View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.TreeMap;
30  import java.util.concurrent.Callable;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.SynchronousQueue;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HRegionLocation;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.KeyValueUtil;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.TableNotFoundException;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.classification.InterfaceStability;
53  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
54  import org.apache.hadoop.hbase.client.coprocessor.Batch;
55  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
56  import org.apache.hadoop.hbase.filter.BinaryComparator;
57  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
58  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
59  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
60  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
61  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
62  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
63  import org.apache.hadoop.hbase.protobuf.RequestConverter;
64  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
69  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
70  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
71  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
72  import org.apache.hadoop.hbase.util.Bytes;
73  import org.apache.hadoop.hbase.util.Pair;
74  import org.apache.hadoop.hbase.util.Threads;
75  
76  import com.google.common.annotations.VisibleForTesting;
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.InvalidProtocolBufferException;
79  import com.google.protobuf.Message;
80  import com.google.protobuf.Service;
81  import com.google.protobuf.ServiceException;
82  
83  /**
84   * An implementation of {@link Table}. Used to communicate with a single HBase table.
85   * Lightweight. Get as needed and just close when done.
86   * Instances of this class SHOULD NOT be constructed directly.
87   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
88   * class comment for an example of how.
89   *
90   * <p>This class is NOT thread safe for reads nor writes.
91   * In the case of writes (Put, Delete), the underlying write buffer can
92   * be corrupted if multiple threads contend over a single HTable instance.
93   * In the case of reads, some fields used by a Scan are shared among all threads.
94   *
95   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
96   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
97   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
98   * Interface Classification</a>
99   * There are no guarantees for backwards source / binary compatibility and methods or class can
100  * change or go away without deprecation.
101  *
102  * @see Table
103  * @see Admin
104  * @see Connection
105  * @see ConnectionFactory
106  */
107 @InterfaceAudience.Private
108 @InterfaceStability.Stable
109 public class HTable implements HTableInterface {
110   private static final Log LOG = LogFactory.getLog(HTable.class);
111   protected ClusterConnection connection;
112   private final TableName tableName;
113   private volatile Configuration configuration;
114   private TableConfiguration tableConfiguration;
115   protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
116   private long writeBufferSize;
117   private boolean autoFlush = true;
118   protected long currentWriteBufferSize = 0 ;
119   private boolean closed = false;
120   protected int scannerCaching;
121   private ExecutorService pool;  // For Multi & Scan
122   private int operationTimeout;
123   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
124   private final boolean cleanupConnectionOnClose; // close the connection in close()
125   private Consistency defaultConsistency = Consistency.STRONG;
126   private HRegionLocator locator;
127 
128   /** The Async process for puts with autoflush set to false or multiputs */
129   protected AsyncProcess ap;
130   /** The Async process for batch */
131   protected AsyncProcess multiAp;
132   private RpcRetryingCallerFactory rpcCallerFactory;
133   private RpcControllerFactory rpcControllerFactory;
134 
135   /**
136    * Creates an object to access a HBase table.
137    * @param conf Configuration object to use.
138    * @param tableName Name of the table.
139    * @throws IOException if a remote or network exception occurs
140    * @deprecated Constructing HTable objects manually has been deprecated. Please use
141    * {@link Connection} to instantiate a {@link Table} instead.
142    */
143   @Deprecated
144   public HTable(Configuration conf, final String tableName)
145   throws IOException {
146     this(conf, TableName.valueOf(tableName));
147   }
148 
149   /**
150    * Creates an object to access a HBase table.
151    * @param conf Configuration object to use.
152    * @param tableName Name of the table.
153    * @throws IOException if a remote or network exception occurs
154    * @deprecated Constructing HTable objects manually has been deprecated. Please use
155    * {@link Connection} to instantiate a {@link Table} instead.
156    */
157   @Deprecated
158   public HTable(Configuration conf, final byte[] tableName)
159   throws IOException {
160     this(conf, TableName.valueOf(tableName));
161   }
162 
163   /**
164    * Creates an object to access a HBase table.
165    * @param conf Configuration object to use.
166    * @param tableName table name pojo
167    * @throws IOException if a remote or network exception occurs
168    * @deprecated Constructing HTable objects manually has been deprecated. Please use
169    * {@link Connection} to instantiate a {@link Table} instead.
170    */
171   @Deprecated
172   public HTable(Configuration conf, final TableName tableName)
173   throws IOException {
174     this.tableName = tableName;
175     this.cleanupPoolOnClose = true;
176     this.cleanupConnectionOnClose = true;
177     if (conf == null) {
178       this.connection = null;
179       return;
180     }
181     this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
182     this.configuration = conf;
183 
184     this.pool = getDefaultExecutor(conf);
185     this.finishSetup();
186   }
187 
188   /**
189    * Creates an object to access a HBase table.
190    * @param tableName Name of the table.
191    * @param connection HConnection to be used.
192    * @throws IOException if a remote or network exception occurs
193    * @deprecated Do not use.
194    */
195   @Deprecated
196   public HTable(TableName tableName, Connection connection) throws IOException {
197     this.tableName = tableName;
198     this.cleanupPoolOnClose = true;
199     this.cleanupConnectionOnClose = false;
200     this.connection = (ClusterConnection)connection;
201     this.configuration = connection.getConfiguration();
202 
203     this.pool = getDefaultExecutor(this.configuration);
204     this.finishSetup();
205   }
206 
207   // Marked Private @since 1.0
208   @InterfaceAudience.Private
209   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
210     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
211     if (maxThreads == 0) {
212       maxThreads = 1; // is there a better default?
213     }
214     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
215 
216     // Using the "direct handoff" approach, new threads will only be created
217     // if it is necessary and will grow unbounded. This could be bad but in HCM
218     // we only create as many Runnables as there are region servers. It means
219     // it also scales when new region servers are added.
220     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
221         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
222     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
223     return pool;
224   }
225 
226   /**
227    * Creates an object to access a HBase table.
228    * @param conf Configuration object to use.
229    * @param tableName Name of the table.
230    * @param pool ExecutorService to be used.
231    * @throws IOException if a remote or network exception occurs
232    * @deprecated Constructing HTable objects manually has been deprecated. Please use
233    * {@link Connection} to instantiate a {@link Table} instead.
234    */
235   @Deprecated
236   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
237       throws IOException {
238     this(conf, TableName.valueOf(tableName), pool);
239   }
240 
241   /**
242    * Creates an object to access a HBase table.
243    * @param conf Configuration object to use.
244    * @param tableName Name of the table.
245    * @param pool ExecutorService to be used.
246    * @throws IOException if a remote or network exception occurs
247    * @deprecated Constructing HTable objects manually has been deprecated. Please use
248    * {@link Connection} to instantiate a {@link Table} instead.
249    */
250   @Deprecated
251   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
252       throws IOException {
253     this.connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
254     this.configuration = conf;
255     this.pool = pool;
256     if (pool == null) {
257       this.pool = getDefaultExecutor(conf);
258       this.cleanupPoolOnClose = true;
259     } else {
260       this.cleanupPoolOnClose = false;
261     }
262     this.tableName = tableName;
263     this.cleanupConnectionOnClose = true;
264     this.finishSetup();
265   }
266 
267   /**
268    * Creates an object to access a HBase table.
269    * @param tableName Name of the table.
270    * @param connection HConnection to be used.
271    * @param pool ExecutorService to be used.
272    * @throws IOException if a remote or network exception occurs.
273    * @deprecated Do not use, internal ctor.
274    */
275   @Deprecated
276   public HTable(final byte[] tableName, final Connection connection,
277       final ExecutorService pool) throws IOException {
278     this(TableName.valueOf(tableName), connection, pool);
279   }
280 
281   /** @deprecated Do not use, internal ctor. */
282   @Deprecated
283   public HTable(TableName tableName, final Connection connection,
284       final ExecutorService pool) throws IOException {
285     this(tableName, (ClusterConnection)connection, null, null, null, pool);
286   }
287 
288   /**
289    * Creates an object to access a HBase table.
290    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
291    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
292    * @param tableName Name of the table.
293    * @param connection HConnection to be used.
294    * @param pool ExecutorService to be used.
295    * @throws IOException if a remote or network exception occurs
296    */
297   @InterfaceAudience.Private
298   public HTable(TableName tableName, final ClusterConnection connection,
299       final TableConfiguration tableConfig,
300       final RpcRetryingCallerFactory rpcCallerFactory,
301       final RpcControllerFactory rpcControllerFactory,
302       final ExecutorService pool) throws IOException {
303     if (connection == null || connection.isClosed()) {
304       throw new IllegalArgumentException("Connection is null or closed.");
305     }
306     this.tableName = tableName;
307     this.cleanupConnectionOnClose = false;
308     this.connection = connection;
309     this.configuration = connection.getConfiguration();
310     this.tableConfiguration = tableConfig;
311     this.pool = pool;
312     if (pool == null) {
313       this.pool = getDefaultExecutor(this.configuration);
314       this.cleanupPoolOnClose = true;
315     } else {
316       this.cleanupPoolOnClose = false;
317     }
318 
319     this.rpcCallerFactory = rpcCallerFactory;
320     this.rpcControllerFactory = rpcControllerFactory;
321 
322     this.finishSetup();
323   }
324 
325   /**
326    * For internal testing.
327    * @throws IOException
328    */
329   @VisibleForTesting
330   protected HTable() throws IOException {
331     tableName = null;
332     tableConfiguration = new TableConfiguration();
333     cleanupPoolOnClose = false;
334     cleanupConnectionOnClose = false;
335   }
336 
337   /**
338    * @return maxKeyValueSize from configuration.
339    */
340   public static int getMaxKeyValueSize(Configuration conf) {
341     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
342   }
343 
344   /**
345    * setup this HTable's parameter based on the passed configuration
346    */
347   private void finishSetup() throws IOException {
348     if (tableConfiguration == null) {
349       tableConfiguration = new TableConfiguration(configuration);
350     }
351 
352     this.operationTimeout = tableName.isSystemTable() ?
353         tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
354     this.writeBufferSize = tableConfiguration.getWriteBufferSize();
355     this.scannerCaching = tableConfiguration.getScannerCaching();
356 
357     if (this.rpcCallerFactory == null) {
358       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
359     }
360     if (this.rpcControllerFactory == null) {
361       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
362     }
363 
364     // puts need to track errors globally due to how the APIs currently work.
365     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true,
366         rpcControllerFactory);
367     multiAp = this.connection.getAsyncProcess();
368     this.locator = new HRegionLocator(getName(), connection);
369   }
370 
371   /**
372    * {@inheritDoc}
373    */
374   @Override
375   public Configuration getConfiguration() {
376     return configuration;
377   }
378 
379   /**
380    * Tells whether or not a table is enabled or not. This method creates a
381    * new HBase configuration, so it might make your unit tests fail due to
382    * incorrect ZK client port.
383    * @param tableName Name of table to check.
384    * @return {@code true} if table is online.
385    * @throws IOException if a remote or network exception occurs
386    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
387    */
388   @Deprecated
389   public static boolean isTableEnabled(String tableName) throws IOException {
390     return isTableEnabled(TableName.valueOf(tableName));
391   }
392 
393   /**
394    * Tells whether or not a table is enabled or not. This method creates a
395    * new HBase configuration, so it might make your unit tests fail due to
396    * incorrect ZK client port.
397    * @param tableName Name of table to check.
398    * @return {@code true} if table is online.
399    * @throws IOException if a remote or network exception occurs
400    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
401    */
402   @Deprecated
403   public static boolean isTableEnabled(byte[] tableName) throws IOException {
404     return isTableEnabled(TableName.valueOf(tableName));
405   }
406 
407   /**
408    * Tells whether or not a table is enabled or not. This method creates a
409    * new HBase configuration, so it might make your unit tests fail due to
410    * incorrect ZK client port.
411    * @param tableName Name of table to check.
412    * @return {@code true} if table is online.
413    * @throws IOException if a remote or network exception occurs
414    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
415    */
416   @Deprecated
417   public static boolean isTableEnabled(TableName tableName) throws IOException {
418     return isTableEnabled(HBaseConfiguration.create(), tableName);
419   }
420 
421   /**
422    * Tells whether or not a table is enabled or not.
423    * @param conf The Configuration object to use.
424    * @param tableName Name of table to check.
425    * @return {@code true} if table is online.
426    * @throws IOException if a remote or network exception occurs
427    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
428    */
429   @Deprecated
430   public static boolean isTableEnabled(Configuration conf, String tableName)
431   throws IOException {
432     return isTableEnabled(conf, TableName.valueOf(tableName));
433   }
434 
435   /**
436    * Tells whether or not a table is enabled or not.
437    * @param conf The Configuration object to use.
438    * @param tableName Name of table to check.
439    * @return {@code true} if table is online.
440    * @throws IOException if a remote or network exception occurs
441    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
442    */
443   @Deprecated
444   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
445   throws IOException {
446     return isTableEnabled(conf, TableName.valueOf(tableName));
447   }
448 
449   /**
450    * Tells whether or not a table is enabled or not.
451    * @param conf The Configuration object to use.
452    * @param tableName Name of table to check.
453    * @return {@code true} if table is online.
454    * @throws IOException if a remote or network exception occurs
455    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
456    */
457   @Deprecated
458   public static boolean isTableEnabled(Configuration conf,
459       final TableName tableName) throws IOException {
460     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
461       @Override
462       public Boolean connect(HConnection connection) throws IOException {
463         return connection.isTableEnabled(tableName);
464       }
465     });
466   }
467 
468   /**
469    * Find region location hosting passed row using cached info
470    * @param row Row to find.
471    * @return The location of the given row.
472    * @throws IOException if a remote or network exception occurs
473    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
474    */
475   @Deprecated
476   public HRegionLocation getRegionLocation(final String row)
477   throws IOException {
478     return getRegionLocation(Bytes.toBytes(row), false);
479   }
480 
481   /**
482    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead.
483    */
484   @Deprecated
485   public HRegionLocation getRegionLocation(final byte [] row)
486   throws IOException {
487     return locator.getRegionLocation(row);
488   }
489 
490   /**
491    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead.
492    */
493   @Deprecated
494   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
495   throws IOException {
496     return locator.getRegionLocation(row, reload);
497   }
498 
499   /**
500    * {@inheritDoc}
501    */
502   @Override
503   public byte [] getTableName() {
504     return this.tableName.getName();
505   }
506 
507   @Override
508   public TableName getName() {
509     return tableName;
510   }
511 
512   /**
513    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
514    * manipulations.
515    * @return An HConnection instance.
516    * @deprecated This method will be changed from public to package protected.
517    */
518   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
519   @Deprecated
520   @VisibleForTesting
521   public HConnection getConnection() {
522     return this.connection;
523   }
524 
525   /**
526    * Gets the number of rows that a scanner will fetch at once.
527    * <p>
528    * The default value comes from {@code hbase.client.scanner.caching}.
529    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
530    */
531   @Deprecated
532   public int getScannerCaching() {
533     return scannerCaching;
534   }
535 
536   /**
537    * Kept in 0.96 for backward compatibility
538    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
539    */
540   @Deprecated
541   public List<Row> getWriteBuffer() {
542     return writeAsyncBuffer;
543   }
544 
545   /**
546    * Sets the number of rows that a scanner will fetch at once.
547    * <p>
548    * This will override the value specified by
549    * {@code hbase.client.scanner.caching}.
550    * Increasing this value will reduce the amount of work needed each time
551    * {@code next()} is called on a scanner, at the expense of memory use
552    * (since more rows will need to be maintained in memory by the scanners).
553    * @param scannerCaching the number of rows a scanner will fetch at once.
554    * @deprecated Use {@link Scan#setCaching(int)}
555    */
556   @Deprecated
557   public void setScannerCaching(int scannerCaching) {
558     this.scannerCaching = scannerCaching;
559   }
560 
561   /**
562    * {@inheritDoc}
563    */
564   @Override
565   public HTableDescriptor getTableDescriptor() throws IOException {
566     // TODO: This is the same as HBaseAdmin.getTableDescriptor(). Only keep one.
567     if (tableName == null) return null;
568     if (tableName.equals(TableName.META_TABLE_NAME)) {
569       return HTableDescriptor.META_TABLEDESC;
570     }
571     HTableDescriptor htd = executeMasterCallable(
572       new MasterCallable<HTableDescriptor>(getConnection()) {
573       @Override
574       public HTableDescriptor call(int callTimeout) throws ServiceException {
575         GetTableDescriptorsResponse htds;
576         GetTableDescriptorsRequest req =
577             RequestConverter.buildGetTableDescriptorsRequest(tableName);
578         htds = master.getTableDescriptors(null, req);
579 
580         if (!htds.getTableSchemaList().isEmpty()) {
581           return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
582         }
583         return null;
584       }
585     });
586     if (htd != null) {
587       return new UnmodifyableHTableDescriptor(htd);
588     }
589     throw new TableNotFoundException(tableName.getNameAsString());
590   }
591 
592   private <V> V executeMasterCallable(MasterCallable<V> callable) throws IOException {
593     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
594     try {
595       return caller.callWithRetries(callable, operationTimeout);
596     } finally {
597       callable.close();
598     }
599   }
600 
601   /**
602    * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
603    */
604   @Deprecated
605   public byte [][] getStartKeys() throws IOException {
606     return locator.getStartKeys();
607   }
608 
609   /**
610    * @deprecated Use {@link RegionLocator#getEndKeys()} instead;
611    */
612   @Deprecated
613   public byte[][] getEndKeys() throws IOException {
614     return locator.getEndKeys();
615   }
616 
617   /**
618    * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
619    */
620   @Deprecated
621   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
622     return locator.getStartEndKeys();
623   }
624 
625   /**
626    * Gets all the regions and their address for this table.
627    * <p>
628    * This is mainly useful for the MapReduce integration.
629    * @return A map of HRegionInfo with it's server address
630    * @throws IOException if a remote or network exception occurs
631    * @deprecated This is no longer a public API.  Use {@link #getAllRegionLocations()} instead.
632    */
633   @Deprecated
634   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
635     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular,
636     // returns an HRegionLocation.
637     return MetaScanner.allTableRegions(this.connection, getName());
638   }
639 
640   /**
641    * Gets all the regions and their address for this table.
642    * <p>
643    * This is mainly useful for the MapReduce integration.
644    * @return A map of HRegionInfo with it's server address
645    * @throws IOException if a remote or network exception occurs
646    * 
647    * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
648    */
649   @Deprecated
650   public List<HRegionLocation> getAllRegionLocations() throws IOException {
651     return locator.getAllRegionLocations();
652   }
653 
654   /**
655    * Get the corresponding regions for an arbitrary range of keys.
656    * <p>
657    * @param startKey Starting row in range, inclusive
658    * @param endKey Ending row in range, exclusive
659    * @return A list of HRegionLocations corresponding to the regions that
660    * contain the specified range
661    * @throws IOException if a remote or network exception occurs
662    * @deprecated This is no longer a public API
663    */
664   @Deprecated
665   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
666     final byte [] endKey) throws IOException {
667     return getRegionsInRange(startKey, endKey, false);
668   }
669 
670   /**
671    * Get the corresponding regions for an arbitrary range of keys.
672    * <p>
673    * @param startKey Starting row in range, inclusive
674    * @param endKey Ending row in range, exclusive
675    * @param reload true to reload information or false to use cached information
676    * @return A list of HRegionLocations corresponding to the regions that
677    * contain the specified range
678    * @throws IOException if a remote or network exception occurs
679    * @deprecated This is no longer a public API
680    */
681   @Deprecated
682   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
683       final byte [] endKey, final boolean reload) throws IOException {
684     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
685   }
686 
687   /**
688    * Get the corresponding start keys and regions for an arbitrary range of
689    * keys.
690    * <p>
691    * @param startKey Starting row in range, inclusive
692    * @param endKey Ending row in range
693    * @param includeEndKey true if endRow is inclusive, false if exclusive
694    * @return A pair of list of start keys and list of HRegionLocations that
695    *         contain the specified range
696    * @throws IOException if a remote or network exception occurs
697    * @deprecated This is no longer a public API
698    */
699   @Deprecated
700   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
701       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
702       throws IOException {
703     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
704   }
705 
706   /**
707    * Get the corresponding start keys and regions for an arbitrary range of
708    * keys.
709    * <p>
710    * @param startKey Starting row in range, inclusive
711    * @param endKey Ending row in range
712    * @param includeEndKey true if endRow is inclusive, false if exclusive
713    * @param reload true to reload information or false to use cached information
714    * @return A pair of list of start keys and list of HRegionLocations that
715    *         contain the specified range
716    * @throws IOException if a remote or network exception occurs
717    * @deprecated This is no longer a public API
718    */
719   @Deprecated
720   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
721       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
722       final boolean reload) throws IOException {
723     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
724     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
725       throw new IllegalArgumentException(
726         "Invalid range: " + Bytes.toStringBinary(startKey) +
727         " > " + Bytes.toStringBinary(endKey));
728     }
729     List<byte[]> keysInRange = new ArrayList<byte[]>();
730     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
731     byte[] currentKey = startKey;
732     do {
733       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
734       keysInRange.add(currentKey);
735       regionsInRange.add(regionLocation);
736       currentKey = regionLocation.getRegionInfo().getEndKey();
737     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
738         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
739             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
740     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
741         regionsInRange);
742   }
743 
744   /**
745    * {@inheritDoc}
746    * @deprecated Use reversed scan instead.
747    */
748    @Override
749    @Deprecated
750    public Result getRowOrBefore(final byte[] row, final byte[] family)
751        throws IOException {
752      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
753          tableName, row) {
754        @Override
755       public Result call(int callTimeout) throws IOException {
756          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
757          controller.setPriority(tableName);
758          controller.setCallTimeout(callTimeout);
759          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
760              getLocation().getRegionInfo().getRegionName(), row, family);
761          try {
762            ClientProtos.GetResponse response = getStub().get(controller, request);
763            if (!response.hasResult()) return null;
764            return ProtobufUtil.toResult(response.getResult());
765          } catch (ServiceException se) {
766            throw ProtobufUtil.getRemoteException(se);
767          }
768        }
769      };
770      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
771    }
772 
773   /**
774    * The underlying {@link HTable} must not be closed.
775    * {@link HTableInterface#getScanner(Scan)} has other usage details.
776    */
777   @Override
778   public ResultScanner getScanner(final Scan scan) throws IOException {
779     if (scan.getCaching() <= 0) {
780       scan.setCaching(getScannerCaching());
781     }
782 
783     if (scan.isReversed()) {
784       if (scan.isSmall()) {
785         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
786             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
787             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
788       } else {
789         return new ReversedClientScanner(getConfiguration(), scan, getName(),
790             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
791             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
792       }
793     }
794 
795     if (scan.isSmall()) {
796       return new ClientSmallScanner(getConfiguration(), scan, getName(),
797           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
798           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
799     } else {
800       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
801           this.rpcCallerFactory, this.rpcControllerFactory,
802           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
803     }
804   }
805 
806   /**
807    * The underlying {@link HTable} must not be closed.
808    * {@link HTableInterface#getScanner(byte[])} has other usage details.
809    */
810   @Override
811   public ResultScanner getScanner(byte [] family) throws IOException {
812     Scan scan = new Scan();
813     scan.addFamily(family);
814     return getScanner(scan);
815   }
816 
817   /**
818    * The underlying {@link HTable} must not be closed.
819    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
820    */
821   @Override
822   public ResultScanner getScanner(byte [] family, byte [] qualifier)
823   throws IOException {
824     Scan scan = new Scan();
825     scan.addColumn(family, qualifier);
826     return getScanner(scan);
827   }
828 
829   /**
830    * {@inheritDoc}
831    */
832   @Override
833   public Result get(final Get get) throws IOException {
834     if (get.getConsistency() == null){
835       get.setConsistency(defaultConsistency);
836     }
837 
838     if (get.getConsistency() == Consistency.STRONG) {
839       // Good old call.
840       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
841           getName(), get.getRow()) {
842         @Override
843         public Result call(int callTimeout) throws IOException {
844           ClientProtos.GetRequest request =
845               RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
846           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
847           controller.setPriority(tableName);
848           controller.setCallTimeout(callTimeout);
849           try {
850             ClientProtos.GetResponse response = getStub().get(controller, request);
851             if (response == null) return null;
852             return ProtobufUtil.toResult(response.getResult());
853           } catch (ServiceException se) {
854             throw ProtobufUtil.getRemoteException(se);
855           }
856         }
857       };
858       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
859     }
860 
861     // Call that takes into account the replica
862     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
863       rpcControllerFactory, tableName, this.connection, get, pool,
864       tableConfiguration.getRetriesNumber(),
865       operationTimeout,
866       tableConfiguration.getPrimaryCallTimeoutMicroSecond());
867     return callable.call();
868   }
869 
870 
871   /**
872    * {@inheritDoc}
873    */
874   @Override
875   public Result[] get(List<Get> gets) throws IOException {
876     if (gets.size() == 1) {
877       return new Result[]{get(gets.get(0))};
878     }
879     try {
880       Object [] r1 = batch((List)gets);
881 
882       // translate.
883       Result [] results = new Result[r1.length];
884       int i=0;
885       for (Object o : r1) {
886         // batch ensures if there is a failure we get an exception instead
887         results[i++] = (Result) o;
888       }
889 
890       return results;
891     } catch (InterruptedException e) {
892       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
893     }
894   }
895 
896   /**
897    * {@inheritDoc}
898    */
899   @Override
900   public void batch(final List<? extends Row> actions, final Object[] results)
901       throws InterruptedException, IOException {
902     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
903     ars.waitUntilDone();
904     if (ars.hasError()) {
905       throw ars.getErrors();
906     }
907   }
908 
909   /**
910    * {@inheritDoc}
911    * @deprecated If any exception is thrown by one of the actions, there is no way to
912    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
913    */
914   @Deprecated
915   @Override
916   public Object[] batch(final List<? extends Row> actions)
917      throws InterruptedException, IOException {
918     Object[] results = new Object[actions.size()];
919     batch(actions, results);
920     return results;
921   }
922 
923   /**
924    * {@inheritDoc}
925    */
926   @Override
927   public <R> void batchCallback(
928       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
929       throws IOException, InterruptedException {
930     connection.processBatchCallback(actions, tableName, pool, results, callback);
931   }
932 
933   /**
934    * {@inheritDoc}
935    * @deprecated If any exception is thrown by one of the actions, there is no way to
936    * retrieve the partially executed results. Use
937    * {@link #batchCallback(List, Object[], Batch.Callback)}
938    * instead.
939    */
940   @Deprecated
941   @Override
942   public <R> Object[] batchCallback(
943     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
944       InterruptedException {
945     Object[] results = new Object[actions.size()];
946     batchCallback(actions, results, callback);
947     return results;
948   }
949 
950   /**
951    * {@inheritDoc}
952    */
953   @Override
954   public void delete(final Delete delete)
955   throws IOException {
956     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
957         tableName, delete.getRow()) {
958       @Override
959       public Boolean call(int callTimeout) throws IOException {
960         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
961         controller.setPriority(tableName);
962         controller.setCallTimeout(callTimeout);
963 
964         try {
965           MutateRequest request = RequestConverter.buildMutateRequest(
966             getLocation().getRegionInfo().getRegionName(), delete);
967           MutateResponse response = getStub().mutate(controller, request);
968           return Boolean.valueOf(response.getProcessed());
969         } catch (ServiceException se) {
970           throw ProtobufUtil.getRemoteException(se);
971         }
972       }
973     };
974     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
975   }
976 
977   /**
978    * {@inheritDoc}
979    */
980   @Override
981   public void delete(final List<Delete> deletes)
982   throws IOException {
983     Object[] results = new Object[deletes.size()];
984     try {
985       batch(deletes, results);
986     } catch (InterruptedException e) {
987       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
988     } finally {
989       // mutate list so that it is empty for complete success, or contains only failed records
990       // results are returned in the same order as the requests in list walk the list backwards,
991       // so we can remove from list without impacting the indexes of earlier members
992       for (int i = results.length - 1; i>=0; i--) {
993         // if result is not null, it succeeded
994         if (results[i] instanceof Result) {
995           deletes.remove(i);
996         }
997       }
998     }
999   }
1000 
1001   /**
1002    * {@inheritDoc}
1003    */
1004   @Override
1005   public void put(final Put put)
1006       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1007     doPut(put);
1008     if (autoFlush) {
1009       flushCommits();
1010     }
1011   }
1012 
1013   /**
1014    * {@inheritDoc}
1015    */
1016   @Override
1017   public void put(final List<Put> puts)
1018       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1019     for (Put put : puts) {
1020       doPut(put);
1021     }
1022     if (autoFlush) {
1023       flushCommits();
1024     }
1025   }
1026 
1027 
1028   /**
1029    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
1030    *  cluster.
1031    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
1032    * @throws InterruptedIOException if we were interrupted.
1033    */
1034   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1035     // This behavior is highly non-intuitive... it does not protect us against
1036     // 94-incompatible behavior, which is a timing issue because hasError, the below code
1037     // and setter of hasError are not synchronized. Perhaps it should be removed.
1038     if (ap.hasError()) {
1039       writeAsyncBuffer.add(put);
1040       backgroundFlushCommits(true);
1041     }
1042 
1043     validatePut(put);
1044 
1045     currentWriteBufferSize += put.heapSize();
1046     writeAsyncBuffer.add(put);
1047 
1048     while (currentWriteBufferSize > writeBufferSize) {
1049       backgroundFlushCommits(false);
1050     }
1051   }
1052 
1053 
1054   /**
1055    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
1056    * If the is an error (max retried reach from a previous flush or bad operation), it tries to
1057    * send all operations in the buffer and sends an exception.
1058    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
1059    *                     returning.
1060    */
1061   private void backgroundFlushCommits(boolean synchronous) throws
1062       InterruptedIOException, RetriesExhaustedWithDetailsException {
1063 
1064     try {
1065       if (!synchronous) {
1066         ap.submit(tableName, writeAsyncBuffer, true, null, false);
1067         if (ap.hasError()) {
1068           LOG.debug(tableName + ": One or more of the operations have failed -" +
1069               " waiting for all operation in progress to finish (successfully or not)");
1070         }
1071       }
1072       if (synchronous || ap.hasError()) {
1073         while (!writeAsyncBuffer.isEmpty()) {
1074           ap.submit(tableName, writeAsyncBuffer, true, null, false);
1075         }
1076         RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
1077         if (error != null) {
1078           throw error;
1079         }
1080       }
1081     } finally {
1082       currentWriteBufferSize = 0;
1083       for (Row mut : writeAsyncBuffer) {
1084         if (mut instanceof Mutation) {
1085           currentWriteBufferSize += ((Mutation) mut).heapSize();
1086         }
1087       }
1088     }
1089   }
1090 
1091   /**
1092    * {@inheritDoc}
1093    */
1094   @Override
1095   public void mutateRow(final RowMutations rm) throws IOException {
1096     RegionServerCallable<Void> callable =
1097         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1098       @Override
1099       public Void call(int callTimeout) throws IOException {
1100         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1101         controller.setPriority(tableName);
1102         controller.setCallTimeout(callTimeout);
1103         try {
1104           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1105             getLocation().getRegionInfo().getRegionName(), rm);
1106           regionMutationBuilder.setAtomic(true);
1107           MultiRequest request =
1108             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1109           getStub().multi(controller, request);
1110         } catch (ServiceException se) {
1111           throw ProtobufUtil.getRemoteException(se);
1112         }
1113         return null;
1114       }
1115     };
1116     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1117   }
1118 
1119   /**
1120    * {@inheritDoc}
1121    */
1122   @Override
1123   public Result append(final Append append) throws IOException {
1124     if (append.numFamilies() == 0) {
1125       throw new IOException(
1126           "Invalid arguments to append, no columns specified");
1127     }
1128 
1129     NonceGenerator ng = this.connection.getNonceGenerator();
1130     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1131     RegionServerCallable<Result> callable =
1132       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1133         @Override
1134         public Result call(int callTimeout) throws IOException {
1135           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1136           controller.setPriority(getTableName());
1137           controller.setCallTimeout(callTimeout);
1138           try {
1139             MutateRequest request = RequestConverter.buildMutateRequest(
1140               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1141             MutateResponse response = getStub().mutate(controller, request);
1142             if (!response.hasResult()) return null;
1143             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1144           } catch (ServiceException se) {
1145             throw ProtobufUtil.getRemoteException(se);
1146           }
1147         }
1148       };
1149     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1150   }
1151 
1152   /**
1153    * {@inheritDoc}
1154    */
1155   @Override
1156   public Result increment(final Increment increment) throws IOException {
1157     if (!increment.hasFamilies()) {
1158       throw new IOException(
1159           "Invalid arguments to increment, no columns specified");
1160     }
1161     NonceGenerator ng = this.connection.getNonceGenerator();
1162     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1163     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1164         getName(), increment.getRow()) {
1165       @Override
1166       public Result call(int callTimeout) throws IOException {
1167         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1168         controller.setPriority(getTableName());
1169         controller.setCallTimeout(callTimeout);
1170         try {
1171           MutateRequest request = RequestConverter.buildMutateRequest(
1172             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1173           MutateResponse response = getStub().mutate(controller, request);
1174           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1175         } catch (ServiceException se) {
1176           throw ProtobufUtil.getRemoteException(se);
1177         }
1178       }
1179     };
1180     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1181   }
1182 
1183   /**
1184    * {@inheritDoc}
1185    */
1186   @Override
1187   public long incrementColumnValue(final byte [] row, final byte [] family,
1188       final byte [] qualifier, final long amount)
1189   throws IOException {
1190     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1191   }
1192 
1193   /**
1194    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1195    */
1196   @Deprecated
1197   @Override
1198   public long incrementColumnValue(final byte [] row, final byte [] family,
1199       final byte [] qualifier, final long amount, final boolean writeToWAL)
1200   throws IOException {
1201     return incrementColumnValue(row, family, qualifier, amount,
1202       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1203   }
1204 
1205   /**
1206    * {@inheritDoc}
1207    */
1208   @Override
1209   public long incrementColumnValue(final byte [] row, final byte [] family,
1210       final byte [] qualifier, final long amount, final Durability durability)
1211   throws IOException {
1212     NullPointerException npe = null;
1213     if (row == null) {
1214       npe = new NullPointerException("row is null");
1215     } else if (family == null) {
1216       npe = new NullPointerException("family is null");
1217     } else if (qualifier == null) {
1218       npe = new NullPointerException("qualifier is null");
1219     }
1220     if (npe != null) {
1221       throw new IOException(
1222           "Invalid arguments to incrementColumnValue", npe);
1223     }
1224 
1225     NonceGenerator ng = this.connection.getNonceGenerator();
1226     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1227     RegionServerCallable<Long> callable =
1228       new RegionServerCallable<Long>(connection, getName(), row) {
1229         @Override
1230         public Long call(int callTimeout) throws IOException {
1231           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1232           controller.setPriority(getTableName());
1233           controller.setCallTimeout(callTimeout);
1234           try {
1235             MutateRequest request = RequestConverter.buildIncrementRequest(
1236               getLocation().getRegionInfo().getRegionName(), row, family,
1237               qualifier, amount, durability, nonceGroup, nonce);
1238             MutateResponse response = getStub().mutate(controller, request);
1239             Result result =
1240               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1241             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1242           } catch (ServiceException se) {
1243             throw ProtobufUtil.getRemoteException(se);
1244           }
1245         }
1246       };
1247     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1248   }
1249 
1250   /**
1251    * {@inheritDoc}
1252    */
1253   @Override
1254   public boolean checkAndPut(final byte [] row,
1255       final byte [] family, final byte [] qualifier, final byte [] value,
1256       final Put put)
1257   throws IOException {
1258     RegionServerCallable<Boolean> callable =
1259       new RegionServerCallable<Boolean>(connection, getName(), row) {
1260         @Override
1261         public Boolean call(int callTimeout) throws IOException {
1262           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1263           controller.setPriority(tableName);
1264           controller.setCallTimeout(callTimeout);
1265           try {
1266             MutateRequest request = RequestConverter.buildMutateRequest(
1267               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1268                 new BinaryComparator(value), CompareType.EQUAL, put);
1269             MutateResponse response = getStub().mutate(controller, request);
1270             return Boolean.valueOf(response.getProcessed());
1271           } catch (ServiceException se) {
1272             throw ProtobufUtil.getRemoteException(se);
1273           }
1274         }
1275       };
1276     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1277   }
1278 
1279   /**
1280    * {@inheritDoc}
1281    */
1282   @Override
1283   public boolean checkAndPut(final byte [] row, final byte [] family,
1284       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1285       final Put put)
1286   throws IOException {
1287     RegionServerCallable<Boolean> callable =
1288       new RegionServerCallable<Boolean>(connection, getName(), row) {
1289         @Override
1290         public Boolean call(int callTimeout) throws IOException {
1291           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1292           controller.setPriority(tableName);
1293           controller.setCallTimeout(callTimeout);
1294           try {
1295             CompareType compareType = CompareType.valueOf(compareOp.name());
1296             MutateRequest request = RequestConverter.buildMutateRequest(
1297               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1298                 new BinaryComparator(value), compareType, put);
1299             MutateResponse response = getStub().mutate(controller, request);
1300             return Boolean.valueOf(response.getProcessed());
1301           } catch (ServiceException se) {
1302             throw ProtobufUtil.getRemoteException(se);
1303           }
1304         }
1305       };
1306     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1307   }
1308 
1309   /**
1310    * {@inheritDoc}
1311    */
1312   @Override
1313   public boolean checkAndDelete(final byte [] row,
1314       final byte [] family, final byte [] qualifier, final byte [] value,
1315       final Delete delete)
1316   throws IOException {
1317     RegionServerCallable<Boolean> callable =
1318       new RegionServerCallable<Boolean>(connection, getName(), row) {
1319         @Override
1320         public Boolean call(int callTimeout) throws IOException {
1321           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1322           controller.setPriority(tableName);
1323           controller.setCallTimeout(callTimeout);
1324           try {
1325             MutateRequest request = RequestConverter.buildMutateRequest(
1326               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1327                 new BinaryComparator(value), CompareType.EQUAL, delete);
1328             MutateResponse response = getStub().mutate(controller, request);
1329             return Boolean.valueOf(response.getProcessed());
1330           } catch (ServiceException se) {
1331             throw ProtobufUtil.getRemoteException(se);
1332           }
1333         }
1334       };
1335     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1336   }
1337 
1338   /**
1339    * {@inheritDoc}
1340    */
1341   @Override
1342   public boolean checkAndDelete(final byte [] row, final byte [] family,
1343       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1344       final Delete delete)
1345   throws IOException {
1346     RegionServerCallable<Boolean> callable =
1347       new RegionServerCallable<Boolean>(connection, getName(), row) {
1348         @Override
1349         public Boolean call(int callTimeout) throws IOException {
1350           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1351           controller.setPriority(tableName);
1352           controller.setCallTimeout(callTimeout);
1353           try {
1354             CompareType compareType = CompareType.valueOf(compareOp.name());
1355             MutateRequest request = RequestConverter.buildMutateRequest(
1356               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1357                 new BinaryComparator(value), compareType, delete);
1358             MutateResponse response = getStub().mutate(controller, request);
1359             return Boolean.valueOf(response.getProcessed());
1360           } catch (ServiceException se) {
1361             throw ProtobufUtil.getRemoteException(se);
1362           }
1363         }
1364       };
1365     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1366   }
1367 
1368   /**
1369    * {@inheritDoc}
1370    */
1371   @Override
1372   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1373       final CompareOp compareOp, final byte [] value, final RowMutations rm)
1374   throws IOException {
1375     RegionServerCallable<Boolean> callable =
1376         new RegionServerCallable<Boolean>(connection, getName(), row) {
1377           @Override
1378           public Boolean call(int callTimeout) throws IOException {
1379             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1380             controller.setPriority(tableName);
1381             controller.setCallTimeout(callTimeout);
1382             try {
1383               CompareType compareType = CompareType.valueOf(compareOp.name());
1384               MultiRequest request = RequestConverter.buildMutateRequest(
1385                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1386                   new BinaryComparator(value), compareType, rm);
1387               ClientProtos.MultiResponse response = getStub().multi(controller, request);
1388               return Boolean.valueOf(response.getProcessed());
1389             } catch (ServiceException se) {
1390               throw ProtobufUtil.getRemoteException(se);
1391             }
1392           }
1393         };
1394     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1395   }
1396 
1397   /**
1398    * {@inheritDoc}
1399    */
1400   @Override
1401   public boolean exists(final Get get) throws IOException {
1402     get.setCheckExistenceOnly(true);
1403     Result r = get(get);
1404     assert r.getExists() != null;
1405     return r.getExists();
1406   }
1407 
1408   /**
1409    * {@inheritDoc}
1410    */
1411   @Override
1412   public boolean[] existsAll(final List<Get> gets) throws IOException {
1413     if (gets.isEmpty()) return new boolean[]{};
1414     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1415 
1416     for (Get g: gets){
1417       g.setCheckExistenceOnly(true);
1418     }
1419 
1420     Object[] r1;
1421     try {
1422       r1 = batch(gets);
1423     } catch (InterruptedException e) {
1424       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1425     }
1426 
1427     // translate.
1428     boolean[] results = new boolean[r1.length];
1429     int i = 0;
1430     for (Object o : r1) {
1431       // batch ensures if there is a failure we get an exception instead
1432       results[i++] = ((Result)o).getExists();
1433     }
1434 
1435     return results;
1436   }
1437 
1438   /**
1439    * {@inheritDoc}
1440    * @deprecated Use {@link #existsAll(java.util.List)}  instead.
1441    */
1442   @Override
1443   @Deprecated
1444   public Boolean[] exists(final List<Get> gets) throws IOException {
1445     boolean[] results = existsAll(gets);
1446     Boolean[] objectResults = new Boolean[results.length];
1447     for (int i = 0; i < results.length; ++i) {
1448       objectResults[i] = results[i];
1449     }
1450     return objectResults;
1451   }
1452 
1453   /**
1454    * {@inheritDoc}
1455    */
1456   @Override
1457   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1458     // As we can have an operation in progress even if the buffer is empty, we call
1459     //  backgroundFlushCommits at least one time.
1460     backgroundFlushCommits(true);
1461   }
1462 
1463   /**
1464    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1465    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1466    *
1467    * @param list The collection of actions.
1468    * @param results An empty array, same size as list. If an exception is thrown,
1469    * you can test here for partial results, and to determine which actions
1470    * processed successfully.
1471    * @throws IOException if there are problems talking to META. Per-item
1472    * exceptions are stored in the results array.
1473    */
1474   public <R> void processBatchCallback(
1475     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1476     throws IOException, InterruptedException {
1477     this.batchCallback(list, results, callback);
1478   }
1479 
1480 
1481   /**
1482    * Parameterized batch processing, allowing varying return types for different
1483    * {@link Row} implementations.
1484    */
1485   public void processBatch(final List<? extends Row> list, final Object[] results)
1486     throws IOException, InterruptedException {
1487     this.batch(list, results);
1488   }
1489 
1490 
1491   @Override
1492   public void close() throws IOException {
1493     if (this.closed) {
1494       return;
1495     }
1496     flushCommits();
1497     if (cleanupPoolOnClose) {
1498       this.pool.shutdown();
1499       try {
1500         boolean terminated = false;
1501         do {
1502           // wait until the pool has terminated
1503           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1504         } while (!terminated);
1505       } catch (InterruptedException e) {
1506         LOG.warn("waitForTermination interrupted");
1507       }
1508     }
1509     if (cleanupConnectionOnClose) {
1510       if (this.connection != null) {
1511         this.connection.close();
1512       }
1513     }
1514     this.closed = true;
1515   }
1516 
1517   // validate for well-formedness
1518   public void validatePut(final Put put) throws IllegalArgumentException {
1519     validatePut(put, tableConfiguration.getMaxKeyValueSize());
1520   }
1521 
1522   // validate for well-formedness
1523   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1524     if (put.isEmpty()) {
1525       throw new IllegalArgumentException("No columns to insert");
1526     }
1527     if (maxKeyValueSize > 0) {
1528       for (List<Cell> list : put.getFamilyCellMap().values()) {
1529         for (Cell cell : list) {
1530           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1531             throw new IllegalArgumentException("KeyValue size too large");
1532           }
1533         }
1534       }
1535     }
1536   }
1537 
1538   /**
1539    * {@inheritDoc}
1540    */
1541   @Override
1542   public boolean isAutoFlush() {
1543     return autoFlush;
1544   }
1545 
1546   /**
1547    * {@inheritDoc}
1548    * @deprecated in 0.96. When called with setAutoFlush(false), this function also
1549    *  set clearBufferOnFail to true, which is unexpected but kept for historical reasons.
1550    *  Replace it with setAutoFlush(false, false) if this is exactly what you want, or by
1551    *  {@link #setAutoFlushTo(boolean)} for all other cases.
1552    */
1553   @Deprecated
1554   @Override
1555   public void setAutoFlush(boolean autoFlush) {
1556     this.autoFlush = autoFlush;
1557   }
1558 
1559   /**
1560    * {@inheritDoc}
1561    */
1562   @Override
1563   public void setAutoFlushTo(boolean autoFlush) {
1564     this.autoFlush = autoFlush;
1565   }
1566 
1567   /**
1568    * {@inheritDoc}
1569    */
1570   @Override
1571   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1572     this.autoFlush = autoFlush;
1573   }
1574 
1575   /**
1576    * Returns the maximum size in bytes of the write buffer for this HTable.
1577    * <p>
1578    * The default value comes from the configuration parameter
1579    * {@code hbase.client.write.buffer}.
1580    * @return The size of the write buffer in bytes.
1581    */
1582   @Override
1583   public long getWriteBufferSize() {
1584     return writeBufferSize;
1585   }
1586 
1587   /**
1588    * Sets the size of the buffer in bytes.
1589    * <p>
1590    * If the new size is less than the current amount of data in the
1591    * write buffer, the buffer gets flushed.
1592    * @param writeBufferSize The new write buffer size, in bytes.
1593    * @throws IOException if a remote or network exception occurs.
1594    */
1595   @Override
1596   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1597     this.writeBufferSize = writeBufferSize;
1598     if(currentWriteBufferSize > writeBufferSize) {
1599       flushCommits();
1600     }
1601   }
1602 
1603   /**
1604    * The pool is used for mutli requests for this HTable
1605    * @return the pool used for mutli
1606    */
1607   ExecutorService getPool() {
1608     return this.pool;
1609   }
1610 
1611   /**
1612    * Enable or disable region cache prefetch for the table. It will be
1613    * applied for the given table's all HTable instances who share the same
1614    * connection. By default, the cache prefetch is enabled.
1615    * @param tableName name of table to configure.
1616    * @param enable Set to true to enable region cache prefetch. Or set to
1617    * false to disable it.
1618    * @throws IOException
1619    * @deprecated does nothing since 0.99
1620    */
1621   @Deprecated
1622   public static void setRegionCachePrefetch(final byte[] tableName,
1623       final boolean enable)  throws IOException {
1624   }
1625 
1626   /**
1627    * @deprecated does nothing since 0.99
1628    */
1629   @Deprecated
1630   public static void setRegionCachePrefetch(
1631       final TableName tableName,
1632       final boolean enable) throws IOException {
1633   }
1634 
1635   /**
1636    * Enable or disable region cache prefetch for the table. It will be
1637    * applied for the given table's all HTable instances who share the same
1638    * connection. By default, the cache prefetch is enabled.
1639    * @param conf The Configuration object to use.
1640    * @param tableName name of table to configure.
1641    * @param enable Set to true to enable region cache prefetch. Or set to
1642    * false to disable it.
1643    * @throws IOException
1644    * @deprecated does nothing since 0.99
1645    */
1646   @Deprecated
1647   public static void setRegionCachePrefetch(final Configuration conf,
1648       final byte[] tableName, final boolean enable) throws IOException {
1649   }
1650 
1651   /**
1652    * @deprecated does nothing since 0.99
1653    */
1654   @Deprecated
1655   public static void setRegionCachePrefetch(final Configuration conf,
1656       final TableName tableName,
1657       final boolean enable) throws IOException {
1658   }
1659 
1660   /**
1661    * Check whether region cache prefetch is enabled or not for the table.
1662    * @param conf The Configuration object to use.
1663    * @param tableName name of table to check
1664    * @return true if table's region cache prefecth is enabled. Otherwise
1665    * it is disabled.
1666    * @throws IOException
1667    * @deprecated always return false since 0.99
1668    */
1669   @Deprecated
1670   public static boolean getRegionCachePrefetch(final Configuration conf,
1671       final byte[] tableName) throws IOException {
1672     return false;
1673   }
1674 
1675   /**
1676    * @deprecated always return false since 0.99
1677    */
1678   @Deprecated
1679   public static boolean getRegionCachePrefetch(final Configuration conf,
1680       final TableName tableName) throws IOException {
1681     return false;
1682   }
1683 
1684   /**
1685    * Check whether region cache prefetch is enabled or not for the table.
1686    * @param tableName name of table to check
1687    * @return true if table's region cache prefecth is enabled. Otherwise
1688    * it is disabled.
1689    * @throws IOException
1690    * @deprecated always return false since 0.99
1691    */
1692   @Deprecated
1693   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1694     return false;
1695   }
1696 
1697   /**
1698    * @deprecated always return false since 0.99
1699    */
1700   @Deprecated
1701   public static boolean getRegionCachePrefetch(
1702       final TableName tableName) throws IOException {
1703     return false;
1704   }
1705 
1706   /**
1707    * Explicitly clears the region cache to fetch the latest value from META.
1708    * This is a power user function: avoid unless you know the ramifications.
1709    */
1710   public void clearRegionCache() {
1711     this.connection.clearRegionCache();
1712   }
1713 
1714   /**
1715    * {@inheritDoc}
1716    */
1717   @Override
1718   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1719     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1720   }
1721 
1722   /**
1723    * {@inheritDoc}
1724    */
1725   @Override
1726   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1727       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1728       throws ServiceException, Throwable {
1729     final Map<byte[],R> results =  Collections.synchronizedMap(
1730         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1731     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1732       @Override
1733       public void update(byte[] region, byte[] row, R value) {
1734         if (region != null) {
1735           results.put(region, value);
1736         }
1737       }
1738     });
1739     return results;
1740   }
1741 
1742   /**
1743    * {@inheritDoc}
1744    */
1745   @Override
1746   public <T extends Service, R> void coprocessorService(final Class<T> service,
1747       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1748       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1749 
1750     // get regions covered by the row range
1751     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1752 
1753     Map<byte[],Future<R>> futures =
1754         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1755     for (final byte[] r : keys) {
1756       final RegionCoprocessorRpcChannel channel =
1757           new RegionCoprocessorRpcChannel(connection, tableName, r);
1758       Future<R> future = pool.submit(
1759           new Callable<R>() {
1760             @Override
1761             public R call() throws Exception {
1762               T instance = ProtobufUtil.newServiceStub(service, channel);
1763               R result = callable.call(instance);
1764               byte[] region = channel.getLastRegion();
1765               if (callback != null) {
1766                 callback.update(region, r, result);
1767               }
1768               return result;
1769             }
1770           });
1771       futures.put(r, future);
1772     }
1773     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1774       try {
1775         e.getValue().get();
1776       } catch (ExecutionException ee) {
1777         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1778             + Bytes.toStringBinary(e.getKey()), ee);
1779         throw ee.getCause();
1780       } catch (InterruptedException ie) {
1781         throw new InterruptedIOException("Interrupted calling coprocessor service "
1782             + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1783       }
1784     }
1785   }
1786 
1787   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1788   throws IOException {
1789     if (start == null) {
1790       start = HConstants.EMPTY_START_ROW;
1791     }
1792     if (end == null) {
1793       end = HConstants.EMPTY_END_ROW;
1794     }
1795     return getKeysAndRegionsInRange(start, end, true).getFirst();
1796   }
1797 
1798   public void setOperationTimeout(int operationTimeout) {
1799     this.operationTimeout = operationTimeout;
1800   }
1801 
1802   public int getOperationTimeout() {
1803     return operationTimeout;
1804   }
1805 
1806   @Override
1807   public String toString() {
1808     return tableName + ";" + connection;
1809   }
1810 
1811   /**
1812    * {@inheritDoc}
1813    */
1814   @Override
1815   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1816       Descriptors.MethodDescriptor methodDescriptor, Message request,
1817       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1818     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1819         Bytes.BYTES_COMPARATOR));
1820     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1821         new Callback<R>() {
1822 
1823           @Override
1824           public void update(byte[] region, byte[] row, R result) {
1825             if (region != null) {
1826               results.put(region, result);
1827             }
1828           }
1829         });
1830     return results;
1831   }
1832 
1833   /**
1834    * {@inheritDoc}
1835    */
1836   @Override
1837   public <R extends Message> void batchCoprocessorService(
1838       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1839       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1840       throws ServiceException, Throwable {
1841 
1842     // get regions covered by the row range
1843     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1844         getKeysAndRegionsInRange(startKey, endKey, true);
1845     List<byte[]> keys = keysAndRegions.getFirst();
1846     List<HRegionLocation> regions = keysAndRegions.getSecond();
1847 
1848     // check if we have any calls to make
1849     if (keys.isEmpty()) {
1850       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1851           ", end=" + Bytes.toStringBinary(endKey));
1852       return;
1853     }
1854 
1855     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1856     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1857         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1858     for (int i = 0; i < keys.size(); i++) {
1859       final byte[] rowKey = keys.get(i);
1860       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1861       RegionCoprocessorServiceExec exec =
1862           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1863       execs.add(exec);
1864       execsByRow.put(rowKey, exec);
1865     }
1866 
1867     // tracking for any possible deserialization errors on success callback
1868     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1869     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1870     final List<Row> callbackErrorActions = new ArrayList<Row>();
1871     final List<String> callbackErrorServers = new ArrayList<String>();
1872     Object[] results = new Object[execs.size()];
1873 
1874     AsyncProcess asyncProcess =
1875         new AsyncProcess(connection, configuration, pool,
1876             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1877             true, RpcControllerFactory.instantiate(configuration));
1878 
1879     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1880         new Callback<ClientProtos.CoprocessorServiceResult>() {
1881           @Override
1882           public void update(byte[] region, byte[] row,
1883                               ClientProtos.CoprocessorServiceResult serviceResult) {
1884             if (LOG.isTraceEnabled()) {
1885               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1886                   ": region=" + Bytes.toStringBinary(region) +
1887                   ", row=" + Bytes.toStringBinary(row) +
1888                   ", value=" + serviceResult.getValue().getValue());
1889             }
1890             try {
1891               callback.update(region, row,
1892                   (R) responsePrototype.newBuilderForType().mergeFrom(
1893                       serviceResult.getValue().getValue()).build());
1894             } catch (InvalidProtocolBufferException e) {
1895               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1896                   e);
1897               callbackErrorExceptions.add(e);
1898               callbackErrorActions.add(execsByRow.get(row));
1899               callbackErrorServers.add("null");
1900             }
1901           }
1902         }, results);
1903 
1904     future.waitUntilDone();
1905 
1906     if (future.hasError()) {
1907       throw future.getErrors();
1908     } else if (!callbackErrorExceptions.isEmpty()) {
1909       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1910           callbackErrorServers);
1911     }
1912   }
1913 
1914   public RegionLocator getRegionLocator() {
1915     return this.locator;
1916   }
1917 }