View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import com.google.protobuf.InvalidProtocolBufferException;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.classification.InterfaceStability;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.HBaseConfiguration;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HRegionLocation;
50  import org.apache.hadoop.hbase.HTableDescriptor;
51  import org.apache.hadoop.hbase.KeyValue;
52  import org.apache.hadoop.hbase.KeyValueUtil;
53  import org.apache.hadoop.hbase.RegionLocations;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
57  import org.apache.hadoop.hbase.client.coprocessor.Batch;
58  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
59  import org.apache.hadoop.hbase.filter.BinaryComparator;
60  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
61  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
62  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
63  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
64  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
65  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66  import org.apache.hadoop.hbase.protobuf.RequestConverter;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
70  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
71  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
72  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
73  import org.apache.hadoop.hbase.util.Bytes;
74  import org.apache.hadoop.hbase.util.Pair;
75  import org.apache.hadoop.hbase.util.Threads;
76  
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.Message;
79  import com.google.common.annotations.VisibleForTesting;
80  import com.google.protobuf.Service;
81  import com.google.protobuf.ServiceException;
82  
83  /**
84   * <p>Used to communicate with a single HBase table.  An implementation of
85   * {@link HTableInterface}.  Instances of this class can be constructed directly but it is
86   * encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}.
87   * See {@link HConnectionManager} class comment for an example.
88   *
89   * <p>This class is not thread safe for reads nor write.
90   *
91   * <p>In case of writes (Put, Delete), the underlying write buffer can
92   * be corrupted if multiple threads contend over a single HTable instance.
93   *
94   * <p>In case of reads, some fields used by a Scan are shared among all threads.
95   * The HTable implementation can either not contract to be safe in case of a Get
96   *
97   * <p>Instances of HTable passed the same {@link Configuration} instance will
98   * share connections to servers out on the cluster and to the zookeeper ensemble
99   * as well as caches of region locations.  This is usually a *good* thing and it
100  * is recommended to reuse the same configuration object for all your tables.
101  * This happens because they will all share the same underlying
102  * {@link HConnection} instance. See {@link HConnectionManager} for more on
103  * how this mechanism works.
104  *
105  * <p>{@link HConnection} will read most of the
106  * configuration it needs from the passed {@link Configuration} on initial
107  * construction.  Thereafter, for settings such as
108  * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
109  * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
110  * passed {@link Configuration} subsequent to {@link HConnection} construction
111  * will go unnoticed.  To run with changed values, make a new
112  * {@link HTable} passing a new {@link Configuration} instance that has the
113  * new configuration.
114  *
115  * <p>Note that this class implements the {@link Closeable} interface. When a
116  * HTable instance is no longer required, it *should* be closed in order to ensure
117  * that the underlying resources are promptly released. Please note that the close
118  * method can throw java.io.IOException that must be handled.
119  *
120  * @see HBaseAdmin for create, drop, list, enable and disable of tables.
121  * @see HConnection
122  * @see HConnectionManager
123  */
124 @InterfaceAudience.Public
125 @InterfaceStability.Stable
126 public class HTable implements HTableInterface, RegionLocator {
127   private static final Log LOG = LogFactory.getLog(HTable.class);
128   protected ClusterConnection connection;
129   private final TableName tableName;
130   private volatile Configuration configuration;
131   protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
132   private long writeBufferSize;
133   private boolean clearBufferOnFail;
134   private boolean autoFlush;
135   protected long currentWriteBufferSize;
136   protected int scannerCaching;
137   private int maxKeyValueSize;
138   private ExecutorService pool;  // For Multi & Scan
139   private boolean closed;
140   private int operationTimeout;
141   private int retries;
142   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
143   private final boolean cleanupConnectionOnClose; // close the connection in close()
144   private Consistency defaultConsistency = Consistency.STRONG;
145   private int primaryCallTimeoutMicroSecond;
146   private int replicaCallTimeoutMicroSecondScan;
147 
148 
149   /** The Async process for puts with autoflush set to false or multiputs */
150   protected AsyncProcess ap;
151   /** The Async process for batch */
152   protected AsyncProcess multiAp;
153   private RpcRetryingCallerFactory rpcCallerFactory;
154   private RpcControllerFactory rpcControllerFactory;
155 
156   /**
157    * Creates an object to access a HBase table.
158    * Shares zookeeper connection and other resources with other HTable instances
159    * created with the same <code>conf</code> instance.  Uses already-populated
160    * region cache if one is available, populated by any other HTable instances
161    * sharing this <code>conf</code> instance.  Recommended.
162    * @param conf Configuration object to use.
163    * @param tableName Name of the table.
164    * @throws IOException if a remote or network exception occurs
165    * @deprecated Constructing HTable objects manually has been deprecated. Please use
166    * {@link Connection} to instantiate a {@link Table} instead.
167    */
168   @Deprecated
169   public HTable(Configuration conf, final String tableName)
170   throws IOException {
171     this(conf, TableName.valueOf(tableName));
172   }
173 
174   /**
175    * Creates an object to access a HBase table.
176    * Shares zookeeper connection and other resources with other HTable instances
177    * created with the same <code>conf</code> instance.  Uses already-populated
178    * region cache if one is available, populated by any other HTable instances
179    * sharing this <code>conf</code> instance.  Recommended.
180    * @param conf Configuration object to use.
181    * @param tableName Name of the table.
182    * @throws IOException if a remote or network exception occurs
183    * @deprecated Constructing HTable objects manually has been deprecated. Please use
184    * {@link Connection} to instantiate a {@link Table} instead.
185    */
186   @Deprecated
187   public HTable(Configuration conf, final byte[] tableName)
188   throws IOException {
189     this(conf, TableName.valueOf(tableName));
190   }
191 
192 
193 
194   /**
195    * Creates an object to access a HBase table.
196    * Shares zookeeper connection and other resources with other HTable instances
197    * created with the same <code>conf</code> instance.  Uses already-populated
198    * region cache if one is available, populated by any other HTable instances
199    * sharing this <code>conf</code> instance.  Recommended.
200    * @param conf Configuration object to use.
201    * @param tableName table name pojo
202    * @throws IOException if a remote or network exception occurs
203    * @deprecated Constructing HTable objects manually has been deprecated. Please use
204    * {@link Connection} to instantiate a {@link Table} instead.
205    */
206   @Deprecated
207   public HTable(Configuration conf, final TableName tableName)
208   throws IOException {
209     this.tableName = tableName;
210     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
211     if (conf == null) {
212       this.connection = null;
213       return;
214     }
215     this.connection = ConnectionManager.getConnectionInternal(conf);
216     this.configuration = conf;
217 
218     this.pool = getDefaultExecutor(conf);
219     this.finishSetup();
220   }
221 
222   /**
223    * Creates an object to access a HBase table. Shares zookeeper connection and other resources with
224    * other HTable instances created with the same <code>connection</code> instance. Use this
225    * constructor when the Connection instance is externally managed.
226    * @param tableName Name of the table.
227    * @param connection HConnection to be used.
228    * @throws IOException if a remote or network exception occurs
229    * @deprecated Do not use.
230    */
231   @Deprecated
232   public HTable(TableName tableName, Connection connection) throws IOException {
233     this.tableName = tableName;
234     this.cleanupPoolOnClose = true;
235     this.cleanupConnectionOnClose = false;
236     this.connection = (ClusterConnection)connection;
237     this.configuration = connection.getConfiguration();
238 
239     this.pool = getDefaultExecutor(this.configuration);
240     this.finishSetup();
241   }
242 
243   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
244     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
245     if (maxThreads == 0) {
246       maxThreads = 1; // is there a better default?
247     }
248     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
249 
250     // Using the "direct handoff" approach, new threads will only be created
251     // if it is necessary and will grow unbounded. This could be bad but in HCM
252     // we only create as many Runnables as there are region servers. It means
253     // it also scales when new region servers are added.
254     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
255         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
256     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
257     return pool;
258   }
259 
260   /**
261    * Creates an object to access a HBase table.
262    * Shares zookeeper connection and other resources with other HTable instances
263    * created with the same <code>conf</code> instance.  Uses already-populated
264    * region cache if one is available, populated by any other HTable instances
265    * sharing this <code>conf</code> instance.
266    * Use this constructor when the ExecutorService is externally managed.
267    * @param conf Configuration object to use.
268    * @param tableName Name of the table.
269    * @param pool ExecutorService to be used.
270    * @throws IOException if a remote or network exception occurs
271    * @deprecated Constructing HTable objects manually has been deprecated. Please use
272    * {@link Connection} to instantiate a {@link Table} instead.
273    */
274   @Deprecated
275   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
276       throws IOException {
277     this(conf, TableName.valueOf(tableName), pool);
278   }
279 
280   /**
281    * Creates an object to access a HBase table.
282    * Shares zookeeper connection and other resources with other HTable instances
283    * created with the same <code>conf</code> instance.  Uses already-populated
284    * region cache if one is available, populated by any other HTable instances
285    * sharing this <code>conf</code> instance.
286    * Use this constructor when the ExecutorService is externally managed.
287    * @param conf Configuration object to use.
288    * @param tableName Name of the table.
289    * @param pool ExecutorService to be used.
290    * @throws IOException if a remote or network exception occurs
291    * @deprecated Constructing HTable objects manually has been deprecated. Please use
292    * {@link Connection} to instantiate a {@link Table} instead.
293    */
294   @Deprecated
295   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
296       throws IOException {
297     this.connection = ConnectionManager.getConnectionInternal(conf);
298     this.configuration = conf;
299     this.pool = pool;
300     if (pool == null) {
301       this.pool = getDefaultExecutor(conf);
302       this.cleanupPoolOnClose = true;
303     } else {
304       this.cleanupPoolOnClose = false;
305     }
306     this.tableName = tableName;
307     this.cleanupConnectionOnClose = true;
308     this.finishSetup();
309   }
310 
311   /**
312    * Creates an object to access a HBase table.
313    * Shares zookeeper connection and other resources with other HTable instances
314    * created with the same <code>connection</code> instance.
315    * Use this constructor when the ExecutorService and HConnection instance are
316    * externally managed.
317    * @param tableName Name of the table.
318    * @param connection HConnection to be used.
319    * @param pool ExecutorService to be used.
320    * @throws IOException if a remote or network exception occurs.
321    * @deprecated Do not use, internal ctor.
322    */
323   @Deprecated
324   public HTable(final byte[] tableName, final HConnection connection,
325       final ExecutorService pool) throws IOException {
326     this(TableName.valueOf(tableName), connection, pool);
327   }
328 
329   /** @deprecated Do not use, internal ctor. */
330   @Deprecated
331   public HTable(TableName tableName, final HConnection connection,
332       final ExecutorService pool) throws IOException {
333     this(tableName, (ClusterConnection)connection, pool);
334   }
335 
336   /**
337    * Creates an object to access a HBase table.
338    * Shares zookeeper connection and other resources with other HTable instances
339    * created with the same <code>connection</code> instance.
340    * Visible only for HTableWrapper which is in different package.
341    * Should not be used by exernal code.
342    * @param tableName Name of the table.
343    * @param connection HConnection to be used.
344    * @param pool ExecutorService to be used.
345    * @throws IOException if a remote or network exception occurs
346    */
347   @InterfaceAudience.Private
348   public HTable(TableName tableName, final ClusterConnection connection,
349       final ExecutorService pool) throws IOException {
350     if (connection == null || connection.isClosed()) {
351       throw new IllegalArgumentException("Connection is null or closed.");
352     }
353     this.tableName = tableName;
354     this.cleanupConnectionOnClose = false;
355     this.connection = connection;
356     this.configuration = connection.getConfiguration();
357     this.pool = pool;
358     if (pool == null) {
359       this.pool = getDefaultExecutor(this.configuration);
360       this.cleanupPoolOnClose = true;
361     } else {
362       this.cleanupPoolOnClose = false;
363     }
364 
365     this.finishSetup();
366   }
367 
368   /**
369    * For internal testing.
370    */
371   protected HTable() {
372     tableName = null;
373     cleanupPoolOnClose = false;
374     cleanupConnectionOnClose = false;
375   }
376 
377   /**
378    * setup this HTable's parameter based on the passed configuration
379    */
380   private void finishSetup() throws IOException {
381     this.operationTimeout = tableName.isSystemTable() ?
382       this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
383         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
384       this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
385         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
386     this.writeBufferSize = this.configuration.getLong(
387         "hbase.client.write.buffer", 2097152);
388     this.clearBufferOnFail = true;
389     this.autoFlush = true;
390     this.currentWriteBufferSize = 0;
391     this.scannerCaching = this.configuration.getInt(
392         HConstants.HBASE_CLIENT_SCANNER_CACHING,
393         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
394     this.primaryCallTimeoutMicroSecond =
395         this.configuration.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10 ms
396     this.replicaCallTimeoutMicroSecondScan =
397         this.configuration.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
398     this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
399             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
400 
401     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
402     this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
403     // puts need to track errors globally due to how the APIs currently work.
404     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
405     multiAp = this.connection.getAsyncProcess();
406 
407     this.maxKeyValueSize = this.configuration.getInt(
408         "hbase.client.keyvalue.maxsize", -1);
409     this.closed = false;
410   }
411 
412 
413 
414   /**
415    * {@inheritDoc}
416    */
417   @Override
418   public Configuration getConfiguration() {
419     return configuration;
420   }
421 
422   /**
423    * Tells whether or not a table is enabled or not. This method creates a
424    * new HBase configuration, so it might make your unit tests fail due to
425    * incorrect ZK client port.
426    * @param tableName Name of table to check.
427    * @return {@code true} if table is online.
428    * @throws IOException if a remote or network exception occurs
429 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
430    */
431   @Deprecated
432   public static boolean isTableEnabled(String tableName) throws IOException {
433     return isTableEnabled(TableName.valueOf(tableName));
434   }
435 
436   /**
437    * Tells whether or not a table is enabled or not. This method creates a
438    * new HBase configuration, so it might make your unit tests fail due to
439    * incorrect ZK client port.
440    * @param tableName Name of table to check.
441    * @return {@code true} if table is online.
442    * @throws IOException if a remote or network exception occurs
443 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
444    */
445   @Deprecated
446   public static boolean isTableEnabled(byte[] tableName) throws IOException {
447     return isTableEnabled(TableName.valueOf(tableName));
448   }
449 
450   /**
451    * Tells whether or not a table is enabled or not. This method creates a
452    * new HBase configuration, so it might make your unit tests fail due to
453    * incorrect ZK client port.
454    * @param tableName Name of table to check.
455    * @return {@code true} if table is online.
456    * @throws IOException if a remote or network exception occurs
457    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
458    */
459   @Deprecated
460   public static boolean isTableEnabled(TableName tableName) throws IOException {
461     return isTableEnabled(HBaseConfiguration.create(), tableName);
462   }
463 
464   /**
465    * Tells whether or not a table is enabled or not.
466    * @param conf The Configuration object to use.
467    * @param tableName Name of table to check.
468    * @return {@code true} if table is online.
469    * @throws IOException if a remote or network exception occurs
470 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
471    */
472   @Deprecated
473   public static boolean isTableEnabled(Configuration conf, String tableName)
474   throws IOException {
475     return isTableEnabled(conf, TableName.valueOf(tableName));
476   }
477 
478   /**
479    * Tells whether or not a table is enabled or not.
480    * @param conf The Configuration object to use.
481    * @param tableName Name of table to check.
482    * @return {@code true} if table is online.
483    * @throws IOException if a remote or network exception occurs
484 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
485    */
486   @Deprecated
487   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
488   throws IOException {
489     return isTableEnabled(conf, TableName.valueOf(tableName));
490   }
491 
492   /**
493    * Tells whether or not a table is enabled or not.
494    * @param conf The Configuration object to use.
495    * @param tableName Name of table to check.
496    * @return {@code true} if table is online.
497    * @throws IOException if a remote or network exception occurs
498    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
499    */
500   @Deprecated
501   public static boolean isTableEnabled(Configuration conf,
502       final TableName tableName) throws IOException {
503     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
504       @Override
505       public Boolean connect(HConnection connection) throws IOException {
506         return connection.isTableEnabled(tableName);
507       }
508     });
509   }
510 
511   /**
512    * Find region location hosting passed row using cached info
513    * @param row Row to find.
514    * @return The location of the given row.
515    * @throws IOException if a remote or network exception occurs
516    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
517    */
518   @Deprecated
519   public HRegionLocation getRegionLocation(final String row)
520   throws IOException {
521     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
522   }
523 
524   /**
525    * {@inheritDoc}
526    */
527   @Override
528   public HRegionLocation getRegionLocation(final byte [] row)
529   throws IOException {
530     return connection.getRegionLocation(tableName, row, false);
531   }
532 
533   /**
534    * {@inheritDoc}
535    */
536   @Override
537   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
538   throws IOException {
539     return connection.getRegionLocation(tableName, row, reload);
540   }
541 
542   /**
543    * {@inheritDoc}
544    */
545   @Override
546   public byte [] getTableName() {
547     return this.tableName.getName();
548   }
549 
550   @Override
551   public TableName getName() {
552     return tableName;
553   }
554 
555   /**
556    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
557    * manipulations.
558    * @return An HConnection instance.
559    * @deprecated This method will be changed from public to package protected.
560    */
561   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
562   @Deprecated
563   @VisibleForTesting
564   public HConnection getConnection() {
565     return this.connection;
566   }
567 
568   /**
569    * Gets the number of rows that a scanner will fetch at once.
570    * <p>
571    * The default value comes from {@code hbase.client.scanner.caching}.
572    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
573    */
574   @Deprecated
575   public int getScannerCaching() {
576     return scannerCaching;
577   }
578 
579   /**
580    * Kept in 0.96 for backward compatibility
581    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
582    */
583   @Deprecated
584   public List<Row> getWriteBuffer() {
585     return writeAsyncBuffer;
586   }
587 
588   /**
589    * Sets the number of rows that a scanner will fetch at once.
590    * <p>
591    * This will override the value specified by
592    * {@code hbase.client.scanner.caching}.
593    * Increasing this value will reduce the amount of work needed each time
594    * {@code next()} is called on a scanner, at the expense of memory use
595    * (since more rows will need to be maintained in memory by the scanners).
596    * @param scannerCaching the number of rows a scanner will fetch at once.
597    * @deprecated Use {@link Scan#setCaching(int)}
598    */
599   @Deprecated
600   public void setScannerCaching(int scannerCaching) {
601     this.scannerCaching = scannerCaching;
602   }
603 
604   /**
605    * {@inheritDoc}
606    */
607   @Override
608   public HTableDescriptor getTableDescriptor() throws IOException {
609     return new UnmodifyableHTableDescriptor(
610       this.connection.getHTableDescriptor(this.tableName));
611   }
612 
613   /**
614    * {@inheritDoc}
615    */
616   @Override
617   public byte [][] getStartKeys() throws IOException {
618     return getStartEndKeys().getFirst();
619   }
620 
621   /**
622    * {@inheritDoc}
623    */
624   @Override
625   public byte[][] getEndKeys() throws IOException {
626     return getStartEndKeys().getSecond();
627   }
628 
629   /**
630    * {@inheritDoc}
631    */
632   @Override
633   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
634 
635     List<RegionLocations> regions = listRegionLocations();
636     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
637     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
638 
639     for (RegionLocations locations : regions) {
640       HRegionInfo region = locations.getRegionLocation().getRegionInfo();
641       startKeyList.add(region.getStartKey());
642       endKeyList.add(region.getEndKey());
643     }
644 
645     return new Pair<byte [][], byte [][]>(
646       startKeyList.toArray(new byte[startKeyList.size()][]),
647       endKeyList.toArray(new byte[endKeyList.size()][]));
648   }
649 
650   @VisibleForTesting
651   List<RegionLocations> listRegionLocations() throws IOException {
652     return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName());
653   }
654 
655   /**
656    * Gets all the regions and their address for this table.
657    * <p>
658    * This is mainly useful for the MapReduce integration.
659    * @return A map of HRegionInfo with it's server address
660    * @throws IOException if a remote or network exception occurs
661    * @deprecated This is no longer a public API
662    */
663   @Deprecated
664   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
665     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an HRegionLocation.
666     return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName(), false);
667   }
668 
669   /**
670    * Get the corresponding regions for an arbitrary range of keys.
671    * <p>
672    * @param startKey Starting row in range, inclusive
673    * @param endKey Ending row in range, exclusive
674    * @return A list of HRegionLocations corresponding to the regions that
675    * contain the specified range
676    * @throws IOException if a remote or network exception occurs
677    * @deprecated This is no longer a public API
678    */
679   @Deprecated
680   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
681     final byte [] endKey) throws IOException {
682     return getRegionsInRange(startKey, endKey, false);
683   }
684 
685   /**
686    * Get the corresponding regions for an arbitrary range of keys.
687    * <p>
688    * @param startKey Starting row in range, inclusive
689    * @param endKey Ending row in range, exclusive
690    * @param reload true to reload information or false to use cached information
691    * @return A list of HRegionLocations corresponding to the regions that
692    * contain the specified range
693    * @throws IOException if a remote or network exception occurs
694    * @deprecated This is no longer a public API
695    */
696   @Deprecated
697   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
698       final byte [] endKey, final boolean reload) throws IOException {
699     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
700   }
701 
702   /**
703    * Get the corresponding start keys and regions for an arbitrary range of
704    * keys.
705    * <p>
706    * @param startKey Starting row in range, inclusive
707    * @param endKey Ending row in range
708    * @param includeEndKey true if endRow is inclusive, false if exclusive
709    * @return A pair of list of start keys and list of HRegionLocations that
710    *         contain the specified range
711    * @throws IOException if a remote or network exception occurs
712    * @deprecated This is no longer a public API
713    */
714   @Deprecated
715   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
716       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
717       throws IOException {
718     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
719   }
720 
721   /**
722    * Get the corresponding start keys and regions for an arbitrary range of
723    * keys.
724    * <p>
725    * @param startKey Starting row in range, inclusive
726    * @param endKey Ending row in range
727    * @param includeEndKey true if endRow is inclusive, false if exclusive
728    * @param reload true to reload information or false to use cached information
729    * @return A pair of list of start keys and list of HRegionLocations that
730    *         contain the specified range
731    * @throws IOException if a remote or network exception occurs
732    * @deprecated This is no longer a public API
733    */
734   @Deprecated
735   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
736       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
737       final boolean reload) throws IOException {
738     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
739     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
740       throw new IllegalArgumentException(
741         "Invalid range: " + Bytes.toStringBinary(startKey) +
742         " > " + Bytes.toStringBinary(endKey));
743     }
744     List<byte[]> keysInRange = new ArrayList<byte[]>();
745     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
746     byte[] currentKey = startKey;
747     do {
748       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
749       keysInRange.add(currentKey);
750       regionsInRange.add(regionLocation);
751       currentKey = regionLocation.getRegionInfo().getEndKey();
752     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
753         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
754             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
755     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
756         regionsInRange);
757   }
758 
759   /**
760    * {@inheritDoc}
761    * @deprecated Use reversed scan instead.
762    */
763    @Override
764    @Deprecated
765    public Result getRowOrBefore(final byte[] row, final byte[] family)
766        throws IOException {
767      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
768          tableName, row) {
769        @Override
770       public Result call(int callTimeout) throws IOException {
771          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
772          controller.setPriority(tableName);
773          controller.setCallTimeout(callTimeout);
774          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
775              getLocation().getRegionInfo().getRegionName(), row, family);
776          try {
777            ClientProtos.GetResponse response = getStub().get(controller, request);
778            if (!response.hasResult()) return null;
779            return ProtobufUtil.toResult(response.getResult());
780          } catch (ServiceException se) {
781            throw ProtobufUtil.getRemoteException(se);
782          }
783        }
784      };
785      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
786    }
787 
788   /**
789    * The underlying {@link HTable} must not be closed.
790    * {@link HTableInterface#getScanner(Scan)} has other usage details.
791    */
792   @Override
793   public ResultScanner getScanner(final Scan scan) throws IOException {
794     if (scan.getCaching() <= 0) {
795       scan.setCaching(getScannerCaching());
796     }
797 
798     if (scan.isReversed()) {
799       if (scan.isSmall()) {
800         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
801             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
802             pool, replicaCallTimeoutMicroSecondScan);
803       } else {
804         return new ReversedClientScanner(getConfiguration(), scan, getName(),
805             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
806             pool, replicaCallTimeoutMicroSecondScan);
807       }
808     }
809 
810     if (scan.isSmall()) {
811       return new ClientSmallScanner(getConfiguration(), scan, getName(),
812           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
813           pool, replicaCallTimeoutMicroSecondScan);
814     } else {
815       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
816           this.rpcCallerFactory, this.rpcControllerFactory,
817           pool, replicaCallTimeoutMicroSecondScan);
818     }
819   }
820 
821   /**
822    * The underlying {@link HTable} must not be closed.
823    * {@link HTableInterface#getScanner(byte[])} has other usage details.
824    */
825   @Override
826   public ResultScanner getScanner(byte [] family) throws IOException {
827     Scan scan = new Scan();
828     scan.addFamily(family);
829     return getScanner(scan);
830   }
831 
832   /**
833    * The underlying {@link HTable} must not be closed.
834    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
835    */
836   @Override
837   public ResultScanner getScanner(byte [] family, byte [] qualifier)
838   throws IOException {
839     Scan scan = new Scan();
840     scan.addColumn(family, qualifier);
841     return getScanner(scan);
842   }
843 
844   /**
845    * {@inheritDoc}
846    */
847   @Override
848   public Result get(final Get get) throws IOException {
849     if (get.getConsistency() == null){
850       get.setConsistency(defaultConsistency);
851     }
852 
853     if (get.getConsistency() == Consistency.STRONG) {
854       // Good old call.
855       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
856           getName(), get.getRow()) {
857         @Override
858         public Result call(int callTimeout) throws IOException {
859           ClientProtos.GetRequest request =
860               RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
861           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
862           controller.setPriority(tableName);
863           controller.setCallTimeout(callTimeout);
864           try {
865             ClientProtos.GetResponse response = getStub().get(controller, request);
866             if (response == null) return null;
867             return ProtobufUtil.toResult(response.getResult());
868           } catch (ServiceException se) {
869             throw ProtobufUtil.getRemoteException(se);
870           }
871         }
872       };
873       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
874     }
875 
876     // Call that takes into account the replica
877     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
878       rpcControllerFactory, tableName, this.connection, get, pool, retries,
879       operationTimeout, primaryCallTimeoutMicroSecond);
880     return callable.call();
881   }
882 
883 
884   /**
885    * {@inheritDoc}
886    */
887   @Override
888   public Result[] get(List<Get> gets) throws IOException {
889     if (gets.size() == 1) {
890       return new Result[]{get(gets.get(0))};
891     }
892     try {
893       Object [] r1 = batch((List)gets);
894 
895       // translate.
896       Result [] results = new Result[r1.length];
897       int i=0;
898       for (Object o : r1) {
899         // batch ensures if there is a failure we get an exception instead
900         results[i++] = (Result) o;
901       }
902 
903       return results;
904     } catch (InterruptedException e) {
905       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
906     }
907   }
908 
909   /**
910    * {@inheritDoc}
911    */
912   @Override
913   public void batch(final List<? extends Row> actions, final Object[] results)
914       throws InterruptedException, IOException {
915     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
916     ars.waitUntilDone();
917     if (ars.hasError()) {
918       throw ars.getErrors();
919     }
920   }
921 
922   /**
923    * {@inheritDoc}
924    * @deprecated If any exception is thrown by one of the actions, there is no way to
925    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
926    */
927   @Deprecated
928   @Override
929   public Object[] batch(final List<? extends Row> actions)
930      throws InterruptedException, IOException {
931     Object[] results = new Object[actions.size()];
932     batch(actions, results);
933     return results;
934   }
935 
936   /**
937    * {@inheritDoc}
938    */
939   @Override
940   public <R> void batchCallback(
941       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
942       throws IOException, InterruptedException {
943     connection.processBatchCallback(actions, tableName, pool, results, callback);
944   }
945 
946   /**
947    * {@inheritDoc}
948    * @deprecated If any exception is thrown by one of the actions, there is no way to
949    * retrieve the partially executed results. Use
950    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
951    * instead.
952    */
953   @Deprecated
954   @Override
955   public <R> Object[] batchCallback(
956     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
957       InterruptedException {
958     Object[] results = new Object[actions.size()];
959     batchCallback(actions, results, callback);
960     return results;
961   }
962 
963   /**
964    * {@inheritDoc}
965    */
966   @Override
967   public void delete(final Delete delete)
968   throws IOException {
969     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
970         tableName, delete.getRow()) {
971       @Override
972       public Boolean call(int callTimeout) throws IOException {
973         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
974         controller.setPriority(tableName);
975         controller.setCallTimeout(callTimeout);
976 
977         try {
978           MutateRequest request = RequestConverter.buildMutateRequest(
979             getLocation().getRegionInfo().getRegionName(), delete);
980           MutateResponse response = getStub().mutate(controller, request);
981           return Boolean.valueOf(response.getProcessed());
982         } catch (ServiceException se) {
983           throw ProtobufUtil.getRemoteException(se);
984         }
985       }
986     };
987     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
988   }
989 
990   /**
991    * {@inheritDoc}
992    */
993   @Override
994   public void delete(final List<Delete> deletes)
995   throws IOException {
996     Object[] results = new Object[deletes.size()];
997     try {
998       batch(deletes, results);
999     } catch (InterruptedException e) {
1000       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1001     } finally {
1002       // mutate list so that it is empty for complete success, or contains only failed records
1003       // results are returned in the same order as the requests in list
1004       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
1005       for (int i = results.length - 1; i>=0; i--) {
1006         // if result is not null, it succeeded
1007         if (results[i] instanceof Result) {
1008           deletes.remove(i);
1009         }
1010       }
1011     }
1012   }
1013 
1014   /**
1015    * {@inheritDoc}
1016    */
1017   @Override
1018   public void put(final Put put)
1019       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1020     doPut(put);
1021     if (autoFlush) {
1022       flushCommits();
1023     }
1024   }
1025 
1026   /**
1027    * {@inheritDoc}
1028    */
1029   @Override
1030   public void put(final List<Put> puts)
1031       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1032     for (Put put : puts) {
1033       doPut(put);
1034     }
1035     if (autoFlush) {
1036       flushCommits();
1037     }
1038   }
1039 
1040 
1041   /**
1042    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
1043    *  cluster.
1044    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
1045    * @throws InterruptedIOException if we were interrupted.
1046    */
1047   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1048     // This behavior is highly non-intuitive... it does not protect us against
1049     // 94-incompatible behavior, which is a timing issue because hasError, the below code
1050     // and setter of hasError are not synchronized. Perhaps it should be removed.
1051     if (ap.hasError()) {
1052       writeAsyncBuffer.add(put);
1053       backgroundFlushCommits(true);
1054     }
1055 
1056     validatePut(put);
1057 
1058     currentWriteBufferSize += put.heapSize();
1059     writeAsyncBuffer.add(put);
1060 
1061     while (currentWriteBufferSize > writeBufferSize) {
1062       backgroundFlushCommits(false);
1063     }
1064   }
1065 
1066 
1067   /**
1068    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
1069    * If the is an error (max retried reach from a previous flush or bad operation), it tries to
1070    * send all operations in the buffer and sends an exception.
1071    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
1072    *                     returning.
1073    */
1074   private void backgroundFlushCommits(boolean synchronous) throws
1075       InterruptedIOException, RetriesExhaustedWithDetailsException {
1076 
1077     try {
1078       if (!synchronous) {
1079         ap.submit(tableName, writeAsyncBuffer, true, null, false);
1080         if (ap.hasError()) {
1081           LOG.debug(tableName + ": One or more of the operations have failed -" +
1082               " waiting for all operation in progress to finish (successfully or not)");
1083         }
1084       }
1085       if (synchronous || ap.hasError()) {
1086         while (!writeAsyncBuffer.isEmpty()) {
1087           ap.submit(tableName, writeAsyncBuffer, true, null, false);
1088         }
1089         List<Row> failedRows = clearBufferOnFail ? null : writeAsyncBuffer;
1090         RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(failedRows);
1091         if (error != null) {
1092           throw error;
1093         }
1094       }
1095     } finally {
1096       currentWriteBufferSize = 0;
1097       for (Row mut : writeAsyncBuffer) {
1098         if (mut instanceof Mutation) {
1099           currentWriteBufferSize += ((Mutation) mut).heapSize();
1100         }
1101       }
1102     }
1103   }
1104 
1105   /**
1106    * {@inheritDoc}
1107    */
1108   @Override
1109   public void mutateRow(final RowMutations rm) throws IOException {
1110     RegionServerCallable<Void> callable =
1111         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1112       @Override
1113       public Void call(int callTimeout) throws IOException {
1114         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1115         controller.setPriority(tableName);
1116         controller.setCallTimeout(callTimeout);
1117         try {
1118           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1119             getLocation().getRegionInfo().getRegionName(), rm);
1120           regionMutationBuilder.setAtomic(true);
1121           MultiRequest request =
1122             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1123           getStub().multi(controller, request);
1124         } catch (ServiceException se) {
1125           throw ProtobufUtil.getRemoteException(se);
1126         }
1127         return null;
1128       }
1129     };
1130     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1131   }
1132 
1133   /**
1134    * {@inheritDoc}
1135    */
1136   @Override
1137   public Result append(final Append append) throws IOException {
1138     if (append.numFamilies() == 0) {
1139       throw new IOException(
1140           "Invalid arguments to append, no columns specified");
1141     }
1142 
1143     NonceGenerator ng = this.connection.getNonceGenerator();
1144     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1145     RegionServerCallable<Result> callable =
1146       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1147         @Override
1148         public Result call(int callTimeout) throws IOException {
1149           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1150           controller.setPriority(getTableName());
1151           controller.setCallTimeout(callTimeout);
1152           try {
1153             MutateRequest request = RequestConverter.buildMutateRequest(
1154               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1155             MutateResponse response = getStub().mutate(controller, request);
1156             if (!response.hasResult()) return null;
1157             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1158           } catch (ServiceException se) {
1159             throw ProtobufUtil.getRemoteException(se);
1160           }
1161         }
1162       };
1163     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1164   }
1165 
1166   /**
1167    * {@inheritDoc}
1168    */
1169   @Override
1170   public Result increment(final Increment increment) throws IOException {
1171     if (!increment.hasFamilies()) {
1172       throw new IOException(
1173           "Invalid arguments to increment, no columns specified");
1174     }
1175     NonceGenerator ng = this.connection.getNonceGenerator();
1176     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1177     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1178         getName(), increment.getRow()) {
1179       @Override
1180       public Result call(int callTimeout) throws IOException {
1181         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1182         controller.setPriority(getTableName());
1183         controller.setCallTimeout(callTimeout);
1184         try {
1185           MutateRequest request = RequestConverter.buildMutateRequest(
1186             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1187           MutateResponse response = getStub().mutate(controller, request);
1188           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1189         } catch (ServiceException se) {
1190           throw ProtobufUtil.getRemoteException(se);
1191         }
1192       }
1193     };
1194     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1195   }
1196 
1197   /**
1198    * {@inheritDoc}
1199    */
1200   @Override
1201   public long incrementColumnValue(final byte [] row, final byte [] family,
1202       final byte [] qualifier, final long amount)
1203   throws IOException {
1204     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1205   }
1206 
1207   /**
1208    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1209    */
1210   @Deprecated
1211   @Override
1212   public long incrementColumnValue(final byte [] row, final byte [] family,
1213       final byte [] qualifier, final long amount, final boolean writeToWAL)
1214   throws IOException {
1215     return incrementColumnValue(row, family, qualifier, amount,
1216       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1217   }
1218 
1219   /**
1220    * {@inheritDoc}
1221    */
1222   @Override
1223   public long incrementColumnValue(final byte [] row, final byte [] family,
1224       final byte [] qualifier, final long amount, final Durability durability)
1225   throws IOException {
1226     NullPointerException npe = null;
1227     if (row == null) {
1228       npe = new NullPointerException("row is null");
1229     } else if (family == null) {
1230       npe = new NullPointerException("family is null");
1231     } else if (qualifier == null) {
1232       npe = new NullPointerException("qualifier is null");
1233     }
1234     if (npe != null) {
1235       throw new IOException(
1236           "Invalid arguments to incrementColumnValue", npe);
1237     }
1238 
1239     NonceGenerator ng = this.connection.getNonceGenerator();
1240     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1241     RegionServerCallable<Long> callable =
1242       new RegionServerCallable<Long>(connection, getName(), row) {
1243         @Override
1244         public Long call(int callTimeout) throws IOException {
1245           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1246           controller.setPriority(getTableName());
1247           controller.setCallTimeout(callTimeout);
1248           try {
1249             MutateRequest request = RequestConverter.buildIncrementRequest(
1250               getLocation().getRegionInfo().getRegionName(), row, family,
1251               qualifier, amount, durability, nonceGroup, nonce);
1252             MutateResponse response = getStub().mutate(controller, request);
1253             Result result =
1254               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1255             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1256           } catch (ServiceException se) {
1257             throw ProtobufUtil.getRemoteException(se);
1258           }
1259         }
1260       };
1261     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1262   }
1263 
1264   /**
1265    * {@inheritDoc}
1266    */
1267   @Override
1268   public boolean checkAndPut(final byte [] row,
1269       final byte [] family, final byte [] qualifier, final byte [] value,
1270       final Put put)
1271   throws IOException {
1272     RegionServerCallable<Boolean> callable =
1273       new RegionServerCallable<Boolean>(connection, getName(), row) {
1274         @Override
1275         public Boolean call(int callTimeout) throws IOException {
1276           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1277           controller.setPriority(tableName);
1278           controller.setCallTimeout(callTimeout);
1279           try {
1280             MutateRequest request = RequestConverter.buildMutateRequest(
1281               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1282                 new BinaryComparator(value), CompareType.EQUAL, put);
1283             MutateResponse response = getStub().mutate(controller, request);
1284             return Boolean.valueOf(response.getProcessed());
1285           } catch (ServiceException se) {
1286             throw ProtobufUtil.getRemoteException(se);
1287           }
1288         }
1289       };
1290     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1291   }
1292 
1293   /**
1294    * {@inheritDoc}
1295    */
1296   @Override
1297   public boolean checkAndPut(final byte [] row, final byte [] family,
1298       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1299       final Put put)
1300   throws IOException {
1301     RegionServerCallable<Boolean> callable =
1302       new RegionServerCallable<Boolean>(connection, getName(), row) {
1303         @Override
1304         public Boolean call(int callTimeout) throws IOException {
1305           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1306           controller.setPriority(tableName);
1307           controller.setCallTimeout(callTimeout);
1308           try {
1309             CompareType compareType = CompareType.valueOf(compareOp.name());
1310             MutateRequest request = RequestConverter.buildMutateRequest(
1311               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1312                 new BinaryComparator(value), compareType, put);
1313             MutateResponse response = getStub().mutate(controller, request);
1314             return Boolean.valueOf(response.getProcessed());
1315           } catch (ServiceException se) {
1316             throw ProtobufUtil.getRemoteException(se);
1317           }
1318         }
1319       };
1320     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1321   }
1322 
1323   /**
1324    * {@inheritDoc}
1325    */
1326   @Override
1327   public boolean checkAndDelete(final byte [] row,
1328       final byte [] family, final byte [] qualifier, final byte [] value,
1329       final Delete delete)
1330   throws IOException {
1331     RegionServerCallable<Boolean> callable =
1332       new RegionServerCallable<Boolean>(connection, getName(), row) {
1333         @Override
1334         public Boolean call(int callTimeout) throws IOException {
1335           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1336           controller.setPriority(tableName);
1337           controller.setCallTimeout(callTimeout);
1338           try {
1339             MutateRequest request = RequestConverter.buildMutateRequest(
1340               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1341                 new BinaryComparator(value), CompareType.EQUAL, delete);
1342             MutateResponse response = getStub().mutate(controller, request);
1343             return Boolean.valueOf(response.getProcessed());
1344           } catch (ServiceException se) {
1345             throw ProtobufUtil.getRemoteException(se);
1346           }
1347         }
1348       };
1349     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1350   }
1351 
1352   /**
1353    * {@inheritDoc}
1354    */
1355   @Override
1356   public boolean checkAndDelete(final byte [] row, final byte [] family,
1357       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1358       final Delete delete)
1359   throws IOException {
1360     RegionServerCallable<Boolean> callable =
1361       new RegionServerCallable<Boolean>(connection, getName(), row) {
1362         @Override
1363         public Boolean call(int callTimeout) throws IOException {
1364           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1365           controller.setPriority(tableName);
1366           controller.setCallTimeout(callTimeout);
1367           try {
1368             CompareType compareType = CompareType.valueOf(compareOp.name());
1369             MutateRequest request = RequestConverter.buildMutateRequest(
1370               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1371                 new BinaryComparator(value), compareType, delete);
1372             MutateResponse response = getStub().mutate(controller, request);
1373             return Boolean.valueOf(response.getProcessed());
1374           } catch (ServiceException se) {
1375             throw ProtobufUtil.getRemoteException(se);
1376           }
1377         }
1378       };
1379     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1380   }
1381 
1382   /**
1383    * {@inheritDoc}
1384    */
1385   @Override
1386   public boolean exists(final Get get) throws IOException {
1387     get.setCheckExistenceOnly(true);
1388     Result r = get(get);
1389     assert r.getExists() != null;
1390     return r.getExists();
1391   }
1392 
1393   /**
1394    * {@inheritDoc}
1395    */
1396   @Override
1397   public boolean[] existsAll(final List<Get> gets) throws IOException {
1398     if (gets.isEmpty()) return new boolean[]{};
1399     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1400 
1401     for (Get g: gets){
1402       g.setCheckExistenceOnly(true);
1403     }
1404 
1405     Object[] r1;
1406     try {
1407       r1 = batch(gets);
1408     } catch (InterruptedException e) {
1409       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1410     }
1411 
1412     // translate.
1413     boolean[] results = new boolean[r1.length];
1414     int i = 0;
1415     for (Object o : r1) {
1416       // batch ensures if there is a failure we get an exception instead
1417       results[i++] = ((Result)o).getExists();
1418     }
1419 
1420     return results;
1421   }
1422 
1423   /**
1424    * {@inheritDoc}
1425    */
1426   @Override
1427   @Deprecated
1428   public Boolean[] exists(final List<Get> gets) throws IOException {
1429     boolean[] results = existsAll(gets);
1430     Boolean[] objectResults = new Boolean[results.length];
1431     for (int i = 0; i < results.length; ++i) {
1432       objectResults[i] = results[i];
1433     }
1434     return objectResults;
1435   }
1436 
1437   /**
1438    * {@inheritDoc}
1439    */
1440   @Override
1441   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1442     // As we can have an operation in progress even if the buffer is empty, we call
1443     //  backgroundFlushCommits at least one time.
1444     backgroundFlushCommits(true);
1445   }
1446 
1447   /**
1448    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1449    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1450    *
1451    * @param list The collection of actions.
1452    * @param results An empty array, same size as list. If an exception is thrown,
1453    * you can test here for partial results, and to determine which actions
1454    * processed successfully.
1455    * @throws IOException if there are problems talking to META. Per-item
1456    * exceptions are stored in the results array.
1457    */
1458   public <R> void processBatchCallback(
1459     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1460     throws IOException, InterruptedException {
1461     this.batchCallback(list, results, callback);
1462   }
1463 
1464 
1465   /**
1466    * Parameterized batch processing, allowing varying return types for different
1467    * {@link Row} implementations.
1468    */
1469   public void processBatch(final List<? extends Row> list, final Object[] results)
1470     throws IOException, InterruptedException {
1471     this.batch(list, results);
1472   }
1473 
1474 
1475   @Override
1476   public void close() throws IOException {
1477     if (this.closed) {
1478       return;
1479     }
1480     flushCommits();
1481     if (cleanupPoolOnClose) {
1482       this.pool.shutdown();
1483       try {
1484         boolean terminated = false;
1485         do {
1486           // wait until the pool has terminated
1487           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1488         } while (!terminated);
1489       } catch (InterruptedException e) {
1490         LOG.warn("waitForTermination interrupted");
1491       }
1492     }
1493     if (cleanupConnectionOnClose) {
1494       if (this.connection != null) {
1495         this.connection.close();
1496       }
1497     }
1498     this.closed = true;
1499   }
1500 
1501   // validate for well-formedness
1502   public void validatePut(final Put put) throws IllegalArgumentException{
1503     if (put.isEmpty()) {
1504       throw new IllegalArgumentException("No columns to insert");
1505     }
1506     if (maxKeyValueSize > 0) {
1507       for (List<Cell> list : put.getFamilyCellMap().values()) {
1508         for (Cell cell : list) {
1509           // KeyValue v1 expectation.  Cast for now.
1510           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1511           if (kv.getLength() > maxKeyValueSize) {
1512             throw new IllegalArgumentException("KeyValue size too large");
1513           }
1514         }
1515       }
1516     }
1517   }
1518 
1519   /**
1520    * {@inheritDoc}
1521    */
1522   @Override
1523   public boolean isAutoFlush() {
1524     return autoFlush;
1525   }
1526 
1527   /**
1528    * {@inheritDoc}
1529    */
1530   @Deprecated
1531   @Override
1532   public void setAutoFlush(boolean autoFlush) {
1533     setAutoFlush(autoFlush, autoFlush);
1534   }
1535 
1536   /**
1537    * {@inheritDoc}
1538    */
1539   @Override
1540   public void setAutoFlushTo(boolean autoFlush) {
1541     setAutoFlush(autoFlush, clearBufferOnFail);
1542   }
1543 
1544   /**
1545    * {@inheritDoc}
1546    */
1547   @Override
1548   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1549     this.autoFlush = autoFlush;
1550     this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1551   }
1552 
1553   /**
1554    * Returns the maximum size in bytes of the write buffer for this HTable.
1555    * <p>
1556    * The default value comes from the configuration parameter
1557    * {@code hbase.client.write.buffer}.
1558    * @return The size of the write buffer in bytes.
1559    */
1560   @Override
1561   public long getWriteBufferSize() {
1562     return writeBufferSize;
1563   }
1564 
1565   /**
1566    * Sets the size of the buffer in bytes.
1567    * <p>
1568    * If the new size is less than the current amount of data in the
1569    * write buffer, the buffer gets flushed.
1570    * @param writeBufferSize The new write buffer size, in bytes.
1571    * @throws IOException if a remote or network exception occurs.
1572    */
1573   @Override
1574   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1575     this.writeBufferSize = writeBufferSize;
1576     if(currentWriteBufferSize > writeBufferSize) {
1577       flushCommits();
1578     }
1579   }
1580 
1581   /**
1582    * The pool is used for mutli requests for this HTable
1583    * @return the pool used for mutli
1584    */
1585   ExecutorService getPool() {
1586     return this.pool;
1587   }
1588 
1589   /**
1590    * Enable or disable region cache prefetch for the table. It will be
1591    * applied for the given table's all HTable instances who share the same
1592    * connection. By default, the cache prefetch is enabled.
1593    * @param tableName name of table to configure.
1594    * @param enable Set to true to enable region cache prefetch. Or set to
1595    * false to disable it.
1596    * @throws IOException
1597    * @deprecated does nothing since 0.99
1598    */
1599   @Deprecated
1600   public static void setRegionCachePrefetch(final byte[] tableName,
1601       final boolean enable)  throws IOException {
1602   }
1603 
1604   /**
1605    * @deprecated does nothing since 0.99
1606    */
1607   @Deprecated
1608   public static void setRegionCachePrefetch(
1609       final TableName tableName,
1610       final boolean enable) throws IOException {
1611   }
1612 
1613   /**
1614    * Enable or disable region cache prefetch for the table. It will be
1615    * applied for the given table's all HTable instances who share the same
1616    * connection. By default, the cache prefetch is enabled.
1617    * @param conf The Configuration object to use.
1618    * @param tableName name of table to configure.
1619    * @param enable Set to true to enable region cache prefetch. Or set to
1620    * false to disable it.
1621    * @throws IOException
1622    * @deprecated does nothing since 0.99
1623    */
1624   @Deprecated
1625   public static void setRegionCachePrefetch(final Configuration conf,
1626       final byte[] tableName, final boolean enable) throws IOException {
1627   }
1628 
1629   /**
1630    * @deprecated does nothing since 0.99
1631    */
1632   @Deprecated
1633   public static void setRegionCachePrefetch(final Configuration conf,
1634       final TableName tableName,
1635       final boolean enable) throws IOException {
1636   }
1637 
1638   /**
1639    * Check whether region cache prefetch is enabled or not for the table.
1640    * @param conf The Configuration object to use.
1641    * @param tableName name of table to check
1642    * @return true if table's region cache prefecth is enabled. Otherwise
1643    * it is disabled.
1644    * @throws IOException
1645    * @deprecated always return false since 0.99
1646    */
1647   @Deprecated
1648   public static boolean getRegionCachePrefetch(final Configuration conf,
1649       final byte[] tableName) throws IOException {
1650     return false;
1651   }
1652 
1653   /**
1654    * @deprecated always return false since 0.99
1655    */
1656   @Deprecated
1657   public static boolean getRegionCachePrefetch(final Configuration conf,
1658       final TableName tableName) throws IOException {
1659     return false;
1660   }
1661 
1662   /**
1663    * Check whether region cache prefetch is enabled or not for the table.
1664    * @param tableName name of table to check
1665    * @return true if table's region cache prefecth is enabled. Otherwise
1666    * it is disabled.
1667    * @throws IOException
1668    * @deprecated always return false since 0.99
1669    */
1670   @Deprecated
1671   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1672     return false;
1673   }
1674 
1675   /**
1676    * @deprecated always return false since 0.99
1677    */
1678   @Deprecated
1679   public static boolean getRegionCachePrefetch(
1680       final TableName tableName) throws IOException {
1681     return false;
1682   }
1683 
1684   /**
1685    * Explicitly clears the region cache to fetch the latest value from META.
1686    * This is a power user function: avoid unless you know the ramifications.
1687    */
1688   public void clearRegionCache() {
1689     this.connection.clearRegionCache();
1690   }
1691 
1692   /**
1693    * {@inheritDoc}
1694    */
1695   @Override
1696   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1697     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1698   }
1699 
1700   /**
1701    * {@inheritDoc}
1702    */
1703   @Override
1704   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1705       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1706       throws ServiceException, Throwable {
1707     final Map<byte[],R> results =  Collections.synchronizedMap(
1708         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1709     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1710       @Override
1711       public void update(byte[] region, byte[] row, R value) {
1712         if (region != null) {
1713           results.put(region, value);
1714         }
1715       }
1716     });
1717     return results;
1718   }
1719 
1720   /**
1721    * {@inheritDoc}
1722    */
1723   @Override
1724   public <T extends Service, R> void coprocessorService(final Class<T> service,
1725       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1726       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1727 
1728     // get regions covered by the row range
1729     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1730 
1731     Map<byte[],Future<R>> futures =
1732         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1733     for (final byte[] r : keys) {
1734       final RegionCoprocessorRpcChannel channel =
1735           new RegionCoprocessorRpcChannel(connection, tableName, r);
1736       Future<R> future = pool.submit(
1737           new Callable<R>() {
1738             @Override
1739             public R call() throws Exception {
1740               T instance = ProtobufUtil.newServiceStub(service, channel);
1741               R result = callable.call(instance);
1742               byte[] region = channel.getLastRegion();
1743               if (callback != null) {
1744                 callback.update(region, r, result);
1745               }
1746               return result;
1747             }
1748           });
1749       futures.put(r, future);
1750     }
1751     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1752       try {
1753         e.getValue().get();
1754       } catch (ExecutionException ee) {
1755         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1756             + Bytes.toStringBinary(e.getKey()), ee);
1757         throw ee.getCause();
1758       } catch (InterruptedException ie) {
1759         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1760             + " for row " + Bytes.toStringBinary(e.getKey()))
1761             .initCause(ie);
1762       }
1763     }
1764   }
1765 
1766   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1767   throws IOException {
1768     if (start == null) {
1769       start = HConstants.EMPTY_START_ROW;
1770     }
1771     if (end == null) {
1772       end = HConstants.EMPTY_END_ROW;
1773     }
1774     return getKeysAndRegionsInRange(start, end, true).getFirst();
1775   }
1776 
1777   public void setOperationTimeout(int operationTimeout) {
1778     this.operationTimeout = operationTimeout;
1779   }
1780 
1781   public int getOperationTimeout() {
1782     return operationTimeout;
1783   }
1784 
1785   @Override
1786   public String toString() {
1787     return tableName + ";" + connection;
1788   }
1789 
1790   /**
1791    * Run basic test.
1792    * @param args Pass table name and row and will get the content.
1793    * @throws IOException
1794    */
1795   public static void main(String[] args) throws IOException {
1796     Table t = new HTable(HBaseConfiguration.create(), args[0]);
1797     try {
1798       System.out.println(t.get(new Get(Bytes.toBytes(args[1]))));
1799     } finally {
1800       t.close();
1801     }
1802   }
1803 
1804   /**
1805    * {@inheritDoc}
1806    */
1807   @Override
1808   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1809       Descriptors.MethodDescriptor methodDescriptor, Message request,
1810       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1811     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1812         Bytes.BYTES_COMPARATOR));
1813     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1814         new Callback<R>() {
1815 
1816           @Override
1817           public void update(byte[] region, byte[] row, R result) {
1818             if (region != null) {
1819               results.put(region, result);
1820             }
1821           }
1822         });
1823     return results;
1824   }
1825 
1826   /**
1827    * {@inheritDoc}
1828    */
1829   @Override
1830   public <R extends Message> void batchCoprocessorService(
1831       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1832       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1833       throws ServiceException, Throwable {
1834 
1835     // get regions covered by the row range
1836     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1837         getKeysAndRegionsInRange(startKey, endKey, true);
1838     List<byte[]> keys = keysAndRegions.getFirst();
1839     List<HRegionLocation> regions = keysAndRegions.getSecond();
1840 
1841     // check if we have any calls to make
1842     if (keys.isEmpty()) {
1843       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1844           ", end=" + Bytes.toStringBinary(endKey));
1845       return;
1846     }
1847 
1848     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1849     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1850         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1851     for (int i = 0; i < keys.size(); i++) {
1852       final byte[] rowKey = keys.get(i);
1853       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1854       RegionCoprocessorServiceExec exec =
1855           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1856       execs.add(exec);
1857       execsByRow.put(rowKey, exec);
1858     }
1859 
1860     // tracking for any possible deserialization errors on success callback
1861     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1862     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1863     final List<Row> callbackErrorActions = new ArrayList<Row>();
1864     final List<String> callbackErrorServers = new ArrayList<String>();
1865     Object[] results = new Object[execs.size()];
1866 
1867     AsyncProcess asyncProcess =
1868         new AsyncProcess(connection, configuration, pool,
1869             RpcRetryingCallerFactory.instantiate(configuration), true,
1870             RpcControllerFactory.instantiate(configuration));
1871     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1872         new Callback<ClientProtos.CoprocessorServiceResult>() {
1873           @Override
1874           public void update(byte[] region, byte[] row,
1875                               ClientProtos.CoprocessorServiceResult serviceResult) {
1876             if (LOG.isTraceEnabled()) {
1877               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1878                   ": region=" + Bytes.toStringBinary(region) +
1879                   ", row=" + Bytes.toStringBinary(row) +
1880                   ", value=" + serviceResult.getValue().getValue());
1881             }
1882             try {
1883               callback.update(region, row,
1884                   (R) responsePrototype.newBuilderForType().mergeFrom(
1885                       serviceResult.getValue().getValue()).build());
1886             } catch (InvalidProtocolBufferException e) {
1887               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1888                   e);
1889               callbackErrorExceptions.add(e);
1890               callbackErrorActions.add(execsByRow.get(row));
1891               callbackErrorServers.add("null");
1892             }
1893           }
1894         }, results);
1895 
1896     future.waitUntilDone();
1897 
1898     if (future.hasError()) {
1899       throw future.getErrors();
1900     } else if (!callbackErrorExceptions.isEmpty()) {
1901       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1902           callbackErrorServers);
1903     }
1904   }
1905 }