View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  import java.util.TreeMap;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.SynchronousQueue;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.classification.InterfaceStability;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HRegionLocation;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.KeyValueUtil;
49  import org.apache.hadoop.hbase.ServerName;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.TableNotFoundException;
52  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53  import org.apache.hadoop.hbase.client.coprocessor.Batch;
54  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
55  import org.apache.hadoop.hbase.filter.BinaryComparator;
56  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
57  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
58  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
59  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
60  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
61  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62  import org.apache.hadoop.hbase.protobuf.RequestConverter;
63  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
64  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
68  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
69  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
70  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.Pair;
73  import org.apache.hadoop.hbase.util.ReflectionUtils;
74  import org.apache.hadoop.hbase.util.Threads;
75  
76  import com.google.common.annotations.VisibleForTesting;
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.Message;
79  import com.google.protobuf.Service;
80  import com.google.protobuf.ServiceException;
81  
82  /**
83   * An implementation of {@link Table}. Used to communicate with a single HBase table.
84   * Lightweight. Get as needed and just close when done.
85   * Instances of this class SHOULD NOT be constructed directly.
86   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
87   * class comment for an example of how.
88   *
89   * <p>This class is NOT thread safe for reads nor writes.
90   * In the case of writes (Put, Delete), the underlying write buffer can
91   * be corrupted if multiple threads contend over a single HTable instance.
92   * In the case of reads, some fields used by a Scan are shared among all threads.
93   *
94   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
95   * InterfaceAudience.Private as of hbase-1.0.0 indicating that this is an
96   * HBase-internal class as defined in
97   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
98   * Interface Classification</a>. There are no guarantees for backwards
99   * source / binary compatibility and methods or the class can
100  * change or go away without deprecation.
101  * <p>Near all methods of this * class made it out to the new {@link Table}
102  * Interface or were * instantiations of methods defined in {@link HTableInterface}.
103  * A few did not. Namely, the {@link #getStartEndKeys}, {@link #getEndKeys},
104  * and {@link #getStartKeys} methods. These three methods are available
105  * in {@link RegionLocator} as of 1.0.0 but were NOT marked as
106  * deprecated when we released 1.0.0. In spite of this oversight on our
107  * part, these methods will be removed in 2.0.0.
108  *
109  * @see Table
110  * @see Admin
111  * @see Connection
112  * @see ConnectionFactory
113  */
114 @InterfaceAudience.Private
115 @InterfaceStability.Stable
116 public class HTable implements HTableInterface, RegionLocator {
117   private static final Log LOG = LogFactory.getLog(HTable.class);
118   protected ClusterConnection connection;
119   private final TableName tableName;
120   private volatile Configuration configuration;
121   private ConnectionConfiguration connConfiguration;
122   protected BufferedMutatorImpl mutator;
123   private boolean autoFlush = true;
124   private boolean closed = false;
125   protected int scannerCaching;
126   protected long scannerMaxResultSize;
127   private ExecutorService pool;  // For Multi & Scan
128   private int operationTimeout; // global timeout for each blocking method with retrying rpc
129   private int rpcTimeout; // timeout for each rpc request
130   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
131   private final boolean cleanupConnectionOnClose; // close the connection in close()
132   private Consistency defaultConsistency = Consistency.STRONG;
133   private HRegionLocator locator;
134 
135   /** The Async process for batch */
136   protected AsyncProcess multiAp;
137   private RpcRetryingCallerFactory rpcCallerFactory;
138   private RpcControllerFactory rpcControllerFactory;
139 
140   /**
141    * Creates an object to access a HBase table.
142    * @param conf Configuration object to use.
143    * @param tableName Name of the table.
144    * @throws IOException if a remote or network exception occurs
145    * @deprecated Constructing HTable objects manually has been deprecated. Please use
146    * {@link Connection} to instantiate a {@link Table} instead.
147    */
148   @Deprecated
149   public HTable(Configuration conf, final String tableName)
150   throws IOException {
151     this(conf, TableName.valueOf(tableName));
152   }
153 
154   /**
155    * Creates an object to access a HBase table.
156    * @param conf Configuration object to use.
157    * @param tableName Name of the table.
158    * @throws IOException if a remote or network exception occurs
159    * @deprecated Constructing HTable objects manually has been deprecated. Please use
160    * {@link Connection} to instantiate a {@link Table} instead.
161    */
162   @Deprecated
163   public HTable(Configuration conf, final byte[] tableName)
164   throws IOException {
165     this(conf, TableName.valueOf(tableName));
166   }
167 
168   /**
169    * Creates an object to access a HBase table.
170    * @param conf Configuration object to use.
171    * @param tableName table name pojo
172    * @throws IOException if a remote or network exception occurs
173    * @deprecated Constructing HTable objects manually has been deprecated. Please use
174    * {@link Connection} to instantiate a {@link Table} instead.
175    */
176   @Deprecated
177   public HTable(Configuration conf, final TableName tableName)
178   throws IOException {
179     this.tableName = tableName;
180     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
181     if (conf == null) {
182       this.connection = null;
183       return;
184     }
185     this.connection = ConnectionManager.getConnectionInternal(conf);
186     this.configuration = conf;
187 
188     this.pool = getDefaultExecutor(conf);
189     this.finishSetup();
190   }
191 
192   /**
193    * Creates an object to access a HBase table.
194    * @param tableName Name of the table.
195    * @param connection HConnection to be used.
196    * @throws IOException if a remote or network exception occurs
197    * @deprecated Do not use.
198    */
199   @Deprecated
200   public HTable(TableName tableName, Connection connection) throws IOException {
201     this.tableName = tableName;
202     this.cleanupPoolOnClose = true;
203     this.cleanupConnectionOnClose = false;
204     this.connection = (ClusterConnection)connection;
205     this.configuration = connection.getConfiguration();
206 
207     this.pool = getDefaultExecutor(this.configuration);
208     this.finishSetup();
209   }
210 
211   // Marked Private @since 1.0
212   @InterfaceAudience.Private
213   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
214     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
215     if (maxThreads == 0) {
216       maxThreads = 1; // is there a better default?
217     }
218     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
219 
220     // Using the "direct handoff" approach, new threads will only be created
221     // if it is necessary and will grow unbounded. This could be bad but in HCM
222     // we only create as many Runnables as there are region servers. It means
223     // it also scales when new region servers are added.
224     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
225         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
226     pool.allowCoreThreadTimeOut(true);
227     return pool;
228   }
229 
230   /**
231    * Creates an object to access a HBase table.
232    * @param conf Configuration object to use.
233    * @param tableName Name of the table.
234    * @param pool ExecutorService to be used.
235    * @throws IOException if a remote or network exception occurs
236    * @deprecated Constructing HTable objects manually has been deprecated. Please use
237    * {@link Connection} to instantiate a {@link Table} instead.
238    */
239   @Deprecated
240   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
241       throws IOException {
242     this(conf, TableName.valueOf(tableName), pool);
243   }
244 
245   /**
246    * Creates an object to access a HBase table.
247    * @param conf Configuration object to use.
248    * @param tableName Name of the table.
249    * @param pool ExecutorService to be used.
250    * @throws IOException if a remote or network exception occurs
251    * @deprecated Constructing HTable objects manually has been deprecated. Please use
252    * {@link Connection} to instantiate a {@link Table} instead.
253    */
254   @Deprecated
255   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
256       throws IOException {
257     this.connection = ConnectionManager.getConnectionInternal(conf);
258     this.configuration = conf;
259     this.pool = pool;
260     if (pool == null) {
261       this.pool = getDefaultExecutor(conf);
262       this.cleanupPoolOnClose = true;
263     } else {
264       this.cleanupPoolOnClose = false;
265     }
266     this.tableName = tableName;
267     this.cleanupConnectionOnClose = true;
268     this.finishSetup();
269   }
270 
271   /**
272    * Creates an object to access a HBase table.
273    * @param tableName Name of the table.
274    * @param connection HConnection to be used.
275    * @param pool ExecutorService to be used.
276    * @throws IOException if a remote or network exception occurs.
277    * @deprecated Do not use, internal ctor.
278    */
279   @Deprecated
280   public HTable(final byte[] tableName, final Connection connection,
281       final ExecutorService pool) throws IOException {
282     this(TableName.valueOf(tableName), connection, pool);
283   }
284 
285   /** @deprecated Do not use, internal ctor. */
286   @Deprecated
287   public HTable(TableName tableName, final Connection connection,
288       final ExecutorService pool) throws IOException {
289     this(tableName, (ClusterConnection)connection, null, null, null, pool);
290   }
291 
292   /**
293    * Creates an object to access a HBase table.
294    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
295    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
296    * @param tableName Name of the table.
297    * @param connection HConnection to be used.
298    * @param pool ExecutorService to be used.
299    * @throws IOException if a remote or network exception occurs
300    */
301   @InterfaceAudience.Private
302   public HTable(TableName tableName, final ClusterConnection connection,
303       final ConnectionConfiguration tableConfig,
304       final RpcRetryingCallerFactory rpcCallerFactory,
305       final RpcControllerFactory rpcControllerFactory,
306       final ExecutorService pool) throws IOException {
307     if (connection == null || connection.isClosed()) {
308       throw new IllegalArgumentException("Connection is null or closed.");
309     }
310     this.tableName = tableName;
311     this.cleanupConnectionOnClose = false;
312     this.connection = connection;
313     this.configuration = connection.getConfiguration();
314     this.connConfiguration = tableConfig;
315     this.pool = pool;
316     if (pool == null) {
317       this.pool = getDefaultExecutor(this.configuration);
318       this.cleanupPoolOnClose = true;
319     } else {
320       this.cleanupPoolOnClose = false;
321     }
322 
323     this.rpcCallerFactory = rpcCallerFactory;
324     this.rpcControllerFactory = rpcControllerFactory;
325 
326     this.finishSetup();
327   }
328 
329   /**
330    * For internal testing. Uses Connection provided in {@code params}.
331    * @throws IOException
332    */
333   @VisibleForTesting
334   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
335     connection = conn;
336     tableName = params.getTableName();
337     connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
338     cleanupPoolOnClose = false;
339     cleanupConnectionOnClose = false;
340     // used from tests, don't trust the connection is real
341     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
342   }
343 
344   /**
345    * @return maxKeyValueSize from configuration.
346    */
347   public static int getMaxKeyValueSize(Configuration conf) {
348     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
349   }
350 
351   /**
352    * setup this HTable's parameter based on the passed configuration
353    */
354   private void finishSetup() throws IOException {
355     if (connConfiguration == null) {
356       connConfiguration = new ConnectionConfiguration(configuration);
357     }
358     this.operationTimeout = tableName.isSystemTable() ?
359         connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
360     this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
361         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
362     this.scannerCaching = connConfiguration.getScannerCaching();
363     this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
364     if (this.rpcCallerFactory == null) {
365       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
366     }
367     if (this.rpcControllerFactory == null) {
368       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
369     }
370 
371     // puts need to track errors globally due to how the APIs currently work.
372     multiAp = this.connection.getAsyncProcess();
373 
374     this.closed = false;
375 
376     this.locator = new HRegionLocator(tableName, connection);
377   }
378 
379   /**
380    * {@inheritDoc}
381    */
382   @Override
383   public Configuration getConfiguration() {
384     return configuration;
385   }
386 
387   /**
388    * Tells whether or not a table is enabled or not. This method creates a
389    * new HBase configuration, so it might make your unit tests fail due to
390    * incorrect ZK client port.
391    * @param tableName Name of table to check.
392    * @return {@code true} if table is online.
393    * @throws IOException if a remote or network exception occurs
394    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
395    */
396   @Deprecated
397   public static boolean isTableEnabled(String tableName) throws IOException {
398     return isTableEnabled(TableName.valueOf(tableName));
399   }
400 
401   /**
402    * Tells whether or not a table is enabled or not. This method creates a
403    * new HBase configuration, so it might make your unit tests fail due to
404    * incorrect ZK client port.
405    * @param tableName Name of table to check.
406    * @return {@code true} if table is online.
407    * @throws IOException if a remote or network exception occurs
408    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
409    */
410   @Deprecated
411   public static boolean isTableEnabled(byte[] tableName) throws IOException {
412     return isTableEnabled(TableName.valueOf(tableName));
413   }
414 
415   /**
416    * Tells whether or not a table is enabled or not. This method creates a
417    * new HBase configuration, so it might make your unit tests fail due to
418    * incorrect ZK client port.
419    * @param tableName Name of table to check.
420    * @return {@code true} if table is online.
421    * @throws IOException if a remote or network exception occurs
422    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
423    */
424   @Deprecated
425   public static boolean isTableEnabled(TableName tableName) throws IOException {
426     return isTableEnabled(HBaseConfiguration.create(), tableName);
427   }
428 
429   /**
430    * Tells whether or not a table is enabled or not.
431    * @param conf The Configuration object to use.
432    * @param tableName Name of table to check.
433    * @return {@code true} if table is online.
434    * @throws IOException if a remote or network exception occurs
435    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
436    */
437   @Deprecated
438   public static boolean isTableEnabled(Configuration conf, String tableName)
439   throws IOException {
440     return isTableEnabled(conf, TableName.valueOf(tableName));
441   }
442 
443   /**
444    * Tells whether or not a table is enabled or not.
445    * @param conf The Configuration object to use.
446    * @param tableName Name of table to check.
447    * @return {@code true} if table is online.
448    * @throws IOException if a remote or network exception occurs
449    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
450    */
451   @Deprecated
452   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
453   throws IOException {
454     return isTableEnabled(conf, TableName.valueOf(tableName));
455   }
456 
457   /**
458    * Tells whether or not a table is enabled or not.
459    * @param conf The Configuration object to use.
460    * @param tableName Name of table to check.
461    * @return {@code true} if table is online.
462    * @throws IOException if a remote or network exception occurs
463    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
464    */
465   @Deprecated
466   public static boolean isTableEnabled(Configuration conf,
467       final TableName tableName) throws IOException {
468     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
469       @Override
470       public Boolean connect(HConnection connection) throws IOException {
471         return connection.isTableEnabled(tableName);
472       }
473     });
474   }
475 
476   /**
477    * Find region location hosting passed row using cached info
478    * @param row Row to find.
479    * @return The location of the given row.
480    * @throws IOException if a remote or network exception occurs
481    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
482    */
483   @Deprecated
484   public HRegionLocation getRegionLocation(final String row)
485   throws IOException {
486     return getRegionLocation(Bytes.toBytes(row), false);
487   }
488 
489   /**
490    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead.
491    */
492   @Override
493   @Deprecated
494   public HRegionLocation getRegionLocation(final byte [] row)
495   throws IOException {
496     return locator.getRegionLocation(row);
497   }
498 
499   /**
500    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead.
501    */
502   @Override
503   @Deprecated
504   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
505   throws IOException {
506     return locator.getRegionLocation(row, reload);
507   }
508 
509   /**
510    * {@inheritDoc}
511    */
512   @Override
513   public byte [] getTableName() {
514     return this.tableName.getName();
515   }
516 
517   @Override
518   public TableName getName() {
519     return tableName;
520   }
521 
522   /**
523    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
524    * manipulations.
525    * @return An HConnection instance.
526    * @deprecated This method will be changed from public to package protected.
527    */
528   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
529   @Deprecated
530   @VisibleForTesting
531   public HConnection getConnection() {
532     return this.connection;
533   }
534 
535   /**
536    * Gets the number of rows that a scanner will fetch at once.
537    * <p>
538    * The default value comes from {@code hbase.client.scanner.caching}.
539    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
540    */
541   @Deprecated
542   public int getScannerCaching() {
543     return scannerCaching;
544   }
545 
546   /**
547    * Kept in 0.96 for backward compatibility
548    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
549    */
550   @Deprecated
551   public List<Row> getWriteBuffer() {
552     return mutator == null ? null : mutator.getWriteBuffer();
553   }
554 
555   /**
556    * Sets the number of rows that a scanner will fetch at once.
557    * <p>
558    * This will override the value specified by
559    * {@code hbase.client.scanner.caching}.
560    * Increasing this value will reduce the amount of work needed each time
561    * {@code next()} is called on a scanner, at the expense of memory use
562    * (since more rows will need to be maintained in memory by the scanners).
563    * @param scannerCaching the number of rows a scanner will fetch at once.
564    * @deprecated Use {@link Scan#setCaching(int)}
565    */
566   @Deprecated
567   public void setScannerCaching(int scannerCaching) {
568     this.scannerCaching = scannerCaching;
569   }
570 
571   /**
572    * {@inheritDoc}
573    */
574   @Override
575   public HTableDescriptor getTableDescriptor() throws IOException {
576     HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection,
577         rpcCallerFactory, rpcControllerFactory, operationTimeout, rpcTimeout);
578     if (htd != null) {
579       return new UnmodifyableHTableDescriptor(htd);
580     }
581     return null;
582   }
583 
584   /**
585    * To be removed in 2.0.0.
586    * @deprecated Since 1.1.0. Use {@link RegionLocator#getStartEndKeys()} instead
587    */
588   @Override
589   @Deprecated
590   public byte [][] getStartKeys() throws IOException {
591     return locator.getStartKeys();
592   }
593 
594   /**
595    * To be removed in 2.0.0.
596    * @deprecated Since 1.1.0. Use {@link RegionLocator#getEndKeys()} instead;
597    */
598   @Override
599   @Deprecated
600   public byte[][] getEndKeys() throws IOException {
601     return locator.getEndKeys();
602   }
603 
604   /**
605    * To be removed in 2.0.0.
606    * @deprecated Since 1.1.0. Use {@link RegionLocator#getStartEndKeys()} instead;
607    */
608   @Override
609   @Deprecated
610   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
611     return locator.getStartEndKeys();
612   }
613 
614   /**
615    * Gets all the regions and their address for this table.
616    * <p>
617    * This is mainly useful for the MapReduce integration.
618    * @return A map of HRegionInfo with it's server address
619    * @throws IOException if a remote or network exception occurs
620    * @deprecated This is no longer a public API.  Use {@link #getAllRegionLocations()} instead.
621    */
622   @Deprecated
623   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
624     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an HRegionLocation.
625     return MetaScanner.allTableRegions(this.connection, getName());
626   }
627 
628   /**
629    * Gets all the regions and their address for this table.
630    * <p>
631    * This is mainly useful for the MapReduce integration.
632    * @return A map of HRegionInfo with it's server address
633    * @throws IOException if a remote or network exception occurs
634    *
635    * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
636    */
637   @Override
638   @Deprecated
639   public List<HRegionLocation> getAllRegionLocations() throws IOException {
640     return locator.getAllRegionLocations();
641   }
642 
643   /**
644    * Get the corresponding regions for an arbitrary range of keys.
645    * <p>
646    * @param startKey Starting row in range, inclusive
647    * @param endKey Ending row in range, exclusive
648    * @return A list of HRegionLocations corresponding to the regions that
649    * contain the specified range
650    * @throws IOException if a remote or network exception occurs
651    * @deprecated This is no longer a public API
652    */
653   @Deprecated
654   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
655     final byte [] endKey) throws IOException {
656     return getRegionsInRange(startKey, endKey, false);
657   }
658 
659   /**
660    * Get the corresponding regions for an arbitrary range of keys.
661    * <p>
662    * @param startKey Starting row in range, inclusive
663    * @param endKey Ending row in range, exclusive
664    * @param reload true to reload information or false to use cached information
665    * @return A list of HRegionLocations corresponding to the regions that
666    * contain the specified range
667    * @throws IOException if a remote or network exception occurs
668    * @deprecated This is no longer a public API
669    */
670   @Deprecated
671   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
672       final byte [] endKey, final boolean reload) throws IOException {
673     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
674   }
675 
676   /**
677    * Get the corresponding start keys and regions for an arbitrary range of
678    * keys.
679    * <p>
680    * @param startKey Starting row in range, inclusive
681    * @param endKey Ending row in range
682    * @param includeEndKey true if endRow is inclusive, false if exclusive
683    * @return A pair of list of start keys and list of HRegionLocations that
684    *         contain the specified range
685    * @throws IOException if a remote or network exception occurs
686    * @deprecated This is no longer a public API
687    */
688   @Deprecated
689   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
690       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
691       throws IOException {
692     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
693   }
694 
695   /**
696    * Get the corresponding start keys and regions for an arbitrary range of
697    * keys.
698    * <p>
699    * @param startKey Starting row in range, inclusive
700    * @param endKey Ending row in range
701    * @param includeEndKey true if endRow is inclusive, false if exclusive
702    * @param reload true to reload information or false to use cached information
703    * @return A pair of list of start keys and list of HRegionLocations that
704    *         contain the specified range
705    * @throws IOException if a remote or network exception occurs
706    * @deprecated This is no longer a public API
707    */
708   @Deprecated
709   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
710       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
711       final boolean reload) throws IOException {
712     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
713     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
714       throw new IllegalArgumentException(
715         "Invalid range: " + Bytes.toStringBinary(startKey) +
716         " > " + Bytes.toStringBinary(endKey));
717     }
718     List<byte[]> keysInRange = new ArrayList<byte[]>();
719     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
720     byte[] currentKey = startKey;
721     do {
722       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
723       keysInRange.add(currentKey);
724       regionsInRange.add(regionLocation);
725       currentKey = regionLocation.getRegionInfo().getEndKey();
726     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
727         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
728             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
729     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
730         regionsInRange);
731   }
732 
733   /**
734    * {@inheritDoc}
735    * @deprecated Use reversed scan instead.
736    */
737    @Override
738    @Deprecated
739    public Result getRowOrBefore(final byte[] row, final byte[] family)
740        throws IOException {
741      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
742          tableName, row) {
743        @Override
744       public Result call(int callTimeout) throws IOException {
745          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
746          controller.setPriority(tableName);
747          controller.setCallTimeout(callTimeout);
748          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
749              getLocation().getRegionInfo().getRegionName(), row, family);
750          try {
751            ClientProtos.GetResponse response = getStub().get(controller, request);
752            if (!response.hasResult()) return null;
753            return ProtobufUtil.toResult(response.getResult());
754          } catch (ServiceException se) {
755            throw ProtobufUtil.getRemoteException(se);
756          }
757        }
758      };
759      return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
760          this.operationTimeout);
761    }
762 
763   /**
764    * The underlying {@link HTable} must not be closed.
765    * {@link HTableInterface#getScanner(Scan)} has other usage details.
766    */
767   @Override
768   public ResultScanner getScanner(final Scan scan) throws IOException {
769     if (scan.getBatch() > 0 && scan.isSmall()) {
770       throw new IllegalArgumentException("Small scan should not be used with batching");
771     }
772 
773     if (scan.getCaching() <= 0) {
774       scan.setCaching(getScannerCaching());
775     }
776     if (scan.getMaxResultSize() <= 0) {
777       scan.setMaxResultSize(scannerMaxResultSize);
778     }
779 
780     if (scan.isReversed()) {
781       if (scan.isSmall()) {
782         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
783             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
784             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
785       } else {
786         return new ReversedClientScanner(getConfiguration(), scan, getName(),
787             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
788             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
789       }
790     }
791 
792     if (scan.isSmall()) {
793       return new ClientSmallScanner(getConfiguration(), scan, getName(),
794           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
795           pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
796     } else {
797       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
798           this.rpcCallerFactory, this.rpcControllerFactory,
799           pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
800     }
801   }
802 
803   /**
804    * The underlying {@link HTable} must not be closed.
805    * {@link HTableInterface#getScanner(byte[])} has other usage details.
806    */
807   @Override
808   public ResultScanner getScanner(byte [] family) throws IOException {
809     Scan scan = new Scan();
810     scan.addFamily(family);
811     return getScanner(scan);
812   }
813 
814   /**
815    * The underlying {@link HTable} must not be closed.
816    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
817    */
818   @Override
819   public ResultScanner getScanner(byte [] family, byte [] qualifier)
820   throws IOException {
821     Scan scan = new Scan();
822     scan.addColumn(family, qualifier);
823     return getScanner(scan);
824   }
825 
826   /**
827    * {@inheritDoc}
828    */
829   @Override
830   public Result get(final Get get) throws IOException {
831     return get(get, get.isCheckExistenceOnly());
832   }
833 
834   private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
835     // if we are changing settings to the get, clone it.
836     if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
837       get = ReflectionUtils.newInstance(get.getClass(), get);
838       get.setCheckExistenceOnly(checkExistenceOnly);
839       if (get.getConsistency() == null){
840         get.setConsistency(defaultConsistency);
841       }
842     }
843 
844     if (get.getConsistency() == Consistency.STRONG) {
845       // Good old call.
846       final Get getReq = get;
847       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
848           getName(), get.getRow()) {
849         @Override
850         public Result call(int callTimeout) throws IOException {
851           ClientProtos.GetRequest request =
852             RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
853           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
854           controller.setPriority(tableName);
855           controller.setCallTimeout(callTimeout);
856           try {
857             ClientProtos.GetResponse response = getStub().get(controller, request);
858             if (response == null) return null;
859             return ProtobufUtil.toResult(response.getResult());
860           } catch (ServiceException se) {
861             throw ProtobufUtil.getRemoteException(se);
862           }
863         }
864       };
865       return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
866           this.operationTimeout);
867     }
868 
869     // Call that takes into account the replica
870     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
871       rpcControllerFactory, tableName, this.connection, get, pool,
872       connConfiguration.getRetriesNumber(),
873       operationTimeout,
874       connConfiguration.getPrimaryCallTimeoutMicroSecond());
875     return callable.call();
876   }
877 
878 
879   /**
880    * {@inheritDoc}
881    */
882   @Override
883   public Result[] get(List<Get> gets) throws IOException {
884     if (gets.size() == 1) {
885       return new Result[]{get(gets.get(0))};
886     }
887     try {
888       Object [] r1 = batch((List)gets);
889 
890       // translate.
891       Result [] results = new Result[r1.length];
892       int i=0;
893       for (Object o : r1) {
894         // batch ensures if there is a failure we get an exception instead
895         results[i++] = (Result) o;
896       }
897 
898       return results;
899     } catch (InterruptedException e) {
900       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
901     }
902   }
903 
904   /**
905    * {@inheritDoc}
906    */
907   @Override
908   public void batch(final List<? extends Row> actions, final Object[] results)
909       throws InterruptedException, IOException {
910     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
911     ars.waitUntilDone();
912     if (ars.hasError()) {
913       throw ars.getErrors();
914     }
915   }
916 
917   /**
918    * {@inheritDoc}
919    * @deprecated If any exception is thrown by one of the actions, there is no way to
920    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
921    */
922   @Deprecated
923   @Override
924   public Object[] batch(final List<? extends Row> actions)
925      throws InterruptedException, IOException {
926     Object[] results = new Object[actions.size()];
927     batch(actions, results);
928     return results;
929   }
930 
931   /**
932    * {@inheritDoc}
933    */
934   @Override
935   public <R> void batchCallback(
936       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
937       throws IOException, InterruptedException {
938     connection.processBatchCallback(actions, tableName, pool, results, callback);
939   }
940 
941   /**
942    * {@inheritDoc}
943    * @deprecated If any exception is thrown by one of the actions, there is no way to
944    * retrieve the partially executed results. Use
945    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
946    * instead.
947    */
948   @Deprecated
949   @Override
950   public <R> Object[] batchCallback(
951     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
952       InterruptedException {
953     Object[] results = new Object[actions.size()];
954     batchCallback(actions, results, callback);
955     return results;
956   }
957 
958   /**
959    * {@inheritDoc}
960    */
961   @Override
962   public void delete(final Delete delete)
963   throws IOException {
964     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
965         tableName, delete.getRow()) {
966       @Override
967       public Boolean call(int callTimeout) throws IOException {
968         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
969         controller.setPriority(tableName);
970         controller.setCallTimeout(callTimeout);
971 
972         try {
973           MutateRequest request = RequestConverter.buildMutateRequest(
974             getLocation().getRegionInfo().getRegionName(), delete);
975           MutateResponse response = getStub().mutate(controller, request);
976           return Boolean.valueOf(response.getProcessed());
977         } catch (ServiceException se) {
978           throw ProtobufUtil.getRemoteException(se);
979         }
980       }
981     };
982     rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
983         this.operationTimeout);
984   }
985 
986   /**
987    * {@inheritDoc}
988    */
989   @Override
990   public void delete(final List<Delete> deletes)
991   throws IOException {
992     Object[] results = new Object[deletes.size()];
993     try {
994       batch(deletes, results);
995     } catch (InterruptedException e) {
996       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
997     } finally {
998       // mutate list so that it is empty for complete success, or contains only failed records
999       // results are returned in the same order as the requests in list
1000       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
1001       for (int i = results.length - 1; i>=0; i--) {
1002         // if result is not null, it succeeded
1003         if (results[i] instanceof Result) {
1004           deletes.remove(i);
1005         }
1006       }
1007     }
1008   }
1009 
1010   /**
1011    * {@inheritDoc}
1012    * @throws IOException
1013    */
1014   @Override
1015   public void put(final Put put) throws IOException {
1016     getBufferedMutator().mutate(put);
1017     if (autoFlush) {
1018       flushCommits();
1019     }
1020   }
1021 
1022   /**
1023    * {@inheritDoc}
1024    * @throws IOException
1025    */
1026   @Override
1027   public void put(final List<Put> puts) throws IOException {
1028     getBufferedMutator().mutate(puts);
1029     if (autoFlush) {
1030       flushCommits();
1031     }
1032   }
1033 
1034   /**
1035    * {@inheritDoc}
1036    */
1037   @Override
1038   public void mutateRow(final RowMutations rm) throws IOException {
1039     RegionServerCallable<Void> callable =
1040         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1041       @Override
1042       public Void call(int callTimeout) throws IOException {
1043         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1044         controller.setPriority(tableName);
1045         controller.setCallTimeout(callTimeout);
1046         try {
1047           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1048             getLocation().getRegionInfo().getRegionName(), rm);
1049           regionMutationBuilder.setAtomic(true);
1050           MultiRequest request =
1051             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1052           ClientProtos.MultiResponse response = getStub().multi(controller, request);
1053           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1054           if (res.hasException()) {
1055             Throwable ex = ProtobufUtil.toException(res.getException());
1056             if(ex instanceof IOException) {
1057               throw (IOException)ex;
1058             }
1059             throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
1060           }
1061         } catch (ServiceException se) {
1062           throw ProtobufUtil.getRemoteException(se);
1063         }
1064         return null;
1065       }
1066     };
1067     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1068   }
1069 
1070   /**
1071    * {@inheritDoc}
1072    */
1073   @Override
1074   public Result append(final Append append) throws IOException {
1075     if (append.numFamilies() == 0) {
1076       throw new IOException(
1077           "Invalid arguments to append, no columns specified");
1078     }
1079 
1080     NonceGenerator ng = this.connection.getNonceGenerator();
1081     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1082     RegionServerCallable<Result> callable =
1083       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1084         @Override
1085         public Result call(int callTimeout) throws IOException {
1086           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1087           controller.setPriority(getTableName());
1088           controller.setCallTimeout(callTimeout);
1089           try {
1090             MutateRequest request = RequestConverter.buildMutateRequest(
1091               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1092             MutateResponse response = getStub().mutate(controller, request);
1093             if (!response.hasResult()) return null;
1094             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1095           } catch (ServiceException se) {
1096             throw ProtobufUtil.getRemoteException(se);
1097           }
1098         }
1099       };
1100     return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
1101         this.operationTimeout);
1102   }
1103 
1104   /**
1105    * {@inheritDoc}
1106    */
1107   @Override
1108   public Result increment(final Increment increment) throws IOException {
1109     if (!increment.hasFamilies()) {
1110       throw new IOException(
1111           "Invalid arguments to increment, no columns specified");
1112     }
1113     NonceGenerator ng = this.connection.getNonceGenerator();
1114     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1115     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1116         getName(), increment.getRow()) {
1117       @Override
1118       public Result call(int callTimeout) throws IOException {
1119         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1120         controller.setPriority(getTableName());
1121         controller.setCallTimeout(callTimeout);
1122         try {
1123           MutateRequest request = RequestConverter.buildMutateRequest(
1124             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1125           MutateResponse response = getStub().mutate(controller, request);
1126           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1127         } catch (ServiceException se) {
1128           throw ProtobufUtil.getRemoteException(se);
1129         }
1130       }
1131     };
1132     return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
1133         this.operationTimeout);
1134   }
1135 
1136   /**
1137    * {@inheritDoc}
1138    */
1139   @Override
1140   public long incrementColumnValue(final byte [] row, final byte [] family,
1141       final byte [] qualifier, final long amount)
1142   throws IOException {
1143     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1144   }
1145 
1146   /**
1147    * @deprecated As of release 0.96
1148    *             (<a href="https://issues.apache.org/jira/browse/HBASE-9508">HBASE-9508</a>).
1149    *             This will be removed in HBase 2.0.0.
1150    *             Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}.
1151    */
1152   @Deprecated
1153   @Override
1154   public long incrementColumnValue(final byte [] row, final byte [] family,
1155       final byte [] qualifier, final long amount, final boolean writeToWAL)
1156   throws IOException {
1157     return incrementColumnValue(row, family, qualifier, amount,
1158       writeToWAL? Durability.SYNC_WAL: Durability.SKIP_WAL);
1159   }
1160 
1161   /**
1162    * {@inheritDoc}
1163    */
1164   @Override
1165   public long incrementColumnValue(final byte [] row, final byte [] family,
1166       final byte [] qualifier, final long amount, final Durability durability)
1167   throws IOException {
1168     NullPointerException npe = null;
1169     if (row == null) {
1170       npe = new NullPointerException("row is null");
1171     } else if (family == null) {
1172       npe = new NullPointerException("family is null");
1173     } else if (qualifier == null) {
1174       npe = new NullPointerException("qualifier is null");
1175     }
1176     if (npe != null) {
1177       throw new IOException(
1178           "Invalid arguments to incrementColumnValue", npe);
1179     }
1180 
1181     NonceGenerator ng = this.connection.getNonceGenerator();
1182     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1183     RegionServerCallable<Long> callable =
1184       new RegionServerCallable<Long>(connection, getName(), row) {
1185         @Override
1186         public Long call(int callTimeout) throws IOException {
1187           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1188           controller.setPriority(getTableName());
1189           controller.setCallTimeout(callTimeout);
1190           try {
1191             MutateRequest request = RequestConverter.buildIncrementRequest(
1192               getLocation().getRegionInfo().getRegionName(), row, family,
1193               qualifier, amount, durability, nonceGroup, nonce);
1194             MutateResponse response = getStub().mutate(controller, request);
1195             Result result =
1196               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1197             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1198           } catch (ServiceException se) {
1199             throw ProtobufUtil.getRemoteException(se);
1200           }
1201         }
1202       };
1203     return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
1204         this.operationTimeout);
1205   }
1206 
1207   /**
1208    * {@inheritDoc}
1209    */
1210   @Override
1211   public boolean checkAndPut(final byte [] row,
1212       final byte [] family, final byte [] qualifier, final byte [] value,
1213       final Put put)
1214   throws IOException {
1215     RegionServerCallable<Boolean> callable =
1216       new RegionServerCallable<Boolean>(connection, getName(), row) {
1217         @Override
1218         public Boolean call(int callTimeout) throws IOException {
1219           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1220           controller.setPriority(tableName);
1221           controller.setCallTimeout(callTimeout);
1222           try {
1223             MutateRequest request = RequestConverter.buildMutateRequest(
1224                 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1225                 new BinaryComparator(value), CompareType.EQUAL, put);
1226             MutateResponse response = getStub().mutate(controller, request);
1227             return Boolean.valueOf(response.getProcessed());
1228           } catch (ServiceException se) {
1229             throw ProtobufUtil.getRemoteException(se);
1230           }
1231         }
1232       };
1233     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1234         this.operationTimeout);
1235   }
1236 
1237   /**
1238    * {@inheritDoc}
1239    */
1240   @Override
1241   public boolean checkAndPut(final byte [] row, final byte [] family,
1242       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1243       final Put put)
1244   throws IOException {
1245     RegionServerCallable<Boolean> callable =
1246       new RegionServerCallable<Boolean>(connection, getName(), row) {
1247         @Override
1248         public Boolean call(int callTimeout) throws IOException {
1249           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1250           controller.setPriority(tableName);
1251           controller.setCallTimeout(callTimeout);
1252           try {
1253             CompareType compareType = CompareType.valueOf(compareOp.name());
1254             MutateRequest request = RequestConverter.buildMutateRequest(
1255               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1256                 new BinaryComparator(value), compareType, put);
1257             MutateResponse response = getStub().mutate(controller, request);
1258             return Boolean.valueOf(response.getProcessed());
1259           } catch (ServiceException se) {
1260             throw ProtobufUtil.getRemoteException(se);
1261           }
1262         }
1263       };
1264     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1265         this.operationTimeout);
1266   }
1267 
1268   /**
1269    * {@inheritDoc}
1270    */
1271   @Override
1272   public boolean checkAndDelete(final byte [] row,
1273       final byte [] family, final byte [] qualifier, final byte [] value,
1274       final Delete delete)
1275   throws IOException {
1276     RegionServerCallable<Boolean> callable =
1277       new RegionServerCallable<Boolean>(connection, getName(), row) {
1278         @Override
1279         public Boolean call(int callTimeout) throws IOException {
1280           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1281           controller.setPriority(tableName);
1282           controller.setCallTimeout(callTimeout);
1283           try {
1284             MutateRequest request = RequestConverter.buildMutateRequest(
1285               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1286                 new BinaryComparator(value), CompareType.EQUAL, delete);
1287             MutateResponse response = getStub().mutate(controller, request);
1288             return Boolean.valueOf(response.getProcessed());
1289           } catch (ServiceException se) {
1290             throw ProtobufUtil.getRemoteException(se);
1291           }
1292         }
1293       };
1294     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1295         this.operationTimeout);
1296   }
1297 
1298   /**
1299    * {@inheritDoc}
1300    */
1301   @Override
1302   public boolean checkAndDelete(final byte [] row, final byte [] family,
1303       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1304       final Delete delete)
1305   throws IOException {
1306     RegionServerCallable<Boolean> callable =
1307       new RegionServerCallable<Boolean>(connection, getName(), row) {
1308         @Override
1309         public Boolean call(int callTimeout) throws IOException {
1310           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1311           controller.setPriority(tableName);
1312           controller.setCallTimeout(callTimeout);
1313           try {
1314             CompareType compareType = CompareType.valueOf(compareOp.name());
1315             MutateRequest request = RequestConverter.buildMutateRequest(
1316               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1317                 new BinaryComparator(value), compareType, delete);
1318             MutateResponse response = getStub().mutate(controller, request);
1319             return Boolean.valueOf(response.getProcessed());
1320           } catch (ServiceException se) {
1321             throw ProtobufUtil.getRemoteException(se);
1322           }
1323         }
1324       };
1325     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
1326         this.operationTimeout);
1327   }
1328 
1329   /**
1330    * {@inheritDoc}
1331    */
1332   @Override
1333   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1334       final CompareOp compareOp, final byte [] value, final RowMutations rm)
1335   throws IOException {
1336     RegionServerCallable<Boolean> callable =
1337         new RegionServerCallable<Boolean>(connection, getName(), row) {
1338           @Override
1339           public Boolean call(int callTimeout) throws IOException {
1340             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1341             controller.setPriority(tableName);
1342             controller.setCallTimeout(callTimeout);
1343             try {
1344               CompareType compareType = CompareType.valueOf(compareOp.name());
1345               MultiRequest request = RequestConverter.buildMutateRequest(
1346                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1347                   new BinaryComparator(value), compareType, rm);
1348               ClientProtos.MultiResponse response = getStub().multi(controller, request);
1349               ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1350               if (res.hasException()) {
1351                 Throwable ex = ProtobufUtil.toException(res.getException());
1352                 if(ex instanceof IOException) {
1353                   throw (IOException)ex;
1354                 }
1355                 throw new IOException("Failed to checkAndMutate row: "+
1356                     Bytes.toStringBinary(rm.getRow()), ex);
1357               }
1358               return Boolean.valueOf(response.getProcessed());
1359             } catch (ServiceException se) {
1360               throw ProtobufUtil.getRemoteException(se);
1361             }
1362           }
1363         };
1364     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1365   }
1366 
1367   /**
1368    * {@inheritDoc}
1369    */
1370   @Override
1371   public boolean exists(final Get get) throws IOException {
1372     Result r = get(get, true);
1373     assert r.getExists() != null;
1374     return r.getExists();
1375   }
1376 
1377   /**
1378    * {@inheritDoc}
1379    */
1380   @Override
1381   public boolean[] existsAll(final List<Get> gets) throws IOException {
1382     if (gets.isEmpty()) return new boolean[]{};
1383     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1384 
1385     ArrayList<Get> exists = new ArrayList<Get>(gets.size());
1386     for (Get g: gets){
1387       Get ge = new Get(g);
1388       ge.setCheckExistenceOnly(true);
1389       exists.add(ge);
1390     }
1391 
1392     Object[] r1;
1393     try {
1394       r1 = batch(exists);
1395     } catch (InterruptedException e) {
1396       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1397     }
1398 
1399     // translate.
1400     boolean[] results = new boolean[r1.length];
1401     int i = 0;
1402     for (Object o : r1) {
1403       // batch ensures if there is a failure we get an exception instead
1404       results[i++] = ((Result)o).getExists();
1405     }
1406 
1407     return results;
1408   }
1409 
1410   /**
1411    * {@inheritDoc}
1412    */
1413   @Override
1414   @Deprecated
1415   public Boolean[] exists(final List<Get> gets) throws IOException {
1416     boolean[] results = existsAll(gets);
1417     Boolean[] objectResults = new Boolean[results.length];
1418     for (int i = 0; i < results.length; ++i) {
1419       objectResults[i] = results[i];
1420     }
1421     return objectResults;
1422   }
1423 
1424   /**
1425    * {@inheritDoc}
1426    * @throws IOException
1427    */
1428   @Override
1429   public void flushCommits() throws IOException {
1430     if (mutator == null) {
1431       // nothing to flush if there's no mutator; don't bother creating one.
1432       return;
1433     }
1434     getBufferedMutator().flush();
1435   }
1436 
1437   /**
1438    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1439    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1440    *
1441    * @param list The collection of actions.
1442    * @param results An empty array, same size as list. If an exception is thrown,
1443    * you can test here for partial results, and to determine which actions
1444    * processed successfully.
1445    * @throws IOException if there are problems talking to META. Per-item
1446    * exceptions are stored in the results array.
1447    */
1448   public <R> void processBatchCallback(
1449     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1450     throws IOException, InterruptedException {
1451     this.batchCallback(list, results, callback);
1452   }
1453 
1454 
1455   /**
1456    * Parameterized batch processing, allowing varying return types for different
1457    * {@link Row} implementations.
1458    */
1459   public void processBatch(final List<? extends Row> list, final Object[] results)
1460     throws IOException, InterruptedException {
1461     this.batch(list, results);
1462   }
1463 
1464 
1465   @Override
1466   public void close() throws IOException {
1467     if (this.closed) {
1468       return;
1469     }
1470     flushCommits();
1471     if (cleanupPoolOnClose) {
1472       this.pool.shutdown();
1473       try {
1474         boolean terminated = false;
1475         do {
1476           // wait until the pool has terminated
1477           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1478         } while (!terminated);
1479       } catch (InterruptedException e) {
1480         this.pool.shutdownNow();
1481         LOG.warn("waitForTermination interrupted");
1482       }
1483     }
1484     if (cleanupConnectionOnClose) {
1485       if (this.connection != null) {
1486         this.connection.close();
1487       }
1488     }
1489     this.closed = true;
1490   }
1491 
1492   // validate for well-formedness
1493   public void validatePut(final Put put) throws IllegalArgumentException {
1494     validatePut(put, connConfiguration.getMaxKeyValueSize());
1495   }
1496 
1497   // validate for well-formedness
1498   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1499     if (put.isEmpty()) {
1500       throw new IllegalArgumentException("No columns to insert");
1501     }
1502     if (maxKeyValueSize > 0) {
1503       for (List<Cell> list : put.getFamilyCellMap().values()) {
1504         for (Cell cell : list) {
1505           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1506             throw new IllegalArgumentException("KeyValue size too large");
1507           }
1508         }
1509       }
1510     }
1511   }
1512 
1513   /**
1514    * {@inheritDoc}
1515    */
1516   @Override
1517   public boolean isAutoFlush() {
1518     return autoFlush;
1519   }
1520 
1521   /**
1522    * {@inheritDoc}
1523    */
1524   @Deprecated
1525   @Override
1526   public void setAutoFlush(boolean autoFlush) {
1527     this.autoFlush = autoFlush;
1528   }
1529 
1530   /**
1531    * {@inheritDoc}
1532    */
1533   @Override
1534   public void setAutoFlushTo(boolean autoFlush) {
1535     this.autoFlush = autoFlush;
1536   }
1537 
1538   /**
1539    * {@inheritDoc}
1540    */
1541   @Override
1542   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1543     this.autoFlush = autoFlush;
1544   }
1545 
1546   /**
1547    * Returns the maximum size in bytes of the write buffer for this HTable.
1548    * <p>
1549    * The default value comes from the configuration parameter
1550    * {@code hbase.client.write.buffer}.
1551    * @return The size of the write buffer in bytes.
1552    */
1553   @Override
1554   public long getWriteBufferSize() {
1555     if (mutator == null) {
1556       return connConfiguration.getWriteBufferSize();
1557     } else {
1558       return mutator.getWriteBufferSize();
1559     }
1560   }
1561 
1562   /**
1563    * Sets the size of the buffer in bytes.
1564    * <p>
1565    * If the new size is less than the current amount of data in the
1566    * write buffer, the buffer gets flushed.
1567    * @param writeBufferSize The new write buffer size, in bytes.
1568    * @throws IOException if a remote or network exception occurs.
1569    */
1570   @Override
1571   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1572     getBufferedMutator();
1573     mutator.setWriteBufferSize(writeBufferSize);
1574   }
1575 
1576   /**
1577    * The pool is used for mutli requests for this HTable
1578    * @return the pool used for mutli
1579    */
1580   ExecutorService getPool() {
1581     return this.pool;
1582   }
1583 
1584   /**
1585    * Enable or disable region cache prefetch for the table. It will be
1586    * applied for the given table's all HTable instances who share the same
1587    * connection. By default, the cache prefetch is enabled.
1588    * @param tableName name of table to configure.
1589    * @param enable Set to true to enable region cache prefetch. Or set to
1590    * false to disable it.
1591    * @throws IOException
1592    * @deprecated does nothing since 0.99
1593    */
1594   @Deprecated
1595   public static void setRegionCachePrefetch(final byte[] tableName,
1596       final boolean enable)  throws IOException {
1597   }
1598 
1599   /**
1600    * @deprecated does nothing since 0.99
1601    */
1602   @Deprecated
1603   public static void setRegionCachePrefetch(
1604       final TableName tableName,
1605       final boolean enable) throws IOException {
1606   }
1607 
1608   /**
1609    * Enable or disable region cache prefetch for the table. It will be
1610    * applied for the given table's all HTable instances who share the same
1611    * connection. By default, the cache prefetch is enabled.
1612    * @param conf The Configuration object to use.
1613    * @param tableName name of table to configure.
1614    * @param enable Set to true to enable region cache prefetch. Or set to
1615    * false to disable it.
1616    * @throws IOException
1617    * @deprecated does nothing since 0.99
1618    */
1619   @Deprecated
1620   public static void setRegionCachePrefetch(final Configuration conf,
1621       final byte[] tableName, final boolean enable) throws IOException {
1622   }
1623 
1624   /**
1625    * @deprecated does nothing since 0.99
1626    */
1627   @Deprecated
1628   public static void setRegionCachePrefetch(final Configuration conf,
1629       final TableName tableName,
1630       final boolean enable) throws IOException {
1631   }
1632 
1633   /**
1634    * Check whether region cache prefetch is enabled or not for the table.
1635    * @param conf The Configuration object to use.
1636    * @param tableName name of table to check
1637    * @return true if table's region cache prefecth is enabled. Otherwise
1638    * it is disabled.
1639    * @throws IOException
1640    * @deprecated always return false since 0.99
1641    */
1642   @Deprecated
1643   public static boolean getRegionCachePrefetch(final Configuration conf,
1644       final byte[] tableName) throws IOException {
1645     return false;
1646   }
1647 
1648   /**
1649    * @deprecated always return false since 0.99
1650    */
1651   @Deprecated
1652   public static boolean getRegionCachePrefetch(final Configuration conf,
1653       final TableName tableName) throws IOException {
1654     return false;
1655   }
1656 
1657   /**
1658    * Check whether region cache prefetch is enabled or not for the table.
1659    * @param tableName name of table to check
1660    * @return true if table's region cache prefecth is enabled. Otherwise
1661    * it is disabled.
1662    * @throws IOException
1663    * @deprecated always return false since 0.99
1664    */
1665   @Deprecated
1666   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1667     return false;
1668   }
1669 
1670   /**
1671    * @deprecated always return false since 0.99
1672    */
1673   @Deprecated
1674   public static boolean getRegionCachePrefetch(
1675       final TableName tableName) throws IOException {
1676     return false;
1677   }
1678 
1679   /**
1680    * Explicitly clears the region cache to fetch the latest value from META.
1681    * This is a power user function: avoid unless you know the ramifications.
1682    */
1683   public void clearRegionCache() {
1684     this.connection.clearRegionCache();
1685   }
1686 
1687   /**
1688    * {@inheritDoc}
1689    */
1690   @Override
1691   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1692     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1693   }
1694 
1695   /**
1696    * {@inheritDoc}
1697    */
1698   @Override
1699   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1700       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1701       throws ServiceException, Throwable {
1702     final Map<byte[],R> results =  Collections.synchronizedMap(
1703         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1704     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1705       @Override
1706       public void update(byte[] region, byte[] row, R value) {
1707         if (region != null) {
1708           results.put(region, value);
1709         }
1710       }
1711     });
1712     return results;
1713   }
1714 
1715   /**
1716    * {@inheritDoc}
1717    */
1718   @Override
1719   public <T extends Service, R> void coprocessorService(final Class<T> service,
1720       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1721       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1722 
1723     // get regions covered by the row range
1724     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1725 
1726     Map<byte[],Future<R>> futures =
1727         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1728     for (final byte[] r : keys) {
1729       final RegionCoprocessorRpcChannel channel =
1730           new RegionCoprocessorRpcChannel(connection, tableName, r);
1731       Future<R> future = pool.submit(
1732           new Callable<R>() {
1733             @Override
1734             public R call() throws Exception {
1735               T instance = ProtobufUtil.newServiceStub(service, channel);
1736               R result = callable.call(instance);
1737               byte[] region = channel.getLastRegion();
1738               if (callback != null) {
1739                 callback.update(region, r, result);
1740               }
1741               return result;
1742             }
1743           });
1744       futures.put(r, future);
1745     }
1746     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1747       try {
1748         e.getValue().get();
1749       } catch (ExecutionException ee) {
1750         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1751             + Bytes.toStringBinary(e.getKey()), ee);
1752         throw ee.getCause();
1753       } catch (InterruptedException ie) {
1754         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1755             + " for row " + Bytes.toStringBinary(e.getKey()))
1756             .initCause(ie);
1757       }
1758     }
1759   }
1760 
1761   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1762   throws IOException {
1763     if (start == null) {
1764       start = HConstants.EMPTY_START_ROW;
1765     }
1766     if (end == null) {
1767       end = HConstants.EMPTY_END_ROW;
1768     }
1769     return getKeysAndRegionsInRange(start, end, true).getFirst();
1770   }
1771 
1772   public void setOperationTimeout(int operationTimeout) {
1773     this.operationTimeout = operationTimeout;
1774   }
1775 
1776   public int getOperationTimeout() {
1777     return operationTimeout;
1778   }
1779 
1780   public void setRpcTimeout(int rpcTimeout) {
1781     this.rpcTimeout = rpcTimeout;
1782   }
1783 
1784   public int getRpcTimeout() {
1785     return rpcTimeout;
1786   }
1787 
1788   @Override
1789   public String toString() {
1790     return tableName + ";" + connection;
1791   }
1792 
1793   /**
1794    * {@inheritDoc}
1795    */
1796   @Override
1797   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1798       Descriptors.MethodDescriptor methodDescriptor, Message request,
1799       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1800     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1801         Bytes.BYTES_COMPARATOR));
1802     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1803         new Callback<R>() {
1804 
1805           @Override
1806           public void update(byte[] region, byte[] row, R result) {
1807             if (region != null) {
1808               results.put(region, result);
1809             }
1810           }
1811         });
1812     return results;
1813   }
1814 
1815   /**
1816    * {@inheritDoc}
1817    */
1818   @Override
1819   public <R extends Message> void batchCoprocessorService(
1820       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1821       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1822       throws ServiceException, Throwable {
1823 
1824     if (startKey == null) {
1825       startKey = HConstants.EMPTY_START_ROW;
1826     }
1827     if (endKey == null) {
1828       endKey = HConstants.EMPTY_END_ROW;
1829     }
1830     // get regions covered by the row range
1831     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1832         getKeysAndRegionsInRange(startKey, endKey, true);
1833     List<byte[]> keys = keysAndRegions.getFirst();
1834     List<HRegionLocation> regions = keysAndRegions.getSecond();
1835 
1836     // check if we have any calls to make
1837     if (keys.isEmpty()) {
1838       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1839           ", end=" + Bytes.toStringBinary(endKey));
1840       return;
1841     }
1842 
1843     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1844     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1845         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1846     for (int i = 0; i < keys.size(); i++) {
1847       final byte[] rowKey = keys.get(i);
1848       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1849       RegionCoprocessorServiceExec exec =
1850           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1851       execs.add(exec);
1852       execsByRow.put(rowKey, exec);
1853     }
1854 
1855     // tracking for any possible deserialization errors on success callback
1856     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1857     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1858     final List<Row> callbackErrorActions = new ArrayList<Row>();
1859     final List<String> callbackErrorServers = new ArrayList<String>();
1860     Object[] results = new Object[execs.size()];
1861 
1862     AsyncProcess asyncProcess =
1863         new AsyncProcess(connection, configuration, pool,
1864             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1865             true, RpcControllerFactory.instantiate(configuration));
1866 
1867     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1868         new Callback<ClientProtos.CoprocessorServiceResult>() {
1869           @Override
1870           public void update(byte[] region, byte[] row,
1871                               ClientProtos.CoprocessorServiceResult serviceResult) {
1872             if (LOG.isTraceEnabled()) {
1873               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1874                   ": region=" + Bytes.toStringBinary(region) +
1875                   ", row=" + Bytes.toStringBinary(row) +
1876                   ", value=" + serviceResult.getValue().getValue());
1877             }
1878             try {
1879               Message.Builder builder = responsePrototype.newBuilderForType();
1880               ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1881               callback.update(region, row, (R) builder.build());
1882             } catch (IOException e) {
1883               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1884                   e);
1885               callbackErrorExceptions.add(e);
1886               callbackErrorActions.add(execsByRow.get(row));
1887               callbackErrorServers.add("null");
1888             }
1889           }
1890         }, results);
1891 
1892     future.waitUntilDone();
1893 
1894     if (future.hasError()) {
1895       throw future.getErrors();
1896     } else if (!callbackErrorExceptions.isEmpty()) {
1897       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1898           callbackErrorServers);
1899     }
1900   }
1901 
1902   public RegionLocator getRegionLocator() {
1903     return this.locator;
1904   }
1905 
1906   @VisibleForTesting
1907   BufferedMutator getBufferedMutator() throws IOException {
1908     if (mutator == null) {
1909       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1910           new BufferedMutatorParams(tableName)
1911               .pool(pool)
1912               .writeBufferSize(connConfiguration.getWriteBufferSize())
1913               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
1914       );
1915     }
1916     return mutator;
1917   }
1918 }