View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HRegionLocation;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.KeyValueUtil;
49  import org.apache.hadoop.hbase.RegionLocations;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.TableNotFoundException;
53  import org.apache.hadoop.hbase.classification.InterfaceAudience;
54  import org.apache.hadoop.hbase.classification.InterfaceStability;
55  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
56  import org.apache.hadoop.hbase.client.coprocessor.Batch;
57  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
58  import org.apache.hadoop.hbase.filter.BinaryComparator;
59  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
60  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
61  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
62  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
63  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
64  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
65  import org.apache.hadoop.hbase.protobuf.RequestConverter;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
70  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
71  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
72  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
73  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
74  import org.apache.hadoop.hbase.util.Bytes;
75  import org.apache.hadoop.hbase.util.Pair;
76  import org.apache.hadoop.hbase.util.Threads;
77  
78  import com.google.common.annotations.VisibleForTesting;
79  import com.google.protobuf.Descriptors;
80  import com.google.protobuf.InvalidProtocolBufferException;
81  import com.google.protobuf.Message;
82  import com.google.protobuf.Service;
83  import com.google.protobuf.ServiceException;
84  
85  /**
86   * An implementation of {@link Table}. Used to communicate with a single HBase table.
87   * Lightweight. Get as needed and just close when done.
88   * Instances of this class SHOULD NOT be constructed directly.
89   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
90   * class comment for an example of how.
91   *
92   * <p>This class is NOT thread safe for reads nor writes.
93   * In the case of writes (Put, Delete), the underlying write buffer can
94   * be corrupted if multiple threads contend over a single HTable instance.
95   * In the case of reads, some fields used by a Scan are shared among all threads.
96   *
97   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
98   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
99   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
100  * Interface Classification</a>
101  * There are no guarantees for backwards source / binary compatibility and methods or class can
102  * change or go away without deprecation.
103  *
104  * @see Table
105  * @see Admin
106  * @see Connection
107  * @see ConnectionFactory
108  */
109 @InterfaceAudience.Private
110 @InterfaceStability.Stable
111 public class HTable implements HTableInterface, RegionLocator {
112   private static final Log LOG = LogFactory.getLog(HTable.class);
113   protected ClusterConnection connection;
114   private final TableName tableName;
115   private volatile Configuration configuration;
116   private TableConfiguration tableConfiguration;
117   protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
118   private long writeBufferSize;
119   private boolean autoFlush = true;
120   protected long currentWriteBufferSize = 0 ;
121   private boolean closed = false;
122   protected int scannerCaching;
123   private ExecutorService pool;  // For Multi & Scan
124   private int operationTimeout;
125   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
126   private final boolean cleanupConnectionOnClose; // close the connection in close()
127   private Consistency defaultConsistency = Consistency.STRONG;
128 
129   /** The Async process for puts with autoflush set to false or multiputs */
130   protected AsyncProcess ap;
131   /** The Async process for batch */
132   protected AsyncProcess multiAp;
133   private RpcRetryingCallerFactory rpcCallerFactory;
134   private RpcControllerFactory rpcControllerFactory;
135 
136   /**
137    * Creates an object to access a HBase table.
138    * @param conf Configuration object to use.
139    * @param tableName Name of the table.
140    * @throws IOException if a remote or network exception occurs
141    * @deprecated Constructing HTable objects manually has been deprecated. Please use
142    * {@link Connection} to instantiate a {@link Table} instead.
143    */
144   @Deprecated
145   public HTable(Configuration conf, final String tableName)
146   throws IOException {
147     this(conf, TableName.valueOf(tableName));
148   }
149 
150   /**
151    * Creates an object to access a HBase table.
152    * @param conf Configuration object to use.
153    * @param tableName Name of the table.
154    * @throws IOException if a remote or network exception occurs
155    * @deprecated Constructing HTable objects manually has been deprecated. Please use
156    * {@link Connection} to instantiate a {@link Table} instead.
157    */
158   @Deprecated
159   public HTable(Configuration conf, final byte[] tableName)
160   throws IOException {
161     this(conf, TableName.valueOf(tableName));
162   }
163 
164   /**
165    * Creates an object to access a HBase table.
166    * @param conf Configuration object to use.
167    * @param tableName table name pojo
168    * @throws IOException if a remote or network exception occurs
169    * @deprecated Constructing HTable objects manually has been deprecated. Please use
170    * {@link Connection} to instantiate a {@link Table} instead.
171    */
172   @Deprecated
173   public HTable(Configuration conf, final TableName tableName)
174   throws IOException {
175     this.tableName = tableName;
176     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
177     if (conf == null) {
178       this.connection = null;
179       return;
180     }
181     this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
182     this.configuration = conf;
183 
184     this.pool = getDefaultExecutor(conf);
185     this.finishSetup();
186   }
187 
188   /**
189    * Creates an object to access a HBase table.
190    * @param tableName Name of the table.
191    * @param connection HConnection to be used.
192    * @throws IOException if a remote or network exception occurs
193    * @deprecated Do not use.
194    */
195   @Deprecated
196   public HTable(TableName tableName, Connection connection) throws IOException {
197     this.tableName = tableName;
198     this.cleanupPoolOnClose = true;
199     this.cleanupConnectionOnClose = false;
200     this.connection = (ClusterConnection)connection;
201     this.configuration = connection.getConfiguration();
202 
203     this.pool = getDefaultExecutor(this.configuration);
204     this.finishSetup();
205   }
206 
207   // Marked Private @since 1.0
208   @InterfaceAudience.Private
209   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
210     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
211     if (maxThreads == 0) {
212       maxThreads = 1; // is there a better default?
213     }
214     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
215 
216     // Using the "direct handoff" approach, new threads will only be created
217     // if it is necessary and will grow unbounded. This could be bad but in HCM
218     // we only create as many Runnables as there are region servers. It means
219     // it also scales when new region servers are added.
220     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
221         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
222     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
223     return pool;
224   }
225 
226   /**
227    * Creates an object to access a HBase table.
228    * @param conf Configuration object to use.
229    * @param tableName Name of the table.
230    * @param pool ExecutorService to be used.
231    * @throws IOException if a remote or network exception occurs
232    * @deprecated Constructing HTable objects manually has been deprecated. Please use
233    * {@link Connection} to instantiate a {@link Table} instead.
234    */
235   @Deprecated
236   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
237       throws IOException {
238     this(conf, TableName.valueOf(tableName), pool);
239   }
240 
241   /**
242    * Creates an object to access a HBase table.
243    * @param conf Configuration object to use.
244    * @param tableName Name of the table.
245    * @param pool ExecutorService to be used.
246    * @throws IOException if a remote or network exception occurs
247    * @deprecated Constructing HTable objects manually has been deprecated. Please use
248    * {@link Connection} to instantiate a {@link Table} instead.
249    */
250   @Deprecated
251   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
252       throws IOException {
253     this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
254     this.configuration = conf;
255     this.pool = pool;
256     if (pool == null) {
257       this.pool = getDefaultExecutor(conf);
258       this.cleanupPoolOnClose = true;
259     } else {
260       this.cleanupPoolOnClose = false;
261     }
262     this.tableName = tableName;
263     this.cleanupConnectionOnClose = true;
264     this.finishSetup();
265   }
266 
267   /**
268    * Creates an object to access a HBase table.
269    * @param tableName Name of the table.
270    * @param connection HConnection to be used.
271    * @param pool ExecutorService to be used.
272    * @throws IOException if a remote or network exception occurs.
273    * @deprecated Do not use, internal ctor.
274    */
275   @Deprecated
276   public HTable(final byte[] tableName, final Connection connection,
277       final ExecutorService pool) throws IOException {
278     this(TableName.valueOf(tableName), connection, pool);
279   }
280 
281   /** @deprecated Do not use, internal ctor. */
282   @Deprecated
283   public HTable(TableName tableName, final Connection connection,
284       final ExecutorService pool) throws IOException {
285     this(tableName, (ClusterConnection)connection, null, null, null, pool);
286   }
287 
288   /**
289    * Creates an object to access a HBase table.
290    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
291    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
292    * @param tableName Name of the table.
293    * @param connection HConnection to be used.
294    * @param pool ExecutorService to be used.
295    * @throws IOException if a remote or network exception occurs
296    */
297   @InterfaceAudience.Private
298   public HTable(TableName tableName, final ClusterConnection connection,
299       final TableConfiguration tableConfig,
300       final RpcRetryingCallerFactory rpcCallerFactory,
301       final RpcControllerFactory rpcControllerFactory,
302       final ExecutorService pool) throws IOException {
303     if (connection == null || connection.isClosed()) {
304       throw new IllegalArgumentException("Connection is null or closed.");
305     }
306     this.tableName = tableName;
307     this.cleanupConnectionOnClose = false;
308     this.connection = connection;
309     this.configuration = connection.getConfiguration();
310     this.tableConfiguration = tableConfig;
311     this.pool = pool;
312     if (pool == null) {
313       this.pool = getDefaultExecutor(this.configuration);
314       this.cleanupPoolOnClose = true;
315     } else {
316       this.cleanupPoolOnClose = false;
317     }
318 
319     this.rpcCallerFactory = rpcCallerFactory;
320     this.rpcControllerFactory = rpcControllerFactory;
321 
322     this.finishSetup();
323   }
324 
325   /**
326    * For internal testing.
327    * @throws IOException
328    */
329   @VisibleForTesting
330   protected HTable() throws IOException {
331     tableName = null;
332     tableConfiguration = new TableConfiguration();
333     cleanupPoolOnClose = false;
334     cleanupConnectionOnClose = false;
335   }
336 
337   /**
338    * @return maxKeyValueSize from configuration.
339    */
340   public static int getMaxKeyValueSize(Configuration conf) {
341     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
342   }
343 
344   /**
345    * setup this HTable's parameter based on the passed configuration
346    */
347   private void finishSetup() throws IOException {
348     if (tableConfiguration == null) {
349       tableConfiguration = new TableConfiguration(configuration);
350     }
351 
352     this.operationTimeout = tableName.isSystemTable() ?
353         tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
354     this.writeBufferSize = tableConfiguration.getWriteBufferSize();
355     this.scannerCaching = tableConfiguration.getScannerCaching();
356 
357     if (this.rpcCallerFactory == null) {
358       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
359     }
360     if (this.rpcControllerFactory == null) {
361       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
362     }
363 
364     // puts need to track errors globally due to how the APIs currently work.
365     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
366     multiAp = this.connection.getAsyncProcess();
367   }
368 
369   /**
370    * {@inheritDoc}
371    */
372   @Override
373   public Configuration getConfiguration() {
374     return configuration;
375   }
376 
377   /**
378    * Tells whether or not a table is enabled or not. This method creates a
379    * new HBase configuration, so it might make your unit tests fail due to
380    * incorrect ZK client port.
381    * @param tableName Name of table to check.
382    * @return {@code true} if table is online.
383    * @throws IOException if a remote or network exception occurs
384    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
385    */
386   @Deprecated
387   public static boolean isTableEnabled(String tableName) throws IOException {
388     return isTableEnabled(TableName.valueOf(tableName));
389   }
390 
391   /**
392    * Tells whether or not a table is enabled or not. This method creates a
393    * new HBase configuration, so it might make your unit tests fail due to
394    * incorrect ZK client port.
395    * @param tableName Name of table to check.
396    * @return {@code true} if table is online.
397    * @throws IOException if a remote or network exception occurs
398    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
399    */
400   @Deprecated
401   public static boolean isTableEnabled(byte[] tableName) throws IOException {
402     return isTableEnabled(TableName.valueOf(tableName));
403   }
404 
405   /**
406    * Tells whether or not a table is enabled or not. This method creates a
407    * new HBase configuration, so it might make your unit tests fail due to
408    * incorrect ZK client port.
409    * @param tableName Name of table to check.
410    * @return {@code true} if table is online.
411    * @throws IOException if a remote or network exception occurs
412    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
413    */
414   @Deprecated
415   public static boolean isTableEnabled(TableName tableName) throws IOException {
416     return isTableEnabled(HBaseConfiguration.create(), tableName);
417   }
418 
419   /**
420    * Tells whether or not a table is enabled or not.
421    * @param conf The Configuration object to use.
422    * @param tableName Name of table to check.
423    * @return {@code true} if table is online.
424    * @throws IOException if a remote or network exception occurs
425    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
426    */
427   @Deprecated
428   public static boolean isTableEnabled(Configuration conf, String tableName)
429   throws IOException {
430     return isTableEnabled(conf, TableName.valueOf(tableName));
431   }
432 
433   /**
434    * Tells whether or not a table is enabled or not.
435    * @param conf The Configuration object to use.
436    * @param tableName Name of table to check.
437    * @return {@code true} if table is online.
438    * @throws IOException if a remote or network exception occurs
439    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
440    */
441   @Deprecated
442   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
443   throws IOException {
444     return isTableEnabled(conf, TableName.valueOf(tableName));
445   }
446 
447   /**
448    * Tells whether or not a table is enabled or not.
449    * @param conf The Configuration object to use.
450    * @param tableName Name of table to check.
451    * @return {@code true} if table is online.
452    * @throws IOException if a remote or network exception occurs
453    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
454    */
455   @Deprecated
456   public static boolean isTableEnabled(Configuration conf,
457       final TableName tableName) throws IOException {
458     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
459       @Override
460       public Boolean connect(HConnection connection) throws IOException {
461         return connection.isTableEnabled(tableName);
462       }
463     });
464   }
465 
466   /**
467    * Find region location hosting passed row using cached info
468    * @param row Row to find.
469    * @return The location of the given row.
470    * @throws IOException if a remote or network exception occurs
471    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
472    */
473   @Deprecated
474   public HRegionLocation getRegionLocation(final String row)
475   throws IOException {
476     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
477   }
478 
479   /**
480    * {@inheritDoc}
481    */
482   @Override
483   public HRegionLocation getRegionLocation(final byte [] row)
484   throws IOException {
485     return connection.getRegionLocation(tableName, row, false);
486   }
487 
488   /**
489    * {@inheritDoc}
490    */
491   @Override
492   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
493   throws IOException {
494     return connection.getRegionLocation(tableName, row, reload);
495   }
496 
497   /**
498    * {@inheritDoc}
499    */
500   @Override
501   public byte [] getTableName() {
502     return this.tableName.getName();
503   }
504 
505   @Override
506   public TableName getName() {
507     return tableName;
508   }
509 
510   /**
511    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
512    * manipulations.
513    * @return An HConnection instance.
514    * @deprecated This method will be changed from public to package protected.
515    */
516   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
517   @Deprecated
518   @VisibleForTesting
519   public HConnection getConnection() {
520     return this.connection;
521   }
522 
523   /**
524    * Gets the number of rows that a scanner will fetch at once.
525    * <p>
526    * The default value comes from {@code hbase.client.scanner.caching}.
527    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
528    */
529   @Deprecated
530   public int getScannerCaching() {
531     return scannerCaching;
532   }
533 
534   /**
535    * Kept in 0.96 for backward compatibility
536    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
537    */
538   @Deprecated
539   public List<Row> getWriteBuffer() {
540     return writeAsyncBuffer;
541   }
542 
543   /**
544    * Sets the number of rows that a scanner will fetch at once.
545    * <p>
546    * This will override the value specified by
547    * {@code hbase.client.scanner.caching}.
548    * Increasing this value will reduce the amount of work needed each time
549    * {@code next()} is called on a scanner, at the expense of memory use
550    * (since more rows will need to be maintained in memory by the scanners).
551    * @param scannerCaching the number of rows a scanner will fetch at once.
552    * @deprecated Use {@link Scan#setCaching(int)}
553    */
554   @Deprecated
555   public void setScannerCaching(int scannerCaching) {
556     this.scannerCaching = scannerCaching;
557   }
558 
559   /**
560    * {@inheritDoc}
561    */
562   @Override
563   public HTableDescriptor getTableDescriptor() throws IOException {
564     // TODO: This is the same as HBaseAdmin.getTableDescriptor(). Only keep one.
565     if (tableName == null) return null;
566     if (tableName.equals(TableName.META_TABLE_NAME)) {
567       return HTableDescriptor.META_TABLEDESC;
568     }
569     HTableDescriptor htd = executeMasterCallable(
570       new MasterCallable<HTableDescriptor>(getConnection()) {
571       @Override
572       public HTableDescriptor call(int callTimeout) throws ServiceException {
573         GetTableDescriptorsResponse htds;
574         GetTableDescriptorsRequest req =
575             RequestConverter.buildGetTableDescriptorsRequest(tableName);
576         htds = master.getTableDescriptors(null, req);
577 
578         if (!htds.getTableSchemaList().isEmpty()) {
579           return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
580         }
581         return null;
582       }
583     });
584     if (htd != null) {
585       return new UnmodifyableHTableDescriptor(htd);
586     }
587     throw new TableNotFoundException(tableName.getNameAsString());
588   }
589 
590   private <V> V executeMasterCallable(MasterCallable<V> callable) throws IOException {
591     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
592     try {
593       return caller.callWithRetries(callable, operationTimeout);
594     } finally {
595       callable.close();
596     }
597   }
598 
599   /**
600    * {@inheritDoc}
601    */
602   @Override
603   public byte [][] getStartKeys() throws IOException {
604     return getStartEndKeys().getFirst();
605   }
606 
607   /**
608    * {@inheritDoc}
609    */
610   @Override
611   public byte[][] getEndKeys() throws IOException {
612     return getStartEndKeys().getSecond();
613   }
614 
615   /**
616    * {@inheritDoc}
617    */
618   @Override
619   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
620 
621     List<RegionLocations> regions = listRegionLocations();
622     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
623     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
624 
625     for (RegionLocations locations : regions) {
626       HRegionInfo region = locations.getRegionLocation().getRegionInfo();
627       startKeyList.add(region.getStartKey());
628       endKeyList.add(region.getEndKey());
629     }
630 
631     return new Pair<byte [][], byte [][]>(
632       startKeyList.toArray(new byte[startKeyList.size()][]),
633       endKeyList.toArray(new byte[endKeyList.size()][]));
634   }
635 
636   @VisibleForTesting
637   List<RegionLocations> listRegionLocations() throws IOException {
638     return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName());
639   }
640 
641   /**
642    * Gets all the regions and their address for this table.
643    * <p>
644    * This is mainly useful for the MapReduce integration.
645    * @return A map of HRegionInfo with it's server address
646    * @throws IOException if a remote or network exception occurs
647    * @deprecated This is no longer a public API.  Use {@link #getAllRegionLocations()} instead.
648    */
649   @Deprecated
650   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
651     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an HRegionLocation.
652     return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName());
653   }
654 
655   /**
656    * Gets all the regions and their address for this table.
657    * <p>
658    * This is mainly useful for the MapReduce integration.
659    * @return A map of HRegionInfo with it's server address
660    * @throws IOException if a remote or network exception occurs
661    */
662   @Override
663   public List<HRegionLocation> getAllRegionLocations() throws IOException {
664     NavigableMap<HRegionInfo, ServerName> locations = getRegionLocations();
665     ArrayList<HRegionLocation> regions = new ArrayList<>(locations.size());
666     for (Entry<HRegionInfo, ServerName> entry : locations.entrySet()) {
667       regions.add(new HRegionLocation(entry.getKey(), entry.getValue()));
668     }
669     return regions;
670   }
671 
672   /**
673    * Get the corresponding regions for an arbitrary range of keys.
674    * <p>
675    * @param startKey Starting row in range, inclusive
676    * @param endKey Ending row in range, exclusive
677    * @return A list of HRegionLocations corresponding to the regions that
678    * contain the specified range
679    * @throws IOException if a remote or network exception occurs
680    * @deprecated This is no longer a public API
681    */
682   @Deprecated
683   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
684     final byte [] endKey) throws IOException {
685     return getRegionsInRange(startKey, endKey, false);
686   }
687 
688   /**
689    * Get the corresponding regions for an arbitrary range of keys.
690    * <p>
691    * @param startKey Starting row in range, inclusive
692    * @param endKey Ending row in range, exclusive
693    * @param reload true to reload information or false to use cached information
694    * @return A list of HRegionLocations corresponding to the regions that
695    * contain the specified range
696    * @throws IOException if a remote or network exception occurs
697    * @deprecated This is no longer a public API
698    */
699   @Deprecated
700   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
701       final byte [] endKey, final boolean reload) throws IOException {
702     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
703   }
704 
705   /**
706    * Get the corresponding start keys and regions for an arbitrary range of
707    * keys.
708    * <p>
709    * @param startKey Starting row in range, inclusive
710    * @param endKey Ending row in range
711    * @param includeEndKey true if endRow is inclusive, false if exclusive
712    * @return A pair of list of start keys and list of HRegionLocations that
713    *         contain the specified range
714    * @throws IOException if a remote or network exception occurs
715    * @deprecated This is no longer a public API
716    */
717   @Deprecated
718   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
719       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
720       throws IOException {
721     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
722   }
723 
724   /**
725    * Get the corresponding start keys and regions for an arbitrary range of
726    * keys.
727    * <p>
728    * @param startKey Starting row in range, inclusive
729    * @param endKey Ending row in range
730    * @param includeEndKey true if endRow is inclusive, false if exclusive
731    * @param reload true to reload information or false to use cached information
732    * @return A pair of list of start keys and list of HRegionLocations that
733    *         contain the specified range
734    * @throws IOException if a remote or network exception occurs
735    * @deprecated This is no longer a public API
736    */
737   @Deprecated
738   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
739       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
740       final boolean reload) throws IOException {
741     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
742     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
743       throw new IllegalArgumentException(
744         "Invalid range: " + Bytes.toStringBinary(startKey) +
745         " > " + Bytes.toStringBinary(endKey));
746     }
747     List<byte[]> keysInRange = new ArrayList<byte[]>();
748     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
749     byte[] currentKey = startKey;
750     do {
751       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
752       keysInRange.add(currentKey);
753       regionsInRange.add(regionLocation);
754       currentKey = regionLocation.getRegionInfo().getEndKey();
755     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
756         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
757             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
758     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
759         regionsInRange);
760   }
761 
762   /**
763    * {@inheritDoc}
764    * @deprecated Use reversed scan instead.
765    */
766    @Override
767    @Deprecated
768    public Result getRowOrBefore(final byte[] row, final byte[] family)
769        throws IOException {
770      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
771          tableName, row) {
772        @Override
773       public Result call(int callTimeout) throws IOException {
774          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
775          controller.setPriority(tableName);
776          controller.setCallTimeout(callTimeout);
777          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
778              getLocation().getRegionInfo().getRegionName(), row, family);
779          try {
780            ClientProtos.GetResponse response = getStub().get(controller, request);
781            if (!response.hasResult()) return null;
782            return ProtobufUtil.toResult(response.getResult());
783          } catch (ServiceException se) {
784            throw ProtobufUtil.getRemoteException(se);
785          }
786        }
787      };
788      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
789    }
790 
791   /**
792    * The underlying {@link HTable} must not be closed.
793    * {@link HTableInterface#getScanner(Scan)} has other usage details.
794    */
795   @Override
796   public ResultScanner getScanner(final Scan scan) throws IOException {
797     if (scan.getCaching() <= 0) {
798       scan.setCaching(getScannerCaching());
799     }
800 
801     if (scan.isReversed()) {
802       if (scan.isSmall()) {
803         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
804             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
805             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
806       } else {
807         return new ReversedClientScanner(getConfiguration(), scan, getName(),
808             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
809             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
810       }
811     }
812 
813     if (scan.isSmall()) {
814       return new ClientSmallScanner(getConfiguration(), scan, getName(),
815           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
816           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
817     } else {
818       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
819           this.rpcCallerFactory, this.rpcControllerFactory,
820           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
821     }
822   }
823 
824   /**
825    * The underlying {@link HTable} must not be closed.
826    * {@link HTableInterface#getScanner(byte[])} has other usage details.
827    */
828   @Override
829   public ResultScanner getScanner(byte [] family) throws IOException {
830     Scan scan = new Scan();
831     scan.addFamily(family);
832     return getScanner(scan);
833   }
834 
835   /**
836    * The underlying {@link HTable} must not be closed.
837    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
838    */
839   @Override
840   public ResultScanner getScanner(byte [] family, byte [] qualifier)
841   throws IOException {
842     Scan scan = new Scan();
843     scan.addColumn(family, qualifier);
844     return getScanner(scan);
845   }
846 
847   /**
848    * {@inheritDoc}
849    */
850   @Override
851   public Result get(final Get get) throws IOException {
852     if (get.getConsistency() == null){
853       get.setConsistency(defaultConsistency);
854     }
855 
856     if (get.getConsistency() == Consistency.STRONG) {
857       // Good old call.
858       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
859           getName(), get.getRow()) {
860         @Override
861         public Result call(int callTimeout) throws IOException {
862           ClientProtos.GetRequest request =
863               RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
864           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
865           controller.setPriority(tableName);
866           controller.setCallTimeout(callTimeout);
867           try {
868             ClientProtos.GetResponse response = getStub().get(controller, request);
869             if (response == null) return null;
870             return ProtobufUtil.toResult(response.getResult());
871           } catch (ServiceException se) {
872             throw ProtobufUtil.getRemoteException(se);
873           }
874         }
875       };
876       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
877     }
878 
879     // Call that takes into account the replica
880     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
881       rpcControllerFactory, tableName, this.connection, get, pool,
882       tableConfiguration.getRetriesNumber(),
883       operationTimeout,
884       tableConfiguration.getPrimaryCallTimeoutMicroSecond());
885     return callable.call();
886   }
887 
888 
889   /**
890    * {@inheritDoc}
891    */
892   @Override
893   public Result[] get(List<Get> gets) throws IOException {
894     if (gets.size() == 1) {
895       return new Result[]{get(gets.get(0))};
896     }
897     try {
898       Object [] r1 = batch((List)gets);
899 
900       // translate.
901       Result [] results = new Result[r1.length];
902       int i=0;
903       for (Object o : r1) {
904         // batch ensures if there is a failure we get an exception instead
905         results[i++] = (Result) o;
906       }
907 
908       return results;
909     } catch (InterruptedException e) {
910       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
911     }
912   }
913 
914   /**
915    * {@inheritDoc}
916    */
917   @Override
918   public void batch(final List<? extends Row> actions, final Object[] results)
919       throws InterruptedException, IOException {
920     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
921     ars.waitUntilDone();
922     if (ars.hasError()) {
923       throw ars.getErrors();
924     }
925   }
926 
927   /**
928    * {@inheritDoc}
929    * @deprecated If any exception is thrown by one of the actions, there is no way to
930    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
931    */
932   @Deprecated
933   @Override
934   public Object[] batch(final List<? extends Row> actions)
935      throws InterruptedException, IOException {
936     Object[] results = new Object[actions.size()];
937     batch(actions, results);
938     return results;
939   }
940 
941   /**
942    * {@inheritDoc}
943    */
944   @Override
945   public <R> void batchCallback(
946       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
947       throws IOException, InterruptedException {
948     connection.processBatchCallback(actions, tableName, pool, results, callback);
949   }
950 
951   /**
952    * {@inheritDoc}
953    * @deprecated If any exception is thrown by one of the actions, there is no way to
954    * retrieve the partially executed results. Use
955    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
956    * instead.
957    */
958   @Deprecated
959   @Override
960   public <R> Object[] batchCallback(
961     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
962       InterruptedException {
963     Object[] results = new Object[actions.size()];
964     batchCallback(actions, results, callback);
965     return results;
966   }
967 
968   /**
969    * {@inheritDoc}
970    */
971   @Override
972   public void delete(final Delete delete)
973   throws IOException {
974     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
975         tableName, delete.getRow()) {
976       @Override
977       public Boolean call(int callTimeout) throws IOException {
978         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
979         controller.setPriority(tableName);
980         controller.setCallTimeout(callTimeout);
981 
982         try {
983           MutateRequest request = RequestConverter.buildMutateRequest(
984             getLocation().getRegionInfo().getRegionName(), delete);
985           MutateResponse response = getStub().mutate(controller, request);
986           return Boolean.valueOf(response.getProcessed());
987         } catch (ServiceException se) {
988           throw ProtobufUtil.getRemoteException(se);
989         }
990       }
991     };
992     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
993   }
994 
995   /**
996    * {@inheritDoc}
997    */
998   @Override
999   public void delete(final List<Delete> deletes)
1000   throws IOException {
1001     Object[] results = new Object[deletes.size()];
1002     try {
1003       batch(deletes, results);
1004     } catch (InterruptedException e) {
1005       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1006     } finally {
1007       // mutate list so that it is empty for complete success, or contains only failed records
1008       // results are returned in the same order as the requests in list
1009       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
1010       for (int i = results.length - 1; i>=0; i--) {
1011         // if result is not null, it succeeded
1012         if (results[i] instanceof Result) {
1013           deletes.remove(i);
1014         }
1015       }
1016     }
1017   }
1018 
1019   /**
1020    * {@inheritDoc}
1021    */
1022   @Override
1023   public void put(final Put put)
1024       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1025     doPut(put);
1026     if (autoFlush) {
1027       flushCommits();
1028     }
1029   }
1030 
1031   /**
1032    * {@inheritDoc}
1033    */
1034   @Override
1035   public void put(final List<Put> puts)
1036       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1037     for (Put put : puts) {
1038       doPut(put);
1039     }
1040     if (autoFlush) {
1041       flushCommits();
1042     }
1043   }
1044 
1045 
1046   /**
1047    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
1048    *  cluster.
1049    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
1050    * @throws InterruptedIOException if we were interrupted.
1051    */
1052   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1053     // This behavior is highly non-intuitive... it does not protect us against
1054     // 94-incompatible behavior, which is a timing issue because hasError, the below code
1055     // and setter of hasError are not synchronized. Perhaps it should be removed.
1056     if (ap.hasError()) {
1057       writeAsyncBuffer.add(put);
1058       backgroundFlushCommits(true);
1059     }
1060 
1061     validatePut(put);
1062 
1063     currentWriteBufferSize += put.heapSize();
1064     writeAsyncBuffer.add(put);
1065 
1066     while (currentWriteBufferSize > writeBufferSize) {
1067       backgroundFlushCommits(false);
1068     }
1069   }
1070 
1071 
1072   /**
1073    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
1074    * If the is an error (max retried reach from a previous flush or bad operation), it tries to
1075    * send all operations in the buffer and sends an exception.
1076    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
1077    *                     returning.
1078    */
1079   private void backgroundFlushCommits(boolean synchronous) throws
1080       InterruptedIOException, RetriesExhaustedWithDetailsException {
1081 
1082     try {
1083       if (!synchronous) {
1084         ap.submit(tableName, writeAsyncBuffer, true, null, false);
1085         if (ap.hasError()) {
1086           LOG.debug(tableName + ": One or more of the operations have failed -" +
1087               " waiting for all operation in progress to finish (successfully or not)");
1088         }
1089       }
1090       if (synchronous || ap.hasError()) {
1091         while (!writeAsyncBuffer.isEmpty()) {
1092           ap.submit(tableName, writeAsyncBuffer, true, null, false);
1093         }
1094         RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
1095         if (error != null) {
1096           throw error;
1097         }
1098       }
1099     } finally {
1100       currentWriteBufferSize = 0;
1101       for (Row mut : writeAsyncBuffer) {
1102         if (mut instanceof Mutation) {
1103           currentWriteBufferSize += ((Mutation) mut).heapSize();
1104         }
1105       }
1106     }
1107   }
1108 
1109   /**
1110    * {@inheritDoc}
1111    */
1112   @Override
1113   public void mutateRow(final RowMutations rm) throws IOException {
1114     RegionServerCallable<Void> callable =
1115         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1116       @Override
1117       public Void call(int callTimeout) throws IOException {
1118         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1119         controller.setPriority(tableName);
1120         controller.setCallTimeout(callTimeout);
1121         try {
1122           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1123             getLocation().getRegionInfo().getRegionName(), rm);
1124           regionMutationBuilder.setAtomic(true);
1125           MultiRequest request =
1126             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1127           getStub().multi(controller, request);
1128         } catch (ServiceException se) {
1129           throw ProtobufUtil.getRemoteException(se);
1130         }
1131         return null;
1132       }
1133     };
1134     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1135   }
1136 
1137   /**
1138    * {@inheritDoc}
1139    */
1140   @Override
1141   public Result append(final Append append) throws IOException {
1142     if (append.numFamilies() == 0) {
1143       throw new IOException(
1144           "Invalid arguments to append, no columns specified");
1145     }
1146 
1147     NonceGenerator ng = this.connection.getNonceGenerator();
1148     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1149     RegionServerCallable<Result> callable =
1150       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1151         @Override
1152         public Result call(int callTimeout) throws IOException {
1153           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1154           controller.setPriority(getTableName());
1155           controller.setCallTimeout(callTimeout);
1156           try {
1157             MutateRequest request = RequestConverter.buildMutateRequest(
1158               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1159             MutateResponse response = getStub().mutate(controller, request);
1160             if (!response.hasResult()) return null;
1161             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1162           } catch (ServiceException se) {
1163             throw ProtobufUtil.getRemoteException(se);
1164           }
1165         }
1166       };
1167     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1168   }
1169 
1170   /**
1171    * {@inheritDoc}
1172    */
1173   @Override
1174   public Result increment(final Increment increment) throws IOException {
1175     if (!increment.hasFamilies()) {
1176       throw new IOException(
1177           "Invalid arguments to increment, no columns specified");
1178     }
1179     NonceGenerator ng = this.connection.getNonceGenerator();
1180     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1181     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1182         getName(), increment.getRow()) {
1183       @Override
1184       public Result call(int callTimeout) throws IOException {
1185         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1186         controller.setPriority(getTableName());
1187         controller.setCallTimeout(callTimeout);
1188         try {
1189           MutateRequest request = RequestConverter.buildMutateRequest(
1190             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1191           MutateResponse response = getStub().mutate(controller, request);
1192           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1193         } catch (ServiceException se) {
1194           throw ProtobufUtil.getRemoteException(se);
1195         }
1196       }
1197     };
1198     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1199   }
1200 
1201   /**
1202    * {@inheritDoc}
1203    */
1204   @Override
1205   public long incrementColumnValue(final byte [] row, final byte [] family,
1206       final byte [] qualifier, final long amount)
1207   throws IOException {
1208     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1209   }
1210 
1211   /**
1212    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1213    */
1214   @Deprecated
1215   @Override
1216   public long incrementColumnValue(final byte [] row, final byte [] family,
1217       final byte [] qualifier, final long amount, final boolean writeToWAL)
1218   throws IOException {
1219     return incrementColumnValue(row, family, qualifier, amount,
1220       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1221   }
1222 
1223   /**
1224    * {@inheritDoc}
1225    */
1226   @Override
1227   public long incrementColumnValue(final byte [] row, final byte [] family,
1228       final byte [] qualifier, final long amount, final Durability durability)
1229   throws IOException {
1230     NullPointerException npe = null;
1231     if (row == null) {
1232       npe = new NullPointerException("row is null");
1233     } else if (family == null) {
1234       npe = new NullPointerException("family is null");
1235     } else if (qualifier == null) {
1236       npe = new NullPointerException("qualifier is null");
1237     }
1238     if (npe != null) {
1239       throw new IOException(
1240           "Invalid arguments to incrementColumnValue", npe);
1241     }
1242 
1243     NonceGenerator ng = this.connection.getNonceGenerator();
1244     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1245     RegionServerCallable<Long> callable =
1246       new RegionServerCallable<Long>(connection, getName(), row) {
1247         @Override
1248         public Long call(int callTimeout) throws IOException {
1249           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1250           controller.setPriority(getTableName());
1251           controller.setCallTimeout(callTimeout);
1252           try {
1253             MutateRequest request = RequestConverter.buildIncrementRequest(
1254               getLocation().getRegionInfo().getRegionName(), row, family,
1255               qualifier, amount, durability, nonceGroup, nonce);
1256             MutateResponse response = getStub().mutate(controller, request);
1257             Result result =
1258               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1259             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1260           } catch (ServiceException se) {
1261             throw ProtobufUtil.getRemoteException(se);
1262           }
1263         }
1264       };
1265     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1266   }
1267 
1268   /**
1269    * {@inheritDoc}
1270    */
1271   @Override
1272   public boolean checkAndPut(final byte [] row,
1273       final byte [] family, final byte [] qualifier, final byte [] value,
1274       final Put put)
1275   throws IOException {
1276     RegionServerCallable<Boolean> callable =
1277       new RegionServerCallable<Boolean>(connection, getName(), row) {
1278         @Override
1279         public Boolean call(int callTimeout) throws IOException {
1280           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1281           controller.setPriority(tableName);
1282           controller.setCallTimeout(callTimeout);
1283           try {
1284             MutateRequest request = RequestConverter.buildMutateRequest(
1285               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1286                 new BinaryComparator(value), CompareType.EQUAL, put);
1287             MutateResponse response = getStub().mutate(controller, request);
1288             return Boolean.valueOf(response.getProcessed());
1289           } catch (ServiceException se) {
1290             throw ProtobufUtil.getRemoteException(se);
1291           }
1292         }
1293       };
1294     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1295   }
1296 
1297   /**
1298    * {@inheritDoc}
1299    */
1300   @Override
1301   public boolean checkAndPut(final byte [] row, final byte [] family,
1302       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1303       final Put put)
1304   throws IOException {
1305     RegionServerCallable<Boolean> callable =
1306       new RegionServerCallable<Boolean>(connection, getName(), row) {
1307         @Override
1308         public Boolean call(int callTimeout) throws IOException {
1309           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1310           controller.setPriority(tableName);
1311           controller.setCallTimeout(callTimeout);
1312           try {
1313             CompareType compareType = CompareType.valueOf(compareOp.name());
1314             MutateRequest request = RequestConverter.buildMutateRequest(
1315               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1316                 new BinaryComparator(value), compareType, put);
1317             MutateResponse response = getStub().mutate(controller, request);
1318             return Boolean.valueOf(response.getProcessed());
1319           } catch (ServiceException se) {
1320             throw ProtobufUtil.getRemoteException(se);
1321           }
1322         }
1323       };
1324     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1325   }
1326 
1327   /**
1328    * {@inheritDoc}
1329    */
1330   @Override
1331   public boolean checkAndDelete(final byte [] row,
1332       final byte [] family, final byte [] qualifier, final byte [] value,
1333       final Delete delete)
1334   throws IOException {
1335     RegionServerCallable<Boolean> callable =
1336       new RegionServerCallable<Boolean>(connection, getName(), row) {
1337         @Override
1338         public Boolean call(int callTimeout) throws IOException {
1339           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1340           controller.setPriority(tableName);
1341           controller.setCallTimeout(callTimeout);
1342           try {
1343             MutateRequest request = RequestConverter.buildMutateRequest(
1344               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1345                 new BinaryComparator(value), CompareType.EQUAL, delete);
1346             MutateResponse response = getStub().mutate(controller, request);
1347             return Boolean.valueOf(response.getProcessed());
1348           } catch (ServiceException se) {
1349             throw ProtobufUtil.getRemoteException(se);
1350           }
1351         }
1352       };
1353     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1354   }
1355 
1356   /**
1357    * {@inheritDoc}
1358    */
1359   @Override
1360   public boolean checkAndDelete(final byte [] row, final byte [] family,
1361       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1362       final Delete delete)
1363   throws IOException {
1364     RegionServerCallable<Boolean> callable =
1365       new RegionServerCallable<Boolean>(connection, getName(), row) {
1366         @Override
1367         public Boolean call(int callTimeout) throws IOException {
1368           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1369           controller.setPriority(tableName);
1370           controller.setCallTimeout(callTimeout);
1371           try {
1372             CompareType compareType = CompareType.valueOf(compareOp.name());
1373             MutateRequest request = RequestConverter.buildMutateRequest(
1374               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1375                 new BinaryComparator(value), compareType, delete);
1376             MutateResponse response = getStub().mutate(controller, request);
1377             return Boolean.valueOf(response.getProcessed());
1378           } catch (ServiceException se) {
1379             throw ProtobufUtil.getRemoteException(se);
1380           }
1381         }
1382       };
1383     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1384   }
1385 
1386   /**
1387    * {@inheritDoc}
1388    */
1389   @Override
1390   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1391       final CompareOp compareOp, final byte [] value, final RowMutations rm)
1392   throws IOException {
1393     RegionServerCallable<Boolean> callable =
1394         new RegionServerCallable<Boolean>(connection, getName(), row) {
1395           @Override
1396           public Boolean call(int callTimeout) throws IOException {
1397             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1398             controller.setPriority(tableName);
1399             controller.setCallTimeout(callTimeout);
1400             try {
1401               CompareType compareType = CompareType.valueOf(compareOp.name());
1402               MultiRequest request = RequestConverter.buildMutateRequest(
1403                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1404                   new BinaryComparator(value), compareType, rm);
1405               ClientProtos.MultiResponse response = getStub().multi(controller, request);
1406               return Boolean.valueOf(response.getProcessed());
1407             } catch (ServiceException se) {
1408               throw ProtobufUtil.getRemoteException(se);
1409             }
1410           }
1411         };
1412     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1413   }
1414 
1415   /**
1416    * {@inheritDoc}
1417    */
1418   @Override
1419   public boolean exists(final Get get) throws IOException {
1420     get.setCheckExistenceOnly(true);
1421     Result r = get(get);
1422     assert r.getExists() != null;
1423     return r.getExists();
1424   }
1425 
1426   /**
1427    * {@inheritDoc}
1428    */
1429   @Override
1430   public boolean[] existsAll(final List<Get> gets) throws IOException {
1431     if (gets.isEmpty()) return new boolean[]{};
1432     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1433 
1434     for (Get g: gets){
1435       g.setCheckExistenceOnly(true);
1436     }
1437 
1438     Object[] r1;
1439     try {
1440       r1 = batch(gets);
1441     } catch (InterruptedException e) {
1442       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1443     }
1444 
1445     // translate.
1446     boolean[] results = new boolean[r1.length];
1447     int i = 0;
1448     for (Object o : r1) {
1449       // batch ensures if there is a failure we get an exception instead
1450       results[i++] = ((Result)o).getExists();
1451     }
1452 
1453     return results;
1454   }
1455 
1456   /**
1457    * {@inheritDoc}
1458    */
1459   @Override
1460   @Deprecated
1461   public Boolean[] exists(final List<Get> gets) throws IOException {
1462     boolean[] results = existsAll(gets);
1463     Boolean[] objectResults = new Boolean[results.length];
1464     for (int i = 0; i < results.length; ++i) {
1465       objectResults[i] = results[i];
1466     }
1467     return objectResults;
1468   }
1469 
1470   /**
1471    * {@inheritDoc}
1472    */
1473   @Override
1474   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1475     // As we can have an operation in progress even if the buffer is empty, we call
1476     //  backgroundFlushCommits at least one time.
1477     backgroundFlushCommits(true);
1478   }
1479 
1480   /**
1481    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1482    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1483    *
1484    * @param list The collection of actions.
1485    * @param results An empty array, same size as list. If an exception is thrown,
1486    * you can test here for partial results, and to determine which actions
1487    * processed successfully.
1488    * @throws IOException if there are problems talking to META. Per-item
1489    * exceptions are stored in the results array.
1490    */
1491   public <R> void processBatchCallback(
1492     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1493     throws IOException, InterruptedException {
1494     this.batchCallback(list, results, callback);
1495   }
1496 
1497 
1498   /**
1499    * Parameterized batch processing, allowing varying return types for different
1500    * {@link Row} implementations.
1501    */
1502   public void processBatch(final List<? extends Row> list, final Object[] results)
1503     throws IOException, InterruptedException {
1504     this.batch(list, results);
1505   }
1506 
1507 
1508   @Override
1509   public void close() throws IOException {
1510     if (this.closed) {
1511       return;
1512     }
1513     flushCommits();
1514     if (cleanupPoolOnClose) {
1515       this.pool.shutdown();
1516       try {
1517         boolean terminated = false;
1518         do {
1519           // wait until the pool has terminated
1520           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1521         } while (!terminated);
1522       } catch (InterruptedException e) {
1523         LOG.warn("waitForTermination interrupted");
1524       }
1525     }
1526     if (cleanupConnectionOnClose) {
1527       if (this.connection != null) {
1528         this.connection.close();
1529       }
1530     }
1531     this.closed = true;
1532   }
1533 
1534   // validate for well-formedness
1535   public void validatePut(final Put put) throws IllegalArgumentException {
1536     validatePut(put, tableConfiguration.getMaxKeyValueSize());
1537   }
1538 
1539   // validate for well-formedness
1540   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1541     if (put.isEmpty()) {
1542       throw new IllegalArgumentException("No columns to insert");
1543     }
1544     if (maxKeyValueSize > 0) {
1545       for (List<Cell> list : put.getFamilyCellMap().values()) {
1546         for (Cell cell : list) {
1547           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1548             throw new IllegalArgumentException("KeyValue size too large");
1549           }
1550         }
1551       }
1552     }
1553   }
1554 
1555   /**
1556    * {@inheritDoc}
1557    */
1558   @Override
1559   public boolean isAutoFlush() {
1560     return autoFlush;
1561   }
1562 
1563   /**
1564    * {@inheritDoc}
1565    */
1566   @Deprecated
1567   @Override
1568   public void setAutoFlush(boolean autoFlush) {
1569     this.autoFlush = autoFlush;
1570   }
1571 
1572   /**
1573    * {@inheritDoc}
1574    */
1575   @Override
1576   public void setAutoFlushTo(boolean autoFlush) {
1577     this.autoFlush = autoFlush;
1578   }
1579 
1580   /**
1581    * {@inheritDoc}
1582    */
1583   @Override
1584   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1585     this.autoFlush = autoFlush;
1586   }
1587 
1588   /**
1589    * Returns the maximum size in bytes of the write buffer for this HTable.
1590    * <p>
1591    * The default value comes from the configuration parameter
1592    * {@code hbase.client.write.buffer}.
1593    * @return The size of the write buffer in bytes.
1594    */
1595   @Override
1596   public long getWriteBufferSize() {
1597     return writeBufferSize;
1598   }
1599 
1600   /**
1601    * Sets the size of the buffer in bytes.
1602    * <p>
1603    * If the new size is less than the current amount of data in the
1604    * write buffer, the buffer gets flushed.
1605    * @param writeBufferSize The new write buffer size, in bytes.
1606    * @throws IOException if a remote or network exception occurs.
1607    */
1608   @Override
1609   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1610     this.writeBufferSize = writeBufferSize;
1611     if(currentWriteBufferSize > writeBufferSize) {
1612       flushCommits();
1613     }
1614   }
1615 
1616   /**
1617    * The pool is used for mutli requests for this HTable
1618    * @return the pool used for mutli
1619    */
1620   ExecutorService getPool() {
1621     return this.pool;
1622   }
1623 
1624   /**
1625    * Enable or disable region cache prefetch for the table. It will be
1626    * applied for the given table's all HTable instances who share the same
1627    * connection. By default, the cache prefetch is enabled.
1628    * @param tableName name of table to configure.
1629    * @param enable Set to true to enable region cache prefetch. Or set to
1630    * false to disable it.
1631    * @throws IOException
1632    * @deprecated does nothing since 0.99
1633    */
1634   @Deprecated
1635   public static void setRegionCachePrefetch(final byte[] tableName,
1636       final boolean enable)  throws IOException {
1637   }
1638 
1639   /**
1640    * @deprecated does nothing since 0.99
1641    */
1642   @Deprecated
1643   public static void setRegionCachePrefetch(
1644       final TableName tableName,
1645       final boolean enable) throws IOException {
1646   }
1647 
1648   /**
1649    * Enable or disable region cache prefetch for the table. It will be
1650    * applied for the given table's all HTable instances who share the same
1651    * connection. By default, the cache prefetch is enabled.
1652    * @param conf The Configuration object to use.
1653    * @param tableName name of table to configure.
1654    * @param enable Set to true to enable region cache prefetch. Or set to
1655    * false to disable it.
1656    * @throws IOException
1657    * @deprecated does nothing since 0.99
1658    */
1659   @Deprecated
1660   public static void setRegionCachePrefetch(final Configuration conf,
1661       final byte[] tableName, final boolean enable) throws IOException {
1662   }
1663 
1664   /**
1665    * @deprecated does nothing since 0.99
1666    */
1667   @Deprecated
1668   public static void setRegionCachePrefetch(final Configuration conf,
1669       final TableName tableName,
1670       final boolean enable) throws IOException {
1671   }
1672 
1673   /**
1674    * Check whether region cache prefetch is enabled or not for the table.
1675    * @param conf The Configuration object to use.
1676    * @param tableName name of table to check
1677    * @return true if table's region cache prefecth is enabled. Otherwise
1678    * it is disabled.
1679    * @throws IOException
1680    * @deprecated always return false since 0.99
1681    */
1682   @Deprecated
1683   public static boolean getRegionCachePrefetch(final Configuration conf,
1684       final byte[] tableName) throws IOException {
1685     return false;
1686   }
1687 
1688   /**
1689    * @deprecated always return false since 0.99
1690    */
1691   @Deprecated
1692   public static boolean getRegionCachePrefetch(final Configuration conf,
1693       final TableName tableName) throws IOException {
1694     return false;
1695   }
1696 
1697   /**
1698    * Check whether region cache prefetch is enabled or not for the table.
1699    * @param tableName name of table to check
1700    * @return true if table's region cache prefecth is enabled. Otherwise
1701    * it is disabled.
1702    * @throws IOException
1703    * @deprecated always return false since 0.99
1704    */
1705   @Deprecated
1706   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1707     return false;
1708   }
1709 
1710   /**
1711    * @deprecated always return false since 0.99
1712    */
1713   @Deprecated
1714   public static boolean getRegionCachePrefetch(
1715       final TableName tableName) throws IOException {
1716     return false;
1717   }
1718 
1719   /**
1720    * Explicitly clears the region cache to fetch the latest value from META.
1721    * This is a power user function: avoid unless you know the ramifications.
1722    */
1723   public void clearRegionCache() {
1724     this.connection.clearRegionCache();
1725   }
1726 
1727   /**
1728    * {@inheritDoc}
1729    */
1730   @Override
1731   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1732     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1733   }
1734 
1735   /**
1736    * {@inheritDoc}
1737    */
1738   @Override
1739   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1740       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1741       throws ServiceException, Throwable {
1742     final Map<byte[],R> results =  Collections.synchronizedMap(
1743         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1744     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1745       @Override
1746       public void update(byte[] region, byte[] row, R value) {
1747         if (region != null) {
1748           results.put(region, value);
1749         }
1750       }
1751     });
1752     return results;
1753   }
1754 
1755   /**
1756    * {@inheritDoc}
1757    */
1758   @Override
1759   public <T extends Service, R> void coprocessorService(final Class<T> service,
1760       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1761       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1762 
1763     // get regions covered by the row range
1764     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1765 
1766     Map<byte[],Future<R>> futures =
1767         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1768     for (final byte[] r : keys) {
1769       final RegionCoprocessorRpcChannel channel =
1770           new RegionCoprocessorRpcChannel(connection, tableName, r);
1771       Future<R> future = pool.submit(
1772           new Callable<R>() {
1773             @Override
1774             public R call() throws Exception {
1775               T instance = ProtobufUtil.newServiceStub(service, channel);
1776               R result = callable.call(instance);
1777               byte[] region = channel.getLastRegion();
1778               if (callback != null) {
1779                 callback.update(region, r, result);
1780               }
1781               return result;
1782             }
1783           });
1784       futures.put(r, future);
1785     }
1786     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1787       try {
1788         e.getValue().get();
1789       } catch (ExecutionException ee) {
1790         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1791             + Bytes.toStringBinary(e.getKey()), ee);
1792         throw ee.getCause();
1793       } catch (InterruptedException ie) {
1794         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1795             + " for row " + Bytes.toStringBinary(e.getKey()))
1796             .initCause(ie);
1797       }
1798     }
1799   }
1800 
1801   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1802   throws IOException {
1803     if (start == null) {
1804       start = HConstants.EMPTY_START_ROW;
1805     }
1806     if (end == null) {
1807       end = HConstants.EMPTY_END_ROW;
1808     }
1809     return getKeysAndRegionsInRange(start, end, true).getFirst();
1810   }
1811 
1812   public void setOperationTimeout(int operationTimeout) {
1813     this.operationTimeout = operationTimeout;
1814   }
1815 
1816   public int getOperationTimeout() {
1817     return operationTimeout;
1818   }
1819 
1820   @Override
1821   public String toString() {
1822     return tableName + ";" + connection;
1823   }
1824 
1825   /**
1826    * {@inheritDoc}
1827    */
1828   @Override
1829   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1830       Descriptors.MethodDescriptor methodDescriptor, Message request,
1831       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1832     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1833         Bytes.BYTES_COMPARATOR));
1834     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1835         new Callback<R>() {
1836 
1837           @Override
1838           public void update(byte[] region, byte[] row, R result) {
1839             if (region != null) {
1840               results.put(region, result);
1841             }
1842           }
1843         });
1844     return results;
1845   }
1846 
1847   /**
1848    * {@inheritDoc}
1849    */
1850   @Override
1851   public <R extends Message> void batchCoprocessorService(
1852       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1853       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1854       throws ServiceException, Throwable {
1855 
1856     // get regions covered by the row range
1857     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1858         getKeysAndRegionsInRange(startKey, endKey, true);
1859     List<byte[]> keys = keysAndRegions.getFirst();
1860     List<HRegionLocation> regions = keysAndRegions.getSecond();
1861 
1862     // check if we have any calls to make
1863     if (keys.isEmpty()) {
1864       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1865           ", end=" + Bytes.toStringBinary(endKey));
1866       return;
1867     }
1868 
1869     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1870     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1871         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1872     for (int i = 0; i < keys.size(); i++) {
1873       final byte[] rowKey = keys.get(i);
1874       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1875       RegionCoprocessorServiceExec exec =
1876           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1877       execs.add(exec);
1878       execsByRow.put(rowKey, exec);
1879     }
1880 
1881     // tracking for any possible deserialization errors on success callback
1882     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1883     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1884     final List<Row> callbackErrorActions = new ArrayList<Row>();
1885     final List<String> callbackErrorServers = new ArrayList<String>();
1886     Object[] results = new Object[execs.size()];
1887 
1888     AsyncProcess asyncProcess =
1889         new AsyncProcess(connection, configuration, pool,
1890             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1891             true, RpcControllerFactory.instantiate(configuration));
1892 
1893     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1894         new Callback<ClientProtos.CoprocessorServiceResult>() {
1895           @Override
1896           public void update(byte[] region, byte[] row,
1897                               ClientProtos.CoprocessorServiceResult serviceResult) {
1898             if (LOG.isTraceEnabled()) {
1899               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1900                   ": region=" + Bytes.toStringBinary(region) +
1901                   ", row=" + Bytes.toStringBinary(row) +
1902                   ", value=" + serviceResult.getValue().getValue());
1903             }
1904             try {
1905               callback.update(region, row,
1906                   (R) responsePrototype.newBuilderForType().mergeFrom(
1907                       serviceResult.getValue().getValue()).build());
1908             } catch (InvalidProtocolBufferException e) {
1909               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1910                   e);
1911               callbackErrorExceptions.add(e);
1912               callbackErrorActions.add(execsByRow.get(row));
1913               callbackErrorServers.add("null");
1914             }
1915           }
1916         }, results);
1917 
1918     future.waitUntilDone();
1919 
1920     if (future.hasError()) {
1921       throw future.getErrors();
1922     } else if (!callbackErrorExceptions.isEmpty()) {
1923       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1924           callbackErrorServers);
1925     }
1926   }
1927 }