View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import com.google.protobuf.InvalidProtocolBufferException;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.classification.InterfaceStability;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.HBaseConfiguration;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HRegionLocation;
50  import org.apache.hadoop.hbase.HTableDescriptor;
51  import org.apache.hadoop.hbase.KeyValue;
52  import org.apache.hadoop.hbase.KeyValueUtil;
53  import org.apache.hadoop.hbase.RegionLocations;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
57  import org.apache.hadoop.hbase.client.coprocessor.Batch;
58  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
59  import org.apache.hadoop.hbase.filter.BinaryComparator;
60  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
61  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
62  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
63  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
64  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
65  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66  import org.apache.hadoop.hbase.protobuf.RequestConverter;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
70  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
71  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
72  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
73  import org.apache.hadoop.hbase.util.Bytes;
74  import org.apache.hadoop.hbase.util.Pair;
75  import org.apache.hadoop.hbase.util.Threads;
76  
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.Message;
79  import com.google.common.annotations.VisibleForTesting;
80  import com.google.protobuf.Service;
81  import com.google.protobuf.ServiceException;
82  
83  /**
84   * <p>Used to communicate with a single HBase table.  An implementation of
85   * {@link HTableInterface}.  Instances of this class can be constructed directly but it is
86   * encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}.
87   * See {@link HConnectionManager} class comment for an example.
88   *
89   * <p>This class is not thread safe for reads nor write.
90   *
91   * <p>In case of writes (Put, Delete), the underlying write buffer can
92   * be corrupted if multiple threads contend over a single HTable instance.
93   *
94   * <p>In case of reads, some fields used by a Scan are shared among all threads.
95   * The HTable implementation can either not contract to be safe in case of a Get
96   *
97   * <p>Instances of HTable passed the same {@link Configuration} instance will
98   * share connections to servers out on the cluster and to the zookeeper ensemble
99   * as well as caches of region locations.  This is usually a *good* thing and it
100  * is recommended to reuse the same configuration object for all your tables.
101  * This happens because they will all share the same underlying
102  * {@link HConnection} instance. See {@link HConnectionManager} for more on
103  * how this mechanism works.
104  *
105  * <p>{@link HConnection} will read most of the
106  * configuration it needs from the passed {@link Configuration} on initial
107  * construction.  Thereafter, for settings such as
108  * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
109  * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
110  * passed {@link Configuration} subsequent to {@link HConnection} construction
111  * will go unnoticed.  To run with changed values, make a new
112  * {@link HTable} passing a new {@link Configuration} instance that has the
113  * new configuration.
114  *
115  * <p>Note that this class implements the {@link Closeable} interface. When a
116  * HTable instance is no longer required, it *should* be closed in order to ensure
117  * that the underlying resources are promptly released. Please note that the close
118  * method can throw java.io.IOException that must be handled.
119  *
120  * @see HBaseAdmin for create, drop, list, enable and disable of tables.
121  * @see HConnection
122  * @see HConnectionManager
123  */
124 @InterfaceAudience.Public
125 @InterfaceStability.Stable
126 public class HTable implements HTableInterface {
127   private static final Log LOG = LogFactory.getLog(HTable.class);
128   protected ClusterConnection connection;
129   private final TableName tableName;
130   private volatile Configuration configuration;
131   protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
132   private long writeBufferSize;
133   private boolean clearBufferOnFail;
134   private boolean autoFlush;
135   protected long currentWriteBufferSize;
136   protected int scannerCaching;
137   private int maxKeyValueSize;
138   private ExecutorService pool;  // For Multi & Scan
139   private boolean closed;
140   private int operationTimeout;
141   private int retries;
142   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
143   private final boolean cleanupConnectionOnClose; // close the connection in close()
144   private Consistency defaultConsistency = Consistency.STRONG;
145   private int primaryCallTimeoutMicroSecond;
146   private int replicaCallTimeoutMicroSecondScan;
147 
148 
149   /** The Async process for puts with autoflush set to false or multiputs */
150   protected AsyncProcess ap;
151   /** The Async process for batch */
152   protected AsyncProcess multiAp;
153   private RpcRetryingCallerFactory rpcCallerFactory;
154   private RpcControllerFactory rpcControllerFactory;
155 
156   /**
157    * Creates an object to access a HBase table.
158    * Shares zookeeper connection and other resources with other HTable instances
159    * created with the same <code>conf</code> instance.  Uses already-populated
160    * region cache if one is available, populated by any other HTable instances
161    * sharing this <code>conf</code> instance.  Recommended.
162    * @param conf Configuration object to use.
163    * @param tableName Name of the table.
164    * @throws IOException if a remote or network exception occurs
165    */
166   public HTable(Configuration conf, final String tableName)
167   throws IOException {
168     this(conf, TableName.valueOf(tableName));
169   }
170 
171   /**
172    * Creates an object to access a HBase table.
173    * Shares zookeeper connection and other resources with other HTable instances
174    * created with the same <code>conf</code> instance.  Uses already-populated
175    * region cache if one is available, populated by any other HTable instances
176    * sharing this <code>conf</code> instance.  Recommended.
177    * @param conf Configuration object to use.
178    * @param tableName Name of the table.
179    * @throws IOException if a remote or network exception occurs
180    */
181   public HTable(Configuration conf, final byte[] tableName)
182   throws IOException {
183     this(conf, TableName.valueOf(tableName));
184   }
185 
186 
187 
188   /**
189    * Creates an object to access a HBase table.
190    * Shares zookeeper connection and other resources with other HTable instances
191    * created with the same <code>conf</code> instance.  Uses already-populated
192    * region cache if one is available, populated by any other HTable instances
193    * sharing this <code>conf</code> instance.  Recommended.
194    * @param conf Configuration object to use.
195    * @param tableName table name pojo
196    * @throws IOException if a remote or network exception occurs
197    */
198   public HTable(Configuration conf, final TableName tableName)
199   throws IOException {
200     this.tableName = tableName;
201     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
202     if (conf == null) {
203       this.connection = null;
204       return;
205     }
206     this.connection = ConnectionManager.getConnectionInternal(conf);
207     this.configuration = conf;
208 
209     this.pool = getDefaultExecutor(conf);
210     this.finishSetup();
211   }
212 
213   /**
214    * Creates an object to access a HBase table. Shares zookeeper connection and other resources with
215    * other HTable instances created with the same <code>connection</code> instance. Use this
216    * constructor when the HConnection instance is externally managed.
217    * @param tableName Name of the table.
218    * @param connection HConnection to be used.
219    * @throws IOException if a remote or network exception occurs
220    * @deprecated Do not use.
221    */
222   @Deprecated
223   public HTable(TableName tableName, HConnection connection) throws IOException {
224     this.tableName = tableName;
225     this.cleanupPoolOnClose = true;
226     this.cleanupConnectionOnClose = false;
227     this.connection = (ClusterConnection)connection;
228     this.configuration = connection.getConfiguration();
229 
230     this.pool = getDefaultExecutor(this.configuration);
231     this.finishSetup();
232   }
233 
234   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
235     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
236     if (maxThreads == 0) {
237       maxThreads = 1; // is there a better default?
238     }
239     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
240 
241     // Using the "direct handoff" approach, new threads will only be created
242     // if it is necessary and will grow unbounded. This could be bad but in HCM
243     // we only create as many Runnables as there are region servers. It means
244     // it also scales when new region servers are added.
245     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
246         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
247     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
248     return pool;
249   }
250 
251   /**
252    * Creates an object to access a HBase table.
253    * Shares zookeeper connection and other resources with other HTable instances
254    * created with the same <code>conf</code> instance.  Uses already-populated
255    * region cache if one is available, populated by any other HTable instances
256    * sharing this <code>conf</code> instance.
257    * Use this constructor when the ExecutorService is externally managed.
258    * @param conf Configuration object to use.
259    * @param tableName Name of the table.
260    * @param pool ExecutorService to be used.
261    * @throws IOException if a remote or network exception occurs
262    */
263   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
264       throws IOException {
265     this(conf, TableName.valueOf(tableName), pool);
266   }
267 
268   /**
269    * Creates an object to access a HBase table.
270    * Shares zookeeper connection and other resources with other HTable instances
271    * created with the same <code>conf</code> instance.  Uses already-populated
272    * region cache if one is available, populated by any other HTable instances
273    * sharing this <code>conf</code> instance.
274    * Use this constructor when the ExecutorService is externally managed.
275    * @param conf Configuration object to use.
276    * @param tableName Name of the table.
277    * @param pool ExecutorService to be used.
278    * @throws IOException if a remote or network exception occurs
279    */
280   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
281       throws IOException {
282     this.connection = ConnectionManager.getConnectionInternal(conf);
283     this.configuration = conf;
284     this.pool = pool;
285     if (pool == null) {
286       this.pool = getDefaultExecutor(conf);
287       this.cleanupPoolOnClose = true;
288     } else {
289       this.cleanupPoolOnClose = false;
290     }
291     this.tableName = tableName;
292     this.cleanupConnectionOnClose = true;
293     this.finishSetup();
294   }
295 
296   /**
297    * Creates an object to access a HBase table.
298    * Shares zookeeper connection and other resources with other HTable instances
299    * created with the same <code>connection</code> instance.
300    * Use this constructor when the ExecutorService and HConnection instance are
301    * externally managed.
302    * @param tableName Name of the table.
303    * @param connection HConnection to be used.
304    * @param pool ExecutorService to be used.
305    * @throws IOException if a remote or network exception occurs.
306    * @deprecated Do not use, internal ctor.
307    */
308   @Deprecated
309   public HTable(final byte[] tableName, final HConnection connection,
310       final ExecutorService pool) throws IOException {
311     this(TableName.valueOf(tableName), connection, pool);
312   }
313 
314   /** @deprecated Do not use, internal ctor. */
315   @Deprecated
316   public HTable(TableName tableName, final HConnection connection,
317       final ExecutorService pool) throws IOException {
318     this(tableName, (ClusterConnection)connection, pool);
319   }
320 
321   /**
322    * Creates an object to access a HBase table.
323    * Shares zookeeper connection and other resources with other HTable instances
324    * created with the same <code>connection</code> instance.
325    * Visible only for HTableWrapper which is in different package.
326    * Should not be used by exernal code.
327    * @param tableName Name of the table.
328    * @param connection HConnection to be used.
329    * @param pool ExecutorService to be used.
330    * @throws IOException if a remote or network exception occurs
331    */
332   @InterfaceAudience.Private
333   public HTable(TableName tableName, final ClusterConnection connection,
334       final ExecutorService pool) throws IOException {
335     if (connection == null || connection.isClosed()) {
336       throw new IllegalArgumentException("Connection is null or closed.");
337     }
338     this.tableName = tableName;
339     this.cleanupConnectionOnClose = false;
340     this.connection = connection;
341     this.configuration = connection.getConfiguration();
342     this.pool = pool;
343     if (pool == null) {
344       this.pool = getDefaultExecutor(this.configuration);
345       this.cleanupPoolOnClose = true;
346     } else {
347       this.cleanupPoolOnClose = false;
348     }
349 
350     this.finishSetup();
351   }
352 
353   /**
354    * For internal testing.
355    */
356   protected HTable() {
357     tableName = null;
358     cleanupPoolOnClose = false;
359     cleanupConnectionOnClose = false;
360   }
361 
362   /**
363    * setup this HTable's parameter based on the passed configuration
364    */
365   private void finishSetup() throws IOException {
366     this.operationTimeout = tableName.isSystemTable() ?
367       this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
368         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
369       this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
370         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
371     this.writeBufferSize = this.configuration.getLong(
372         "hbase.client.write.buffer", 2097152);
373     this.clearBufferOnFail = true;
374     this.autoFlush = true;
375     this.currentWriteBufferSize = 0;
376     this.scannerCaching = this.configuration.getInt(
377         HConstants.HBASE_CLIENT_SCANNER_CACHING,
378         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
379     this.primaryCallTimeoutMicroSecond =
380         this.configuration.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10 ms
381     this.replicaCallTimeoutMicroSecondScan =
382         this.configuration.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
383     this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
384             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
385 
386     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
387     this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
388     // puts need to track errors globally due to how the APIs currently work.
389     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
390     multiAp = this.connection.getAsyncProcess();
391 
392     this.maxKeyValueSize = this.configuration.getInt(
393         "hbase.client.keyvalue.maxsize", -1);
394     this.closed = false;
395   }
396 
397 
398 
399   /**
400    * {@inheritDoc}
401    */
402   @Override
403   public Configuration getConfiguration() {
404     return configuration;
405   }
406 
407   /**
408    * Tells whether or not a table is enabled or not. This method creates a
409    * new HBase configuration, so it might make your unit tests fail due to
410    * incorrect ZK client port.
411    * @param tableName Name of table to check.
412    * @return {@code true} if table is online.
413    * @throws IOException if a remote or network exception occurs
414 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
415    */
416   @Deprecated
417   public static boolean isTableEnabled(String tableName) throws IOException {
418     return isTableEnabled(TableName.valueOf(tableName));
419   }
420 
421   /**
422    * Tells whether or not a table is enabled or not. This method creates a
423    * new HBase configuration, so it might make your unit tests fail due to
424    * incorrect ZK client port.
425    * @param tableName Name of table to check.
426    * @return {@code true} if table is online.
427    * @throws IOException if a remote or network exception occurs
428 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
429    */
430   @Deprecated
431   public static boolean isTableEnabled(byte[] tableName) throws IOException {
432     return isTableEnabled(TableName.valueOf(tableName));
433   }
434 
435   /**
436    * Tells whether or not a table is enabled or not. This method creates a
437    * new HBase configuration, so it might make your unit tests fail due to
438    * incorrect ZK client port.
439    * @param tableName Name of table to check.
440    * @return {@code true} if table is online.
441    * @throws IOException if a remote or network exception occurs
442    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
443    */
444   @Deprecated
445   public static boolean isTableEnabled(TableName tableName) throws IOException {
446     return isTableEnabled(HBaseConfiguration.create(), tableName);
447   }
448 
449   /**
450    * Tells whether or not a table is enabled or not.
451    * @param conf The Configuration object to use.
452    * @param tableName Name of table to check.
453    * @return {@code true} if table is online.
454    * @throws IOException if a remote or network exception occurs
455 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
456    */
457   @Deprecated
458   public static boolean isTableEnabled(Configuration conf, String tableName)
459   throws IOException {
460     return isTableEnabled(conf, TableName.valueOf(tableName));
461   }
462 
463   /**
464    * Tells whether or not a table is enabled or not.
465    * @param conf The Configuration object to use.
466    * @param tableName Name of table to check.
467    * @return {@code true} if table is online.
468    * @throws IOException if a remote or network exception occurs
469 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
470    */
471   @Deprecated
472   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
473   throws IOException {
474     return isTableEnabled(conf, TableName.valueOf(tableName));
475   }
476 
477   /**
478    * Tells whether or not a table is enabled or not.
479    * @param conf The Configuration object to use.
480    * @param tableName Name of table to check.
481    * @return {@code true} if table is online.
482    * @throws IOException if a remote or network exception occurs
483    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
484    */
485   @Deprecated
486   public static boolean isTableEnabled(Configuration conf,
487       final TableName tableName) throws IOException {
488     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
489       @Override
490       public Boolean connect(HConnection connection) throws IOException {
491         return connection.isTableEnabled(tableName);
492       }
493     });
494   }
495 
496   /**
497    * Find region location hosting passed row using cached info
498    * @param row Row to find.
499    * @return The location of the given row.
500    * @throws IOException if a remote or network exception occurs
501    */
502   public HRegionLocation getRegionLocation(final String row)
503   throws IOException {
504     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
505   }
506 
507   /**
508    * Finds the region on which the given row is being served. Does not reload the cache.
509    * @param row Row to find.
510    * @return Location of the row.
511    * @throws IOException if a remote or network exception occurs
512    */
513   public HRegionLocation getRegionLocation(final byte [] row)
514   throws IOException {
515     return connection.getRegionLocation(tableName, row, false);
516   }
517 
518   /**
519    * Finds the region on which the given row is being served.
520    * @param row Row to find.
521    * @param reload true to reload information or false to use cached information
522    * @return Location of the row.
523    * @throws IOException if a remote or network exception occurs
524    */
525   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
526   throws IOException {
527     return connection.getRegionLocation(tableName, row, reload);
528   }
529 
530   /**
531    * {@inheritDoc}
532    */
533   @Override
534   public byte [] getTableName() {
535     return this.tableName.getName();
536   }
537 
538   @Override
539   public TableName getName() {
540     return tableName;
541   }
542 
543   /**
544    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
545    * manipulations.
546    * @return An HConnection instance.
547    * @deprecated This method will be changed from public to package protected.
548    */
549   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
550   @Deprecated
551   @VisibleForTesting
552   public HConnection getConnection() {
553     return this.connection;
554   }
555 
556   /**
557    * Gets the number of rows that a scanner will fetch at once.
558    * <p>
559    * The default value comes from {@code hbase.client.scanner.caching}.
560    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
561    */
562   @Deprecated
563   public int getScannerCaching() {
564     return scannerCaching;
565   }
566 
567   /**
568    * Kept in 0.96 for backward compatibility
569    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
570    */
571   @Deprecated
572   public List<Row> getWriteBuffer() {
573     return writeAsyncBuffer;
574   }
575 
576   /**
577    * Sets the number of rows that a scanner will fetch at once.
578    * <p>
579    * This will override the value specified by
580    * {@code hbase.client.scanner.caching}.
581    * Increasing this value will reduce the amount of work needed each time
582    * {@code next()} is called on a scanner, at the expense of memory use
583    * (since more rows will need to be maintained in memory by the scanners).
584    * @param scannerCaching the number of rows a scanner will fetch at once.
585    * @deprecated Use {@link Scan#setCaching(int)}
586    */
587   @Deprecated
588   public void setScannerCaching(int scannerCaching) {
589     this.scannerCaching = scannerCaching;
590   }
591 
592   /**
593    * {@inheritDoc}
594    */
595   @Override
596   public HTableDescriptor getTableDescriptor() throws IOException {
597     return new UnmodifyableHTableDescriptor(
598       this.connection.getHTableDescriptor(this.tableName));
599   }
600 
601   /**
602    * Gets the starting row key for every region in the currently open table.
603    * <p>
604    * This is mainly useful for the MapReduce integration.
605    * @return Array of region starting row keys
606    * @throws IOException if a remote or network exception occurs
607    */
608   public byte [][] getStartKeys() throws IOException {
609     return getStartEndKeys().getFirst();
610   }
611 
612   /**
613    * Gets the ending row key for every region in the currently open table.
614    * <p>
615    * This is mainly useful for the MapReduce integration.
616    * @return Array of region ending row keys
617    * @throws IOException if a remote or network exception occurs
618    */
619   public byte[][] getEndKeys() throws IOException {
620     return getStartEndKeys().getSecond();
621   }
622 
623   /**
624    * Gets the starting and ending row keys for every region in the currently
625    * open table.
626    * <p>
627    * This is mainly useful for the MapReduce integration.
628    * @return Pair of arrays of region starting and ending row keys
629    * @throws IOException if a remote or network exception occurs
630    */
631   // TODO: these are not in HTableInterface. Should we add them there or move these to HBaseAdmin?
632   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
633 
634     List<RegionLocations> regions = listRegionLocations();
635     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
636     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
637 
638     for (RegionLocations locations : regions) {
639       HRegionInfo region = locations.getRegionLocation().getRegionInfo();
640       startKeyList.add(region.getStartKey());
641       endKeyList.add(region.getEndKey());
642     }
643 
644     return new Pair<byte [][], byte [][]>(
645       startKeyList.toArray(new byte[startKeyList.size()][]),
646       endKeyList.toArray(new byte[endKeyList.size()][]));
647   }
648 
649   @VisibleForTesting
650   List<RegionLocations> listRegionLocations() throws IOException {
651     return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName());
652   }
653 
654   /**
655    * Gets all the regions and their address for this table.
656    * <p>
657    * This is mainly useful for the MapReduce integration.
658    * @return A map of HRegionInfo with it's server address
659    * @throws IOException if a remote or network exception occurs
660    * @deprecated This is no longer a public API
661    */
662   @Deprecated
663   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
664     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocation, singular, returns an HRegionLocation.
665     return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName(), false);
666   }
667 
668   /**
669    * Get the corresponding regions for an arbitrary range of keys.
670    * <p>
671    * @param startKey Starting row in range, inclusive
672    * @param endKey Ending row in range, exclusive
673    * @return A list of HRegionLocations corresponding to the regions that
674    * contain the specified range
675    * @throws IOException if a remote or network exception occurs
676    * @deprecated This is no longer a public API
677    */
678   @Deprecated
679   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
680     final byte [] endKey) throws IOException {
681     return getRegionsInRange(startKey, endKey, false);
682   }
683 
684   /**
685    * Get the corresponding regions for an arbitrary range of keys.
686    * <p>
687    * @param startKey Starting row in range, inclusive
688    * @param endKey Ending row in range, exclusive
689    * @param reload true to reload information or false to use cached information
690    * @return A list of HRegionLocations corresponding to the regions that
691    * contain the specified range
692    * @throws IOException if a remote or network exception occurs
693    * @deprecated This is no longer a public API
694    */
695   @Deprecated
696   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
697       final byte [] endKey, final boolean reload) throws IOException {
698     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
699   }
700 
701   /**
702    * Get the corresponding start keys and regions for an arbitrary range of
703    * keys.
704    * <p>
705    * @param startKey Starting row in range, inclusive
706    * @param endKey Ending row in range
707    * @param includeEndKey true if endRow is inclusive, false if exclusive
708    * @return A pair of list of start keys and list of HRegionLocations that
709    *         contain the specified range
710    * @throws IOException if a remote or network exception occurs
711    * @deprecated This is no longer a public API
712    */
713   @Deprecated
714   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
715       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
716       throws IOException {
717     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
718   }
719 
720   /**
721    * Get the corresponding start keys and regions for an arbitrary range of
722    * keys.
723    * <p>
724    * @param startKey Starting row in range, inclusive
725    * @param endKey Ending row in range
726    * @param includeEndKey true if endRow is inclusive, false if exclusive
727    * @param reload true to reload information or false to use cached information
728    * @return A pair of list of start keys and list of HRegionLocations that
729    *         contain the specified range
730    * @throws IOException if a remote or network exception occurs
731    * @deprecated This is no longer a public API
732    */
733   @Deprecated
734   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
735       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
736       final boolean reload) throws IOException {
737     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
738     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
739       throw new IllegalArgumentException(
740         "Invalid range: " + Bytes.toStringBinary(startKey) +
741         " > " + Bytes.toStringBinary(endKey));
742     }
743     List<byte[]> keysInRange = new ArrayList<byte[]>();
744     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
745     byte[] currentKey = startKey;
746     do {
747       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
748       keysInRange.add(currentKey);
749       regionsInRange.add(regionLocation);
750       currentKey = regionLocation.getRegionInfo().getEndKey();
751     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
752         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
753             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
754     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
755         regionsInRange);
756   }
757 
758   /**
759    * {@inheritDoc}
760    * @deprecated Use reversed scan instead.
761    */
762    @Override
763    @Deprecated
764    public Result getRowOrBefore(final byte[] row, final byte[] family)
765        throws IOException {
766      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
767          tableName, row) {
768        @Override
769       public Result call(int callTimeout) throws IOException {
770          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
771          controller.setPriority(tableName);
772          controller.setCallTimeout(callTimeout);
773          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
774              getLocation().getRegionInfo().getRegionName(), row, family);
775          try {
776            ClientProtos.GetResponse response = getStub().get(controller, request);
777            if (!response.hasResult()) return null;
778            return ProtobufUtil.toResult(response.getResult());
779          } catch (ServiceException se) {
780            throw ProtobufUtil.getRemoteException(se);
781          }
782        }
783      };
784      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
785    }
786 
787   /**
788    * The underlying {@link HTable} must not be closed.
789    * {@link HTableInterface#getScanner(Scan)} has other usage details.
790    */
791   @Override
792   public ResultScanner getScanner(final Scan scan) throws IOException {
793     if (scan.getCaching() <= 0) {
794       scan.setCaching(getScannerCaching());
795     }
796 
797     if (scan.isReversed()) {
798       if (scan.isSmall()) {
799         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
800             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
801             pool, replicaCallTimeoutMicroSecondScan);
802       } else {
803         return new ReversedClientScanner(getConfiguration(), scan, getName(),
804             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
805             pool, replicaCallTimeoutMicroSecondScan);
806       }
807     }
808 
809     if (scan.isSmall()) {
810       return new ClientSmallScanner(getConfiguration(), scan, getName(),
811           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
812           pool, replicaCallTimeoutMicroSecondScan);
813     } else {
814       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
815           this.rpcCallerFactory, this.rpcControllerFactory,
816           pool, replicaCallTimeoutMicroSecondScan);
817     }
818   }
819 
820   /**
821    * The underlying {@link HTable} must not be closed.
822    * {@link HTableInterface#getScanner(byte[])} has other usage details.
823    */
824   @Override
825   public ResultScanner getScanner(byte [] family) throws IOException {
826     Scan scan = new Scan();
827     scan.addFamily(family);
828     return getScanner(scan);
829   }
830 
831   /**
832    * The underlying {@link HTable} must not be closed.
833    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
834    */
835   @Override
836   public ResultScanner getScanner(byte [] family, byte [] qualifier)
837   throws IOException {
838     Scan scan = new Scan();
839     scan.addColumn(family, qualifier);
840     return getScanner(scan);
841   }
842 
843   /**
844    * {@inheritDoc}
845    */
846   @Override
847   public Result get(final Get get) throws IOException {
848     if (get.getConsistency() == null){
849       get.setConsistency(defaultConsistency);
850     }
851 
852     if (get.getConsistency() == Consistency.STRONG) {
853       // Good old call.
854       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
855           getName(), get.getRow()) {
856         @Override
857         public Result call(int callTimeout) throws IOException {
858           ClientProtos.GetRequest request =
859               RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
860           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
861           controller.setPriority(tableName);
862           controller.setCallTimeout(callTimeout);
863           try {
864             ClientProtos.GetResponse response = getStub().get(controller, request);
865             if (response == null) return null;
866             return ProtobufUtil.toResult(response.getResult());
867           } catch (ServiceException se) {
868             throw ProtobufUtil.getRemoteException(se);
869           }
870         }
871       };
872       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
873     }
874 
875     // Call that takes into account the replica
876     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
877       rpcControllerFactory, tableName, this.connection, get, pool, retries,
878       operationTimeout, primaryCallTimeoutMicroSecond);
879     return callable.call();
880   }
881 
882 
883   /**
884    * {@inheritDoc}
885    */
886   @Override
887   public Result[] get(List<Get> gets) throws IOException {
888     if (gets.size() == 1) {
889       return new Result[]{get(gets.get(0))};
890     }
891     try {
892       Object [] r1 = batch((List)gets);
893 
894       // translate.
895       Result [] results = new Result[r1.length];
896       int i=0;
897       for (Object o : r1) {
898         // batch ensures if there is a failure we get an exception instead
899         results[i++] = (Result) o;
900       }
901 
902       return results;
903     } catch (InterruptedException e) {
904       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
905     }
906   }
907 
908   /**
909    * {@inheritDoc}
910    */
911   @Override
912   public void batch(final List<? extends Row> actions, final Object[] results)
913       throws InterruptedException, IOException {
914     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
915     ars.waitUntilDone();
916     if (ars.hasError()) {
917       throw ars.getErrors();
918     }
919   }
920 
921   /**
922    * {@inheritDoc}
923    * @deprecated If any exception is thrown by one of the actions, there is no way to
924    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
925    */
926   @Deprecated
927   @Override
928   public Object[] batch(final List<? extends Row> actions)
929      throws InterruptedException, IOException {
930     Object[] results = new Object[actions.size()];
931     batch(actions, results);
932     return results;
933   }
934 
935   /**
936    * {@inheritDoc}
937    */
938   @Override
939   public <R> void batchCallback(
940       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
941       throws IOException, InterruptedException {
942     connection.processBatchCallback(actions, tableName, pool, results, callback);
943   }
944 
945   /**
946    * {@inheritDoc}
947    * @deprecated If any exception is thrown by one of the actions, there is no way to
948    * retrieve the partially executed results. Use
949    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
950    * instead.
951    */
952   @Deprecated
953   @Override
954   public <R> Object[] batchCallback(
955     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
956       InterruptedException {
957     Object[] results = new Object[actions.size()];
958     batchCallback(actions, results, callback);
959     return results;
960   }
961 
962   /**
963    * {@inheritDoc}
964    */
965   @Override
966   public void delete(final Delete delete)
967   throws IOException {
968     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
969         tableName, delete.getRow()) {
970       @Override
971       public Boolean call(int callTimeout) throws IOException {
972         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
973         controller.setPriority(tableName);
974         controller.setCallTimeout(callTimeout);
975 
976         try {
977           MutateRequest request = RequestConverter.buildMutateRequest(
978             getLocation().getRegionInfo().getRegionName(), delete);
979           MutateResponse response = getStub().mutate(controller, request);
980           return Boolean.valueOf(response.getProcessed());
981         } catch (ServiceException se) {
982           throw ProtobufUtil.getRemoteException(se);
983         }
984       }
985     };
986     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
987   }
988 
989   /**
990    * {@inheritDoc}
991    */
992   @Override
993   public void delete(final List<Delete> deletes)
994   throws IOException {
995     Object[] results = new Object[deletes.size()];
996     try {
997       batch(deletes, results);
998     } catch (InterruptedException e) {
999       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1000     } finally {
1001       // mutate list so that it is empty for complete success, or contains only failed records
1002       // results are returned in the same order as the requests in list
1003       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
1004       for (int i = results.length - 1; i>=0; i--) {
1005         // if result is not null, it succeeded
1006         if (results[i] instanceof Result) {
1007           deletes.remove(i);
1008         }
1009       }
1010     }
1011   }
1012 
1013   /**
1014    * {@inheritDoc}
1015    */
1016   @Override
1017   public void put(final Put put)
1018       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1019     doPut(put);
1020     if (autoFlush) {
1021       flushCommits();
1022     }
1023   }
1024 
1025   /**
1026    * {@inheritDoc}
1027    */
1028   @Override
1029   public void put(final List<Put> puts)
1030       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1031     for (Put put : puts) {
1032       doPut(put);
1033     }
1034     if (autoFlush) {
1035       flushCommits();
1036     }
1037   }
1038 
1039 
1040   /**
1041    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
1042    *  cluster.
1043    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
1044    * @throws InterruptedIOException if we were interrupted.
1045    */
1046   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1047     // This behavior is highly non-intuitive... it does not protect us against
1048     // 94-incompatible behavior, which is a timing issue because hasError, the below code
1049     // and setter of hasError are not synchronized. Perhaps it should be removed.
1050     if (ap.hasError()) {
1051       writeAsyncBuffer.add(put);
1052       backgroundFlushCommits(true);
1053     }
1054 
1055     validatePut(put);
1056 
1057     currentWriteBufferSize += put.heapSize();
1058     writeAsyncBuffer.add(put);
1059 
1060     while (currentWriteBufferSize > writeBufferSize) {
1061       backgroundFlushCommits(false);
1062     }
1063   }
1064 
1065 
1066   /**
1067    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
1068    * If the is an error (max retried reach from a previous flush or bad operation), it tries to
1069    * send all operations in the buffer and sends an exception.
1070    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
1071    *                     returning.
1072    */
1073   private void backgroundFlushCommits(boolean synchronous) throws
1074       InterruptedIOException, RetriesExhaustedWithDetailsException {
1075 
1076     try {
1077       if (!synchronous) {
1078         ap.submit(tableName, writeAsyncBuffer, true, null, false);
1079         if (ap.hasError()) {
1080           LOG.debug(tableName + ": One or more of the operations have failed -" +
1081               " waiting for all operation in progress to finish (successfully or not)");
1082         }
1083       }
1084       if (synchronous || ap.hasError()) {
1085         while (!writeAsyncBuffer.isEmpty()) {
1086           ap.submit(tableName, writeAsyncBuffer, true, null, false);
1087         }
1088         List<Row> failedRows = clearBufferOnFail ? null : writeAsyncBuffer;
1089         RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(failedRows);
1090         if (error != null) {
1091           throw error;
1092         }
1093       }
1094     } finally {
1095       currentWriteBufferSize = 0;
1096       for (Row mut : writeAsyncBuffer) {
1097         if (mut instanceof Mutation) {
1098           currentWriteBufferSize += ((Mutation) mut).heapSize();
1099         }
1100       }
1101     }
1102   }
1103 
1104   /**
1105    * {@inheritDoc}
1106    */
1107   @Override
1108   public void mutateRow(final RowMutations rm) throws IOException {
1109     RegionServerCallable<Void> callable =
1110         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1111       @Override
1112       public Void call(int callTimeout) throws IOException {
1113         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1114         controller.setPriority(tableName);
1115         controller.setCallTimeout(callTimeout);
1116         try {
1117           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1118             getLocation().getRegionInfo().getRegionName(), rm);
1119           regionMutationBuilder.setAtomic(true);
1120           MultiRequest request =
1121             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1122           getStub().multi(controller, request);
1123         } catch (ServiceException se) {
1124           throw ProtobufUtil.getRemoteException(se);
1125         }
1126         return null;
1127       }
1128     };
1129     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1130   }
1131 
1132   /**
1133    * {@inheritDoc}
1134    */
1135   @Override
1136   public Result append(final Append append) throws IOException {
1137     if (append.numFamilies() == 0) {
1138       throw new IOException(
1139           "Invalid arguments to append, no columns specified");
1140     }
1141 
1142     NonceGenerator ng = this.connection.getNonceGenerator();
1143     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1144     RegionServerCallable<Result> callable =
1145       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1146         @Override
1147         public Result call(int callTimeout) throws IOException {
1148           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1149           controller.setPriority(getTableName());
1150           controller.setCallTimeout(callTimeout);
1151           try {
1152             MutateRequest request = RequestConverter.buildMutateRequest(
1153               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1154             MutateResponse response = getStub().mutate(controller, request);
1155             if (!response.hasResult()) return null;
1156             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1157           } catch (ServiceException se) {
1158             throw ProtobufUtil.getRemoteException(se);
1159           }
1160         }
1161       };
1162     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1163   }
1164 
1165   /**
1166    * {@inheritDoc}
1167    */
1168   @Override
1169   public Result increment(final Increment increment) throws IOException {
1170     if (!increment.hasFamilies()) {
1171       throw new IOException(
1172           "Invalid arguments to increment, no columns specified");
1173     }
1174     NonceGenerator ng = this.connection.getNonceGenerator();
1175     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1176     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1177         getName(), increment.getRow()) {
1178       @Override
1179       public Result call(int callTimeout) throws IOException {
1180         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1181         controller.setPriority(getTableName());
1182         controller.setCallTimeout(callTimeout);
1183         try {
1184           MutateRequest request = RequestConverter.buildMutateRequest(
1185             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1186           MutateResponse response = getStub().mutate(controller, request);
1187           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1188         } catch (ServiceException se) {
1189           throw ProtobufUtil.getRemoteException(se);
1190         }
1191       }
1192     };
1193     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1194   }
1195 
1196   /**
1197    * {@inheritDoc}
1198    */
1199   @Override
1200   public long incrementColumnValue(final byte [] row, final byte [] family,
1201       final byte [] qualifier, final long amount)
1202   throws IOException {
1203     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1204   }
1205 
1206   /**
1207    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1208    */
1209   @Deprecated
1210   @Override
1211   public long incrementColumnValue(final byte [] row, final byte [] family,
1212       final byte [] qualifier, final long amount, final boolean writeToWAL)
1213   throws IOException {
1214     return incrementColumnValue(row, family, qualifier, amount,
1215       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1216   }
1217 
1218   /**
1219    * {@inheritDoc}
1220    */
1221   @Override
1222   public long incrementColumnValue(final byte [] row, final byte [] family,
1223       final byte [] qualifier, final long amount, final Durability durability)
1224   throws IOException {
1225     NullPointerException npe = null;
1226     if (row == null) {
1227       npe = new NullPointerException("row is null");
1228     } else if (family == null) {
1229       npe = new NullPointerException("family is null");
1230     } else if (qualifier == null) {
1231       npe = new NullPointerException("qualifier is null");
1232     }
1233     if (npe != null) {
1234       throw new IOException(
1235           "Invalid arguments to incrementColumnValue", npe);
1236     }
1237 
1238     NonceGenerator ng = this.connection.getNonceGenerator();
1239     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1240     RegionServerCallable<Long> callable =
1241       new RegionServerCallable<Long>(connection, getName(), row) {
1242         @Override
1243         public Long call(int callTimeout) throws IOException {
1244           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1245           controller.setPriority(getTableName());
1246           controller.setCallTimeout(callTimeout);
1247           try {
1248             MutateRequest request = RequestConverter.buildIncrementRequest(
1249               getLocation().getRegionInfo().getRegionName(), row, family,
1250               qualifier, amount, durability, nonceGroup, nonce);
1251             MutateResponse response = getStub().mutate(controller, request);
1252             Result result =
1253               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1254             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1255           } catch (ServiceException se) {
1256             throw ProtobufUtil.getRemoteException(se);
1257           }
1258         }
1259       };
1260     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1261   }
1262 
1263   /**
1264    * {@inheritDoc}
1265    */
1266   @Override
1267   public boolean checkAndPut(final byte [] row,
1268       final byte [] family, final byte [] qualifier, final byte [] value,
1269       final Put put)
1270   throws IOException {
1271     RegionServerCallable<Boolean> callable =
1272       new RegionServerCallable<Boolean>(connection, getName(), row) {
1273         @Override
1274         public Boolean call(int callTimeout) throws IOException {
1275           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1276           controller.setPriority(tableName);
1277           controller.setCallTimeout(callTimeout);
1278           try {
1279             MutateRequest request = RequestConverter.buildMutateRequest(
1280               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1281                 new BinaryComparator(value), CompareType.EQUAL, put);
1282             MutateResponse response = getStub().mutate(controller, request);
1283             return Boolean.valueOf(response.getProcessed());
1284           } catch (ServiceException se) {
1285             throw ProtobufUtil.getRemoteException(se);
1286           }
1287         }
1288       };
1289     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1290   }
1291 
1292   /**
1293    * {@inheritDoc}
1294    */
1295   @Override
1296   public boolean checkAndPut(final byte [] row, final byte [] family,
1297       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1298       final Put put)
1299   throws IOException {
1300     RegionServerCallable<Boolean> callable =
1301       new RegionServerCallable<Boolean>(connection, getName(), row) {
1302         @Override
1303         public Boolean call(int callTimeout) throws IOException {
1304           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1305           controller.setPriority(tableName);
1306           controller.setCallTimeout(callTimeout);
1307           try {
1308             CompareType compareType = CompareType.valueOf(compareOp.name());
1309             MutateRequest request = RequestConverter.buildMutateRequest(
1310               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1311                 new BinaryComparator(value), compareType, put);
1312             MutateResponse response = getStub().mutate(controller, request);
1313             return Boolean.valueOf(response.getProcessed());
1314           } catch (ServiceException se) {
1315             throw ProtobufUtil.getRemoteException(se);
1316           }
1317         }
1318       };
1319     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1320   }
1321 
1322   /**
1323    * {@inheritDoc}
1324    */
1325   @Override
1326   public boolean checkAndDelete(final byte [] row,
1327       final byte [] family, final byte [] qualifier, final byte [] value,
1328       final Delete delete)
1329   throws IOException {
1330     RegionServerCallable<Boolean> callable =
1331       new RegionServerCallable<Boolean>(connection, getName(), row) {
1332         @Override
1333         public Boolean call(int callTimeout) throws IOException {
1334           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1335           controller.setPriority(tableName);
1336           controller.setCallTimeout(callTimeout);
1337           try {
1338             MutateRequest request = RequestConverter.buildMutateRequest(
1339               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1340                 new BinaryComparator(value), CompareType.EQUAL, delete);
1341             MutateResponse response = getStub().mutate(controller, request);
1342             return Boolean.valueOf(response.getProcessed());
1343           } catch (ServiceException se) {
1344             throw ProtobufUtil.getRemoteException(se);
1345           }
1346         }
1347       };
1348     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1349   }
1350 
1351   /**
1352    * {@inheritDoc}
1353    */
1354   @Override
1355   public boolean checkAndDelete(final byte [] row, final byte [] family,
1356       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1357       final Delete delete)
1358   throws IOException {
1359     RegionServerCallable<Boolean> callable =
1360       new RegionServerCallable<Boolean>(connection, getName(), row) {
1361         @Override
1362         public Boolean call(int callTimeout) throws IOException {
1363           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1364           controller.setPriority(tableName);
1365           controller.setCallTimeout(callTimeout);
1366           try {
1367             CompareType compareType = CompareType.valueOf(compareOp.name());
1368             MutateRequest request = RequestConverter.buildMutateRequest(
1369               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1370                 new BinaryComparator(value), compareType, delete);
1371             MutateResponse response = getStub().mutate(controller, request);
1372             return Boolean.valueOf(response.getProcessed());
1373           } catch (ServiceException se) {
1374             throw ProtobufUtil.getRemoteException(se);
1375           }
1376         }
1377       };
1378     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1379   }
1380 
1381   /**
1382    * {@inheritDoc}
1383    */
1384   @Override
1385   public boolean exists(final Get get) throws IOException {
1386     get.setCheckExistenceOnly(true);
1387     Result r = get(get);
1388     assert r.getExists() != null;
1389     return r.getExists();
1390   }
1391 
1392   /**
1393    * {@inheritDoc}
1394    */
1395   @Override
1396   public Boolean[] exists(final List<Get> gets) throws IOException {
1397     if (gets.isEmpty()) return new Boolean[]{};
1398     if (gets.size() == 1) return new Boolean[]{exists(gets.get(0))};
1399 
1400     for (Get g: gets){
1401       g.setCheckExistenceOnly(true);
1402     }
1403 
1404     Object[] r1;
1405     try {
1406       r1 = batch(gets);
1407     } catch (InterruptedException e) {
1408       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1409     }
1410 
1411     // translate.
1412     Boolean[] results = new Boolean[r1.length];
1413     int i = 0;
1414     for (Object o : r1) {
1415       // batch ensures if there is a failure we get an exception instead
1416       results[i++] = ((Result)o).getExists();
1417     }
1418 
1419     return results;
1420   }
1421 
1422   /**
1423    * {@inheritDoc}
1424    */
1425   @Override
1426   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1427     // As we can have an operation in progress even if the buffer is empty, we call
1428     //  backgroundFlushCommits at least one time.
1429     backgroundFlushCommits(true);
1430   }
1431 
1432   /**
1433    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1434    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1435    *
1436    * @param list The collection of actions.
1437    * @param results An empty array, same size as list. If an exception is thrown,
1438    * you can test here for partial results, and to determine which actions
1439    * processed successfully.
1440    * @throws IOException if there are problems talking to META. Per-item
1441    * exceptions are stored in the results array.
1442    */
1443   public <R> void processBatchCallback(
1444     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1445     throws IOException, InterruptedException {
1446     this.batchCallback(list, results, callback);
1447   }
1448 
1449 
1450   /**
1451    * Parameterized batch processing, allowing varying return types for different
1452    * {@link Row} implementations.
1453    */
1454   public void processBatch(final List<? extends Row> list, final Object[] results)
1455     throws IOException, InterruptedException {
1456     this.batch(list, results);
1457   }
1458 
1459 
1460   @Override
1461   public void close() throws IOException {
1462     if (this.closed) {
1463       return;
1464     }
1465     flushCommits();
1466     if (cleanupPoolOnClose) {
1467       this.pool.shutdown();
1468       try {
1469         boolean terminated = false;
1470         do {
1471           // wait until the pool has terminated
1472           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1473         } while (!terminated);
1474       } catch (InterruptedException e) {
1475         LOG.warn("waitForTermination interrupted");
1476       }
1477     }
1478     if (cleanupConnectionOnClose) {
1479       if (this.connection != null) {
1480         this.connection.close();
1481       }
1482     }
1483     this.closed = true;
1484   }
1485 
1486   // validate for well-formedness
1487   public void validatePut(final Put put) throws IllegalArgumentException{
1488     if (put.isEmpty()) {
1489       throw new IllegalArgumentException("No columns to insert");
1490     }
1491     if (maxKeyValueSize > 0) {
1492       for (List<Cell> list : put.getFamilyCellMap().values()) {
1493         for (Cell cell : list) {
1494           // KeyValue v1 expectation.  Cast for now.
1495           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1496           if (kv.getLength() > maxKeyValueSize) {
1497             throw new IllegalArgumentException("KeyValue size too large");
1498           }
1499         }
1500       }
1501     }
1502   }
1503 
1504   /**
1505    * {@inheritDoc}
1506    */
1507   @Override
1508   public boolean isAutoFlush() {
1509     return autoFlush;
1510   }
1511 
1512   /**
1513    * {@inheritDoc}
1514    */
1515   @Deprecated
1516   @Override
1517   public void setAutoFlush(boolean autoFlush) {
1518     setAutoFlush(autoFlush, autoFlush);
1519   }
1520 
1521   /**
1522    * {@inheritDoc}
1523    */
1524   @Override
1525   public void setAutoFlushTo(boolean autoFlush) {
1526     setAutoFlush(autoFlush, clearBufferOnFail);
1527   }
1528 
1529   /**
1530    * {@inheritDoc}
1531    */
1532   @Override
1533   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1534     this.autoFlush = autoFlush;
1535     this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1536   }
1537 
1538   /**
1539    * Returns the maximum size in bytes of the write buffer for this HTable.
1540    * <p>
1541    * The default value comes from the configuration parameter
1542    * {@code hbase.client.write.buffer}.
1543    * @return The size of the write buffer in bytes.
1544    */
1545   @Override
1546   public long getWriteBufferSize() {
1547     return writeBufferSize;
1548   }
1549 
1550   /**
1551    * Sets the size of the buffer in bytes.
1552    * <p>
1553    * If the new size is less than the current amount of data in the
1554    * write buffer, the buffer gets flushed.
1555    * @param writeBufferSize The new write buffer size, in bytes.
1556    * @throws IOException if a remote or network exception occurs.
1557    */
1558   @Override
1559   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1560     this.writeBufferSize = writeBufferSize;
1561     if(currentWriteBufferSize > writeBufferSize) {
1562       flushCommits();
1563     }
1564   }
1565 
1566   /**
1567    * The pool is used for mutli requests for this HTable
1568    * @return the pool used for mutli
1569    */
1570   ExecutorService getPool() {
1571     return this.pool;
1572   }
1573 
1574   /**
1575    * Enable or disable region cache prefetch for the table. It will be
1576    * applied for the given table's all HTable instances who share the same
1577    * connection. By default, the cache prefetch is enabled.
1578    * @param tableName name of table to configure.
1579    * @param enable Set to true to enable region cache prefetch. Or set to
1580    * false to disable it.
1581    * @throws IOException
1582    * @deprecated does nothing since 0.99
1583    */
1584   @Deprecated
1585   public static void setRegionCachePrefetch(final byte[] tableName,
1586       final boolean enable)  throws IOException {
1587   }
1588 
1589   /**
1590    * @deprecated does nothing since 0.99
1591    */
1592   @Deprecated
1593   public static void setRegionCachePrefetch(
1594       final TableName tableName,
1595       final boolean enable) throws IOException {
1596   }
1597 
1598   /**
1599    * Enable or disable region cache prefetch for the table. It will be
1600    * applied for the given table's all HTable instances who share the same
1601    * connection. By default, the cache prefetch is enabled.
1602    * @param conf The Configuration object to use.
1603    * @param tableName name of table to configure.
1604    * @param enable Set to true to enable region cache prefetch. Or set to
1605    * false to disable it.
1606    * @throws IOException
1607    * @deprecated does nothing since 0.99
1608    */
1609   @Deprecated
1610   public static void setRegionCachePrefetch(final Configuration conf,
1611       final byte[] tableName, final boolean enable) throws IOException {
1612   }
1613 
1614   /**
1615    * @deprecated does nothing since 0.99
1616    */
1617   @Deprecated
1618   public static void setRegionCachePrefetch(final Configuration conf,
1619       final TableName tableName,
1620       final boolean enable) throws IOException {
1621   }
1622 
1623   /**
1624    * Check whether region cache prefetch is enabled or not for the table.
1625    * @param conf The Configuration object to use.
1626    * @param tableName name of table to check
1627    * @return true if table's region cache prefecth is enabled. Otherwise
1628    * it is disabled.
1629    * @throws IOException
1630    * @deprecated always return false since 0.99
1631    */
1632   @Deprecated
1633   public static boolean getRegionCachePrefetch(final Configuration conf,
1634       final byte[] tableName) throws IOException {
1635     return false;
1636   }
1637 
1638   /**
1639    * @deprecated always return false since 0.99
1640    */
1641   @Deprecated
1642   public static boolean getRegionCachePrefetch(final Configuration conf,
1643       final TableName tableName) throws IOException {
1644     return false;
1645   }
1646 
1647   /**
1648    * Check whether region cache prefetch is enabled or not for the table.
1649    * @param tableName name of table to check
1650    * @return true if table's region cache prefecth is enabled. Otherwise
1651    * it is disabled.
1652    * @throws IOException
1653    * @deprecated always return false since 0.99
1654    */
1655   @Deprecated
1656   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1657     return false;
1658   }
1659 
1660   /**
1661    * @deprecated always return false since 0.99
1662    */
1663   @Deprecated
1664   public static boolean getRegionCachePrefetch(
1665       final TableName tableName) throws IOException {
1666     return false;
1667   }
1668 
1669   /**
1670    * Explicitly clears the region cache to fetch the latest value from META.
1671    * This is a power user function: avoid unless you know the ramifications.
1672    */
1673   public void clearRegionCache() {
1674     this.connection.clearRegionCache();
1675   }
1676 
1677   /**
1678    * {@inheritDoc}
1679    */
1680   @Override
1681   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1682     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1683   }
1684 
1685   /**
1686    * {@inheritDoc}
1687    */
1688   @Override
1689   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1690       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1691       throws ServiceException, Throwable {
1692     final Map<byte[],R> results =  Collections.synchronizedMap(
1693         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1694     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1695       @Override
1696       public void update(byte[] region, byte[] row, R value) {
1697         if (region != null) {
1698           results.put(region, value);
1699         }
1700       }
1701     });
1702     return results;
1703   }
1704 
1705   /**
1706    * {@inheritDoc}
1707    */
1708   @Override
1709   public <T extends Service, R> void coprocessorService(final Class<T> service,
1710       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1711       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1712 
1713     // get regions covered by the row range
1714     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1715 
1716     Map<byte[],Future<R>> futures =
1717         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1718     for (final byte[] r : keys) {
1719       final RegionCoprocessorRpcChannel channel =
1720           new RegionCoprocessorRpcChannel(connection, tableName, r);
1721       Future<R> future = pool.submit(
1722           new Callable<R>() {
1723             @Override
1724             public R call() throws Exception {
1725               T instance = ProtobufUtil.newServiceStub(service, channel);
1726               R result = callable.call(instance);
1727               byte[] region = channel.getLastRegion();
1728               if (callback != null) {
1729                 callback.update(region, r, result);
1730               }
1731               return result;
1732             }
1733           });
1734       futures.put(r, future);
1735     }
1736     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1737       try {
1738         e.getValue().get();
1739       } catch (ExecutionException ee) {
1740         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1741             + Bytes.toStringBinary(e.getKey()), ee);
1742         throw ee.getCause();
1743       } catch (InterruptedException ie) {
1744         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1745             + " for row " + Bytes.toStringBinary(e.getKey()))
1746             .initCause(ie);
1747       }
1748     }
1749   }
1750 
1751   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1752   throws IOException {
1753     if (start == null) {
1754       start = HConstants.EMPTY_START_ROW;
1755     }
1756     if (end == null) {
1757       end = HConstants.EMPTY_END_ROW;
1758     }
1759     return getKeysAndRegionsInRange(start, end, true).getFirst();
1760   }
1761 
1762   public void setOperationTimeout(int operationTimeout) {
1763     this.operationTimeout = operationTimeout;
1764   }
1765 
1766   public int getOperationTimeout() {
1767     return operationTimeout;
1768   }
1769 
1770   @Override
1771   public String toString() {
1772     return tableName + ";" + connection;
1773   }
1774 
1775   /**
1776    * Run basic test.
1777    * @param args Pass table name and row and will get the content.
1778    * @throws IOException
1779    */
1780   public static void main(String[] args) throws IOException {
1781     HTable t = new HTable(HBaseConfiguration.create(), args[0]);
1782     try {
1783       System.out.println(t.get(new Get(Bytes.toBytes(args[1]))));
1784     } finally {
1785       t.close();
1786     }
1787   }
1788 
1789   /**
1790    * {@inheritDoc}
1791    */
1792   @Override
1793   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1794       Descriptors.MethodDescriptor methodDescriptor, Message request,
1795       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1796     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1797         Bytes.BYTES_COMPARATOR));
1798     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1799         new Callback<R>() {
1800 
1801           @Override
1802           public void update(byte[] region, byte[] row, R result) {
1803             if (region != null) {
1804               results.put(region, result);
1805             }
1806           }
1807         });
1808     return results;
1809   }
1810 
1811   /**
1812    * {@inheritDoc}
1813    */
1814   @Override
1815   public <R extends Message> void batchCoprocessorService(
1816       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1817       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1818       throws ServiceException, Throwable {
1819 
1820     // get regions covered by the row range
1821     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1822         getKeysAndRegionsInRange(startKey, endKey, true);
1823     List<byte[]> keys = keysAndRegions.getFirst();
1824     List<HRegionLocation> regions = keysAndRegions.getSecond();
1825 
1826     // check if we have any calls to make
1827     if (keys.isEmpty()) {
1828       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1829           ", end=" + Bytes.toStringBinary(endKey));
1830       return;
1831     }
1832 
1833     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1834     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1835         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1836     for (int i = 0; i < keys.size(); i++) {
1837       final byte[] rowKey = keys.get(i);
1838       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1839       RegionCoprocessorServiceExec exec =
1840           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1841       execs.add(exec);
1842       execsByRow.put(rowKey, exec);
1843     }
1844 
1845     // tracking for any possible deserialization errors on success callback
1846     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1847     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1848     final List<Row> callbackErrorActions = new ArrayList<Row>();
1849     final List<String> callbackErrorServers = new ArrayList<String>();
1850     Object[] results = new Object[execs.size()];
1851 
1852     AsyncProcess asyncProcess =
1853         new AsyncProcess(connection, configuration, pool,
1854             RpcRetryingCallerFactory.instantiate(configuration), true,
1855             RpcControllerFactory.instantiate(configuration));
1856     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1857         new Callback<ClientProtos.CoprocessorServiceResult>() {
1858           @Override
1859           public void update(byte[] region, byte[] row,
1860                               ClientProtos.CoprocessorServiceResult serviceResult) {
1861             if (LOG.isTraceEnabled()) {
1862               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1863                   ": region=" + Bytes.toStringBinary(region) +
1864                   ", row=" + Bytes.toStringBinary(row) +
1865                   ", value=" + serviceResult.getValue().getValue());
1866             }
1867             try {
1868               callback.update(region, row,
1869                   (R) responsePrototype.newBuilderForType().mergeFrom(
1870                       serviceResult.getValue().getValue()).build());
1871             } catch (InvalidProtocolBufferException e) {
1872               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1873                   e);
1874               callbackErrorExceptions.add(e);
1875               callbackErrorActions.add(execsByRow.get(row));
1876               callbackErrorServers.add("null");
1877             }
1878           }
1879         }, results);
1880 
1881     future.waitUntilDone();
1882 
1883     if (future.hasError()) {
1884       throw future.getErrors();
1885     } else if (!callbackErrorExceptions.isEmpty()) {
1886       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1887           callbackErrorServers);
1888     }
1889   }
1890 }