View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import com.google.protobuf.InvalidProtocolBufferException;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.classification.InterfaceStability;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.HBaseConfiguration;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HRegionLocation;
50  import org.apache.hadoop.hbase.HTableDescriptor;
51  import org.apache.hadoop.hbase.KeyValue;
52  import org.apache.hadoop.hbase.KeyValueUtil;
53  import org.apache.hadoop.hbase.RegionLocations;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
57  import org.apache.hadoop.hbase.client.coprocessor.Batch;
58  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
59  import org.apache.hadoop.hbase.filter.BinaryComparator;
60  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
61  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
62  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
63  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
64  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
65  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66  import org.apache.hadoop.hbase.protobuf.RequestConverter;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
70  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
71  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
72  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
73  import org.apache.hadoop.hbase.util.Bytes;
74  import org.apache.hadoop.hbase.util.Pair;
75  import org.apache.hadoop.hbase.util.Threads;
76  
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.Message;
79  import com.google.common.annotations.VisibleForTesting;
80  import com.google.protobuf.Service;
81  import com.google.protobuf.ServiceException;
82  
83  /**
84   * <p>Used to communicate with a single HBase table.  An implementation of
85   * {@link HTableInterface}.  Instances of this class can be constructed directly but it is
86   * encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}.
87   * See {@link HConnectionManager} class comment for an example.
88   *
89   * <p>This class is not thread safe for reads nor write.
90   *
91   * <p>In case of writes (Put, Delete), the underlying write buffer can
92   * be corrupted if multiple threads contend over a single HTable instance.
93   *
94   * <p>In case of reads, some fields used by a Scan are shared among all threads.
95   * The HTable implementation can either not contract to be safe in case of a Get
96   *
97   * <p>Instances of HTable passed the same {@link Configuration} instance will
98   * share connections to servers out on the cluster and to the zookeeper ensemble
99   * as well as caches of region locations.  This is usually a *good* thing and it
100  * is recommended to reuse the same configuration object for all your tables.
101  * This happens because they will all share the same underlying
102  * {@link HConnection} instance. See {@link HConnectionManager} for more on
103  * how this mechanism works.
104  *
105  * <p>{@link HConnection} will read most of the
106  * configuration it needs from the passed {@link Configuration} on initial
107  * construction.  Thereafter, for settings such as
108  * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
109  * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
110  * passed {@link Configuration} subsequent to {@link HConnection} construction
111  * will go unnoticed.  To run with changed values, make a new
112  * {@link HTable} passing a new {@link Configuration} instance that has the
113  * new configuration.
114  *
115  * <p>Note that this class implements the {@link Closeable} interface. When a
116  * HTable instance is no longer required, it *should* be closed in order to ensure
117  * that the underlying resources are promptly released. Please note that the close
118  * method can throw java.io.IOException that must be handled.
119  *
120  * @see HBaseAdmin for create, drop, list, enable and disable of tables.
121  * @see HConnection
122  * @see HConnectionManager
123  */
124 @InterfaceAudience.Public
125 @InterfaceStability.Stable
126 public class HTable implements HTableInterface, RegionLocator {
127   private static final Log LOG = LogFactory.getLog(HTable.class);
128   protected ClusterConnection connection;
129   private final TableName tableName;
130   private volatile Configuration configuration;
131   protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
132   private long writeBufferSize;
133   private boolean clearBufferOnFail;
134   private boolean autoFlush;
135   protected long currentWriteBufferSize;
136   protected int scannerCaching;
137   private int maxKeyValueSize;
138   private ExecutorService pool;  // For Multi & Scan
139   private boolean closed;
140   private int operationTimeout;
141   private int retries;
142   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
143   private final boolean cleanupConnectionOnClose; // close the connection in close()
144   private Consistency defaultConsistency = Consistency.STRONG;
145   private int primaryCallTimeoutMicroSecond;
146   private int replicaCallTimeoutMicroSecondScan;
147 
148 
149   /** The Async process for puts with autoflush set to false or multiputs */
150   protected AsyncProcess ap;
151   /** The Async process for batch */
152   protected AsyncProcess multiAp;
153   private RpcRetryingCallerFactory rpcCallerFactory;
154   private RpcControllerFactory rpcControllerFactory;
155 
156   /**
157    * Creates an object to access a HBase table.
158    * Shares zookeeper connection and other resources with other HTable instances
159    * created with the same <code>conf</code> instance.  Uses already-populated
160    * region cache if one is available, populated by any other HTable instances
161    * sharing this <code>conf</code> instance.  Recommended.
162    * @param conf Configuration object to use.
163    * @param tableName Name of the table.
164    * @throws IOException if a remote or network exception occurs
165    */
166   public HTable(Configuration conf, final String tableName)
167   throws IOException {
168     this(conf, TableName.valueOf(tableName));
169   }
170 
171   /**
172    * Creates an object to access a HBase table.
173    * Shares zookeeper connection and other resources with other HTable instances
174    * created with the same <code>conf</code> instance.  Uses already-populated
175    * region cache if one is available, populated by any other HTable instances
176    * sharing this <code>conf</code> instance.  Recommended.
177    * @param conf Configuration object to use.
178    * @param tableName Name of the table.
179    * @throws IOException if a remote or network exception occurs
180    */
181   public HTable(Configuration conf, final byte[] tableName)
182   throws IOException {
183     this(conf, TableName.valueOf(tableName));
184   }
185 
186 
187 
188   /**
189    * Creates an object to access a HBase table.
190    * Shares zookeeper connection and other resources with other HTable instances
191    * created with the same <code>conf</code> instance.  Uses already-populated
192    * region cache if one is available, populated by any other HTable instances
193    * sharing this <code>conf</code> instance.  Recommended.
194    * @param conf Configuration object to use.
195    * @param tableName table name pojo
196    * @throws IOException if a remote or network exception occurs
197    */
198   public HTable(Configuration conf, final TableName tableName)
199   throws IOException {
200     this.tableName = tableName;
201     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
202     if (conf == null) {
203       this.connection = null;
204       return;
205     }
206     this.connection = ConnectionManager.getConnectionInternal(conf);
207     this.configuration = conf;
208 
209     this.pool = getDefaultExecutor(conf);
210     this.finishSetup();
211   }
212 
213   /**
214    * Creates an object to access a HBase table. Shares zookeeper connection and other resources with
215    * other HTable instances created with the same <code>connection</code> instance. Use this
216    * constructor when the HConnection instance is externally managed.
217    * @param tableName Name of the table.
218    * @param connection HConnection to be used.
219    * @throws IOException if a remote or network exception occurs
220    * @deprecated Do not use.
221    */
222   @Deprecated
223   public HTable(TableName tableName, HConnection connection) throws IOException {
224     this.tableName = tableName;
225     this.cleanupPoolOnClose = true;
226     this.cleanupConnectionOnClose = false;
227     this.connection = (ClusterConnection)connection;
228     this.configuration = connection.getConfiguration();
229 
230     this.pool = getDefaultExecutor(this.configuration);
231     this.finishSetup();
232   }
233 
234   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
235     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
236     if (maxThreads == 0) {
237       maxThreads = 1; // is there a better default?
238     }
239     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
240 
241     // Using the "direct handoff" approach, new threads will only be created
242     // if it is necessary and will grow unbounded. This could be bad but in HCM
243     // we only create as many Runnables as there are region servers. It means
244     // it also scales when new region servers are added.
245     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
246         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
247     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
248     return pool;
249   }
250 
251   /**
252    * Creates an object to access a HBase table.
253    * Shares zookeeper connection and other resources with other HTable instances
254    * created with the same <code>conf</code> instance.  Uses already-populated
255    * region cache if one is available, populated by any other HTable instances
256    * sharing this <code>conf</code> instance.
257    * Use this constructor when the ExecutorService is externally managed.
258    * @param conf Configuration object to use.
259    * @param tableName Name of the table.
260    * @param pool ExecutorService to be used.
261    * @throws IOException if a remote or network exception occurs
262    */
263   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
264       throws IOException {
265     this(conf, TableName.valueOf(tableName), pool);
266   }
267 
268   /**
269    * Creates an object to access a HBase table.
270    * Shares zookeeper connection and other resources with other HTable instances
271    * created with the same <code>conf</code> instance.  Uses already-populated
272    * region cache if one is available, populated by any other HTable instances
273    * sharing this <code>conf</code> instance.
274    * Use this constructor when the ExecutorService is externally managed.
275    * @param conf Configuration object to use.
276    * @param tableName Name of the table.
277    * @param pool ExecutorService to be used.
278    * @throws IOException if a remote or network exception occurs
279    */
280   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
281       throws IOException {
282     this.connection = ConnectionManager.getConnectionInternal(conf);
283     this.configuration = conf;
284     this.pool = pool;
285     if (pool == null) {
286       this.pool = getDefaultExecutor(conf);
287       this.cleanupPoolOnClose = true;
288     } else {
289       this.cleanupPoolOnClose = false;
290     }
291     this.tableName = tableName;
292     this.cleanupConnectionOnClose = true;
293     this.finishSetup();
294   }
295 
296   /**
297    * Creates an object to access a HBase table.
298    * Shares zookeeper connection and other resources with other HTable instances
299    * created with the same <code>connection</code> instance.
300    * Use this constructor when the ExecutorService and HConnection instance are
301    * externally managed.
302    * @param tableName Name of the table.
303    * @param connection HConnection to be used.
304    * @param pool ExecutorService to be used.
305    * @throws IOException if a remote or network exception occurs.
306    * @deprecated Do not use, internal ctor.
307    */
308   @Deprecated
309   public HTable(final byte[] tableName, final HConnection connection,
310       final ExecutorService pool) throws IOException {
311     this(TableName.valueOf(tableName), connection, pool);
312   }
313 
314   /** @deprecated Do not use, internal ctor. */
315   @Deprecated
316   public HTable(TableName tableName, final HConnection connection,
317       final ExecutorService pool) throws IOException {
318     this(tableName, (ClusterConnection)connection, pool);
319   }
320 
321   /**
322    * Creates an object to access a HBase table.
323    * Shares zookeeper connection and other resources with other HTable instances
324    * created with the same <code>connection</code> instance.
325    * Visible only for HTableWrapper which is in different package.
326    * Should not be used by exernal code.
327    * @param tableName Name of the table.
328    * @param connection HConnection to be used.
329    * @param pool ExecutorService to be used.
330    * @throws IOException if a remote or network exception occurs
331    */
332   @InterfaceAudience.Private
333   public HTable(TableName tableName, final ClusterConnection connection,
334       final ExecutorService pool) throws IOException {
335     if (connection == null || connection.isClosed()) {
336       throw new IllegalArgumentException("Connection is null or closed.");
337     }
338     this.tableName = tableName;
339     this.cleanupConnectionOnClose = false;
340     this.connection = connection;
341     this.configuration = connection.getConfiguration();
342     this.pool = pool;
343     if (pool == null) {
344       this.pool = getDefaultExecutor(this.configuration);
345       this.cleanupPoolOnClose = true;
346     } else {
347       this.cleanupPoolOnClose = false;
348     }
349 
350     this.finishSetup();
351   }
352 
353   /**
354    * For internal testing.
355    */
356   protected HTable() {
357     tableName = null;
358     cleanupPoolOnClose = false;
359     cleanupConnectionOnClose = false;
360   }
361 
362   /**
363    * setup this HTable's parameter based on the passed configuration
364    */
365   private void finishSetup() throws IOException {
366     this.operationTimeout = tableName.isSystemTable() ?
367       this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
368         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
369       this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
370         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
371     this.writeBufferSize = this.configuration.getLong(
372         "hbase.client.write.buffer", 2097152);
373     this.clearBufferOnFail = true;
374     this.autoFlush = true;
375     this.currentWriteBufferSize = 0;
376     this.scannerCaching = this.configuration.getInt(
377         HConstants.HBASE_CLIENT_SCANNER_CACHING,
378         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
379     this.primaryCallTimeoutMicroSecond =
380         this.configuration.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10 ms
381     this.replicaCallTimeoutMicroSecondScan =
382         this.configuration.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
383     this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
384             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
385 
386     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
387     this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
388     // puts need to track errors globally due to how the APIs currently work.
389     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
390     multiAp = this.connection.getAsyncProcess();
391 
392     this.maxKeyValueSize = this.configuration.getInt(
393         "hbase.client.keyvalue.maxsize", -1);
394     this.closed = false;
395   }
396 
397 
398 
399   /**
400    * {@inheritDoc}
401    */
402   @Override
403   public Configuration getConfiguration() {
404     return configuration;
405   }
406 
407   /**
408    * Tells whether or not a table is enabled or not. This method creates a
409    * new HBase configuration, so it might make your unit tests fail due to
410    * incorrect ZK client port.
411    * @param tableName Name of table to check.
412    * @return {@code true} if table is online.
413    * @throws IOException if a remote or network exception occurs
414 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
415    */
416   @Deprecated
417   public static boolean isTableEnabled(String tableName) throws IOException {
418     return isTableEnabled(TableName.valueOf(tableName));
419   }
420 
421   /**
422    * Tells whether or not a table is enabled or not. This method creates a
423    * new HBase configuration, so it might make your unit tests fail due to
424    * incorrect ZK client port.
425    * @param tableName Name of table to check.
426    * @return {@code true} if table is online.
427    * @throws IOException if a remote or network exception occurs
428 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
429    */
430   @Deprecated
431   public static boolean isTableEnabled(byte[] tableName) throws IOException {
432     return isTableEnabled(TableName.valueOf(tableName));
433   }
434 
435   /**
436    * Tells whether or not a table is enabled or not. This method creates a
437    * new HBase configuration, so it might make your unit tests fail due to
438    * incorrect ZK client port.
439    * @param tableName Name of table to check.
440    * @return {@code true} if table is online.
441    * @throws IOException if a remote or network exception occurs
442    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
443    */
444   @Deprecated
445   public static boolean isTableEnabled(TableName tableName) throws IOException {
446     return isTableEnabled(HBaseConfiguration.create(), tableName);
447   }
448 
449   /**
450    * Tells whether or not a table is enabled or not.
451    * @param conf The Configuration object to use.
452    * @param tableName Name of table to check.
453    * @return {@code true} if table is online.
454    * @throws IOException if a remote or network exception occurs
455 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
456    */
457   @Deprecated
458   public static boolean isTableEnabled(Configuration conf, String tableName)
459   throws IOException {
460     return isTableEnabled(conf, TableName.valueOf(tableName));
461   }
462 
463   /**
464    * Tells whether or not a table is enabled or not.
465    * @param conf The Configuration object to use.
466    * @param tableName Name of table to check.
467    * @return {@code true} if table is online.
468    * @throws IOException if a remote or network exception occurs
469 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
470    */
471   @Deprecated
472   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
473   throws IOException {
474     return isTableEnabled(conf, TableName.valueOf(tableName));
475   }
476 
477   /**
478    * Tells whether or not a table is enabled or not.
479    * @param conf The Configuration object to use.
480    * @param tableName Name of table to check.
481    * @return {@code true} if table is online.
482    * @throws IOException if a remote or network exception occurs
483    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
484    */
485   @Deprecated
486   public static boolean isTableEnabled(Configuration conf,
487       final TableName tableName) throws IOException {
488     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
489       @Override
490       public Boolean connect(HConnection connection) throws IOException {
491         return connection.isTableEnabled(tableName);
492       }
493     });
494   }
495 
496   /**
497    * Find region location hosting passed row using cached info
498    * @param row Row to find.
499    * @return The location of the given row.
500    * @throws IOException if a remote or network exception occurs
501    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
502    */
503   @Deprecated
504   public HRegionLocation getRegionLocation(final String row)
505   throws IOException {
506     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
507   }
508 
509   /**
510    * {@inheritDoc}
511    */
512   @Override
513   public HRegionLocation getRegionLocation(final byte [] row)
514   throws IOException {
515     return connection.getRegionLocation(tableName, row, false);
516   }
517 
518   /**
519    * {@inheritDoc}
520    */
521   @Override
522   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
523   throws IOException {
524     return connection.getRegionLocation(tableName, row, reload);
525   }
526 
527   /**
528    * {@inheritDoc}
529    */
530   @Override
531   public byte [] getTableName() {
532     return this.tableName.getName();
533   }
534 
535   @Override
536   public TableName getName() {
537     return tableName;
538   }
539 
540   /**
541    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
542    * manipulations.
543    * @return An HConnection instance.
544    * @deprecated This method will be changed from public to package protected.
545    */
546   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
547   @Deprecated
548   @VisibleForTesting
549   public HConnection getConnection() {
550     return this.connection;
551   }
552 
553   /**
554    * Gets the number of rows that a scanner will fetch at once.
555    * <p>
556    * The default value comes from {@code hbase.client.scanner.caching}.
557    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
558    */
559   @Deprecated
560   public int getScannerCaching() {
561     return scannerCaching;
562   }
563 
564   /**
565    * Kept in 0.96 for backward compatibility
566    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
567    */
568   @Deprecated
569   public List<Row> getWriteBuffer() {
570     return writeAsyncBuffer;
571   }
572 
573   /**
574    * Sets the number of rows that a scanner will fetch at once.
575    * <p>
576    * This will override the value specified by
577    * {@code hbase.client.scanner.caching}.
578    * Increasing this value will reduce the amount of work needed each time
579    * {@code next()} is called on a scanner, at the expense of memory use
580    * (since more rows will need to be maintained in memory by the scanners).
581    * @param scannerCaching the number of rows a scanner will fetch at once.
582    * @deprecated Use {@link Scan#setCaching(int)}
583    */
584   @Deprecated
585   public void setScannerCaching(int scannerCaching) {
586     this.scannerCaching = scannerCaching;
587   }
588 
589   /**
590    * {@inheritDoc}
591    */
592   @Override
593   public HTableDescriptor getTableDescriptor() throws IOException {
594     return new UnmodifyableHTableDescriptor(
595       this.connection.getHTableDescriptor(this.tableName));
596   }
597 
598   /**
599    * {@inheritDoc}
600    */
601   @Override
602   public byte [][] getStartKeys() throws IOException {
603     return getStartEndKeys().getFirst();
604   }
605 
606   /**
607    * {@inheritDoc}
608    */
609   @Override
610   public byte[][] getEndKeys() throws IOException {
611     return getStartEndKeys().getSecond();
612   }
613 
614   /**
615    * {@inheritDoc}
616    */
617   @Override
618   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
619 
620     List<RegionLocations> regions = listRegionLocations();
621     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
622     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
623 
624     for (RegionLocations locations : regions) {
625       HRegionInfo region = locations.getRegionLocation().getRegionInfo();
626       startKeyList.add(region.getStartKey());
627       endKeyList.add(region.getEndKey());
628     }
629 
630     return new Pair<byte [][], byte [][]>(
631       startKeyList.toArray(new byte[startKeyList.size()][]),
632       endKeyList.toArray(new byte[endKeyList.size()][]));
633   }
634 
635   @VisibleForTesting
636   List<RegionLocations> listRegionLocations() throws IOException {
637     return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName());
638   }
639 
640   /**
641    * Gets all the regions and their address for this table.
642    * <p>
643    * This is mainly useful for the MapReduce integration.
644    * @return A map of HRegionInfo with it's server address
645    * @throws IOException if a remote or network exception occurs
646    * @deprecated This is no longer a public API
647    */
648   @Deprecated
649   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
650     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an HRegionLocation.
651     return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName(), false);
652   }
653 
654   /**
655    * Get the corresponding regions for an arbitrary range of keys.
656    * <p>
657    * @param startKey Starting row in range, inclusive
658    * @param endKey Ending row in range, exclusive
659    * @return A list of HRegionLocations corresponding to the regions that
660    * contain the specified range
661    * @throws IOException if a remote or network exception occurs
662    * @deprecated This is no longer a public API
663    */
664   @Deprecated
665   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
666     final byte [] endKey) throws IOException {
667     return getRegionsInRange(startKey, endKey, false);
668   }
669 
670   /**
671    * Get the corresponding regions for an arbitrary range of keys.
672    * <p>
673    * @param startKey Starting row in range, inclusive
674    * @param endKey Ending row in range, exclusive
675    * @param reload true to reload information or false to use cached information
676    * @return A list of HRegionLocations corresponding to the regions that
677    * contain the specified range
678    * @throws IOException if a remote or network exception occurs
679    * @deprecated This is no longer a public API
680    */
681   @Deprecated
682   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
683       final byte [] endKey, final boolean reload) throws IOException {
684     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
685   }
686 
687   /**
688    * Get the corresponding start keys and regions for an arbitrary range of
689    * keys.
690    * <p>
691    * @param startKey Starting row in range, inclusive
692    * @param endKey Ending row in range
693    * @param includeEndKey true if endRow is inclusive, false if exclusive
694    * @return A pair of list of start keys and list of HRegionLocations that
695    *         contain the specified range
696    * @throws IOException if a remote or network exception occurs
697    * @deprecated This is no longer a public API
698    */
699   @Deprecated
700   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
701       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
702       throws IOException {
703     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
704   }
705 
706   /**
707    * Get the corresponding start keys and regions for an arbitrary range of
708    * keys.
709    * <p>
710    * @param startKey Starting row in range, inclusive
711    * @param endKey Ending row in range
712    * @param includeEndKey true if endRow is inclusive, false if exclusive
713    * @param reload true to reload information or false to use cached information
714    * @return A pair of list of start keys and list of HRegionLocations that
715    *         contain the specified range
716    * @throws IOException if a remote or network exception occurs
717    * @deprecated This is no longer a public API
718    */
719   @Deprecated
720   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
721       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
722       final boolean reload) throws IOException {
723     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
724     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
725       throw new IllegalArgumentException(
726         "Invalid range: " + Bytes.toStringBinary(startKey) +
727         " > " + Bytes.toStringBinary(endKey));
728     }
729     List<byte[]> keysInRange = new ArrayList<byte[]>();
730     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
731     byte[] currentKey = startKey;
732     do {
733       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
734       keysInRange.add(currentKey);
735       regionsInRange.add(regionLocation);
736       currentKey = regionLocation.getRegionInfo().getEndKey();
737     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
738         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
739             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
740     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
741         regionsInRange);
742   }
743 
744   /**
745    * {@inheritDoc}
746    * @deprecated Use reversed scan instead.
747    */
748    @Override
749    @Deprecated
750    public Result getRowOrBefore(final byte[] row, final byte[] family)
751        throws IOException {
752      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
753          tableName, row) {
754        @Override
755       public Result call(int callTimeout) throws IOException {
756          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
757          controller.setPriority(tableName);
758          controller.setCallTimeout(callTimeout);
759          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
760              getLocation().getRegionInfo().getRegionName(), row, family);
761          try {
762            ClientProtos.GetResponse response = getStub().get(controller, request);
763            if (!response.hasResult()) return null;
764            return ProtobufUtil.toResult(response.getResult());
765          } catch (ServiceException se) {
766            throw ProtobufUtil.getRemoteException(se);
767          }
768        }
769      };
770      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
771    }
772 
773   /**
774    * The underlying {@link HTable} must not be closed.
775    * {@link HTableInterface#getScanner(Scan)} has other usage details.
776    */
777   @Override
778   public ResultScanner getScanner(final Scan scan) throws IOException {
779     if (scan.getCaching() <= 0) {
780       scan.setCaching(getScannerCaching());
781     }
782 
783     if (scan.isReversed()) {
784       if (scan.isSmall()) {
785         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
786             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
787             pool, replicaCallTimeoutMicroSecondScan);
788       } else {
789         return new ReversedClientScanner(getConfiguration(), scan, getName(),
790             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
791             pool, replicaCallTimeoutMicroSecondScan);
792       }
793     }
794 
795     if (scan.isSmall()) {
796       return new ClientSmallScanner(getConfiguration(), scan, getName(),
797           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
798           pool, replicaCallTimeoutMicroSecondScan);
799     } else {
800       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
801           this.rpcCallerFactory, this.rpcControllerFactory,
802           pool, replicaCallTimeoutMicroSecondScan);
803     }
804   }
805 
806   /**
807    * The underlying {@link HTable} must not be closed.
808    * {@link HTableInterface#getScanner(byte[])} has other usage details.
809    */
810   @Override
811   public ResultScanner getScanner(byte [] family) throws IOException {
812     Scan scan = new Scan();
813     scan.addFamily(family);
814     return getScanner(scan);
815   }
816 
817   /**
818    * The underlying {@link HTable} must not be closed.
819    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
820    */
821   @Override
822   public ResultScanner getScanner(byte [] family, byte [] qualifier)
823   throws IOException {
824     Scan scan = new Scan();
825     scan.addColumn(family, qualifier);
826     return getScanner(scan);
827   }
828 
829   /**
830    * {@inheritDoc}
831    */
832   @Override
833   public Result get(final Get get) throws IOException {
834     if (get.getConsistency() == null){
835       get.setConsistency(defaultConsistency);
836     }
837 
838     if (get.getConsistency() == Consistency.STRONG) {
839       // Good old call.
840       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
841           getName(), get.getRow()) {
842         @Override
843         public Result call(int callTimeout) throws IOException {
844           ClientProtos.GetRequest request =
845               RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
846           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
847           controller.setPriority(tableName);
848           controller.setCallTimeout(callTimeout);
849           try {
850             ClientProtos.GetResponse response = getStub().get(controller, request);
851             if (response == null) return null;
852             return ProtobufUtil.toResult(response.getResult());
853           } catch (ServiceException se) {
854             throw ProtobufUtil.getRemoteException(se);
855           }
856         }
857       };
858       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
859     }
860 
861     // Call that takes into account the replica
862     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
863       rpcControllerFactory, tableName, this.connection, get, pool, retries,
864       operationTimeout, primaryCallTimeoutMicroSecond);
865     return callable.call();
866   }
867 
868 
869   /**
870    * {@inheritDoc}
871    */
872   @Override
873   public Result[] get(List<Get> gets) throws IOException {
874     if (gets.size() == 1) {
875       return new Result[]{get(gets.get(0))};
876     }
877     try {
878       Object [] r1 = batch((List)gets);
879 
880       // translate.
881       Result [] results = new Result[r1.length];
882       int i=0;
883       for (Object o : r1) {
884         // batch ensures if there is a failure we get an exception instead
885         results[i++] = (Result) o;
886       }
887 
888       return results;
889     } catch (InterruptedException e) {
890       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
891     }
892   }
893 
894   /**
895    * {@inheritDoc}
896    */
897   @Override
898   public void batch(final List<? extends Row> actions, final Object[] results)
899       throws InterruptedException, IOException {
900     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
901     ars.waitUntilDone();
902     if (ars.hasError()) {
903       throw ars.getErrors();
904     }
905   }
906 
907   /**
908    * {@inheritDoc}
909    * @deprecated If any exception is thrown by one of the actions, there is no way to
910    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
911    */
912   @Deprecated
913   @Override
914   public Object[] batch(final List<? extends Row> actions)
915      throws InterruptedException, IOException {
916     Object[] results = new Object[actions.size()];
917     batch(actions, results);
918     return results;
919   }
920 
921   /**
922    * {@inheritDoc}
923    */
924   @Override
925   public <R> void batchCallback(
926       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
927       throws IOException, InterruptedException {
928     connection.processBatchCallback(actions, tableName, pool, results, callback);
929   }
930 
931   /**
932    * {@inheritDoc}
933    * @deprecated If any exception is thrown by one of the actions, there is no way to
934    * retrieve the partially executed results. Use
935    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
936    * instead.
937    */
938   @Deprecated
939   @Override
940   public <R> Object[] batchCallback(
941     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
942       InterruptedException {
943     Object[] results = new Object[actions.size()];
944     batchCallback(actions, results, callback);
945     return results;
946   }
947 
948   /**
949    * {@inheritDoc}
950    */
951   @Override
952   public void delete(final Delete delete)
953   throws IOException {
954     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
955         tableName, delete.getRow()) {
956       @Override
957       public Boolean call(int callTimeout) throws IOException {
958         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
959         controller.setPriority(tableName);
960         controller.setCallTimeout(callTimeout);
961 
962         try {
963           MutateRequest request = RequestConverter.buildMutateRequest(
964             getLocation().getRegionInfo().getRegionName(), delete);
965           MutateResponse response = getStub().mutate(controller, request);
966           return Boolean.valueOf(response.getProcessed());
967         } catch (ServiceException se) {
968           throw ProtobufUtil.getRemoteException(se);
969         }
970       }
971     };
972     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
973   }
974 
975   /**
976    * {@inheritDoc}
977    */
978   @Override
979   public void delete(final List<Delete> deletes)
980   throws IOException {
981     Object[] results = new Object[deletes.size()];
982     try {
983       batch(deletes, results);
984     } catch (InterruptedException e) {
985       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
986     } finally {
987       // mutate list so that it is empty for complete success, or contains only failed records
988       // results are returned in the same order as the requests in list
989       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
990       for (int i = results.length - 1; i>=0; i--) {
991         // if result is not null, it succeeded
992         if (results[i] instanceof Result) {
993           deletes.remove(i);
994         }
995       }
996     }
997   }
998 
999   /**
1000    * {@inheritDoc}
1001    */
1002   @Override
1003   public void put(final Put put)
1004       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1005     doPut(put);
1006     if (autoFlush) {
1007       flushCommits();
1008     }
1009   }
1010 
1011   /**
1012    * {@inheritDoc}
1013    */
1014   @Override
1015   public void put(final List<Put> puts)
1016       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1017     for (Put put : puts) {
1018       doPut(put);
1019     }
1020     if (autoFlush) {
1021       flushCommits();
1022     }
1023   }
1024 
1025 
1026   /**
1027    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
1028    *  cluster.
1029    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
1030    * @throws InterruptedIOException if we were interrupted.
1031    */
1032   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1033     // This behavior is highly non-intuitive... it does not protect us against
1034     // 94-incompatible behavior, which is a timing issue because hasError, the below code
1035     // and setter of hasError are not synchronized. Perhaps it should be removed.
1036     if (ap.hasError()) {
1037       writeAsyncBuffer.add(put);
1038       backgroundFlushCommits(true);
1039     }
1040 
1041     validatePut(put);
1042 
1043     currentWriteBufferSize += put.heapSize();
1044     writeAsyncBuffer.add(put);
1045 
1046     while (currentWriteBufferSize > writeBufferSize) {
1047       backgroundFlushCommits(false);
1048     }
1049   }
1050 
1051 
1052   /**
1053    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
1054    * If the is an error (max retried reach from a previous flush or bad operation), it tries to
1055    * send all operations in the buffer and sends an exception.
1056    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
1057    *                     returning.
1058    */
1059   private void backgroundFlushCommits(boolean synchronous) throws
1060       InterruptedIOException, RetriesExhaustedWithDetailsException {
1061 
1062     try {
1063       if (!synchronous) {
1064         ap.submit(tableName, writeAsyncBuffer, true, null, false);
1065         if (ap.hasError()) {
1066           LOG.debug(tableName + ": One or more of the operations have failed -" +
1067               " waiting for all operation in progress to finish (successfully or not)");
1068         }
1069       }
1070       if (synchronous || ap.hasError()) {
1071         while (!writeAsyncBuffer.isEmpty()) {
1072           ap.submit(tableName, writeAsyncBuffer, true, null, false);
1073         }
1074         List<Row> failedRows = clearBufferOnFail ? null : writeAsyncBuffer;
1075         RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(failedRows);
1076         if (error != null) {
1077           throw error;
1078         }
1079       }
1080     } finally {
1081       currentWriteBufferSize = 0;
1082       for (Row mut : writeAsyncBuffer) {
1083         if (mut instanceof Mutation) {
1084           currentWriteBufferSize += ((Mutation) mut).heapSize();
1085         }
1086       }
1087     }
1088   }
1089 
1090   /**
1091    * {@inheritDoc}
1092    */
1093   @Override
1094   public void mutateRow(final RowMutations rm) throws IOException {
1095     RegionServerCallable<Void> callable =
1096         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1097       @Override
1098       public Void call(int callTimeout) throws IOException {
1099         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1100         controller.setPriority(tableName);
1101         controller.setCallTimeout(callTimeout);
1102         try {
1103           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1104             getLocation().getRegionInfo().getRegionName(), rm);
1105           regionMutationBuilder.setAtomic(true);
1106           MultiRequest request =
1107             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1108           getStub().multi(controller, request);
1109         } catch (ServiceException se) {
1110           throw ProtobufUtil.getRemoteException(se);
1111         }
1112         return null;
1113       }
1114     };
1115     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1116   }
1117 
1118   /**
1119    * {@inheritDoc}
1120    */
1121   @Override
1122   public Result append(final Append append) throws IOException {
1123     if (append.numFamilies() == 0) {
1124       throw new IOException(
1125           "Invalid arguments to append, no columns specified");
1126     }
1127 
1128     NonceGenerator ng = this.connection.getNonceGenerator();
1129     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1130     RegionServerCallable<Result> callable =
1131       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1132         @Override
1133         public Result call(int callTimeout) throws IOException {
1134           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1135           controller.setPriority(getTableName());
1136           controller.setCallTimeout(callTimeout);
1137           try {
1138             MutateRequest request = RequestConverter.buildMutateRequest(
1139               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1140             MutateResponse response = getStub().mutate(controller, request);
1141             if (!response.hasResult()) return null;
1142             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1143           } catch (ServiceException se) {
1144             throw ProtobufUtil.getRemoteException(se);
1145           }
1146         }
1147       };
1148     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1149   }
1150 
1151   /**
1152    * {@inheritDoc}
1153    */
1154   @Override
1155   public Result increment(final Increment increment) throws IOException {
1156     if (!increment.hasFamilies()) {
1157       throw new IOException(
1158           "Invalid arguments to increment, no columns specified");
1159     }
1160     NonceGenerator ng = this.connection.getNonceGenerator();
1161     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1162     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1163         getName(), increment.getRow()) {
1164       @Override
1165       public Result call(int callTimeout) throws IOException {
1166         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1167         controller.setPriority(getTableName());
1168         controller.setCallTimeout(callTimeout);
1169         try {
1170           MutateRequest request = RequestConverter.buildMutateRequest(
1171             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1172           MutateResponse response = getStub().mutate(controller, request);
1173           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1174         } catch (ServiceException se) {
1175           throw ProtobufUtil.getRemoteException(se);
1176         }
1177       }
1178     };
1179     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1180   }
1181 
1182   /**
1183    * {@inheritDoc}
1184    */
1185   @Override
1186   public long incrementColumnValue(final byte [] row, final byte [] family,
1187       final byte [] qualifier, final long amount)
1188   throws IOException {
1189     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1190   }
1191 
1192   /**
1193    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1194    */
1195   @Deprecated
1196   @Override
1197   public long incrementColumnValue(final byte [] row, final byte [] family,
1198       final byte [] qualifier, final long amount, final boolean writeToWAL)
1199   throws IOException {
1200     return incrementColumnValue(row, family, qualifier, amount,
1201       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1202   }
1203 
1204   /**
1205    * {@inheritDoc}
1206    */
1207   @Override
1208   public long incrementColumnValue(final byte [] row, final byte [] family,
1209       final byte [] qualifier, final long amount, final Durability durability)
1210   throws IOException {
1211     NullPointerException npe = null;
1212     if (row == null) {
1213       npe = new NullPointerException("row is null");
1214     } else if (family == null) {
1215       npe = new NullPointerException("family is null");
1216     } else if (qualifier == null) {
1217       npe = new NullPointerException("qualifier is null");
1218     }
1219     if (npe != null) {
1220       throw new IOException(
1221           "Invalid arguments to incrementColumnValue", npe);
1222     }
1223 
1224     NonceGenerator ng = this.connection.getNonceGenerator();
1225     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1226     RegionServerCallable<Long> callable =
1227       new RegionServerCallable<Long>(connection, getName(), row) {
1228         @Override
1229         public Long call(int callTimeout) throws IOException {
1230           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1231           controller.setPriority(getTableName());
1232           controller.setCallTimeout(callTimeout);
1233           try {
1234             MutateRequest request = RequestConverter.buildIncrementRequest(
1235               getLocation().getRegionInfo().getRegionName(), row, family,
1236               qualifier, amount, durability, nonceGroup, nonce);
1237             MutateResponse response = getStub().mutate(controller, request);
1238             Result result =
1239               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1240             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1241           } catch (ServiceException se) {
1242             throw ProtobufUtil.getRemoteException(se);
1243           }
1244         }
1245       };
1246     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1247   }
1248 
1249   /**
1250    * {@inheritDoc}
1251    */
1252   @Override
1253   public boolean checkAndPut(final byte [] row,
1254       final byte [] family, final byte [] qualifier, final byte [] value,
1255       final Put put)
1256   throws IOException {
1257     RegionServerCallable<Boolean> callable =
1258       new RegionServerCallable<Boolean>(connection, getName(), row) {
1259         @Override
1260         public Boolean call(int callTimeout) throws IOException {
1261           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1262           controller.setPriority(tableName);
1263           controller.setCallTimeout(callTimeout);
1264           try {
1265             MutateRequest request = RequestConverter.buildMutateRequest(
1266               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1267                 new BinaryComparator(value), CompareType.EQUAL, put);
1268             MutateResponse response = getStub().mutate(controller, request);
1269             return Boolean.valueOf(response.getProcessed());
1270           } catch (ServiceException se) {
1271             throw ProtobufUtil.getRemoteException(se);
1272           }
1273         }
1274       };
1275     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1276   }
1277 
1278   /**
1279    * {@inheritDoc}
1280    */
1281   @Override
1282   public boolean checkAndPut(final byte [] row, final byte [] family,
1283       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1284       final Put put)
1285   throws IOException {
1286     RegionServerCallable<Boolean> callable =
1287       new RegionServerCallable<Boolean>(connection, getName(), row) {
1288         @Override
1289         public Boolean call(int callTimeout) throws IOException {
1290           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1291           controller.setPriority(tableName);
1292           controller.setCallTimeout(callTimeout);
1293           try {
1294             CompareType compareType = CompareType.valueOf(compareOp.name());
1295             MutateRequest request = RequestConverter.buildMutateRequest(
1296               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1297                 new BinaryComparator(value), compareType, put);
1298             MutateResponse response = getStub().mutate(controller, request);
1299             return Boolean.valueOf(response.getProcessed());
1300           } catch (ServiceException se) {
1301             throw ProtobufUtil.getRemoteException(se);
1302           }
1303         }
1304       };
1305     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1306   }
1307 
1308   /**
1309    * {@inheritDoc}
1310    */
1311   @Override
1312   public boolean checkAndDelete(final byte [] row,
1313       final byte [] family, final byte [] qualifier, final byte [] value,
1314       final Delete delete)
1315   throws IOException {
1316     RegionServerCallable<Boolean> callable =
1317       new RegionServerCallable<Boolean>(connection, getName(), row) {
1318         @Override
1319         public Boolean call(int callTimeout) throws IOException {
1320           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1321           controller.setPriority(tableName);
1322           controller.setCallTimeout(callTimeout);
1323           try {
1324             MutateRequest request = RequestConverter.buildMutateRequest(
1325               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1326                 new BinaryComparator(value), CompareType.EQUAL, delete);
1327             MutateResponse response = getStub().mutate(controller, request);
1328             return Boolean.valueOf(response.getProcessed());
1329           } catch (ServiceException se) {
1330             throw ProtobufUtil.getRemoteException(se);
1331           }
1332         }
1333       };
1334     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1335   }
1336 
1337   /**
1338    * {@inheritDoc}
1339    */
1340   @Override
1341   public boolean checkAndDelete(final byte [] row, final byte [] family,
1342       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1343       final Delete delete)
1344   throws IOException {
1345     RegionServerCallable<Boolean> callable =
1346       new RegionServerCallable<Boolean>(connection, getName(), row) {
1347         @Override
1348         public Boolean call(int callTimeout) throws IOException {
1349           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1350           controller.setPriority(tableName);
1351           controller.setCallTimeout(callTimeout);
1352           try {
1353             CompareType compareType = CompareType.valueOf(compareOp.name());
1354             MutateRequest request = RequestConverter.buildMutateRequest(
1355               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1356                 new BinaryComparator(value), compareType, delete);
1357             MutateResponse response = getStub().mutate(controller, request);
1358             return Boolean.valueOf(response.getProcessed());
1359           } catch (ServiceException se) {
1360             throw ProtobufUtil.getRemoteException(se);
1361           }
1362         }
1363       };
1364     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1365   }
1366 
1367   /**
1368    * {@inheritDoc}
1369    */
1370   @Override
1371   public boolean exists(final Get get) throws IOException {
1372     get.setCheckExistenceOnly(true);
1373     Result r = get(get);
1374     assert r.getExists() != null;
1375     return r.getExists();
1376   }
1377 
1378   /**
1379    * {@inheritDoc}
1380    */
1381   @Override
1382   public Boolean[] exists(final List<Get> gets) throws IOException {
1383     if (gets.isEmpty()) return new Boolean[]{};
1384     if (gets.size() == 1) return new Boolean[]{exists(gets.get(0))};
1385 
1386     for (Get g: gets){
1387       g.setCheckExistenceOnly(true);
1388     }
1389 
1390     Object[] r1;
1391     try {
1392       r1 = batch(gets);
1393     } catch (InterruptedException e) {
1394       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1395     }
1396 
1397     // translate.
1398     Boolean[] results = new Boolean[r1.length];
1399     int i = 0;
1400     for (Object o : r1) {
1401       // batch ensures if there is a failure we get an exception instead
1402       results[i++] = ((Result)o).getExists();
1403     }
1404 
1405     return results;
1406   }
1407 
1408   /**
1409    * {@inheritDoc}
1410    */
1411   @Override
1412   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1413     // As we can have an operation in progress even if the buffer is empty, we call
1414     //  backgroundFlushCommits at least one time.
1415     backgroundFlushCommits(true);
1416   }
1417 
1418   /**
1419    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1420    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1421    *
1422    * @param list The collection of actions.
1423    * @param results An empty array, same size as list. If an exception is thrown,
1424    * you can test here for partial results, and to determine which actions
1425    * processed successfully.
1426    * @throws IOException if there are problems talking to META. Per-item
1427    * exceptions are stored in the results array.
1428    */
1429   public <R> void processBatchCallback(
1430     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1431     throws IOException, InterruptedException {
1432     this.batchCallback(list, results, callback);
1433   }
1434 
1435 
1436   /**
1437    * Parameterized batch processing, allowing varying return types for different
1438    * {@link Row} implementations.
1439    */
1440   public void processBatch(final List<? extends Row> list, final Object[] results)
1441     throws IOException, InterruptedException {
1442     this.batch(list, results);
1443   }
1444 
1445 
1446   @Override
1447   public void close() throws IOException {
1448     if (this.closed) {
1449       return;
1450     }
1451     flushCommits();
1452     if (cleanupPoolOnClose) {
1453       this.pool.shutdown();
1454       try {
1455         boolean terminated = false;
1456         do {
1457           // wait until the pool has terminated
1458           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1459         } while (!terminated);
1460       } catch (InterruptedException e) {
1461         LOG.warn("waitForTermination interrupted");
1462       }
1463     }
1464     if (cleanupConnectionOnClose) {
1465       if (this.connection != null) {
1466         this.connection.close();
1467       }
1468     }
1469     this.closed = true;
1470   }
1471 
1472   // validate for well-formedness
1473   public void validatePut(final Put put) throws IllegalArgumentException{
1474     if (put.isEmpty()) {
1475       throw new IllegalArgumentException("No columns to insert");
1476     }
1477     if (maxKeyValueSize > 0) {
1478       for (List<Cell> list : put.getFamilyCellMap().values()) {
1479         for (Cell cell : list) {
1480           // KeyValue v1 expectation.  Cast for now.
1481           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1482           if (kv.getLength() > maxKeyValueSize) {
1483             throw new IllegalArgumentException("KeyValue size too large");
1484           }
1485         }
1486       }
1487     }
1488   }
1489 
1490   /**
1491    * {@inheritDoc}
1492    */
1493   @Override
1494   public boolean isAutoFlush() {
1495     return autoFlush;
1496   }
1497 
1498   /**
1499    * {@inheritDoc}
1500    */
1501   @Deprecated
1502   @Override
1503   public void setAutoFlush(boolean autoFlush) {
1504     setAutoFlush(autoFlush, autoFlush);
1505   }
1506 
1507   /**
1508    * {@inheritDoc}
1509    */
1510   @Override
1511   public void setAutoFlushTo(boolean autoFlush) {
1512     setAutoFlush(autoFlush, clearBufferOnFail);
1513   }
1514 
1515   /**
1516    * {@inheritDoc}
1517    */
1518   @Override
1519   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1520     this.autoFlush = autoFlush;
1521     this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1522   }
1523 
1524   /**
1525    * Returns the maximum size in bytes of the write buffer for this HTable.
1526    * <p>
1527    * The default value comes from the configuration parameter
1528    * {@code hbase.client.write.buffer}.
1529    * @return The size of the write buffer in bytes.
1530    */
1531   @Override
1532   public long getWriteBufferSize() {
1533     return writeBufferSize;
1534   }
1535 
1536   /**
1537    * Sets the size of the buffer in bytes.
1538    * <p>
1539    * If the new size is less than the current amount of data in the
1540    * write buffer, the buffer gets flushed.
1541    * @param writeBufferSize The new write buffer size, in bytes.
1542    * @throws IOException if a remote or network exception occurs.
1543    */
1544   @Override
1545   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1546     this.writeBufferSize = writeBufferSize;
1547     if(currentWriteBufferSize > writeBufferSize) {
1548       flushCommits();
1549     }
1550   }
1551 
1552   /**
1553    * The pool is used for mutli requests for this HTable
1554    * @return the pool used for mutli
1555    */
1556   ExecutorService getPool() {
1557     return this.pool;
1558   }
1559 
1560   /**
1561    * Enable or disable region cache prefetch for the table. It will be
1562    * applied for the given table's all HTable instances who share the same
1563    * connection. By default, the cache prefetch is enabled.
1564    * @param tableName name of table to configure.
1565    * @param enable Set to true to enable region cache prefetch. Or set to
1566    * false to disable it.
1567    * @throws IOException
1568    * @deprecated does nothing since 0.99
1569    */
1570   @Deprecated
1571   public static void setRegionCachePrefetch(final byte[] tableName,
1572       final boolean enable)  throws IOException {
1573   }
1574 
1575   /**
1576    * @deprecated does nothing since 0.99
1577    */
1578   @Deprecated
1579   public static void setRegionCachePrefetch(
1580       final TableName tableName,
1581       final boolean enable) throws IOException {
1582   }
1583 
1584   /**
1585    * Enable or disable region cache prefetch for the table. It will be
1586    * applied for the given table's all HTable instances who share the same
1587    * connection. By default, the cache prefetch is enabled.
1588    * @param conf The Configuration object to use.
1589    * @param tableName name of table to configure.
1590    * @param enable Set to true to enable region cache prefetch. Or set to
1591    * false to disable it.
1592    * @throws IOException
1593    * @deprecated does nothing since 0.99
1594    */
1595   @Deprecated
1596   public static void setRegionCachePrefetch(final Configuration conf,
1597       final byte[] tableName, final boolean enable) throws IOException {
1598   }
1599 
1600   /**
1601    * @deprecated does nothing since 0.99
1602    */
1603   @Deprecated
1604   public static void setRegionCachePrefetch(final Configuration conf,
1605       final TableName tableName,
1606       final boolean enable) throws IOException {
1607   }
1608 
1609   /**
1610    * Check whether region cache prefetch is enabled or not for the table.
1611    * @param conf The Configuration object to use.
1612    * @param tableName name of table to check
1613    * @return true if table's region cache prefecth is enabled. Otherwise
1614    * it is disabled.
1615    * @throws IOException
1616    * @deprecated always return false since 0.99
1617    */
1618   @Deprecated
1619   public static boolean getRegionCachePrefetch(final Configuration conf,
1620       final byte[] tableName) throws IOException {
1621     return false;
1622   }
1623 
1624   /**
1625    * @deprecated always return false since 0.99
1626    */
1627   @Deprecated
1628   public static boolean getRegionCachePrefetch(final Configuration conf,
1629       final TableName tableName) throws IOException {
1630     return false;
1631   }
1632 
1633   /**
1634    * Check whether region cache prefetch is enabled or not for the table.
1635    * @param tableName name of table to check
1636    * @return true if table's region cache prefecth is enabled. Otherwise
1637    * it is disabled.
1638    * @throws IOException
1639    * @deprecated always return false since 0.99
1640    */
1641   @Deprecated
1642   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1643     return false;
1644   }
1645 
1646   /**
1647    * @deprecated always return false since 0.99
1648    */
1649   @Deprecated
1650   public static boolean getRegionCachePrefetch(
1651       final TableName tableName) throws IOException {
1652     return false;
1653   }
1654 
1655   /**
1656    * Explicitly clears the region cache to fetch the latest value from META.
1657    * This is a power user function: avoid unless you know the ramifications.
1658    */
1659   public void clearRegionCache() {
1660     this.connection.clearRegionCache();
1661   }
1662 
1663   /**
1664    * {@inheritDoc}
1665    */
1666   @Override
1667   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1668     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1669   }
1670 
1671   /**
1672    * {@inheritDoc}
1673    */
1674   @Override
1675   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1676       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1677       throws ServiceException, Throwable {
1678     final Map<byte[],R> results =  Collections.synchronizedMap(
1679         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1680     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1681       @Override
1682       public void update(byte[] region, byte[] row, R value) {
1683         if (region != null) {
1684           results.put(region, value);
1685         }
1686       }
1687     });
1688     return results;
1689   }
1690 
1691   /**
1692    * {@inheritDoc}
1693    */
1694   @Override
1695   public <T extends Service, R> void coprocessorService(final Class<T> service,
1696       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1697       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1698 
1699     // get regions covered by the row range
1700     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1701 
1702     Map<byte[],Future<R>> futures =
1703         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1704     for (final byte[] r : keys) {
1705       final RegionCoprocessorRpcChannel channel =
1706           new RegionCoprocessorRpcChannel(connection, tableName, r);
1707       Future<R> future = pool.submit(
1708           new Callable<R>() {
1709             @Override
1710             public R call() throws Exception {
1711               T instance = ProtobufUtil.newServiceStub(service, channel);
1712               R result = callable.call(instance);
1713               byte[] region = channel.getLastRegion();
1714               if (callback != null) {
1715                 callback.update(region, r, result);
1716               }
1717               return result;
1718             }
1719           });
1720       futures.put(r, future);
1721     }
1722     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1723       try {
1724         e.getValue().get();
1725       } catch (ExecutionException ee) {
1726         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1727             + Bytes.toStringBinary(e.getKey()), ee);
1728         throw ee.getCause();
1729       } catch (InterruptedException ie) {
1730         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1731             + " for row " + Bytes.toStringBinary(e.getKey()))
1732             .initCause(ie);
1733       }
1734     }
1735   }
1736 
1737   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1738   throws IOException {
1739     if (start == null) {
1740       start = HConstants.EMPTY_START_ROW;
1741     }
1742     if (end == null) {
1743       end = HConstants.EMPTY_END_ROW;
1744     }
1745     return getKeysAndRegionsInRange(start, end, true).getFirst();
1746   }
1747 
1748   public void setOperationTimeout(int operationTimeout) {
1749     this.operationTimeout = operationTimeout;
1750   }
1751 
1752   public int getOperationTimeout() {
1753     return operationTimeout;
1754   }
1755 
1756   @Override
1757   public String toString() {
1758     return tableName + ";" + connection;
1759   }
1760 
1761   /**
1762    * Run basic test.
1763    * @param args Pass table name and row and will get the content.
1764    * @throws IOException
1765    */
1766   public static void main(String[] args) throws IOException {
1767     HTable t = new HTable(HBaseConfiguration.create(), args[0]);
1768     try {
1769       System.out.println(t.get(new Get(Bytes.toBytes(args[1]))));
1770     } finally {
1771       t.close();
1772     }
1773   }
1774 
1775   /**
1776    * {@inheritDoc}
1777    */
1778   @Override
1779   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1780       Descriptors.MethodDescriptor methodDescriptor, Message request,
1781       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1782     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1783         Bytes.BYTES_COMPARATOR));
1784     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1785         new Callback<R>() {
1786 
1787           @Override
1788           public void update(byte[] region, byte[] row, R result) {
1789             if (region != null) {
1790               results.put(region, result);
1791             }
1792           }
1793         });
1794     return results;
1795   }
1796 
1797   /**
1798    * {@inheritDoc}
1799    */
1800   @Override
1801   public <R extends Message> void batchCoprocessorService(
1802       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1803       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1804       throws ServiceException, Throwable {
1805 
1806     // get regions covered by the row range
1807     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1808         getKeysAndRegionsInRange(startKey, endKey, true);
1809     List<byte[]> keys = keysAndRegions.getFirst();
1810     List<HRegionLocation> regions = keysAndRegions.getSecond();
1811 
1812     // check if we have any calls to make
1813     if (keys.isEmpty()) {
1814       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1815           ", end=" + Bytes.toStringBinary(endKey));
1816       return;
1817     }
1818 
1819     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1820     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1821         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1822     for (int i = 0; i < keys.size(); i++) {
1823       final byte[] rowKey = keys.get(i);
1824       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1825       RegionCoprocessorServiceExec exec =
1826           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1827       execs.add(exec);
1828       execsByRow.put(rowKey, exec);
1829     }
1830 
1831     // tracking for any possible deserialization errors on success callback
1832     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1833     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1834     final List<Row> callbackErrorActions = new ArrayList<Row>();
1835     final List<String> callbackErrorServers = new ArrayList<String>();
1836     Object[] results = new Object[execs.size()];
1837 
1838     AsyncProcess asyncProcess =
1839         new AsyncProcess(connection, configuration, pool,
1840             RpcRetryingCallerFactory.instantiate(configuration), true,
1841             RpcControllerFactory.instantiate(configuration));
1842     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1843         new Callback<ClientProtos.CoprocessorServiceResult>() {
1844           @Override
1845           public void update(byte[] region, byte[] row,
1846                               ClientProtos.CoprocessorServiceResult serviceResult) {
1847             if (LOG.isTraceEnabled()) {
1848               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1849                   ": region=" + Bytes.toStringBinary(region) +
1850                   ", row=" + Bytes.toStringBinary(row) +
1851                   ", value=" + serviceResult.getValue().getValue());
1852             }
1853             try {
1854               callback.update(region, row,
1855                   (R) responsePrototype.newBuilderForType().mergeFrom(
1856                       serviceResult.getValue().getValue()).build());
1857             } catch (InvalidProtocolBufferException e) {
1858               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1859                   e);
1860               callbackErrorExceptions.add(e);
1861               callbackErrorActions.add(execsByRow.get(row));
1862               callbackErrorServers.add("null");
1863             }
1864           }
1865         }, results);
1866 
1867     future.waitUntilDone();
1868 
1869     if (future.hasError()) {
1870       throw future.getErrors();
1871     } else if (!callbackErrorExceptions.isEmpty()) {
1872       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1873           callbackErrorServers);
1874     }
1875   }
1876 }