View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HRegionLocation;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.KeyValueUtil;
49  import org.apache.hadoop.hbase.RegionLocations;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.classification.InterfaceStability;
54  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
55  import org.apache.hadoop.hbase.client.coprocessor.Batch;
56  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
57  import org.apache.hadoop.hbase.filter.BinaryComparator;
58  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
59  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
60  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
61  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
62  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
63  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
64  import org.apache.hadoop.hbase.protobuf.RequestConverter;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
70  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.Pair;
73  import org.apache.hadoop.hbase.util.Threads;
74  
75  import com.google.common.annotations.VisibleForTesting;
76  import com.google.protobuf.Descriptors;
77  import com.google.protobuf.InvalidProtocolBufferException;
78  import com.google.protobuf.Message;
79  import com.google.protobuf.Service;
80  import com.google.protobuf.ServiceException;
81  
82  /**
83   *
84   * HTable is no longer a client API. It is marked InterfaceAudience.Private indicating that
85   * this is an HBase-internal class as defined in
86   * https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html
87   * There are no guarantees for backwards source / binary compatibility and methods or class can
88   * change or go away without deprecation. Use {@link Connection#getTable(TableName)}
89   * to obtain an instance of {@link Table} instead of constructing an HTable directly.
90   * <p>An implementation of {@link Table}. Used to communicate with a single HBase table.
91   * Lightweight. Get as needed and just close when done.
92   * Instances of this class SHOULD NOT be constructed directly.
93   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
94   * class comment for an example of how.
95   *
96   * <p>This class is NOT thread safe for reads nor write.
97   * In the case of writes (Put, Delete), the underlying write buffer can
98   * be corrupted if multiple threads contend over a single HTable instance.
99   * In the case of reads, some fields used by a Scan are shared among all threads.
100  *
101  * @see Table
102  * @see Admin
103  * @see Connection
104  * @see ConnectionFactory
105  */
106 @InterfaceAudience.Private
107 @InterfaceStability.Stable
108 public class HTable implements HTableInterface, RegionLocator {
109   private static final Log LOG = LogFactory.getLog(HTable.class);
110   protected ClusterConnection connection;
111   private final TableName tableName;
112   private volatile Configuration configuration;
113   protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
114   private long writeBufferSize;
115   private boolean clearBufferOnFail;
116   private boolean autoFlush;
117   protected long currentWriteBufferSize;
118   protected int scannerCaching;
119   private int maxKeyValueSize;
120   private ExecutorService pool;  // For Multi & Scan
121   private boolean closed;
122   private int operationTimeout;
123   private int retries;
124   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
125   private final boolean cleanupConnectionOnClose; // close the connection in close()
126   private Consistency defaultConsistency = Consistency.STRONG;
127   private int primaryCallTimeoutMicroSecond;
128   private int replicaCallTimeoutMicroSecondScan;
129 
130 
131   /** The Async process for puts with autoflush set to false or multiputs */
132   protected AsyncProcess ap;
133   /** The Async process for batch */
134   protected AsyncProcess multiAp;
135   private RpcRetryingCallerFactory rpcCallerFactory;
136   private RpcControllerFactory rpcControllerFactory;
137 
138   /**
139    * Creates an object to access a HBase table.
140    * @param conf Configuration object to use.
141    * @param tableName Name of the table.
142    * @throws IOException if a remote or network exception occurs
143    * @deprecated Constructing HTable objects manually has been deprecated. Please use
144    * {@link Connection} to instantiate a {@link Table} instead.
145    */
146   @Deprecated
147   public HTable(Configuration conf, final String tableName)
148   throws IOException {
149     this(conf, TableName.valueOf(tableName));
150   }
151 
152   /**
153    * Creates an object to access a HBase table.
154    * @param conf Configuration object to use.
155    * @param tableName Name of the table.
156    * @throws IOException if a remote or network exception occurs
157    * @deprecated Constructing HTable objects manually has been deprecated. Please use
158    * {@link Connection} to instantiate a {@link Table} instead.
159    */
160   @Deprecated
161   public HTable(Configuration conf, final byte[] tableName)
162   throws IOException {
163     this(conf, TableName.valueOf(tableName));
164   }
165 
166 
167 
168   /**
169    * Creates an object to access a HBase table.
170    * @param conf Configuration object to use.
171    * @param tableName table name pojo
172    * @throws IOException if a remote or network exception occurs
173    * @deprecated Constructing HTable objects manually has been deprecated. Please use
174    * {@link Connection} to instantiate a {@link Table} instead.
175    */
176   @Deprecated
177   public HTable(Configuration conf, final TableName tableName)
178   throws IOException {
179     this.tableName = tableName;
180     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
181     if (conf == null) {
182       this.connection = null;
183       return;
184     }
185     this.connection = ConnectionManager.getConnectionInternal(conf);
186     this.configuration = conf;
187 
188     this.pool = getDefaultExecutor(conf);
189     this.finishSetup();
190   }
191 
192   /**
193    * Creates an object to access a HBase table.
194    * @param tableName Name of the table.
195    * @param connection HConnection to be used.
196    * @throws IOException if a remote or network exception occurs
197    * @deprecated Do not use.
198    */
199   @Deprecated
200   public HTable(TableName tableName, Connection connection) throws IOException {
201     this.tableName = tableName;
202     this.cleanupPoolOnClose = true;
203     this.cleanupConnectionOnClose = false;
204     this.connection = (ClusterConnection)connection;
205     this.configuration = connection.getConfiguration();
206 
207     this.pool = getDefaultExecutor(this.configuration);
208     this.finishSetup();
209   }
210 
211   // Marked Private @since 1.0
212   @InterfaceAudience.Private
213   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
214     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
215     if (maxThreads == 0) {
216       maxThreads = 1; // is there a better default?
217     }
218     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
219 
220     // Using the "direct handoff" approach, new threads will only be created
221     // if it is necessary and will grow unbounded. This could be bad but in HCM
222     // we only create as many Runnables as there are region servers. It means
223     // it also scales when new region servers are added.
224     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
225         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
226     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
227     return pool;
228   }
229 
230   /**
231    * Creates an object to access a HBase table.
232    * @param conf Configuration object to use.
233    * @param tableName Name of the table.
234    * @param pool ExecutorService to be used.
235    * @throws IOException if a remote or network exception occurs
236    * @deprecated Constructing HTable objects manually has been deprecated. Please use
237    * {@link Connection} to instantiate a {@link Table} instead.
238    */
239   @Deprecated
240   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
241       throws IOException {
242     this(conf, TableName.valueOf(tableName), pool);
243   }
244 
245   /**
246    * Creates an object to access a HBase table.
247    * @param conf Configuration object to use.
248    * @param tableName Name of the table.
249    * @param pool ExecutorService to be used.
250    * @throws IOException if a remote or network exception occurs
251    * @deprecated Constructing HTable objects manually has been deprecated. Please use
252    * {@link Connection} to instantiate a {@link Table} instead.
253    */
254   @Deprecated
255   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
256       throws IOException {
257     this.connection = ConnectionManager.getConnectionInternal(conf);
258     this.configuration = conf;
259     this.pool = pool;
260     if (pool == null) {
261       this.pool = getDefaultExecutor(conf);
262       this.cleanupPoolOnClose = true;
263     } else {
264       this.cleanupPoolOnClose = false;
265     }
266     this.tableName = tableName;
267     this.cleanupConnectionOnClose = true;
268     this.finishSetup();
269   }
270 
271   /**
272    * Creates an object to access a HBase table.
273    * @param tableName Name of the table.
274    * @param connection HConnection to be used.
275    * @param pool ExecutorService to be used.
276    * @throws IOException if a remote or network exception occurs.
277    * @deprecated Do not use, internal ctor.
278    */
279   @Deprecated
280   public HTable(final byte[] tableName, final Connection connection,
281       final ExecutorService pool) throws IOException {
282     this(TableName.valueOf(tableName), connection, pool);
283   }
284 
285   /** @deprecated Do not use, internal ctor. */
286   @Deprecated
287   public HTable(TableName tableName, final Connection connection,
288       final ExecutorService pool) throws IOException {
289     this(tableName, (ClusterConnection)connection, pool);
290   }
291 
292   /**
293    * Creates an object to access a HBase table.
294    * @param tableName Name of the table.
295    * @param connection HConnection to be used.
296    * @param pool ExecutorService to be used.
297    * @throws IOException if a remote or network exception occurs
298    */
299   @InterfaceAudience.Private
300   public HTable(TableName tableName, final ClusterConnection connection,
301       final ExecutorService pool) throws IOException {
302     if (connection == null || connection.isClosed()) {
303       throw new IllegalArgumentException("Connection is null or closed.");
304     }
305     this.tableName = tableName;
306     this.cleanupConnectionOnClose = false;
307     this.connection = connection;
308     this.configuration = connection.getConfiguration();
309     this.pool = pool;
310     if (pool == null) {
311       this.pool = getDefaultExecutor(this.configuration);
312       this.cleanupPoolOnClose = true;
313     } else {
314       this.cleanupPoolOnClose = false;
315     }
316 
317     this.finishSetup();
318   }
319 
320   /**
321    * For internal testing.
322    */
323   @VisibleForTesting
324   protected HTable() {
325     tableName = null;
326     cleanupPoolOnClose = false;
327     cleanupConnectionOnClose = false;
328   }
329 
330   /**
331    * @return maxKeyValueSize from configuration.
332    */
333   public static int getMaxKeyValueSize(Configuration conf) {
334     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
335   }
336 
337   /**
338    * setup this HTable's parameter based on the passed configuration
339    */
340   private void finishSetup() throws IOException {
341     this.operationTimeout = tableName.isSystemTable() ?
342       this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
343         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
344       this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
345         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
346     this.writeBufferSize = this.configuration.getLong(
347         "hbase.client.write.buffer", 2097152);
348     this.clearBufferOnFail = true;
349     this.autoFlush = true;
350     this.currentWriteBufferSize = 0;
351     this.scannerCaching = this.configuration.getInt(
352         HConstants.HBASE_CLIENT_SCANNER_CACHING,
353         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
354     this.primaryCallTimeoutMicroSecond =
355         this.configuration.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10 ms
356     this.replicaCallTimeoutMicroSecondScan =
357         this.configuration.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
358     this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
359             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
360 
361     this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
362     this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
363     // puts need to track errors globally due to how the APIs currently work.
364     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
365     multiAp = this.connection.getAsyncProcess();
366 
367     this.maxKeyValueSize = getMaxKeyValueSize(this.configuration);
368     this.closed = false;
369   }
370 
371   /**
372    * {@inheritDoc}
373    */
374   @Override
375   public Configuration getConfiguration() {
376     return configuration;
377   }
378 
379   /**
380    * Tells whether or not a table is enabled or not. This method creates a
381    * new HBase configuration, so it might make your unit tests fail due to
382    * incorrect ZK client port.
383    * @param tableName Name of table to check.
384    * @return {@code true} if table is online.
385    * @throws IOException if a remote or network exception occurs
386    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
387    */
388   @Deprecated
389   public static boolean isTableEnabled(String tableName) throws IOException {
390     return isTableEnabled(TableName.valueOf(tableName));
391   }
392 
393   /**
394    * Tells whether or not a table is enabled or not. This method creates a
395    * new HBase configuration, so it might make your unit tests fail due to
396    * incorrect ZK client port.
397    * @param tableName Name of table to check.
398    * @return {@code true} if table is online.
399    * @throws IOException if a remote or network exception occurs
400    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
401    */
402   @Deprecated
403   public static boolean isTableEnabled(byte[] tableName) throws IOException {
404     return isTableEnabled(TableName.valueOf(tableName));
405   }
406 
407   /**
408    * Tells whether or not a table is enabled or not. This method creates a
409    * new HBase configuration, so it might make your unit tests fail due to
410    * incorrect ZK client port.
411    * @param tableName Name of table to check.
412    * @return {@code true} if table is online.
413    * @throws IOException if a remote or network exception occurs
414    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
415    */
416   @Deprecated
417   public static boolean isTableEnabled(TableName tableName) throws IOException {
418     return isTableEnabled(HBaseConfiguration.create(), tableName);
419   }
420 
421   /**
422    * Tells whether or not a table is enabled or not.
423    * @param conf The Configuration object to use.
424    * @param tableName Name of table to check.
425    * @return {@code true} if table is online.
426    * @throws IOException if a remote or network exception occurs
427 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
428    */
429   @Deprecated
430   public static boolean isTableEnabled(Configuration conf, String tableName)
431   throws IOException {
432     return isTableEnabled(conf, TableName.valueOf(tableName));
433   }
434 
435   /**
436    * Tells whether or not a table is enabled or not.
437    * @param conf The Configuration object to use.
438    * @param tableName Name of table to check.
439    * @return {@code true} if table is online.
440    * @throws IOException if a remote or network exception occurs
441 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
442    */
443   @Deprecated
444   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
445   throws IOException {
446     return isTableEnabled(conf, TableName.valueOf(tableName));
447   }
448 
449   /**
450    * Tells whether or not a table is enabled or not.
451    * @param conf The Configuration object to use.
452    * @param tableName Name of table to check.
453    * @return {@code true} if table is online.
454    * @throws IOException if a remote or network exception occurs
455    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
456    */
457   @Deprecated
458   public static boolean isTableEnabled(Configuration conf,
459       final TableName tableName) throws IOException {
460     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
461       @Override
462       public Boolean connect(HConnection connection) throws IOException {
463         return connection.isTableEnabled(tableName);
464       }
465     });
466   }
467 
468   /**
469    * Find region location hosting passed row using cached info
470    * @param row Row to find.
471    * @return The location of the given row.
472    * @throws IOException if a remote or network exception occurs
473    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
474    */
475   @Deprecated
476   public HRegionLocation getRegionLocation(final String row)
477   throws IOException {
478     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
479   }
480 
481   /**
482    * {@inheritDoc}
483    */
484   @Override
485   public HRegionLocation getRegionLocation(final byte [] row)
486   throws IOException {
487     return connection.getRegionLocation(tableName, row, false);
488   }
489 
490   /**
491    * {@inheritDoc}
492    */
493   @Override
494   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
495   throws IOException {
496     return connection.getRegionLocation(tableName, row, reload);
497   }
498 
499   /**
500    * {@inheritDoc}
501    */
502   @Override
503   public byte [] getTableName() {
504     return this.tableName.getName();
505   }
506 
507   @Override
508   public TableName getName() {
509     return tableName;
510   }
511 
512   /**
513    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
514    * manipulations.
515    * @return An HConnection instance.
516    * @deprecated This method will be changed from public to package protected.
517    */
518   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
519   @Deprecated
520   @VisibleForTesting
521   public HConnection getConnection() {
522     return this.connection;
523   }
524 
525   /**
526    * Gets the number of rows that a scanner will fetch at once.
527    * <p>
528    * The default value comes from {@code hbase.client.scanner.caching}.
529    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
530    */
531   @Deprecated
532   public int getScannerCaching() {
533     return scannerCaching;
534   }
535 
536   /**
537    * Kept in 0.96 for backward compatibility
538    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
539    */
540   @Deprecated
541   public List<Row> getWriteBuffer() {
542     return writeAsyncBuffer;
543   }
544 
545   /**
546    * Sets the number of rows that a scanner will fetch at once.
547    * <p>
548    * This will override the value specified by
549    * {@code hbase.client.scanner.caching}.
550    * Increasing this value will reduce the amount of work needed each time
551    * {@code next()} is called on a scanner, at the expense of memory use
552    * (since more rows will need to be maintained in memory by the scanners).
553    * @param scannerCaching the number of rows a scanner will fetch at once.
554    * @deprecated Use {@link Scan#setCaching(int)}
555    */
556   @Deprecated
557   public void setScannerCaching(int scannerCaching) {
558     this.scannerCaching = scannerCaching;
559   }
560 
561   /**
562    * {@inheritDoc}
563    */
564   @Override
565   public HTableDescriptor getTableDescriptor() throws IOException {
566     return new UnmodifyableHTableDescriptor(
567       this.connection.getHTableDescriptor(this.tableName));
568   }
569 
570   /**
571    * {@inheritDoc}
572    */
573   @Override
574   public byte [][] getStartKeys() throws IOException {
575     return getStartEndKeys().getFirst();
576   }
577 
578   /**
579    * {@inheritDoc}
580    */
581   @Override
582   public byte[][] getEndKeys() throws IOException {
583     return getStartEndKeys().getSecond();
584   }
585 
586   /**
587    * {@inheritDoc}
588    */
589   @Override
590   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
591 
592     List<RegionLocations> regions = listRegionLocations();
593     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
594     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
595 
596     for (RegionLocations locations : regions) {
597       HRegionInfo region = locations.getRegionLocation().getRegionInfo();
598       startKeyList.add(region.getStartKey());
599       endKeyList.add(region.getEndKey());
600     }
601 
602     return new Pair<byte [][], byte [][]>(
603       startKeyList.toArray(new byte[startKeyList.size()][]),
604       endKeyList.toArray(new byte[endKeyList.size()][]));
605   }
606 
607   @VisibleForTesting
608   List<RegionLocations> listRegionLocations() throws IOException {
609     return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName());
610   }
611 
612   /**
613    * Gets all the regions and their address for this table.
614    * <p>
615    * This is mainly useful for the MapReduce integration.
616    * @return A map of HRegionInfo with it's server address
617    * @throws IOException if a remote or network exception occurs
618    * @deprecated This is no longer a public API.  Use {@link #getAllRegionLocations()} instead.
619    */
620   @Deprecated
621   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
622     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an HRegionLocation.
623     return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName());
624   }
625 
626   /**
627    * Gets all the regions and their address for this table.
628    * <p>
629    * This is mainly useful for the MapReduce integration.
630    * @return A map of HRegionInfo with it's server address
631    * @throws IOException if a remote or network exception occurs
632    */
633   @Override
634   public List<HRegionLocation> getAllRegionLocations() throws IOException {
635     NavigableMap<HRegionInfo, ServerName> locations = getRegionLocations();
636     ArrayList<HRegionLocation> regions = new ArrayList<>(locations.size());
637     for (Entry<HRegionInfo, ServerName> entry : locations.entrySet()) {
638       regions.add(new HRegionLocation(entry.getKey(), entry.getValue()));
639     }
640     return regions;
641   }
642 
643   /**
644    * Get the corresponding regions for an arbitrary range of keys.
645    * <p>
646    * @param startKey Starting row in range, inclusive
647    * @param endKey Ending row in range, exclusive
648    * @return A list of HRegionLocations corresponding to the regions that
649    * contain the specified range
650    * @throws IOException if a remote or network exception occurs
651    * @deprecated This is no longer a public API
652    */
653   @Deprecated
654   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
655     final byte [] endKey) throws IOException {
656     return getRegionsInRange(startKey, endKey, false);
657   }
658 
659   /**
660    * Get the corresponding regions for an arbitrary range of keys.
661    * <p>
662    * @param startKey Starting row in range, inclusive
663    * @param endKey Ending row in range, exclusive
664    * @param reload true to reload information or false to use cached information
665    * @return A list of HRegionLocations corresponding to the regions that
666    * contain the specified range
667    * @throws IOException if a remote or network exception occurs
668    * @deprecated This is no longer a public API
669    */
670   @Deprecated
671   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
672       final byte [] endKey, final boolean reload) throws IOException {
673     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
674   }
675 
676   /**
677    * Get the corresponding start keys and regions for an arbitrary range of
678    * keys.
679    * <p>
680    * @param startKey Starting row in range, inclusive
681    * @param endKey Ending row in range
682    * @param includeEndKey true if endRow is inclusive, false if exclusive
683    * @return A pair of list of start keys and list of HRegionLocations that
684    *         contain the specified range
685    * @throws IOException if a remote or network exception occurs
686    * @deprecated This is no longer a public API
687    */
688   @Deprecated
689   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
690       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
691       throws IOException {
692     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
693   }
694 
695   /**
696    * Get the corresponding start keys and regions for an arbitrary range of
697    * keys.
698    * <p>
699    * @param startKey Starting row in range, inclusive
700    * @param endKey Ending row in range
701    * @param includeEndKey true if endRow is inclusive, false if exclusive
702    * @param reload true to reload information or false to use cached information
703    * @return A pair of list of start keys and list of HRegionLocations that
704    *         contain the specified range
705    * @throws IOException if a remote or network exception occurs
706    * @deprecated This is no longer a public API
707    */
708   @Deprecated
709   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
710       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
711       final boolean reload) throws IOException {
712     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
713     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
714       throw new IllegalArgumentException(
715         "Invalid range: " + Bytes.toStringBinary(startKey) +
716         " > " + Bytes.toStringBinary(endKey));
717     }
718     List<byte[]> keysInRange = new ArrayList<byte[]>();
719     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
720     byte[] currentKey = startKey;
721     do {
722       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
723       keysInRange.add(currentKey);
724       regionsInRange.add(regionLocation);
725       currentKey = regionLocation.getRegionInfo().getEndKey();
726     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
727         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
728             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
729     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
730         regionsInRange);
731   }
732 
733   /**
734    * {@inheritDoc}
735    * @deprecated Use reversed scan instead.
736    */
737    @Override
738    @Deprecated
739    public Result getRowOrBefore(final byte[] row, final byte[] family)
740        throws IOException {
741      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
742          tableName, row) {
743        @Override
744       public Result call(int callTimeout) throws IOException {
745          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
746          controller.setPriority(tableName);
747          controller.setCallTimeout(callTimeout);
748          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
749              getLocation().getRegionInfo().getRegionName(), row, family);
750          try {
751            ClientProtos.GetResponse response = getStub().get(controller, request);
752            if (!response.hasResult()) return null;
753            return ProtobufUtil.toResult(response.getResult());
754          } catch (ServiceException se) {
755            throw ProtobufUtil.getRemoteException(se);
756          }
757        }
758      };
759      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
760    }
761 
762   /**
763    * The underlying {@link HTable} must not be closed.
764    * {@link HTableInterface#getScanner(Scan)} has other usage details.
765    */
766   @Override
767   public ResultScanner getScanner(final Scan scan) throws IOException {
768     if (scan.getCaching() <= 0) {
769       scan.setCaching(getScannerCaching());
770     }
771 
772     if (scan.isReversed()) {
773       if (scan.isSmall()) {
774         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
775             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
776             pool, replicaCallTimeoutMicroSecondScan);
777       } else {
778         return new ReversedClientScanner(getConfiguration(), scan, getName(),
779             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
780             pool, replicaCallTimeoutMicroSecondScan);
781       }
782     }
783 
784     if (scan.isSmall()) {
785       return new ClientSmallScanner(getConfiguration(), scan, getName(),
786           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
787           pool, replicaCallTimeoutMicroSecondScan);
788     } else {
789       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
790           this.rpcCallerFactory, this.rpcControllerFactory,
791           pool, replicaCallTimeoutMicroSecondScan);
792     }
793   }
794 
795   /**
796    * The underlying {@link HTable} must not be closed.
797    * {@link HTableInterface#getScanner(byte[])} has other usage details.
798    */
799   @Override
800   public ResultScanner getScanner(byte [] family) throws IOException {
801     Scan scan = new Scan();
802     scan.addFamily(family);
803     return getScanner(scan);
804   }
805 
806   /**
807    * The underlying {@link HTable} must not be closed.
808    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
809    */
810   @Override
811   public ResultScanner getScanner(byte [] family, byte [] qualifier)
812   throws IOException {
813     Scan scan = new Scan();
814     scan.addColumn(family, qualifier);
815     return getScanner(scan);
816   }
817 
818   /**
819    * {@inheritDoc}
820    */
821   @Override
822   public Result get(final Get get) throws IOException {
823     if (get.getConsistency() == null){
824       get.setConsistency(defaultConsistency);
825     }
826 
827     if (get.getConsistency() == Consistency.STRONG) {
828       // Good old call.
829       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
830           getName(), get.getRow()) {
831         @Override
832         public Result call(int callTimeout) throws IOException {
833           ClientProtos.GetRequest request =
834               RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
835           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
836           controller.setPriority(tableName);
837           controller.setCallTimeout(callTimeout);
838           try {
839             ClientProtos.GetResponse response = getStub().get(controller, request);
840             if (response == null) return null;
841             return ProtobufUtil.toResult(response.getResult());
842           } catch (ServiceException se) {
843             throw ProtobufUtil.getRemoteException(se);
844           }
845         }
846       };
847       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
848     }
849 
850     // Call that takes into account the replica
851     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
852       rpcControllerFactory, tableName, this.connection, get, pool, retries,
853       operationTimeout, primaryCallTimeoutMicroSecond);
854     return callable.call();
855   }
856 
857 
858   /**
859    * {@inheritDoc}
860    */
861   @Override
862   public Result[] get(List<Get> gets) throws IOException {
863     if (gets.size() == 1) {
864       return new Result[]{get(gets.get(0))};
865     }
866     try {
867       Object [] r1 = batch((List)gets);
868 
869       // translate.
870       Result [] results = new Result[r1.length];
871       int i=0;
872       for (Object o : r1) {
873         // batch ensures if there is a failure we get an exception instead
874         results[i++] = (Result) o;
875       }
876 
877       return results;
878     } catch (InterruptedException e) {
879       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
880     }
881   }
882 
883   /**
884    * {@inheritDoc}
885    */
886   @Override
887   public void batch(final List<? extends Row> actions, final Object[] results)
888       throws InterruptedException, IOException {
889     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
890     ars.waitUntilDone();
891     if (ars.hasError()) {
892       throw ars.getErrors();
893     }
894   }
895 
896   /**
897    * {@inheritDoc}
898    * @deprecated If any exception is thrown by one of the actions, there is no way to
899    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
900    */
901   @Deprecated
902   @Override
903   public Object[] batch(final List<? extends Row> actions)
904      throws InterruptedException, IOException {
905     Object[] results = new Object[actions.size()];
906     batch(actions, results);
907     return results;
908   }
909 
910   /**
911    * {@inheritDoc}
912    */
913   @Override
914   public <R> void batchCallback(
915       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
916       throws IOException, InterruptedException {
917     connection.processBatchCallback(actions, tableName, pool, results, callback);
918   }
919 
920   /**
921    * {@inheritDoc}
922    * @deprecated If any exception is thrown by one of the actions, there is no way to
923    * retrieve the partially executed results. Use
924    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
925    * instead.
926    */
927   @Deprecated
928   @Override
929   public <R> Object[] batchCallback(
930     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
931       InterruptedException {
932     Object[] results = new Object[actions.size()];
933     batchCallback(actions, results, callback);
934     return results;
935   }
936 
937   /**
938    * {@inheritDoc}
939    */
940   @Override
941   public void delete(final Delete delete)
942   throws IOException {
943     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
944         tableName, delete.getRow()) {
945       @Override
946       public Boolean call(int callTimeout) throws IOException {
947         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
948         controller.setPriority(tableName);
949         controller.setCallTimeout(callTimeout);
950 
951         try {
952           MutateRequest request = RequestConverter.buildMutateRequest(
953             getLocation().getRegionInfo().getRegionName(), delete);
954           MutateResponse response = getStub().mutate(controller, request);
955           return Boolean.valueOf(response.getProcessed());
956         } catch (ServiceException se) {
957           throw ProtobufUtil.getRemoteException(se);
958         }
959       }
960     };
961     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
962   }
963 
964   /**
965    * {@inheritDoc}
966    */
967   @Override
968   public void delete(final List<Delete> deletes)
969   throws IOException {
970     Object[] results = new Object[deletes.size()];
971     try {
972       batch(deletes, results);
973     } catch (InterruptedException e) {
974       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
975     } finally {
976       // mutate list so that it is empty for complete success, or contains only failed records
977       // results are returned in the same order as the requests in list
978       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
979       for (int i = results.length - 1; i>=0; i--) {
980         // if result is not null, it succeeded
981         if (results[i] instanceof Result) {
982           deletes.remove(i);
983         }
984       }
985     }
986   }
987 
988   /**
989    * {@inheritDoc}
990    */
991   @Override
992   public void put(final Put put)
993       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
994     doPut(put);
995     if (autoFlush) {
996       flushCommits();
997     }
998   }
999 
1000   /**
1001    * {@inheritDoc}
1002    */
1003   @Override
1004   public void put(final List<Put> puts)
1005       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1006     for (Put put : puts) {
1007       doPut(put);
1008     }
1009     if (autoFlush) {
1010       flushCommits();
1011     }
1012   }
1013 
1014 
1015   /**
1016    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
1017    *  cluster.
1018    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
1019    * @throws InterruptedIOException if we were interrupted.
1020    */
1021   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1022     // This behavior is highly non-intuitive... it does not protect us against
1023     // 94-incompatible behavior, which is a timing issue because hasError, the below code
1024     // and setter of hasError are not synchronized. Perhaps it should be removed.
1025     if (ap.hasError()) {
1026       writeAsyncBuffer.add(put);
1027       backgroundFlushCommits(true);
1028     }
1029 
1030     validatePut(put);
1031 
1032     currentWriteBufferSize += put.heapSize();
1033     writeAsyncBuffer.add(put);
1034 
1035     while (currentWriteBufferSize > writeBufferSize) {
1036       backgroundFlushCommits(false);
1037     }
1038   }
1039 
1040 
1041   /**
1042    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
1043    * If the is an error (max retried reach from a previous flush or bad operation), it tries to
1044    * send all operations in the buffer and sends an exception.
1045    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
1046    *                     returning.
1047    */
1048   private void backgroundFlushCommits(boolean synchronous) throws
1049       InterruptedIOException, RetriesExhaustedWithDetailsException {
1050 
1051     try {
1052       if (!synchronous) {
1053         ap.submit(tableName, writeAsyncBuffer, true, null, false);
1054         if (ap.hasError()) {
1055           LOG.debug(tableName + ": One or more of the operations have failed -" +
1056               " waiting for all operation in progress to finish (successfully or not)");
1057         }
1058       }
1059       if (synchronous || ap.hasError()) {
1060         while (!writeAsyncBuffer.isEmpty()) {
1061           ap.submit(tableName, writeAsyncBuffer, true, null, false);
1062         }
1063         List<Row> failedRows = clearBufferOnFail ? null : writeAsyncBuffer;
1064         RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(failedRows);
1065         if (error != null) {
1066           throw error;
1067         }
1068       }
1069     } finally {
1070       currentWriteBufferSize = 0;
1071       for (Row mut : writeAsyncBuffer) {
1072         if (mut instanceof Mutation) {
1073           currentWriteBufferSize += ((Mutation) mut).heapSize();
1074         }
1075       }
1076     }
1077   }
1078 
1079   /**
1080    * {@inheritDoc}
1081    */
1082   @Override
1083   public void mutateRow(final RowMutations rm) throws IOException {
1084     RegionServerCallable<Void> callable =
1085         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1086       @Override
1087       public Void call(int callTimeout) throws IOException {
1088         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1089         controller.setPriority(tableName);
1090         controller.setCallTimeout(callTimeout);
1091         try {
1092           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1093             getLocation().getRegionInfo().getRegionName(), rm);
1094           regionMutationBuilder.setAtomic(true);
1095           MultiRequest request =
1096             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1097           getStub().multi(controller, request);
1098         } catch (ServiceException se) {
1099           throw ProtobufUtil.getRemoteException(se);
1100         }
1101         return null;
1102       }
1103     };
1104     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1105   }
1106 
1107   /**
1108    * {@inheritDoc}
1109    */
1110   @Override
1111   public Result append(final Append append) throws IOException {
1112     if (append.numFamilies() == 0) {
1113       throw new IOException(
1114           "Invalid arguments to append, no columns specified");
1115     }
1116 
1117     NonceGenerator ng = this.connection.getNonceGenerator();
1118     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1119     RegionServerCallable<Result> callable =
1120       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1121         @Override
1122         public Result call(int callTimeout) throws IOException {
1123           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1124           controller.setPriority(getTableName());
1125           controller.setCallTimeout(callTimeout);
1126           try {
1127             MutateRequest request = RequestConverter.buildMutateRequest(
1128               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1129             MutateResponse response = getStub().mutate(controller, request);
1130             if (!response.hasResult()) return null;
1131             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1132           } catch (ServiceException se) {
1133             throw ProtobufUtil.getRemoteException(se);
1134           }
1135         }
1136       };
1137     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1138   }
1139 
1140   /**
1141    * {@inheritDoc}
1142    */
1143   @Override
1144   public Result increment(final Increment increment) throws IOException {
1145     if (!increment.hasFamilies()) {
1146       throw new IOException(
1147           "Invalid arguments to increment, no columns specified");
1148     }
1149     NonceGenerator ng = this.connection.getNonceGenerator();
1150     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1151     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1152         getName(), increment.getRow()) {
1153       @Override
1154       public Result call(int callTimeout) throws IOException {
1155         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1156         controller.setPriority(getTableName());
1157         controller.setCallTimeout(callTimeout);
1158         try {
1159           MutateRequest request = RequestConverter.buildMutateRequest(
1160             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1161           MutateResponse response = getStub().mutate(controller, request);
1162           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1163         } catch (ServiceException se) {
1164           throw ProtobufUtil.getRemoteException(se);
1165         }
1166       }
1167     };
1168     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1169   }
1170 
1171   /**
1172    * {@inheritDoc}
1173    */
1174   @Override
1175   public long incrementColumnValue(final byte [] row, final byte [] family,
1176       final byte [] qualifier, final long amount)
1177   throws IOException {
1178     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1179   }
1180 
1181   /**
1182    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1183    */
1184   @Deprecated
1185   @Override
1186   public long incrementColumnValue(final byte [] row, final byte [] family,
1187       final byte [] qualifier, final long amount, final boolean writeToWAL)
1188   throws IOException {
1189     return incrementColumnValue(row, family, qualifier, amount,
1190       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1191   }
1192 
1193   /**
1194    * {@inheritDoc}
1195    */
1196   @Override
1197   public long incrementColumnValue(final byte [] row, final byte [] family,
1198       final byte [] qualifier, final long amount, final Durability durability)
1199   throws IOException {
1200     NullPointerException npe = null;
1201     if (row == null) {
1202       npe = new NullPointerException("row is null");
1203     } else if (family == null) {
1204       npe = new NullPointerException("family is null");
1205     } else if (qualifier == null) {
1206       npe = new NullPointerException("qualifier is null");
1207     }
1208     if (npe != null) {
1209       throw new IOException(
1210           "Invalid arguments to incrementColumnValue", npe);
1211     }
1212 
1213     NonceGenerator ng = this.connection.getNonceGenerator();
1214     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1215     RegionServerCallable<Long> callable =
1216       new RegionServerCallable<Long>(connection, getName(), row) {
1217         @Override
1218         public Long call(int callTimeout) throws IOException {
1219           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1220           controller.setPriority(getTableName());
1221           controller.setCallTimeout(callTimeout);
1222           try {
1223             MutateRequest request = RequestConverter.buildIncrementRequest(
1224               getLocation().getRegionInfo().getRegionName(), row, family,
1225               qualifier, amount, durability, nonceGroup, nonce);
1226             MutateResponse response = getStub().mutate(controller, request);
1227             Result result =
1228               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1229             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1230           } catch (ServiceException se) {
1231             throw ProtobufUtil.getRemoteException(se);
1232           }
1233         }
1234       };
1235     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1236   }
1237 
1238   /**
1239    * {@inheritDoc}
1240    */
1241   @Override
1242   public boolean checkAndPut(final byte [] row,
1243       final byte [] family, final byte [] qualifier, final byte [] value,
1244       final Put put)
1245   throws IOException {
1246     RegionServerCallable<Boolean> callable =
1247       new RegionServerCallable<Boolean>(connection, getName(), row) {
1248         @Override
1249         public Boolean call(int callTimeout) throws IOException {
1250           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1251           controller.setPriority(tableName);
1252           controller.setCallTimeout(callTimeout);
1253           try {
1254             MutateRequest request = RequestConverter.buildMutateRequest(
1255               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1256                 new BinaryComparator(value), CompareType.EQUAL, put);
1257             MutateResponse response = getStub().mutate(controller, request);
1258             return Boolean.valueOf(response.getProcessed());
1259           } catch (ServiceException se) {
1260             throw ProtobufUtil.getRemoteException(se);
1261           }
1262         }
1263       };
1264     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1265   }
1266 
1267   /**
1268    * {@inheritDoc}
1269    */
1270   @Override
1271   public boolean checkAndPut(final byte [] row, final byte [] family,
1272       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1273       final Put put)
1274   throws IOException {
1275     RegionServerCallable<Boolean> callable =
1276       new RegionServerCallable<Boolean>(connection, getName(), row) {
1277         @Override
1278         public Boolean call(int callTimeout) throws IOException {
1279           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1280           controller.setPriority(tableName);
1281           controller.setCallTimeout(callTimeout);
1282           try {
1283             CompareType compareType = CompareType.valueOf(compareOp.name());
1284             MutateRequest request = RequestConverter.buildMutateRequest(
1285               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1286                 new BinaryComparator(value), compareType, put);
1287             MutateResponse response = getStub().mutate(controller, request);
1288             return Boolean.valueOf(response.getProcessed());
1289           } catch (ServiceException se) {
1290             throw ProtobufUtil.getRemoteException(se);
1291           }
1292         }
1293       };
1294     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1295   }
1296 
1297   /**
1298    * {@inheritDoc}
1299    */
1300   @Override
1301   public boolean checkAndDelete(final byte [] row,
1302       final byte [] family, final byte [] qualifier, final byte [] value,
1303       final Delete delete)
1304   throws IOException {
1305     RegionServerCallable<Boolean> callable =
1306       new RegionServerCallable<Boolean>(connection, getName(), row) {
1307         @Override
1308         public Boolean call(int callTimeout) throws IOException {
1309           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1310           controller.setPriority(tableName);
1311           controller.setCallTimeout(callTimeout);
1312           try {
1313             MutateRequest request = RequestConverter.buildMutateRequest(
1314               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1315                 new BinaryComparator(value), CompareType.EQUAL, delete);
1316             MutateResponse response = getStub().mutate(controller, request);
1317             return Boolean.valueOf(response.getProcessed());
1318           } catch (ServiceException se) {
1319             throw ProtobufUtil.getRemoteException(se);
1320           }
1321         }
1322       };
1323     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1324   }
1325 
1326   /**
1327    * {@inheritDoc}
1328    */
1329   @Override
1330   public boolean checkAndDelete(final byte [] row, final byte [] family,
1331       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1332       final Delete delete)
1333   throws IOException {
1334     RegionServerCallable<Boolean> callable =
1335       new RegionServerCallable<Boolean>(connection, getName(), row) {
1336         @Override
1337         public Boolean call(int callTimeout) throws IOException {
1338           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1339           controller.setPriority(tableName);
1340           controller.setCallTimeout(callTimeout);
1341           try {
1342             CompareType compareType = CompareType.valueOf(compareOp.name());
1343             MutateRequest request = RequestConverter.buildMutateRequest(
1344               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1345                 new BinaryComparator(value), compareType, delete);
1346             MutateResponse response = getStub().mutate(controller, request);
1347             return Boolean.valueOf(response.getProcessed());
1348           } catch (ServiceException se) {
1349             throw ProtobufUtil.getRemoteException(se);
1350           }
1351         }
1352       };
1353     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1354   }
1355 
1356   /**
1357    * {@inheritDoc}
1358    */
1359   @Override
1360   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1361       final CompareOp compareOp, final byte [] value, final RowMutations rm)
1362   throws IOException {
1363     RegionServerCallable<Boolean> callable =
1364         new RegionServerCallable<Boolean>(connection, getName(), row) {
1365           @Override
1366           public Boolean call(int callTimeout) throws IOException {
1367             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1368             controller.setPriority(tableName);
1369             controller.setCallTimeout(callTimeout);
1370             try {
1371               CompareType compareType = CompareType.valueOf(compareOp.name());
1372               MultiRequest request = RequestConverter.buildMutateRequest(
1373                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1374                   new BinaryComparator(value), compareType, rm);
1375               ClientProtos.MultiResponse response = getStub().multi(controller, request);
1376               return Boolean.valueOf(response.getProcessed());
1377             } catch (ServiceException se) {
1378               throw ProtobufUtil.getRemoteException(se);
1379             }
1380           }
1381         };
1382     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1383   }
1384 
1385   /**
1386    * {@inheritDoc}
1387    */
1388   @Override
1389   public boolean exists(final Get get) throws IOException {
1390     get.setCheckExistenceOnly(true);
1391     Result r = get(get);
1392     assert r.getExists() != null;
1393     return r.getExists();
1394   }
1395 
1396   /**
1397    * {@inheritDoc}
1398    */
1399   @Override
1400   public boolean[] existsAll(final List<Get> gets) throws IOException {
1401     if (gets.isEmpty()) return new boolean[]{};
1402     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1403 
1404     for (Get g: gets){
1405       g.setCheckExistenceOnly(true);
1406     }
1407 
1408     Object[] r1;
1409     try {
1410       r1 = batch(gets);
1411     } catch (InterruptedException e) {
1412       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1413     }
1414 
1415     // translate.
1416     boolean[] results = new boolean[r1.length];
1417     int i = 0;
1418     for (Object o : r1) {
1419       // batch ensures if there is a failure we get an exception instead
1420       results[i++] = ((Result)o).getExists();
1421     }
1422 
1423     return results;
1424   }
1425 
1426   /**
1427    * {@inheritDoc}
1428    */
1429   @Override
1430   @Deprecated
1431   public Boolean[] exists(final List<Get> gets) throws IOException {
1432     boolean[] results = existsAll(gets);
1433     Boolean[] objectResults = new Boolean[results.length];
1434     for (int i = 0; i < results.length; ++i) {
1435       objectResults[i] = results[i];
1436     }
1437     return objectResults;
1438   }
1439 
1440   /**
1441    * {@inheritDoc}
1442    */
1443   @Override
1444   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1445     // As we can have an operation in progress even if the buffer is empty, we call
1446     //  backgroundFlushCommits at least one time.
1447     backgroundFlushCommits(true);
1448   }
1449 
1450   /**
1451    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1452    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1453    *
1454    * @param list The collection of actions.
1455    * @param results An empty array, same size as list. If an exception is thrown,
1456    * you can test here for partial results, and to determine which actions
1457    * processed successfully.
1458    * @throws IOException if there are problems talking to META. Per-item
1459    * exceptions are stored in the results array.
1460    */
1461   public <R> void processBatchCallback(
1462     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1463     throws IOException, InterruptedException {
1464     this.batchCallback(list, results, callback);
1465   }
1466 
1467 
1468   /**
1469    * Parameterized batch processing, allowing varying return types for different
1470    * {@link Row} implementations.
1471    */
1472   public void processBatch(final List<? extends Row> list, final Object[] results)
1473     throws IOException, InterruptedException {
1474     this.batch(list, results);
1475   }
1476 
1477 
1478   @Override
1479   public void close() throws IOException {
1480     if (this.closed) {
1481       return;
1482     }
1483     flushCommits();
1484     if (cleanupPoolOnClose) {
1485       this.pool.shutdown();
1486       try {
1487         boolean terminated = false;
1488         do {
1489           // wait until the pool has terminated
1490           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1491         } while (!terminated);
1492       } catch (InterruptedException e) {
1493         LOG.warn("waitForTermination interrupted");
1494       }
1495     }
1496     if (cleanupConnectionOnClose) {
1497       if (this.connection != null) {
1498         this.connection.close();
1499       }
1500     }
1501     this.closed = true;
1502   }
1503 
1504   // validate for well-formedness
1505   public void validatePut(final Put put) throws IllegalArgumentException {
1506     validatePut(put, maxKeyValueSize);
1507   }
1508 
1509   // validate for well-formedness
1510   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1511     if (put.isEmpty()) {
1512       throw new IllegalArgumentException("No columns to insert");
1513     }
1514     if (maxKeyValueSize > 0) {
1515       for (List<Cell> list : put.getFamilyCellMap().values()) {
1516         for (Cell cell : list) {
1517           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1518             throw new IllegalArgumentException("KeyValue size too large");
1519           }
1520         }
1521       }
1522     }
1523   }
1524 
1525   /**
1526    * {@inheritDoc}
1527    */
1528   @Override
1529   public boolean isAutoFlush() {
1530     return autoFlush;
1531   }
1532 
1533   /**
1534    * {@inheritDoc}
1535    */
1536   @Deprecated
1537   @Override
1538   public void setAutoFlush(boolean autoFlush) {
1539     setAutoFlush(autoFlush, autoFlush);
1540   }
1541 
1542   /**
1543    * {@inheritDoc}
1544    */
1545   @Override
1546   public void setAutoFlushTo(boolean autoFlush) {
1547     setAutoFlush(autoFlush, clearBufferOnFail);
1548   }
1549 
1550   /**
1551    * {@inheritDoc}
1552    */
1553   @Override
1554   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1555     this.autoFlush = autoFlush;
1556     this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1557   }
1558 
1559   /**
1560    * Returns the maximum size in bytes of the write buffer for this HTable.
1561    * <p>
1562    * The default value comes from the configuration parameter
1563    * {@code hbase.client.write.buffer}.
1564    * @return The size of the write buffer in bytes.
1565    */
1566   @Override
1567   public long getWriteBufferSize() {
1568     return writeBufferSize;
1569   }
1570 
1571   /**
1572    * Sets the size of the buffer in bytes.
1573    * <p>
1574    * If the new size is less than the current amount of data in the
1575    * write buffer, the buffer gets flushed.
1576    * @param writeBufferSize The new write buffer size, in bytes.
1577    * @throws IOException if a remote or network exception occurs.
1578    */
1579   @Override
1580   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1581     this.writeBufferSize = writeBufferSize;
1582     if(currentWriteBufferSize > writeBufferSize) {
1583       flushCommits();
1584     }
1585   }
1586 
1587   /**
1588    * The pool is used for mutli requests for this HTable
1589    * @return the pool used for mutli
1590    */
1591   ExecutorService getPool() {
1592     return this.pool;
1593   }
1594 
1595   /**
1596    * Enable or disable region cache prefetch for the table. It will be
1597    * applied for the given table's all HTable instances who share the same
1598    * connection. By default, the cache prefetch is enabled.
1599    * @param tableName name of table to configure.
1600    * @param enable Set to true to enable region cache prefetch. Or set to
1601    * false to disable it.
1602    * @throws IOException
1603    * @deprecated does nothing since 0.99
1604    */
1605   @Deprecated
1606   public static void setRegionCachePrefetch(final byte[] tableName,
1607       final boolean enable)  throws IOException {
1608   }
1609 
1610   /**
1611    * @deprecated does nothing since 0.99
1612    */
1613   @Deprecated
1614   public static void setRegionCachePrefetch(
1615       final TableName tableName,
1616       final boolean enable) throws IOException {
1617   }
1618 
1619   /**
1620    * Enable or disable region cache prefetch for the table. It will be
1621    * applied for the given table's all HTable instances who share the same
1622    * connection. By default, the cache prefetch is enabled.
1623    * @param conf The Configuration object to use.
1624    * @param tableName name of table to configure.
1625    * @param enable Set to true to enable region cache prefetch. Or set to
1626    * false to disable it.
1627    * @throws IOException
1628    * @deprecated does nothing since 0.99
1629    */
1630   @Deprecated
1631   public static void setRegionCachePrefetch(final Configuration conf,
1632       final byte[] tableName, final boolean enable) throws IOException {
1633   }
1634 
1635   /**
1636    * @deprecated does nothing since 0.99
1637    */
1638   @Deprecated
1639   public static void setRegionCachePrefetch(final Configuration conf,
1640       final TableName tableName,
1641       final boolean enable) throws IOException {
1642   }
1643 
1644   /**
1645    * Check whether region cache prefetch is enabled or not for the table.
1646    * @param conf The Configuration object to use.
1647    * @param tableName name of table to check
1648    * @return true if table's region cache prefecth is enabled. Otherwise
1649    * it is disabled.
1650    * @throws IOException
1651    * @deprecated always return false since 0.99
1652    */
1653   @Deprecated
1654   public static boolean getRegionCachePrefetch(final Configuration conf,
1655       final byte[] tableName) throws IOException {
1656     return false;
1657   }
1658 
1659   /**
1660    * @deprecated always return false since 0.99
1661    */
1662   @Deprecated
1663   public static boolean getRegionCachePrefetch(final Configuration conf,
1664       final TableName tableName) throws IOException {
1665     return false;
1666   }
1667 
1668   /**
1669    * Check whether region cache prefetch is enabled or not for the table.
1670    * @param tableName name of table to check
1671    * @return true if table's region cache prefecth is enabled. Otherwise
1672    * it is disabled.
1673    * @throws IOException
1674    * @deprecated always return false since 0.99
1675    */
1676   @Deprecated
1677   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1678     return false;
1679   }
1680 
1681   /**
1682    * @deprecated always return false since 0.99
1683    */
1684   @Deprecated
1685   public static boolean getRegionCachePrefetch(
1686       final TableName tableName) throws IOException {
1687     return false;
1688   }
1689 
1690   /**
1691    * Explicitly clears the region cache to fetch the latest value from META.
1692    * This is a power user function: avoid unless you know the ramifications.
1693    */
1694   public void clearRegionCache() {
1695     this.connection.clearRegionCache();
1696   }
1697 
1698   /**
1699    * {@inheritDoc}
1700    */
1701   @Override
1702   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1703     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1704   }
1705 
1706   /**
1707    * {@inheritDoc}
1708    */
1709   @Override
1710   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1711       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1712       throws ServiceException, Throwable {
1713     final Map<byte[],R> results =  Collections.synchronizedMap(
1714         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1715     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1716       @Override
1717       public void update(byte[] region, byte[] row, R value) {
1718         if (region != null) {
1719           results.put(region, value);
1720         }
1721       }
1722     });
1723     return results;
1724   }
1725 
1726   /**
1727    * {@inheritDoc}
1728    */
1729   @Override
1730   public <T extends Service, R> void coprocessorService(final Class<T> service,
1731       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1732       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1733 
1734     // get regions covered by the row range
1735     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1736 
1737     Map<byte[],Future<R>> futures =
1738         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1739     for (final byte[] r : keys) {
1740       final RegionCoprocessorRpcChannel channel =
1741           new RegionCoprocessorRpcChannel(connection, tableName, r);
1742       Future<R> future = pool.submit(
1743           new Callable<R>() {
1744             @Override
1745             public R call() throws Exception {
1746               T instance = ProtobufUtil.newServiceStub(service, channel);
1747               R result = callable.call(instance);
1748               byte[] region = channel.getLastRegion();
1749               if (callback != null) {
1750                 callback.update(region, r, result);
1751               }
1752               return result;
1753             }
1754           });
1755       futures.put(r, future);
1756     }
1757     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1758       try {
1759         e.getValue().get();
1760       } catch (ExecutionException ee) {
1761         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1762             + Bytes.toStringBinary(e.getKey()), ee);
1763         throw ee.getCause();
1764       } catch (InterruptedException ie) {
1765         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1766             + " for row " + Bytes.toStringBinary(e.getKey()))
1767             .initCause(ie);
1768       }
1769     }
1770   }
1771 
1772   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1773   throws IOException {
1774     if (start == null) {
1775       start = HConstants.EMPTY_START_ROW;
1776     }
1777     if (end == null) {
1778       end = HConstants.EMPTY_END_ROW;
1779     }
1780     return getKeysAndRegionsInRange(start, end, true).getFirst();
1781   }
1782 
1783   public void setOperationTimeout(int operationTimeout) {
1784     this.operationTimeout = operationTimeout;
1785   }
1786 
1787   public int getOperationTimeout() {
1788     return operationTimeout;
1789   }
1790 
1791   @Override
1792   public String toString() {
1793     return tableName + ";" + connection;
1794   }
1795 
1796   /**
1797    * Run basic test.
1798    * @param args Pass table name and row and will get the content.
1799    * @throws IOException
1800    */
1801   public static void main(String[] args) throws IOException {
1802     Table t = new HTable(HBaseConfiguration.create(), args[0]);
1803     try {
1804       System.out.println(t.get(new Get(Bytes.toBytes(args[1]))));
1805     } finally {
1806       t.close();
1807     }
1808   }
1809 
1810   /**
1811    * {@inheritDoc}
1812    */
1813   @Override
1814   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1815       Descriptors.MethodDescriptor methodDescriptor, Message request,
1816       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1817     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1818         Bytes.BYTES_COMPARATOR));
1819     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1820         new Callback<R>() {
1821 
1822           @Override
1823           public void update(byte[] region, byte[] row, R result) {
1824             if (region != null) {
1825               results.put(region, result);
1826             }
1827           }
1828         });
1829     return results;
1830   }
1831 
1832   /**
1833    * {@inheritDoc}
1834    */
1835   @Override
1836   public <R extends Message> void batchCoprocessorService(
1837       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1838       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1839       throws ServiceException, Throwable {
1840 
1841     // get regions covered by the row range
1842     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1843         getKeysAndRegionsInRange(startKey, endKey, true);
1844     List<byte[]> keys = keysAndRegions.getFirst();
1845     List<HRegionLocation> regions = keysAndRegions.getSecond();
1846 
1847     // check if we have any calls to make
1848     if (keys.isEmpty()) {
1849       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1850           ", end=" + Bytes.toStringBinary(endKey));
1851       return;
1852     }
1853 
1854     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1855     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1856         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1857     for (int i = 0; i < keys.size(); i++) {
1858       final byte[] rowKey = keys.get(i);
1859       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1860       RegionCoprocessorServiceExec exec =
1861           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1862       execs.add(exec);
1863       execsByRow.put(rowKey, exec);
1864     }
1865 
1866     // tracking for any possible deserialization errors on success callback
1867     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1868     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1869     final List<Row> callbackErrorActions = new ArrayList<Row>();
1870     final List<String> callbackErrorServers = new ArrayList<String>();
1871     Object[] results = new Object[execs.size()];
1872 
1873     AsyncProcess asyncProcess =
1874         new AsyncProcess(connection, configuration, pool,
1875             RpcRetryingCallerFactory.instantiate(configuration), true,
1876             RpcControllerFactory.instantiate(configuration));
1877     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1878         new Callback<ClientProtos.CoprocessorServiceResult>() {
1879           @Override
1880           public void update(byte[] region, byte[] row,
1881                               ClientProtos.CoprocessorServiceResult serviceResult) {
1882             if (LOG.isTraceEnabled()) {
1883               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1884                   ": region=" + Bytes.toStringBinary(region) +
1885                   ", row=" + Bytes.toStringBinary(row) +
1886                   ", value=" + serviceResult.getValue().getValue());
1887             }
1888             try {
1889               callback.update(region, row,
1890                   (R) responsePrototype.newBuilderForType().mergeFrom(
1891                       serviceResult.getValue().getValue()).build());
1892             } catch (InvalidProtocolBufferException e) {
1893               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1894                   e);
1895               callbackErrorExceptions.add(e);
1896               callbackErrorActions.add(execsByRow.get(row));
1897               callbackErrorServers.add("null");
1898             }
1899           }
1900         }, results);
1901 
1902     future.waitUntilDone();
1903 
1904     if (future.hasError()) {
1905       throw future.getErrors();
1906     } else if (!callbackErrorExceptions.isEmpty()) {
1907       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1908           callbackErrorServers);
1909     }
1910   }
1911 }