View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  import java.util.TreeMap;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.SynchronousQueue;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HRegionLocation;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.KeyValueUtil;
47  import org.apache.hadoop.hbase.MetaTableAccessor;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.TableNotFoundException;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.classification.InterfaceStability;
53  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
54  import org.apache.hadoop.hbase.client.coprocessor.Batch;
55  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
56  import org.apache.hadoop.hbase.filter.BinaryComparator;
57  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
58  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
59  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
60  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
61  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
62  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
63  import org.apache.hadoop.hbase.protobuf.RequestConverter;
64  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
69  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
70  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
71  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
72  import org.apache.hadoop.hbase.util.Bytes;
73  import org.apache.hadoop.hbase.util.Pair;
74  import org.apache.hadoop.hbase.util.Threads;
75  
76  import com.google.common.annotations.VisibleForTesting;
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.InvalidProtocolBufferException;
79  import com.google.protobuf.Message;
80  import com.google.protobuf.Service;
81  import com.google.protobuf.ServiceException;
82  
83  /**
84   * An implementation of {@link Table}. Used to communicate with a single HBase table.
85   * Lightweight. Get as needed and just close when done.
86   * Instances of this class SHOULD NOT be constructed directly.
87   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
88   * class comment for an example of how.
89   *
90   * <p>This class is NOT thread safe for reads nor writes.
91   * In the case of writes (Put, Delete), the underlying write buffer can
92   * be corrupted if multiple threads contend over a single HTable instance.
93   * In the case of reads, some fields used by a Scan are shared among all threads.
94   *
95   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
96   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
97   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
98   * Interface Classification</a>
99   * There are no guarantees for backwards source / binary compatibility and methods or class can
100  * change or go away without deprecation.
101  *
102  * @see Table
103  * @see Admin
104  * @see Connection
105  * @see ConnectionFactory
106  */
107 @InterfaceAudience.Private
108 @InterfaceStability.Stable
109 public class HTable implements HTableInterface {
110   private static final Log LOG = LogFactory.getLog(HTable.class);
111   protected ClusterConnection connection;
112   private final TableName tableName;
113   private volatile Configuration configuration;
114   private TableConfiguration tableConfiguration;
115   protected BufferedMutatorImpl mutator;
116   private boolean autoFlush = true;
117   private boolean closed = false;
118   protected int scannerCaching;
119   private ExecutorService pool;  // For Multi & Scan
120   private int operationTimeout;
121   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
122   private final boolean cleanupConnectionOnClose; // close the connection in close()
123   private Consistency defaultConsistency = Consistency.STRONG;
124   private HRegionLocator locator;
125 
126   /** The Async process for batch */
127   protected AsyncProcess multiAp;
128   private RpcRetryingCallerFactory rpcCallerFactory;
129   private RpcControllerFactory rpcControllerFactory;
130 
131   /**
132    * Creates an object to access a HBase table.
133    * @param conf Configuration object to use.
134    * @param tableName Name of the table.
135    * @throws IOException if a remote or network exception occurs
136    * @deprecated Constructing HTable objects manually has been deprecated. Please use
137    * {@link Connection} to instantiate a {@link Table} instead.
138    */
139   @Deprecated
140   public HTable(Configuration conf, final String tableName)
141   throws IOException {
142     this(conf, TableName.valueOf(tableName));
143   }
144 
145   /**
146    * Creates an object to access a HBase table.
147    * @param conf Configuration object to use.
148    * @param tableName Name of the table.
149    * @throws IOException if a remote or network exception occurs
150    * @deprecated Constructing HTable objects manually has been deprecated. Please use
151    * {@link Connection} to instantiate a {@link Table} instead.
152    */
153   @Deprecated
154   public HTable(Configuration conf, final byte[] tableName)
155   throws IOException {
156     this(conf, TableName.valueOf(tableName));
157   }
158 
159   /**
160    * Creates an object to access a HBase table.
161    * @param conf Configuration object to use.
162    * @param tableName table name pojo
163    * @throws IOException if a remote or network exception occurs
164    * @deprecated Constructing HTable objects manually has been deprecated. Please use
165    * {@link Connection} to instantiate a {@link Table} instead.
166    */
167   @Deprecated
168   public HTable(Configuration conf, final TableName tableName)
169   throws IOException {
170     this.tableName = tableName;
171     this.cleanupPoolOnClose = true;
172     this.cleanupConnectionOnClose = true;
173     if (conf == null) {
174       this.connection = null;
175       return;
176     }
177     this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
178     this.configuration = conf;
179 
180     this.pool = getDefaultExecutor(conf);
181     this.finishSetup();
182   }
183 
184   /**
185    * Creates an object to access a HBase table.
186    * @param tableName Name of the table.
187    * @param connection HConnection to be used.
188    * @throws IOException if a remote or network exception occurs
189    * @deprecated Do not use.
190    */
191   @Deprecated
192   public HTable(TableName tableName, Connection connection) throws IOException {
193     this.tableName = tableName;
194     this.cleanupPoolOnClose = true;
195     this.cleanupConnectionOnClose = false;
196     this.connection = (ClusterConnection)connection;
197     this.configuration = connection.getConfiguration();
198 
199     this.pool = getDefaultExecutor(this.configuration);
200     this.finishSetup();
201   }
202 
203   // Marked Private @since 1.0
204   @InterfaceAudience.Private
205   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
206     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
207     if (maxThreads == 0) {
208       maxThreads = 1; // is there a better default?
209     }
210     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
211 
212     // Using the "direct handoff" approach, new threads will only be created
213     // if it is necessary and will grow unbounded. This could be bad but in HCM
214     // we only create as many Runnables as there are region servers. It means
215     // it also scales when new region servers are added.
216     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
217         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
218     pool.allowCoreThreadTimeOut(true);
219     return pool;
220   }
221 
222   /**
223    * Creates an object to access a HBase table.
224    * @param conf Configuration object to use.
225    * @param tableName Name of the table.
226    * @param pool ExecutorService to be used.
227    * @throws IOException if a remote or network exception occurs
228    * @deprecated Constructing HTable objects manually has been deprecated. Please use
229    * {@link Connection} to instantiate a {@link Table} instead.
230    */
231   @Deprecated
232   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
233       throws IOException {
234     this(conf, TableName.valueOf(tableName), pool);
235   }
236 
237   /**
238    * Creates an object to access a HBase table.
239    * @param conf Configuration object to use.
240    * @param tableName Name of the table.
241    * @param pool ExecutorService to be used.
242    * @throws IOException if a remote or network exception occurs
243    * @deprecated Constructing HTable objects manually has been deprecated. Please use
244    * {@link Connection} to instantiate a {@link Table} instead.
245    */
246   @Deprecated
247   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
248       throws IOException {
249     this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
250     this.configuration = conf;
251     this.pool = pool;
252     if (pool == null) {
253       this.pool = getDefaultExecutor(conf);
254       this.cleanupPoolOnClose = true;
255     } else {
256       this.cleanupPoolOnClose = false;
257     }
258     this.tableName = tableName;
259     this.cleanupConnectionOnClose = true;
260     this.finishSetup();
261   }
262 
263   /**
264    * Creates an object to access a HBase table.
265    * @param tableName Name of the table.
266    * @param connection HConnection to be used.
267    * @param pool ExecutorService to be used.
268    * @throws IOException if a remote or network exception occurs.
269    * @deprecated Do not use, internal ctor.
270    */
271   @Deprecated
272   public HTable(final byte[] tableName, final Connection connection,
273       final ExecutorService pool) throws IOException {
274     this(TableName.valueOf(tableName), connection, pool);
275   }
276 
277   /** @deprecated Do not use, internal ctor. */
278   @Deprecated
279   public HTable(TableName tableName, final Connection connection,
280       final ExecutorService pool) throws IOException {
281     this(tableName, (ClusterConnection)connection, null, null, null, pool);
282   }
283 
284   /**
285    * Creates an object to access a HBase table.
286    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
287    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
288    * @param tableName Name of the table.
289    * @param connection HConnection to be used.
290    * @param pool ExecutorService to be used.
291    * @throws IOException if a remote or network exception occurs
292    */
293   @InterfaceAudience.Private
294   public HTable(TableName tableName, final ClusterConnection connection,
295       final TableConfiguration tableConfig,
296       final RpcRetryingCallerFactory rpcCallerFactory,
297       final RpcControllerFactory rpcControllerFactory,
298       final ExecutorService pool) throws IOException {
299     if (connection == null || connection.isClosed()) {
300       throw new IllegalArgumentException("Connection is null or closed.");
301     }
302     this.tableName = tableName;
303     this.cleanupConnectionOnClose = false;
304     this.connection = connection;
305     this.configuration = connection.getConfiguration();
306     this.tableConfiguration = tableConfig;
307     this.pool = pool;
308     if (pool == null) {
309       this.pool = getDefaultExecutor(this.configuration);
310       this.cleanupPoolOnClose = true;
311     } else {
312       this.cleanupPoolOnClose = false;
313     }
314 
315     this.rpcCallerFactory = rpcCallerFactory;
316     this.rpcControllerFactory = rpcControllerFactory;
317 
318     this.finishSetup();
319   }
320 
321   /**
322    * For internal testing. Uses Connection provided in {@code params}.
323    * @throws IOException
324    */
325   @VisibleForTesting
326   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
327     connection = conn;
328     tableName = params.getTableName();
329     tableConfiguration = new TableConfiguration(connection.getConfiguration());
330     cleanupPoolOnClose = false;
331     cleanupConnectionOnClose = false;
332     // used from tests, don't trust the connection is real
333     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
334   }
335 
336   /**
337    * @return maxKeyValueSize from configuration.
338    */
339   public static int getMaxKeyValueSize(Configuration conf) {
340     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
341   }
342 
343   /**
344    * setup this HTable's parameter based on the passed configuration
345    */
346   private void finishSetup() throws IOException {
347     if (tableConfiguration == null) {
348       tableConfiguration = new TableConfiguration(configuration);
349     }
350 
351     this.operationTimeout = tableName.isSystemTable() ?
352         tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
353     this.scannerCaching = tableConfiguration.getScannerCaching();
354 
355     if (this.rpcCallerFactory == null) {
356       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
357     }
358     if (this.rpcControllerFactory == null) {
359       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
360     }
361 
362     // puts need to track errors globally due to how the APIs currently work.
363     multiAp = this.connection.getAsyncProcess();
364     this.locator = new HRegionLocator(getName(), connection);
365   }
366 
367   /**
368    * {@inheritDoc}
369    */
370   @Override
371   public Configuration getConfiguration() {
372     return configuration;
373   }
374 
375   /**
376    * Tells whether or not a table is enabled or not. This method creates a
377    * new HBase configuration, so it might make your unit tests fail due to
378    * incorrect ZK client port.
379    * @param tableName Name of table to check.
380    * @return {@code true} if table is online.
381    * @throws IOException if a remote or network exception occurs
382    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
383    */
384   @Deprecated
385   public static boolean isTableEnabled(String tableName) throws IOException {
386     return isTableEnabled(TableName.valueOf(tableName));
387   }
388 
389   /**
390    * Tells whether or not a table is enabled or not. This method creates a
391    * new HBase configuration, so it might make your unit tests fail due to
392    * incorrect ZK client port.
393    * @param tableName Name of table to check.
394    * @return {@code true} if table is online.
395    * @throws IOException if a remote or network exception occurs
396    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
397    */
398   @Deprecated
399   public static boolean isTableEnabled(byte[] tableName) throws IOException {
400     return isTableEnabled(TableName.valueOf(tableName));
401   }
402 
403   /**
404    * Tells whether or not a table is enabled or not. This method creates a
405    * new HBase configuration, so it might make your unit tests fail due to
406    * incorrect ZK client port.
407    * @param tableName Name of table to check.
408    * @return {@code true} if table is online.
409    * @throws IOException if a remote or network exception occurs
410    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
411    */
412   @Deprecated
413   public static boolean isTableEnabled(TableName tableName) throws IOException {
414     return isTableEnabled(HBaseConfiguration.create(), tableName);
415   }
416 
417   /**
418    * Tells whether or not a table is enabled or not.
419    * @param conf The Configuration object to use.
420    * @param tableName Name of table to check.
421    * @return {@code true} if table is online.
422    * @throws IOException if a remote or network exception occurs
423    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
424    */
425   @Deprecated
426   public static boolean isTableEnabled(Configuration conf, String tableName)
427   throws IOException {
428     return isTableEnabled(conf, TableName.valueOf(tableName));
429   }
430 
431   /**
432    * Tells whether or not a table is enabled or not.
433    * @param conf The Configuration object to use.
434    * @param tableName Name of table to check.
435    * @return {@code true} if table is online.
436    * @throws IOException if a remote or network exception occurs
437    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
438    */
439   @Deprecated
440   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
441   throws IOException {
442     return isTableEnabled(conf, TableName.valueOf(tableName));
443   }
444 
445   /**
446    * Tells whether or not a table is enabled or not.
447    * @param conf The Configuration object to use.
448    * @param tableName Name of table to check.
449    * @return {@code true} if table is online.
450    * @throws IOException if a remote or network exception occurs
451    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
452    */
453   @Deprecated
454   public static boolean isTableEnabled(Configuration conf,
455       final TableName tableName) throws IOException {
456     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
457       @Override
458       public Boolean connect(HConnection connection) throws IOException {
459         return connection.isTableEnabled(tableName);
460       }
461     });
462   }
463 
464   /**
465    * Find region location hosting passed row using cached info
466    * @param row Row to find.
467    * @return The location of the given row.
468    * @throws IOException if a remote or network exception occurs
469    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
470    */
471   @Deprecated
472   public HRegionLocation getRegionLocation(final String row)
473   throws IOException {
474     return getRegionLocation(Bytes.toBytes(row), false);
475   }
476 
477   /**
478    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead.
479    */
480   @Deprecated
481   public HRegionLocation getRegionLocation(final byte [] row)
482   throws IOException {
483     return locator.getRegionLocation(row);
484   }
485 
486   /**
487    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead.
488    */
489   @Deprecated
490   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
491   throws IOException {
492     return locator.getRegionLocation(row, reload);
493   }
494 
495   /**
496    * {@inheritDoc}
497    */
498   @Override
499   public byte [] getTableName() {
500     return this.tableName.getName();
501   }
502 
503   @Override
504   public TableName getName() {
505     return tableName;
506   }
507 
508   /**
509    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
510    * manipulations.
511    * @return An HConnection instance.
512    * @deprecated This method will be changed from public to package protected.
513    */
514   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
515   @Deprecated
516   @VisibleForTesting
517   public HConnection getConnection() {
518     return this.connection;
519   }
520 
521   /**
522    * Gets the number of rows that a scanner will fetch at once.
523    * <p>
524    * The default value comes from {@code hbase.client.scanner.caching}.
525    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
526    */
527   @Deprecated
528   public int getScannerCaching() {
529     return scannerCaching;
530   }
531 
532   /**
533    * Kept in 0.96 for backward compatibility
534    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
535    */
536   @Deprecated
537   public List<Row> getWriteBuffer() {
538     return mutator == null ? null : mutator.getWriteBuffer();
539   }
540 
541   /**
542    * Sets the number of rows that a scanner will fetch at once.
543    * <p>
544    * This will override the value specified by
545    * {@code hbase.client.scanner.caching}.
546    * Increasing this value will reduce the amount of work needed each time
547    * {@code next()} is called on a scanner, at the expense of memory use
548    * (since more rows will need to be maintained in memory by the scanners).
549    * @param scannerCaching the number of rows a scanner will fetch at once.
550    * @deprecated Use {@link Scan#setCaching(int)}
551    */
552   @Deprecated
553   public void setScannerCaching(int scannerCaching) {
554     this.scannerCaching = scannerCaching;
555   }
556 
557   /**
558    * {@inheritDoc}
559    */
560   @Override
561   public HTableDescriptor getTableDescriptor() throws IOException {
562     // TODO: This is the same as HBaseAdmin.getTableDescriptor(). Only keep one.
563     if (tableName == null) return null;
564     if (tableName.equals(TableName.META_TABLE_NAME)) {
565       return HTableDescriptor.META_TABLEDESC;
566     }
567     HTableDescriptor htd = executeMasterCallable(
568       new MasterCallable<HTableDescriptor>(getConnection()) {
569       @Override
570       public HTableDescriptor call(int callTimeout) throws ServiceException {
571         GetTableDescriptorsResponse htds;
572         GetTableDescriptorsRequest req =
573             RequestConverter.buildGetTableDescriptorsRequest(tableName);
574         htds = master.getTableDescriptors(null, req);
575 
576         if (!htds.getTableSchemaList().isEmpty()) {
577           return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
578         }
579         return null;
580       }
581     });
582     if (htd != null) {
583       return new UnmodifyableHTableDescriptor(htd);
584     }
585     throw new TableNotFoundException(tableName.getNameAsString());
586   }
587 
588   private <V> V executeMasterCallable(MasterCallable<V> callable) throws IOException {
589     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
590     try {
591       return caller.callWithRetries(callable, operationTimeout);
592     } finally {
593       callable.close();
594     }
595   }
596 
597   /**
598    * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
599    */
600   @Deprecated
601   public byte [][] getStartKeys() throws IOException {
602     return locator.getStartKeys();
603   }
604 
605   /**
606    * @deprecated Use {@link RegionLocator#getEndKeys()} instead;
607    */
608   @Deprecated
609   public byte[][] getEndKeys() throws IOException {
610     return locator.getEndKeys();
611   }
612 
613   /**
614    * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
615    */
616   @Deprecated
617   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
618     return locator.getStartEndKeys();
619   }
620 
621   /**
622    * Gets all the regions and their address for this table.
623    * <p>
624    * This is mainly useful for the MapReduce integration.
625    * @return A map of HRegionInfo with it's server address
626    * @throws IOException if a remote or network exception occurs
627    * @deprecated This is no longer a public API.  Use {@link #getAllRegionLocations()} instead.
628    */
629   @SuppressWarnings("deprecation")
630   @Deprecated
631   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
632     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular,
633     // returns an HRegionLocation.
634     return MetaTableAccessor.allTableRegions(this.connection, getName());
635   }
636 
637   /**
638    * Gets all the regions and their address for this table.
639    * <p>
640    * This is mainly useful for the MapReduce integration.
641    * @return A map of HRegionInfo with it's server address
642    * @throws IOException if a remote or network exception occurs
643    *
644    * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
645    */
646   @Deprecated
647   public List<HRegionLocation> getAllRegionLocations() throws IOException {
648     return locator.getAllRegionLocations();
649   }
650 
651   /**
652    * Get the corresponding regions for an arbitrary range of keys.
653    * <p>
654    * @param startKey Starting row in range, inclusive
655    * @param endKey Ending row in range, exclusive
656    * @return A list of HRegionLocations corresponding to the regions that
657    * contain the specified range
658    * @throws IOException if a remote or network exception occurs
659    * @deprecated This is no longer a public API
660    */
661   @Deprecated
662   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
663     final byte [] endKey) throws IOException {
664     return getRegionsInRange(startKey, endKey, false);
665   }
666 
667   /**
668    * Get the corresponding regions for an arbitrary range of keys.
669    * <p>
670    * @param startKey Starting row in range, inclusive
671    * @param endKey Ending row in range, exclusive
672    * @param reload true to reload information or false to use cached information
673    * @return A list of HRegionLocations corresponding to the regions that
674    * contain the specified range
675    * @throws IOException if a remote or network exception occurs
676    * @deprecated This is no longer a public API
677    */
678   @Deprecated
679   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
680       final byte [] endKey, final boolean reload) throws IOException {
681     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
682   }
683 
684   /**
685    * Get the corresponding start keys and regions for an arbitrary range of
686    * keys.
687    * <p>
688    * @param startKey Starting row in range, inclusive
689    * @param endKey Ending row in range
690    * @param includeEndKey true if endRow is inclusive, false if exclusive
691    * @return A pair of list of start keys and list of HRegionLocations that
692    *         contain the specified range
693    * @throws IOException if a remote or network exception occurs
694    * @deprecated This is no longer a public API
695    */
696   @Deprecated
697   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
698       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
699       throws IOException {
700     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
701   }
702 
703   /**
704    * Get the corresponding start keys and regions for an arbitrary range of
705    * keys.
706    * <p>
707    * @param startKey Starting row in range, inclusive
708    * @param endKey Ending row in range
709    * @param includeEndKey true if endRow is inclusive, false if exclusive
710    * @param reload true to reload information or false to use cached information
711    * @return A pair of list of start keys and list of HRegionLocations that
712    *         contain the specified range
713    * @throws IOException if a remote or network exception occurs
714    * @deprecated This is no longer a public API
715    */
716   @Deprecated
717   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
718       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
719       final boolean reload) throws IOException {
720     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
721     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
722       throw new IllegalArgumentException(
723         "Invalid range: " + Bytes.toStringBinary(startKey) +
724         " > " + Bytes.toStringBinary(endKey));
725     }
726     List<byte[]> keysInRange = new ArrayList<byte[]>();
727     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
728     byte[] currentKey = startKey;
729     do {
730       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
731       keysInRange.add(currentKey);
732       regionsInRange.add(regionLocation);
733       currentKey = regionLocation.getRegionInfo().getEndKey();
734     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
735         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
736             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
737     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
738         regionsInRange);
739   }
740 
741   /**
742    * {@inheritDoc}
743    * @deprecated Use reversed scan instead.
744    */
745    @Override
746    @Deprecated
747    public Result getRowOrBefore(final byte[] row, final byte[] family)
748        throws IOException {
749      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
750          tableName, row) {
751        @Override
752       public Result call(int callTimeout) throws IOException {
753          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
754          controller.setPriority(tableName);
755          controller.setCallTimeout(callTimeout);
756          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
757              getLocation().getRegionInfo().getRegionName(), row, family);
758          try {
759            ClientProtos.GetResponse response = getStub().get(controller, request);
760            if (!response.hasResult()) return null;
761            return ProtobufUtil.toResult(response.getResult());
762          } catch (ServiceException se) {
763            throw ProtobufUtil.getRemoteException(se);
764          }
765        }
766      };
767      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
768    }
769 
770   /**
771    * The underlying {@link HTable} must not be closed.
772    * {@link HTableInterface#getScanner(Scan)} has other usage details.
773    */
774   @Override
775   public ResultScanner getScanner(final Scan scan) throws IOException {
776     if (scan.getBatch() > 0 && scan.isSmall()) {
777       throw new IllegalArgumentException("Small scan should not be used with batching");
778     }
779     if (scan.getCaching() <= 0) {
780       scan.setCaching(getScannerCaching());
781     }
782 
783     if (scan.isReversed()) {
784       if (scan.isSmall()) {
785         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
786             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
787             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
788       } else {
789         return new ReversedClientScanner(getConfiguration(), scan, getName(),
790             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
791             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
792       }
793     }
794 
795     if (scan.isSmall()) {
796       return new ClientSmallScanner(getConfiguration(), scan, getName(),
797           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
798           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
799     } else {
800       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
801           this.rpcCallerFactory, this.rpcControllerFactory,
802           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
803     }
804   }
805 
806   /**
807    * The underlying {@link HTable} must not be closed.
808    * {@link HTableInterface#getScanner(byte[])} has other usage details.
809    */
810   @Override
811   public ResultScanner getScanner(byte [] family) throws IOException {
812     Scan scan = new Scan();
813     scan.addFamily(family);
814     return getScanner(scan);
815   }
816 
817   /**
818    * The underlying {@link HTable} must not be closed.
819    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
820    */
821   @Override
822   public ResultScanner getScanner(byte [] family, byte [] qualifier)
823   throws IOException {
824     Scan scan = new Scan();
825     scan.addColumn(family, qualifier);
826     return getScanner(scan);
827   }
828 
829   /**
830    * {@inheritDoc}
831    */
832   @Override
833   public Result get(final Get get) throws IOException {
834     if (get.getConsistency() == null){
835       get.setConsistency(defaultConsistency);
836     }
837 
838     if (get.getConsistency() == Consistency.STRONG) {
839       // Good old call.
840       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
841           getName(), get.getRow()) {
842         @Override
843         public Result call(int callTimeout) throws IOException {
844           ClientProtos.GetRequest request =
845               RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
846           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
847           controller.setPriority(tableName);
848           controller.setCallTimeout(callTimeout);
849           try {
850             ClientProtos.GetResponse response = getStub().get(controller, request);
851             if (response == null) return null;
852             return ProtobufUtil.toResult(response.getResult());
853           } catch (ServiceException se) {
854             throw ProtobufUtil.getRemoteException(se);
855           }
856         }
857       };
858       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
859     }
860 
861     // Call that takes into account the replica
862     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
863       rpcControllerFactory, tableName, this.connection, get, pool,
864       tableConfiguration.getRetriesNumber(),
865       operationTimeout,
866       tableConfiguration.getPrimaryCallTimeoutMicroSecond());
867     return callable.call();
868   }
869 
870 
871   /**
872    * {@inheritDoc}
873    */
874   @Override
875   public Result[] get(List<Get> gets) throws IOException {
876     if (gets.size() == 1) {
877       return new Result[]{get(gets.get(0))};
878     }
879     try {
880       Object [] r1 = batch((List)gets);
881 
882       // translate.
883       Result [] results = new Result[r1.length];
884       int i=0;
885       for (Object o : r1) {
886         // batch ensures if there is a failure we get an exception instead
887         results[i++] = (Result) o;
888       }
889 
890       return results;
891     } catch (InterruptedException e) {
892       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
893     }
894   }
895 
896   /**
897    * {@inheritDoc}
898    */
899   @Override
900   public void batch(final List<? extends Row> actions, final Object[] results)
901       throws InterruptedException, IOException {
902     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
903     ars.waitUntilDone();
904     if (ars.hasError()) {
905       throw ars.getErrors();
906     }
907   }
908 
909   /**
910    * {@inheritDoc}
911    * @deprecated If any exception is thrown by one of the actions, there is no way to
912    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
913    */
914   @Deprecated
915   @Override
916   public Object[] batch(final List<? extends Row> actions)
917      throws InterruptedException, IOException {
918     Object[] results = new Object[actions.size()];
919     batch(actions, results);
920     return results;
921   }
922 
923   /**
924    * {@inheritDoc}
925    */
926   @Override
927   public <R> void batchCallback(
928       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
929       throws IOException, InterruptedException {
930     connection.processBatchCallback(actions, tableName, pool, results, callback);
931   }
932 
933   /**
934    * {@inheritDoc}
935    * @deprecated If any exception is thrown by one of the actions, there is no way to
936    * retrieve the partially executed results. Use
937    * {@link #batchCallback(List, Object[], Batch.Callback)}
938    * instead.
939    */
940   @Deprecated
941   @Override
942   public <R> Object[] batchCallback(
943     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
944       InterruptedException {
945     Object[] results = new Object[actions.size()];
946     batchCallback(actions, results, callback);
947     return results;
948   }
949 
950   /**
951    * {@inheritDoc}
952    */
953   @Override
954   public void delete(final Delete delete)
955   throws IOException {
956     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
957         tableName, delete.getRow()) {
958       @Override
959       public Boolean call(int callTimeout) throws IOException {
960         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
961         controller.setPriority(tableName);
962         controller.setCallTimeout(callTimeout);
963 
964         try {
965           MutateRequest request = RequestConverter.buildMutateRequest(
966             getLocation().getRegionInfo().getRegionName(), delete);
967           MutateResponse response = getStub().mutate(controller, request);
968           return Boolean.valueOf(response.getProcessed());
969         } catch (ServiceException se) {
970           throw ProtobufUtil.getRemoteException(se);
971         }
972       }
973     };
974     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
975   }
976 
977   /**
978    * {@inheritDoc}
979    */
980   @Override
981   public void delete(final List<Delete> deletes)
982   throws IOException {
983     Object[] results = new Object[deletes.size()];
984     try {
985       batch(deletes, results);
986     } catch (InterruptedException e) {
987       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
988     } finally {
989       // mutate list so that it is empty for complete success, or contains only failed records
990       // results are returned in the same order as the requests in list walk the list backwards,
991       // so we can remove from list without impacting the indexes of earlier members
992       for (int i = results.length - 1; i>=0; i--) {
993         // if result is not null, it succeeded
994         if (results[i] instanceof Result) {
995           deletes.remove(i);
996         }
997       }
998     }
999   }
1000 
1001   /**
1002    * {@inheritDoc}
1003    * @throws IOException
1004    */
1005   @Override
1006   public void put(final Put put) throws IOException {
1007     getBufferedMutator().mutate(put);
1008     if (autoFlush) {
1009       flushCommits();
1010     }
1011   }
1012 
1013   /**
1014    * {@inheritDoc}
1015    * @throws IOException
1016    */
1017   @Override
1018   public void put(final List<Put> puts) throws IOException {
1019     getBufferedMutator().mutate(puts);
1020     if (autoFlush) {
1021       flushCommits();
1022     }
1023   }
1024 
1025   /**
1026    * {@inheritDoc}
1027    */
1028   @Override
1029   public void mutateRow(final RowMutations rm) throws IOException {
1030     RegionServerCallable<Void> callable =
1031         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1032       @Override
1033       public Void call(int callTimeout) throws IOException {
1034         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1035         controller.setPriority(tableName);
1036         controller.setCallTimeout(callTimeout);
1037         try {
1038           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1039             getLocation().getRegionInfo().getRegionName(), rm);
1040           regionMutationBuilder.setAtomic(true);
1041           MultiRequest request =
1042             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1043           ClientProtos.MultiResponse response = getStub().multi(controller, request);
1044           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1045           if (res.hasException()) {
1046             Throwable ex = ProtobufUtil.toException(res.getException());
1047             if(ex instanceof IOException) {
1048               throw (IOException)ex;
1049             }
1050             throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
1051           }
1052         } catch (ServiceException se) {
1053           throw ProtobufUtil.getRemoteException(se);
1054         }
1055         return null;
1056       }
1057     };
1058     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1059   }
1060 
1061   /**
1062    * {@inheritDoc}
1063    */
1064   @Override
1065   public Result append(final Append append) throws IOException {
1066     if (append.numFamilies() == 0) {
1067       throw new IOException(
1068           "Invalid arguments to append, no columns specified");
1069     }
1070 
1071     NonceGenerator ng = this.connection.getNonceGenerator();
1072     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1073     RegionServerCallable<Result> callable =
1074       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1075         @Override
1076         public Result call(int callTimeout) throws IOException {
1077           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1078           controller.setPriority(getTableName());
1079           controller.setCallTimeout(callTimeout);
1080           try {
1081             MutateRequest request = RequestConverter.buildMutateRequest(
1082               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1083             MutateResponse response = getStub().mutate(controller, request);
1084             if (!response.hasResult()) return null;
1085             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1086           } catch (ServiceException se) {
1087             throw ProtobufUtil.getRemoteException(se);
1088           }
1089         }
1090       };
1091     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1092   }
1093 
1094   /**
1095    * {@inheritDoc}
1096    */
1097   @Override
1098   public Result increment(final Increment increment) throws IOException {
1099     if (!increment.hasFamilies()) {
1100       throw new IOException(
1101           "Invalid arguments to increment, no columns specified");
1102     }
1103     NonceGenerator ng = this.connection.getNonceGenerator();
1104     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1105     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1106         getName(), increment.getRow()) {
1107       @Override
1108       public Result call(int callTimeout) throws IOException {
1109         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1110         controller.setPriority(getTableName());
1111         controller.setCallTimeout(callTimeout);
1112         try {
1113           MutateRequest request = RequestConverter.buildMutateRequest(
1114             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1115           MutateResponse response = getStub().mutate(controller, request);
1116           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1117         } catch (ServiceException se) {
1118           throw ProtobufUtil.getRemoteException(se);
1119         }
1120       }
1121     };
1122     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1123   }
1124 
1125   /**
1126    * {@inheritDoc}
1127    */
1128   @Override
1129   public long incrementColumnValue(final byte [] row, final byte [] family,
1130       final byte [] qualifier, final long amount)
1131   throws IOException {
1132     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1133   }
1134 
1135   /**
1136    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1137    */
1138   @Deprecated
1139   @Override
1140   public long incrementColumnValue(final byte [] row, final byte [] family,
1141       final byte [] qualifier, final long amount, final boolean writeToWAL)
1142   throws IOException {
1143     return incrementColumnValue(row, family, qualifier, amount,
1144       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1145   }
1146 
1147   /**
1148    * {@inheritDoc}
1149    */
1150   @Override
1151   public long incrementColumnValue(final byte [] row, final byte [] family,
1152       final byte [] qualifier, final long amount, final Durability durability)
1153   throws IOException {
1154     NullPointerException npe = null;
1155     if (row == null) {
1156       npe = new NullPointerException("row is null");
1157     } else if (family == null) {
1158       npe = new NullPointerException("family is null");
1159     } else if (qualifier == null) {
1160       npe = new NullPointerException("qualifier is null");
1161     }
1162     if (npe != null) {
1163       throw new IOException(
1164           "Invalid arguments to incrementColumnValue", npe);
1165     }
1166 
1167     NonceGenerator ng = this.connection.getNonceGenerator();
1168     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1169     RegionServerCallable<Long> callable =
1170       new RegionServerCallable<Long>(connection, getName(), row) {
1171         @Override
1172         public Long call(int callTimeout) throws IOException {
1173           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1174           controller.setPriority(getTableName());
1175           controller.setCallTimeout(callTimeout);
1176           try {
1177             MutateRequest request = RequestConverter.buildIncrementRequest(
1178               getLocation().getRegionInfo().getRegionName(), row, family,
1179               qualifier, amount, durability, nonceGroup, nonce);
1180             MutateResponse response = getStub().mutate(controller, request);
1181             Result result =
1182               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1183             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1184           } catch (ServiceException se) {
1185             throw ProtobufUtil.getRemoteException(se);
1186           }
1187         }
1188       };
1189     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1190   }
1191 
1192   /**
1193    * {@inheritDoc}
1194    */
1195   @Override
1196   public boolean checkAndPut(final byte [] row,
1197       final byte [] family, final byte [] qualifier, final byte [] value,
1198       final Put put)
1199   throws IOException {
1200     RegionServerCallable<Boolean> callable =
1201       new RegionServerCallable<Boolean>(connection, getName(), row) {
1202         @Override
1203         public Boolean call(int callTimeout) throws IOException {
1204           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1205           controller.setPriority(tableName);
1206           controller.setCallTimeout(callTimeout);
1207           try {
1208             MutateRequest request = RequestConverter.buildMutateRequest(
1209                 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1210                 new BinaryComparator(value), CompareType.EQUAL, put);
1211             MutateResponse response = getStub().mutate(controller, request);
1212             return Boolean.valueOf(response.getProcessed());
1213           } catch (ServiceException se) {
1214             throw ProtobufUtil.getRemoteException(se);
1215           }
1216         }
1217       };
1218     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1219   }
1220 
1221   /**
1222    * {@inheritDoc}
1223    */
1224   @Override
1225   public boolean checkAndPut(final byte [] row, final byte [] family,
1226       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1227       final Put put)
1228   throws IOException {
1229     RegionServerCallable<Boolean> callable =
1230       new RegionServerCallable<Boolean>(connection, getName(), row) {
1231         @Override
1232         public Boolean call(int callTimeout) throws IOException {
1233           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1234           controller.setPriority(tableName);
1235           controller.setCallTimeout(callTimeout);
1236           try {
1237             CompareType compareType = CompareType.valueOf(compareOp.name());
1238             MutateRequest request = RequestConverter.buildMutateRequest(
1239               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1240                 new BinaryComparator(value), compareType, put);
1241             MutateResponse response = getStub().mutate(controller, request);
1242             return Boolean.valueOf(response.getProcessed());
1243           } catch (ServiceException se) {
1244             throw ProtobufUtil.getRemoteException(se);
1245           }
1246         }
1247       };
1248     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1249   }
1250 
1251   /**
1252    * {@inheritDoc}
1253    */
1254   @Override
1255   public boolean checkAndDelete(final byte [] row,
1256       final byte [] family, final byte [] qualifier, final byte [] value,
1257       final Delete delete)
1258   throws IOException {
1259     RegionServerCallable<Boolean> callable =
1260       new RegionServerCallable<Boolean>(connection, getName(), row) {
1261         @Override
1262         public Boolean call(int callTimeout) throws IOException {
1263           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1264           controller.setPriority(tableName);
1265           controller.setCallTimeout(callTimeout);
1266           try {
1267             MutateRequest request = RequestConverter.buildMutateRequest(
1268               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1269                 new BinaryComparator(value), CompareType.EQUAL, delete);
1270             MutateResponse response = getStub().mutate(controller, request);
1271             return Boolean.valueOf(response.getProcessed());
1272           } catch (ServiceException se) {
1273             throw ProtobufUtil.getRemoteException(se);
1274           }
1275         }
1276       };
1277     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1278   }
1279 
1280   /**
1281    * {@inheritDoc}
1282    */
1283   @Override
1284   public boolean checkAndDelete(final byte [] row, final byte [] family,
1285       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1286       final Delete delete)
1287   throws IOException {
1288     RegionServerCallable<Boolean> callable =
1289       new RegionServerCallable<Boolean>(connection, getName(), row) {
1290         @Override
1291         public Boolean call(int callTimeout) throws IOException {
1292           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1293           controller.setPriority(tableName);
1294           controller.setCallTimeout(callTimeout);
1295           try {
1296             CompareType compareType = CompareType.valueOf(compareOp.name());
1297             MutateRequest request = RequestConverter.buildMutateRequest(
1298               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1299                 new BinaryComparator(value), compareType, delete);
1300             MutateResponse response = getStub().mutate(controller, request);
1301             return Boolean.valueOf(response.getProcessed());
1302           } catch (ServiceException se) {
1303             throw ProtobufUtil.getRemoteException(se);
1304           }
1305         }
1306       };
1307     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1308   }
1309 
1310   /**
1311    * {@inheritDoc}
1312    */
1313   @Override
1314   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1315       final CompareOp compareOp, final byte [] value, final RowMutations rm)
1316   throws IOException {
1317     RegionServerCallable<Boolean> callable =
1318         new RegionServerCallable<Boolean>(connection, getName(), row) {
1319           @Override
1320           public Boolean call(int callTimeout) throws IOException {
1321             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1322             controller.setPriority(tableName);
1323             controller.setCallTimeout(callTimeout);
1324             try {
1325               CompareType compareType = CompareType.valueOf(compareOp.name());
1326               MultiRequest request = RequestConverter.buildMutateRequest(
1327                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1328                   new BinaryComparator(value), compareType, rm);
1329               ClientProtos.MultiResponse response = getStub().multi(controller, request);
1330               ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1331               if (res.hasException()) {
1332                 Throwable ex = ProtobufUtil.toException(res.getException());
1333                 if(ex instanceof IOException) {
1334                   throw (IOException)ex;
1335                 }
1336                 throw new IOException("Failed to checkAndMutate row: "+
1337                     Bytes.toStringBinary(rm.getRow()), ex);
1338               }
1339               return Boolean.valueOf(response.getProcessed());
1340             } catch (ServiceException se) {
1341               throw ProtobufUtil.getRemoteException(se);
1342             }
1343           }
1344         };
1345     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1346   }
1347 
1348   /**
1349    * {@inheritDoc}
1350    */
1351   @Override
1352   public boolean exists(final Get get) throws IOException {
1353     get.setCheckExistenceOnly(true);
1354     Result r = get(get);
1355     assert r.getExists() != null;
1356     return r.getExists();
1357   }
1358 
1359   /**
1360    * {@inheritDoc}
1361    */
1362   @Override
1363   public boolean[] existsAll(final List<Get> gets) throws IOException {
1364     if (gets.isEmpty()) return new boolean[]{};
1365     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1366 
1367     for (Get g: gets){
1368       g.setCheckExistenceOnly(true);
1369     }
1370 
1371     Object[] r1;
1372     try {
1373       r1 = batch(gets);
1374     } catch (InterruptedException e) {
1375       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1376     }
1377 
1378     // translate.
1379     boolean[] results = new boolean[r1.length];
1380     int i = 0;
1381     for (Object o : r1) {
1382       // batch ensures if there is a failure we get an exception instead
1383       results[i++] = ((Result)o).getExists();
1384     }
1385 
1386     return results;
1387   }
1388 
1389   /**
1390    * {@inheritDoc}
1391    * @deprecated Use {@link #existsAll(java.util.List)}  instead.
1392    */
1393   @Override
1394   @Deprecated
1395   public Boolean[] exists(final List<Get> gets) throws IOException {
1396     boolean[] results = existsAll(gets);
1397     Boolean[] objectResults = new Boolean[results.length];
1398     for (int i = 0; i < results.length; ++i) {
1399       objectResults[i] = results[i];
1400     }
1401     return objectResults;
1402   }
1403 
1404   /**
1405    * {@inheritDoc}
1406    * @throws IOException
1407    */
1408   @Override
1409   public void flushCommits() throws IOException {
1410     if (mutator == null) {
1411       // nothing to flush if there's no mutator; don't bother creating one.
1412       return;
1413     }
1414     getBufferedMutator().flush();
1415   }
1416 
1417   /**
1418    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1419    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1420    *
1421    * @param list The collection of actions.
1422    * @param results An empty array, same size as list. If an exception is thrown,
1423    * you can test here for partial results, and to determine which actions
1424    * processed successfully.
1425    * @throws IOException if there are problems talking to META. Per-item
1426    * exceptions are stored in the results array.
1427    */
1428   public <R> void processBatchCallback(
1429     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1430     throws IOException, InterruptedException {
1431     this.batchCallback(list, results, callback);
1432   }
1433 
1434 
1435   /**
1436    * Parameterized batch processing, allowing varying return types for different
1437    * {@link Row} implementations.
1438    */
1439   public void processBatch(final List<? extends Row> list, final Object[] results)
1440     throws IOException, InterruptedException {
1441     this.batch(list, results);
1442   }
1443 
1444 
1445   @Override
1446   public void close() throws IOException {
1447     if (this.closed) {
1448       return;
1449     }
1450     flushCommits();
1451     if (cleanupPoolOnClose) {
1452       this.pool.shutdown();
1453       try {
1454         boolean terminated = false;
1455         do {
1456           // wait until the pool has terminated
1457           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1458         } while (!terminated);
1459       } catch (InterruptedException e) {
1460         this.pool.shutdownNow();
1461         LOG.warn("waitForTermination interrupted");
1462       }
1463     }
1464     if (cleanupConnectionOnClose) {
1465       if (this.connection != null) {
1466         this.connection.close();
1467       }
1468     }
1469     this.closed = true;
1470   }
1471 
1472   // validate for well-formedness
1473   public void validatePut(final Put put) throws IllegalArgumentException {
1474     validatePut(put, tableConfiguration.getMaxKeyValueSize());
1475   }
1476 
1477   // validate for well-formedness
1478   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1479     if (put.isEmpty()) {
1480       throw new IllegalArgumentException("No columns to insert");
1481     }
1482     if (maxKeyValueSize > 0) {
1483       for (List<Cell> list : put.getFamilyCellMap().values()) {
1484         for (Cell cell : list) {
1485           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1486             throw new IllegalArgumentException("KeyValue size too large");
1487           }
1488         }
1489       }
1490     }
1491   }
1492 
1493   /**
1494    * {@inheritDoc}
1495    */
1496   @Override
1497   public boolean isAutoFlush() {
1498     return autoFlush;
1499   }
1500 
1501   /**
1502    * {@inheritDoc}
1503    * @deprecated in 0.96. When called with setAutoFlush(false), this function also
1504    *  set clearBufferOnFail to true, which is unexpected but kept for historical reasons.
1505    *  Replace it with setAutoFlush(false, false) if this is exactly what you want, or by
1506    *  {@link #setAutoFlushTo(boolean)} for all other cases.
1507    */
1508   @Deprecated
1509   @Override
1510   public void setAutoFlush(boolean autoFlush) {
1511     this.autoFlush = autoFlush;
1512   }
1513 
1514   /**
1515    * {@inheritDoc}
1516    */
1517   @Override
1518   public void setAutoFlushTo(boolean autoFlush) {
1519     this.autoFlush = autoFlush;
1520   }
1521 
1522   /**
1523    * {@inheritDoc}
1524    */
1525   @Override
1526   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1527     this.autoFlush = autoFlush;
1528   }
1529 
1530   /**
1531    * Returns the maximum size in bytes of the write buffer for this HTable.
1532    * <p>
1533    * The default value comes from the configuration parameter
1534    * {@code hbase.client.write.buffer}.
1535    * @return The size of the write buffer in bytes.
1536    */
1537   @Override
1538   public long getWriteBufferSize() {
1539     if (mutator == null) {
1540       return tableConfiguration.getWriteBufferSize();
1541     } else {
1542       return mutator.getWriteBufferSize();
1543     }
1544   }
1545 
1546   /**
1547    * Sets the size of the buffer in bytes.
1548    * <p>
1549    * If the new size is less than the current amount of data in the
1550    * write buffer, the buffer gets flushed.
1551    * @param writeBufferSize The new write buffer size, in bytes.
1552    * @throws IOException if a remote or network exception occurs.
1553    */
1554   @Override
1555   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1556     getBufferedMutator();
1557     mutator.setWriteBufferSize(writeBufferSize);
1558   }
1559 
1560   /**
1561    * The pool is used for mutli requests for this HTable
1562    * @return the pool used for mutli
1563    */
1564   ExecutorService getPool() {
1565     return this.pool;
1566   }
1567 
1568   /**
1569    * Enable or disable region cache prefetch for the table. It will be
1570    * applied for the given table's all HTable instances who share the same
1571    * connection. By default, the cache prefetch is enabled.
1572    * @param tableName name of table to configure.
1573    * @param enable Set to true to enable region cache prefetch. Or set to
1574    * false to disable it.
1575    * @throws IOException
1576    * @deprecated does nothing since 0.99
1577    */
1578   @Deprecated
1579   public static void setRegionCachePrefetch(final byte[] tableName,
1580       final boolean enable)  throws IOException {
1581   }
1582 
1583   /**
1584    * @deprecated does nothing since 0.99
1585    */
1586   @Deprecated
1587   public static void setRegionCachePrefetch(
1588       final TableName tableName,
1589       final boolean enable) throws IOException {
1590   }
1591 
1592   /**
1593    * Enable or disable region cache prefetch for the table. It will be
1594    * applied for the given table's all HTable instances who share the same
1595    * connection. By default, the cache prefetch is enabled.
1596    * @param conf The Configuration object to use.
1597    * @param tableName name of table to configure.
1598    * @param enable Set to true to enable region cache prefetch. Or set to
1599    * false to disable it.
1600    * @throws IOException
1601    * @deprecated does nothing since 0.99
1602    */
1603   @Deprecated
1604   public static void setRegionCachePrefetch(final Configuration conf,
1605       final byte[] tableName, final boolean enable) throws IOException {
1606   }
1607 
1608   /**
1609    * @deprecated does nothing since 0.99
1610    */
1611   @Deprecated
1612   public static void setRegionCachePrefetch(final Configuration conf,
1613       final TableName tableName,
1614       final boolean enable) throws IOException {
1615   }
1616 
1617   /**
1618    * Check whether region cache prefetch is enabled or not for the table.
1619    * @param conf The Configuration object to use.
1620    * @param tableName name of table to check
1621    * @return true if table's region cache prefecth is enabled. Otherwise
1622    * it is disabled.
1623    * @throws IOException
1624    * @deprecated always return false since 0.99
1625    */
1626   @Deprecated
1627   public static boolean getRegionCachePrefetch(final Configuration conf,
1628       final byte[] tableName) throws IOException {
1629     return false;
1630   }
1631 
1632   /**
1633    * @deprecated always return false since 0.99
1634    */
1635   @Deprecated
1636   public static boolean getRegionCachePrefetch(final Configuration conf,
1637       final TableName tableName) throws IOException {
1638     return false;
1639   }
1640 
1641   /**
1642    * Check whether region cache prefetch is enabled or not for the table.
1643    * @param tableName name of table to check
1644    * @return true if table's region cache prefecth is enabled. Otherwise
1645    * it is disabled.
1646    * @throws IOException
1647    * @deprecated always return false since 0.99
1648    */
1649   @Deprecated
1650   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1651     return false;
1652   }
1653 
1654   /**
1655    * @deprecated always return false since 0.99
1656    */
1657   @Deprecated
1658   public static boolean getRegionCachePrefetch(
1659       final TableName tableName) throws IOException {
1660     return false;
1661   }
1662 
1663   /**
1664    * Explicitly clears the region cache to fetch the latest value from META.
1665    * This is a power user function: avoid unless you know the ramifications.
1666    */
1667   public void clearRegionCache() {
1668     this.connection.clearRegionCache();
1669   }
1670 
1671   /**
1672    * {@inheritDoc}
1673    */
1674   @Override
1675   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1676     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1677   }
1678 
1679   /**
1680    * {@inheritDoc}
1681    */
1682   @Override
1683   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1684       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1685       throws ServiceException, Throwable {
1686     final Map<byte[],R> results =  Collections.synchronizedMap(
1687         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1688     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1689       @Override
1690       public void update(byte[] region, byte[] row, R value) {
1691         if (region != null) {
1692           results.put(region, value);
1693         }
1694       }
1695     });
1696     return results;
1697   }
1698 
1699   /**
1700    * {@inheritDoc}
1701    */
1702   @Override
1703   public <T extends Service, R> void coprocessorService(final Class<T> service,
1704       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1705       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1706 
1707     // get regions covered by the row range
1708     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1709 
1710     Map<byte[],Future<R>> futures =
1711         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1712     for (final byte[] r : keys) {
1713       final RegionCoprocessorRpcChannel channel =
1714           new RegionCoprocessorRpcChannel(connection, tableName, r);
1715       Future<R> future = pool.submit(
1716           new Callable<R>() {
1717             @Override
1718             public R call() throws Exception {
1719               T instance = ProtobufUtil.newServiceStub(service, channel);
1720               R result = callable.call(instance);
1721               byte[] region = channel.getLastRegion();
1722               if (callback != null) {
1723                 callback.update(region, r, result);
1724               }
1725               return result;
1726             }
1727           });
1728       futures.put(r, future);
1729     }
1730     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1731       try {
1732         e.getValue().get();
1733       } catch (ExecutionException ee) {
1734         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1735             + Bytes.toStringBinary(e.getKey()), ee);
1736         throw ee.getCause();
1737       } catch (InterruptedException ie) {
1738         throw new InterruptedIOException("Interrupted calling coprocessor service "
1739             + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1740       }
1741     }
1742   }
1743 
1744   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1745   throws IOException {
1746     if (start == null) {
1747       start = HConstants.EMPTY_START_ROW;
1748     }
1749     if (end == null) {
1750       end = HConstants.EMPTY_END_ROW;
1751     }
1752     return getKeysAndRegionsInRange(start, end, true).getFirst();
1753   }
1754 
1755   public void setOperationTimeout(int operationTimeout) {
1756     this.operationTimeout = operationTimeout;
1757   }
1758 
1759   public int getOperationTimeout() {
1760     return operationTimeout;
1761   }
1762 
1763   @Override
1764   public String toString() {
1765     return tableName + ";" + connection;
1766   }
1767 
1768   /**
1769    * {@inheritDoc}
1770    */
1771   @Override
1772   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1773       Descriptors.MethodDescriptor methodDescriptor, Message request,
1774       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1775     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1776         Bytes.BYTES_COMPARATOR));
1777     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1778         new Callback<R>() {
1779 
1780           @Override
1781           public void update(byte[] region, byte[] row, R result) {
1782             if (region != null) {
1783               results.put(region, result);
1784             }
1785           }
1786         });
1787     return results;
1788   }
1789 
1790   /**
1791    * {@inheritDoc}
1792    */
1793   @Override
1794   public <R extends Message> void batchCoprocessorService(
1795       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1796       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1797       throws ServiceException, Throwable {
1798 
1799     // get regions covered by the row range
1800     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1801         getKeysAndRegionsInRange(startKey, endKey, true);
1802     List<byte[]> keys = keysAndRegions.getFirst();
1803     List<HRegionLocation> regions = keysAndRegions.getSecond();
1804 
1805     // check if we have any calls to make
1806     if (keys.isEmpty()) {
1807       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1808           ", end=" + Bytes.toStringBinary(endKey));
1809       return;
1810     }
1811 
1812     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1813     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1814         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1815     for (int i = 0; i < keys.size(); i++) {
1816       final byte[] rowKey = keys.get(i);
1817       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1818       RegionCoprocessorServiceExec exec =
1819           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1820       execs.add(exec);
1821       execsByRow.put(rowKey, exec);
1822     }
1823 
1824     // tracking for any possible deserialization errors on success callback
1825     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1826     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1827     final List<Row> callbackErrorActions = new ArrayList<Row>();
1828     final List<String> callbackErrorServers = new ArrayList<String>();
1829     Object[] results = new Object[execs.size()];
1830 
1831     AsyncProcess asyncProcess =
1832         new AsyncProcess(connection, configuration, pool,
1833             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1834             true, RpcControllerFactory.instantiate(configuration));
1835 
1836     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1837         new Callback<ClientProtos.CoprocessorServiceResult>() {
1838           @Override
1839           public void update(byte[] region, byte[] row,
1840                               ClientProtos.CoprocessorServiceResult serviceResult) {
1841             if (LOG.isTraceEnabled()) {
1842               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1843                   ": region=" + Bytes.toStringBinary(region) +
1844                   ", row=" + Bytes.toStringBinary(row) +
1845                   ", value=" + serviceResult.getValue().getValue());
1846             }
1847             try {
1848               callback.update(region, row,
1849                   (R) responsePrototype.newBuilderForType().mergeFrom(
1850                       serviceResult.getValue().getValue()).build());
1851             } catch (InvalidProtocolBufferException e) {
1852               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1853                   e);
1854               callbackErrorExceptions.add(e);
1855               callbackErrorActions.add(execsByRow.get(row));
1856               callbackErrorServers.add("null");
1857             }
1858           }
1859         }, results);
1860 
1861     future.waitUntilDone();
1862 
1863     if (future.hasError()) {
1864       throw future.getErrors();
1865     } else if (!callbackErrorExceptions.isEmpty()) {
1866       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1867           callbackErrorServers);
1868     }
1869   }
1870 
1871   public RegionLocator getRegionLocator() {
1872     return this.locator;
1873   }
1874 
1875   @VisibleForTesting
1876   BufferedMutator getBufferedMutator() throws IOException {
1877     if (mutator == null) {
1878       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1879           new BufferedMutatorParams(tableName)
1880               .pool(pool)
1881               .writeBufferSize(tableConfiguration.getWriteBufferSize())
1882               .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize())
1883       );
1884     }
1885     return mutator;
1886   }
1887 }