View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import com.google.protobuf.InvalidProtocolBufferException;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.classification.InterfaceStability;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.KeyValueUtil;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
57  import org.apache.hadoop.hbase.client.coprocessor.Batch;
58  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
59  import org.apache.hadoop.hbase.filter.BinaryComparator;
60  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
61  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
62  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
63  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
64  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
65  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66  import org.apache.hadoop.hbase.protobuf.RequestConverter;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
70  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
71  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
72  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
73  import org.apache.hadoop.hbase.util.Bytes;
74  import org.apache.hadoop.hbase.util.Pair;
75  import org.apache.hadoop.hbase.util.Threads;
76  
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.Message;
79  import com.google.protobuf.Service;
80  import com.google.protobuf.ServiceException;
81  
82  import com.google.common.annotations.VisibleForTesting;
83  
84  /**
85   * <p>Used to communicate with a single HBase table.  An implementation of
86   * {@link HTableInterface}.  Instances of this class can be constructed directly but it is
87   * encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}.
88   * See {@link HConnectionManager} class comment for an example.
89   *
90   * <p>This class is not thread safe for reads nor write.
91   *
92   * <p>In case of writes (Put, Delete), the underlying write buffer can
93   * be corrupted if multiple threads contend over a single HTable instance.
94   *
95   * <p>In case of reads, some fields used by a Scan are shared among all threads.
96   * The HTable implementation can either not contract to be safe in case of a Get
97   *
98   * <p>Instances of HTable passed the same {@link Configuration} instance will
99   * share connections to servers out on the cluster and to the zookeeper ensemble
100  * as well as caches of region locations.  This is usually a *good* thing and it
101  * is recommended to reuse the same configuration object for all your tables.
102  * This happens because they will all share the same underlying
103  * {@link HConnection} instance. See {@link HConnectionManager} for more on
104  * how this mechanism works.
105  *
106  * <p>{@link HConnection} will read most of the
107  * configuration it needs from the passed {@link Configuration} on initial
108  * construction.  Thereafter, for settings such as
109  * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
110  * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
111  * passed {@link Configuration} subsequent to {@link HConnection} construction
112  * will go unnoticed.  To run with changed values, make a new
113  * {@link HTable} passing a new {@link Configuration} instance that has the
114  * new configuration.
115  *
116  * <p>Note that this class implements the {@link Closeable} interface. When a
117  * HTable instance is no longer required, it *should* be closed in order to ensure
118  * that the underlying resources are promptly released. Please note that the close
119  * method can throw java.io.IOException that must be handled.
120  *
121  * @see HBaseAdmin for create, drop, list, enable and disable of tables.
122  * @see HConnection
123  * @see HConnectionManager
124  */
125 @InterfaceAudience.Public
126 @InterfaceStability.Stable
127 public class HTable implements HTableInterface {
128   private static final Log LOG = LogFactory.getLog(HTable.class);
129   protected ClusterConnection connection;
130   private final TableName tableName;
131   private volatile Configuration configuration;
132   protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
133   private long writeBufferSize;
134   private boolean clearBufferOnFail;
135   private boolean autoFlush;
136   protected long currentWriteBufferSize;
137   protected int scannerCaching;
138   private int maxKeyValueSize;
139   private ExecutorService pool;  // For Multi
140   private boolean closed;
141   private int operationTimeout;
142   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
143   private final boolean cleanupConnectionOnClose; // close the connection in close()
144 
145   /** The Async process for puts with autoflush set to false or multiputs */
146   protected AsyncProcess ap;
147   /** The Async process for batch */
148   protected AsyncProcess multiAp;
149   private RpcRetryingCallerFactory rpcCallerFactory;
150   private RpcControllerFactory rpcControllerFactory;
151 
152   /**
153    * Creates an object to access a HBase table.
154    * Shares zookeeper connection and other resources with other HTable instances
155    * created with the same <code>conf</code> instance.  Uses already-populated
156    * region cache if one is available, populated by any other HTable instances
157    * sharing this <code>conf</code> instance.  Recommended.
158    * @param conf Configuration object to use.
159    * @param tableName Name of the table.
160    * @throws IOException if a remote or network exception occurs
161    */
162   public HTable(Configuration conf, final String tableName)
163   throws IOException {
164     this(conf, TableName.valueOf(tableName));
165   }
166 
167   /**
168    * Creates an object to access a HBase table.
169    * Shares zookeeper connection and other resources with other HTable instances
170    * created with the same <code>conf</code> instance.  Uses already-populated
171    * region cache if one is available, populated by any other HTable instances
172    * sharing this <code>conf</code> instance.  Recommended.
173    * @param conf Configuration object to use.
174    * @param tableName Name of the table.
175    * @throws IOException if a remote or network exception occurs
176    */
177   public HTable(Configuration conf, final byte[] tableName)
178   throws IOException {
179     this(conf, TableName.valueOf(tableName));
180   }
181 
182 
183 
184   /**
185    * Creates an object to access a HBase table.
186    * Shares zookeeper connection and other resources with other HTable instances
187    * created with the same <code>conf</code> instance.  Uses already-populated
188    * region cache if one is available, populated by any other HTable instances
189    * sharing this <code>conf</code> instance.  Recommended.
190    * @param conf Configuration object to use.
191    * @param tableName table name pojo
192    * @throws IOException if a remote or network exception occurs
193    */
194   public HTable(Configuration conf, final TableName tableName)
195   throws IOException {
196     this.tableName = tableName;
197     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
198     if (conf == null) {
199       this.connection = null;
200       return;
201     }
202     this.connection = ConnectionManager.getConnectionInternal(conf);
203     this.configuration = conf;
204 
205     this.pool = getDefaultExecutor(conf);
206     this.finishSetup();
207   }
208 
209   /**
210    * Creates an object to access a HBase table. Shares zookeeper connection and other resources with
211    * other HTable instances created with the same <code>connection</code> instance. Use this
212    * constructor when the HConnection instance is externally managed.
213    * @param tableName Name of the table.
214    * @param connection HConnection to be used.
215    * @throws IOException if a remote or network exception occurs
216    * @deprecated Do not use.
217    */
218   @Deprecated
219   public HTable(TableName tableName, HConnection connection) throws IOException {
220     this.tableName = tableName;
221     this.cleanupPoolOnClose = true;
222     this.cleanupConnectionOnClose = false;
223     this.connection = (ClusterConnection)connection;
224     this.configuration = connection.getConfiguration();
225 
226     this.pool = getDefaultExecutor(this.configuration);
227     this.finishSetup();
228   }
229 
230   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
231     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
232     if (maxThreads == 0) {
233       maxThreads = 1; // is there a better default?
234     }
235     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
236 
237     // Using the "direct handoff" approach, new threads will only be created
238     // if it is necessary and will grow unbounded. This could be bad but in HCM
239     // we only create as many Runnables as there are region servers. It means
240     // it also scales when new region servers are added.
241     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
242         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
243     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
244     return pool;
245   }
246 
247   /**
248    * Creates an object to access a HBase table.
249    * Shares zookeeper connection and other resources with other HTable instances
250    * created with the same <code>conf</code> instance.  Uses already-populated
251    * region cache if one is available, populated by any other HTable instances
252    * sharing this <code>conf</code> instance.
253    * Use this constructor when the ExecutorService is externally managed.
254    * @param conf Configuration object to use.
255    * @param tableName Name of the table.
256    * @param pool ExecutorService to be used.
257    * @throws IOException if a remote or network exception occurs
258    */
259   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
260       throws IOException {
261     this(conf, TableName.valueOf(tableName), pool);
262   }
263 
264   /**
265    * Creates an object to access a HBase table.
266    * Shares zookeeper connection and other resources with other HTable instances
267    * created with the same <code>conf</code> instance.  Uses already-populated
268    * region cache if one is available, populated by any other HTable instances
269    * sharing this <code>conf</code> instance.
270    * Use this constructor when the ExecutorService is externally managed.
271    * @param conf Configuration object to use.
272    * @param tableName Name of the table.
273    * @param pool ExecutorService to be used.
274    * @throws IOException if a remote or network exception occurs
275    */
276   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
277       throws IOException {
278     this.connection = ConnectionManager.getConnectionInternal(conf);
279     this.configuration = conf;
280     this.pool = pool;
281     this.tableName = tableName;
282     this.cleanupPoolOnClose = false;
283     this.cleanupConnectionOnClose = true;
284 
285     this.finishSetup();
286   }
287 
288   /**
289    * Creates an object to access a HBase table.
290    * Shares zookeeper connection and other resources with other HTable instances
291    * created with the same <code>connection</code> instance.
292    * Use this constructor when the ExecutorService and HConnection instance are
293    * externally managed.
294    * @param tableName Name of the table.
295    * @param connection HConnection to be used.
296    * @param pool ExecutorService to be used.
297    * @throws IOException if a remote or network exception occurs.
298    * @deprecated Do not use, internal ctor.
299    */
300   @Deprecated
301   public HTable(final byte[] tableName, final HConnection connection,
302       final ExecutorService pool) throws IOException {
303     this(TableName.valueOf(tableName), connection, pool);
304   }
305 
306   /** @deprecated Do not use, internal ctor. */
307   @Deprecated
308   public HTable(TableName tableName, final HConnection connection,
309       final ExecutorService pool) throws IOException {
310     this(tableName, (ClusterConnection)connection, pool);
311   }
312 
313   /**
314    * Creates an object to access a HBase table.
315    * Shares zookeeper connection and other resources with other HTable instances
316    * created with the same <code>connection</code> instance.
317    * Visible only for HTableWrapper which is in different package.
318    * Should not be used by exernal code.
319    * @param tableName Name of the table.
320    * @param connection HConnection to be used.
321    * @param pool ExecutorService to be used.
322    * @throws IOException if a remote or network exception occurs
323    */
324   @InterfaceAudience.Private
325   public HTable(TableName tableName, final ClusterConnection connection,
326       final ExecutorService pool) throws IOException {
327     if (connection == null || connection.isClosed()) {
328       throw new IllegalArgumentException("Connection is null or closed.");
329     }
330     this.tableName = tableName;
331     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
332     this.connection = connection;
333     this.configuration = connection.getConfiguration();
334     this.pool = pool;
335 
336     this.finishSetup();
337   }
338 
339   /**
340    * For internal testing.
341    */
342   protected HTable() {
343     tableName = null;
344     cleanupPoolOnClose = false;
345     cleanupConnectionOnClose = false;
346   }
347 
348   /**
349    * setup this HTable's parameter based on the passed configuration
350    */
351   private void finishSetup() throws IOException {
352     this.operationTimeout = tableName.isSystemTable() ?
353       this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
354         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
355       this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
356         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
357     this.writeBufferSize = this.configuration.getLong(
358         "hbase.client.write.buffer", 2097152);
359     this.clearBufferOnFail = true;
360     this.autoFlush = true;
361     this.currentWriteBufferSize = 0;
362     this.scannerCaching = this.configuration.getInt(
363         HConstants.HBASE_CLIENT_SCANNER_CACHING,
364         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
365 
366     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
367     this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
368     // puts need to track errors globally due to how the APIs currently work.
369     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
370     multiAp = this.connection.getAsyncProcess();
371 
372     this.maxKeyValueSize = this.configuration.getInt(
373         "hbase.client.keyvalue.maxsize", -1);
374     this.closed = false;
375   }
376 
377 
378 
379   /**
380    * {@inheritDoc}
381    */
382   @Override
383   public Configuration getConfiguration() {
384     return configuration;
385   }
386 
387   /**
388    * Tells whether or not a table is enabled or not. This method creates a
389    * new HBase configuration, so it might make your unit tests fail due to
390    * incorrect ZK client port.
391    * @param tableName Name of table to check.
392    * @return {@code true} if table is online.
393    * @throws IOException if a remote or network exception occurs
394 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
395    */
396   @Deprecated
397   public static boolean isTableEnabled(String tableName) throws IOException {
398     return isTableEnabled(TableName.valueOf(tableName));
399   }
400 
401   /**
402    * Tells whether or not a table is enabled or not. This method creates a
403    * new HBase configuration, so it might make your unit tests fail due to
404    * incorrect ZK client port.
405    * @param tableName Name of table to check.
406    * @return {@code true} if table is online.
407    * @throws IOException if a remote or network exception occurs
408 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
409    */
410   @Deprecated
411   public static boolean isTableEnabled(byte[] tableName) throws IOException {
412     return isTableEnabled(TableName.valueOf(tableName));
413   }
414 
415   /**
416    * Tells whether or not a table is enabled or not. This method creates a
417    * new HBase configuration, so it might make your unit tests fail due to
418    * incorrect ZK client port.
419    * @param tableName Name of table to check.
420    * @return {@code true} if table is online.
421    * @throws IOException if a remote or network exception occurs
422    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
423    */
424   @Deprecated
425   public static boolean isTableEnabled(TableName tableName) throws IOException {
426     return isTableEnabled(HBaseConfiguration.create(), tableName);
427   }
428 
429   /**
430    * Tells whether or not a table is enabled or not.
431    * @param conf The Configuration object to use.
432    * @param tableName Name of table to check.
433    * @return {@code true} if table is online.
434    * @throws IOException if a remote or network exception occurs
435 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
436    */
437   @Deprecated
438   public static boolean isTableEnabled(Configuration conf, String tableName)
439   throws IOException {
440     return isTableEnabled(conf, TableName.valueOf(tableName));
441   }
442 
443   /**
444    * Tells whether or not a table is enabled or not.
445    * @param conf The Configuration object to use.
446    * @param tableName Name of table to check.
447    * @return {@code true} if table is online.
448    * @throws IOException if a remote or network exception occurs
449 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
450    */
451   @Deprecated
452   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
453   throws IOException {
454     return isTableEnabled(conf, TableName.valueOf(tableName));
455   }
456 
457   /**
458    * Tells whether or not a table is enabled or not.
459    * @param conf The Configuration object to use.
460    * @param tableName Name of table to check.
461    * @return {@code true} if table is online.
462    * @throws IOException if a remote or network exception occurs
463    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
464    */
465   @Deprecated
466   public static boolean isTableEnabled(Configuration conf,
467       final TableName tableName) throws IOException {
468     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
469       @Override
470       public Boolean connect(HConnection connection) throws IOException {
471         return connection.isTableEnabled(tableName);
472       }
473     });
474   }
475 
476   /**
477    * Find region location hosting passed row using cached info
478    * @param row Row to find.
479    * @return The location of the given row.
480    * @throws IOException if a remote or network exception occurs
481    */
482   public HRegionLocation getRegionLocation(final String row)
483   throws IOException {
484     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
485   }
486 
487   /**
488    * Finds the region on which the given row is being served. Does not reload the cache.
489    * @param row Row to find.
490    * @return Location of the row.
491    * @throws IOException if a remote or network exception occurs
492    */
493   public HRegionLocation getRegionLocation(final byte [] row)
494   throws IOException {
495     return connection.getRegionLocation(tableName, row, false);
496   }
497 
498   /**
499    * Finds the region on which the given row is being served.
500    * @param row Row to find.
501    * @param reload true to reload information or false to use cached information
502    * @return Location of the row.
503    * @throws IOException if a remote or network exception occurs
504    */
505   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
506   throws IOException {
507     return connection.getRegionLocation(tableName, row, reload);
508   }
509 
510   /**
511    * {@inheritDoc}
512    */
513   @Override
514   public byte [] getTableName() {
515     return this.tableName.getName();
516   }
517 
518   @Override
519   public TableName getName() {
520     return tableName;
521   }
522 
523   /**
524    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
525    * manipulations.
526    * @return An HConnection instance.
527    * @deprecated This method will be changed from public to package protected.
528    */
529   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
530   @Deprecated
531   @VisibleForTesting
532   public HConnection getConnection() {
533     return this.connection;
534   }
535 
536   /**
537    * Gets the number of rows that a scanner will fetch at once.
538    * <p>
539    * The default value comes from {@code hbase.client.scanner.caching}.
540    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
541    */
542   @Deprecated
543   public int getScannerCaching() {
544     return scannerCaching;
545   }
546 
547   /**
548    * Kept in 0.96 for backward compatibility
549    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
550    */
551   @Deprecated
552   public List<Row> getWriteBuffer() {
553     return writeAsyncBuffer;
554   }
555 
556   /**
557    * Sets the number of rows that a scanner will fetch at once.
558    * <p>
559    * This will override the value specified by
560    * {@code hbase.client.scanner.caching}.
561    * Increasing this value will reduce the amount of work needed each time
562    * {@code next()} is called on a scanner, at the expense of memory use
563    * (since more rows will need to be maintained in memory by the scanners).
564    * @param scannerCaching the number of rows a scanner will fetch at once.
565    * @deprecated Use {@link Scan#setCaching(int)}
566    */
567   @Deprecated
568   public void setScannerCaching(int scannerCaching) {
569     this.scannerCaching = scannerCaching;
570   }
571 
572   /**
573    * {@inheritDoc}
574    */
575   @Override
576   public HTableDescriptor getTableDescriptor() throws IOException {
577     return new UnmodifyableHTableDescriptor(
578       this.connection.getHTableDescriptor(this.tableName));
579   }
580 
581   /**
582    * Gets the starting row key for every region in the currently open table.
583    * <p>
584    * This is mainly useful for the MapReduce integration.
585    * @return Array of region starting row keys
586    * @throws IOException if a remote or network exception occurs
587    */
588   public byte [][] getStartKeys() throws IOException {
589     return getStartEndKeys().getFirst();
590   }
591 
592   /**
593    * Gets the ending row key for every region in the currently open table.
594    * <p>
595    * This is mainly useful for the MapReduce integration.
596    * @return Array of region ending row keys
597    * @throws IOException if a remote or network exception occurs
598    */
599   public byte[][] getEndKeys() throws IOException {
600     return getStartEndKeys().getSecond();
601   }
602 
603   /**
604    * Gets the starting and ending row keys for every region in the currently
605    * open table.
606    * <p>
607    * This is mainly useful for the MapReduce integration.
608    * @return Pair of arrays of region starting and ending row keys
609    * @throws IOException if a remote or network exception occurs
610    */
611   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
612     NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
613     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
614     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
615 
616     for (HRegionInfo region : regions.keySet()) {
617       startKeyList.add(region.getStartKey());
618       endKeyList.add(region.getEndKey());
619     }
620 
621     return new Pair<byte [][], byte [][]>(
622       startKeyList.toArray(new byte[startKeyList.size()][]),
623       endKeyList.toArray(new byte[endKeyList.size()][]));
624   }
625 
626   /**
627    * Gets all the regions and their address for this table.
628    * <p>
629    * This is mainly useful for the MapReduce integration.
630    * @return A map of HRegionInfo with it's server address
631    * @throws IOException if a remote or network exception occurs
632    */
633   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
634     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocation, singular, returns an HRegionLocation.
635     return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName(), false);
636   }
637 
638   /**
639    * Get the corresponding regions for an arbitrary range of keys.
640    * <p>
641    * @param startKey Starting row in range, inclusive
642    * @param endKey Ending row in range, exclusive
643    * @return A list of HRegionLocations corresponding to the regions that
644    * contain the specified range
645    * @throws IOException if a remote or network exception occurs
646    */
647   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
648     final byte [] endKey) throws IOException {
649     return getRegionsInRange(startKey, endKey, false);
650   }
651 
652   /**
653    * Get the corresponding regions for an arbitrary range of keys.
654    * <p>
655    * @param startKey Starting row in range, inclusive
656    * @param endKey Ending row in range, exclusive
657    * @param reload true to reload information or false to use cached information
658    * @return A list of HRegionLocations corresponding to the regions that
659    * contain the specified range
660    * @throws IOException if a remote or network exception occurs
661    */
662   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
663       final byte [] endKey, final boolean reload) throws IOException {
664     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
665   }
666 
667   /**
668    * Get the corresponding start keys and regions for an arbitrary range of
669    * keys.
670    * <p>
671    * @param startKey Starting row in range, inclusive
672    * @param endKey Ending row in range
673    * @param includeEndKey true if endRow is inclusive, false if exclusive
674    * @return A pair of list of start keys and list of HRegionLocations that
675    *         contain the specified range
676    * @throws IOException if a remote or network exception occurs
677    */
678   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
679       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
680       throws IOException {
681     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
682   }
683 
684   /**
685    * Get the corresponding start keys and regions for an arbitrary range of
686    * keys.
687    * <p>
688    * @param startKey Starting row in range, inclusive
689    * @param endKey Ending row in range
690    * @param includeEndKey true if endRow is inclusive, false if exclusive
691    * @param reload true to reload information or false to use cached information
692    * @return A pair of list of start keys and list of HRegionLocations that
693    *         contain the specified range
694    * @throws IOException if a remote or network exception occurs
695    */
696   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
697       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
698       final boolean reload) throws IOException {
699     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
700     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
701       throw new IllegalArgumentException(
702         "Invalid range: " + Bytes.toStringBinary(startKey) +
703         " > " + Bytes.toStringBinary(endKey));
704     }
705     List<byte[]> keysInRange = new ArrayList<byte[]>();
706     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
707     byte[] currentKey = startKey;
708     do {
709       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
710       keysInRange.add(currentKey);
711       regionsInRange.add(regionLocation);
712       currentKey = regionLocation.getRegionInfo().getEndKey();
713     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
714         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
715             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
716     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
717         regionsInRange);
718   }
719 
720   /**
721    * {@inheritDoc}
722    * @deprecated Use reversed scan instead.
723    */
724    @Override
725    @Deprecated
726    public Result getRowOrBefore(final byte[] row, final byte[] family)
727        throws IOException {
728      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
729          tableName, row) {
730        public Result call(int callTimeout) throws IOException {
731          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
732          controller.setPriority(tableName);
733          controller.setCallTimeout(callTimeout);
734          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
735              getLocation().getRegionInfo().getRegionName(), row, family);
736          try {
737            ClientProtos.GetResponse response = getStub().get(controller, request);
738            if (!response.hasResult()) return null;
739            return ProtobufUtil.toResult(response.getResult());
740          } catch (ServiceException se) {
741            throw ProtobufUtil.getRemoteException(se);
742          }
743        }
744      };
745      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
746    }
747 
748    /**
749     * {@inheritDoc}
750     */
751   @Override
752   public ResultScanner getScanner(final Scan scan) throws IOException {
753     if (scan.getCaching() <= 0) {
754       scan.setCaching(getScannerCaching());
755     }
756 
757     if (scan.isReversed()) {
758       if (scan.isSmall()) {
759         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
760             this.connection);
761       } else {
762         return new ReversedClientScanner(getConfiguration(), scan, getName(),
763             this.connection);
764       }
765     }
766 
767     if (scan.isSmall()) {
768       return new ClientSmallScanner(getConfiguration(), scan, getName(),
769           this.connection, this.rpcCallerFactory, this.rpcControllerFactory);
770     } else {
771       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
772           this.rpcCallerFactory, this.rpcControllerFactory);
773     }
774   }
775 
776   /**
777    * {@inheritDoc}
778    */
779   @Override
780   public ResultScanner getScanner(byte [] family) throws IOException {
781     Scan scan = new Scan();
782     scan.addFamily(family);
783     return getScanner(scan);
784   }
785 
786   /**
787    * {@inheritDoc}
788    */
789   @Override
790   public ResultScanner getScanner(byte [] family, byte [] qualifier)
791   throws IOException {
792     Scan scan = new Scan();
793     scan.addColumn(family, qualifier);
794     return getScanner(scan);
795   }
796 
797   /**
798    * {@inheritDoc}
799    */
800   @Override
801   public Result get(final Get get) throws IOException {
802     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
803         getName(), get.getRow()) {
804       public Result call(int callTimeout) throws IOException {
805         ClientProtos.GetRequest request =
806             RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
807         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
808         controller.setPriority(tableName);
809         controller.setCallTimeout(callTimeout);
810         try {
811           ClientProtos.GetResponse response = getStub().get(controller, request);
812           if (response == null) return null;
813           return ProtobufUtil.toResult(response.getResult());
814         } catch (ServiceException se) {
815           throw ProtobufUtil.getRemoteException(se);
816         }
817       }
818     };
819     return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
820   }
821 
822   /**
823    * {@inheritDoc}
824    */
825   @Override
826   public Result[] get(List<Get> gets) throws IOException {
827     if (gets.size() == 1) {
828       return new Result[]{get(gets.get(0))};
829     }
830     try {
831       Object [] r1 = batch((List)gets);
832 
833       // translate.
834       Result [] results = new Result[r1.length];
835       int i=0;
836       for (Object o : r1) {
837         // batch ensures if there is a failure we get an exception instead
838         results[i++] = (Result) o;
839       }
840 
841       return results;
842     } catch (InterruptedException e) {
843       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
844     }
845   }
846 
847   /**
848    * {@inheritDoc}
849    */
850   @Override
851   public void batch(final List<? extends Row> actions, final Object[] results)
852       throws InterruptedException, IOException {
853     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
854     ars.waitUntilDone();
855     if (ars.hasError()) {
856       throw ars.getErrors();
857     }
858   }
859 
860   /**
861    * {@inheritDoc}
862    * @deprecated If any exception is thrown by one of the actions, there is no way to
863    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
864    */
865   @Override
866   public Object[] batch(final List<? extends Row> actions)
867      throws InterruptedException, IOException {
868     Object[] results = new Object[actions.size()];
869     batch(actions, results);
870     return results;
871   }
872 
873   /**
874    * {@inheritDoc}
875    */
876   @Override
877   public <R> void batchCallback(
878       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
879       throws IOException, InterruptedException {
880     connection.processBatchCallback(actions, tableName, pool, results, callback);
881   }
882 
883   /**
884    * {@inheritDoc}
885    * @deprecated If any exception is thrown by one of the actions, there is no way to
886    * retrieve the partially executed results. Use
887    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
888    * instead.
889    */
890   @Override
891   public <R> Object[] batchCallback(
892     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
893       InterruptedException {
894     Object[] results = new Object[actions.size()];
895     batchCallback(actions, results, callback);
896     return results;
897   }
898 
899   /**
900    * {@inheritDoc}
901    */
902   @Override
903   public void delete(final Delete delete)
904   throws IOException {
905     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
906         tableName, delete.getRow()) {
907       public Boolean call(int callTimeout) throws IOException {
908         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
909         controller.setPriority(tableName);
910         controller.setCallTimeout(callTimeout);
911 
912         try {
913           MutateRequest request = RequestConverter.buildMutateRequest(
914             getLocation().getRegionInfo().getRegionName(), delete);
915           MutateResponse response = getStub().mutate(controller, request);
916           return Boolean.valueOf(response.getProcessed());
917         } catch (ServiceException se) {
918           throw ProtobufUtil.getRemoteException(se);
919         }
920       }
921     };
922     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
923   }
924 
925   /**
926    * {@inheritDoc}
927    */
928   @Override
929   public void delete(final List<Delete> deletes)
930   throws IOException {
931     Object[] results = new Object[deletes.size()];
932     try {
933       batch(deletes, results);
934     } catch (InterruptedException e) {
935       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
936     } finally {
937       // mutate list so that it is empty for complete success, or contains only failed records
938       // results are returned in the same order as the requests in list
939       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
940       for (int i = results.length - 1; i>=0; i--) {
941         // if result is not null, it succeeded
942         if (results[i] instanceof Result) {
943           deletes.remove(i);
944         }
945       }
946     }
947   }
948 
949   /**
950    * {@inheritDoc}
951    */
952   @Override
953   public void put(final Put put)
954       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
955     doPut(put);
956     if (autoFlush) {
957       flushCommits();
958     }
959   }
960 
961   /**
962    * {@inheritDoc}
963    */
964   @Override
965   public void put(final List<Put> puts)
966       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
967     for (Put put : puts) {
968       doPut(put);
969     }
970     if (autoFlush) {
971       flushCommits();
972     }
973   }
974 
975 
976   /**
977    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
978    *  cluster.
979    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
980    * @throws InterruptedIOException if we were interrupted.
981    */
982   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
983     // This behavior is highly non-intuitive... it does not protect us against
984     // 94-incompatible behavior, which is a timing issue because hasError, the below code
985     // and setter of hasError are not synchronized. Perhaps it should be removed.
986     if (ap.hasError()) {
987       writeAsyncBuffer.add(put);
988       backgroundFlushCommits(true);
989     }
990 
991     validatePut(put);
992 
993     currentWriteBufferSize += put.heapSize();
994     writeAsyncBuffer.add(put);
995 
996     while (currentWriteBufferSize > writeBufferSize) {
997       backgroundFlushCommits(false);
998     }
999   }
1000 
1001 
1002   /**
1003    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
1004    * If the is an error (max retried reach from a previous flush or bad operation), it tries to
1005    * send all operations in the buffer and sends an exception.
1006    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
1007    *                     returning.
1008    */
1009   private void backgroundFlushCommits(boolean synchronous) throws
1010       InterruptedIOException, RetriesExhaustedWithDetailsException {
1011 
1012     try {
1013       if (!synchronous) {
1014         ap.submit(tableName, writeAsyncBuffer, true, null, false);
1015         if (ap.hasError()) {
1016           LOG.debug(tableName + ": One or more of the operations have failed -" +
1017               " waiting for all operation in progress to finish (successfully or not)");
1018         }
1019       }
1020       if (synchronous || ap.hasError()) {
1021         while (!writeAsyncBuffer.isEmpty()) {
1022           ap.submit(tableName, writeAsyncBuffer, true, null, false);
1023         }
1024         List<Row> failedRows = clearBufferOnFail ? null : writeAsyncBuffer;
1025         RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(failedRows);
1026         if (error != null) {
1027           throw error;
1028         }
1029       }
1030     } finally {
1031       currentWriteBufferSize = 0;
1032       for (Row mut : writeAsyncBuffer) {
1033         if (mut instanceof Mutation) {
1034           currentWriteBufferSize += ((Mutation) mut).heapSize();
1035         }
1036       }
1037     }
1038   }
1039 
1040   /**
1041    * {@inheritDoc}
1042    */
1043   @Override
1044   public void mutateRow(final RowMutations rm) throws IOException {
1045     RegionServerCallable<Void> callable =
1046         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1047       public Void call(int callTimeout) throws IOException {
1048         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1049         controller.setPriority(tableName);
1050         controller.setCallTimeout(callTimeout);
1051         try {
1052           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1053             getLocation().getRegionInfo().getRegionName(), rm);
1054           regionMutationBuilder.setAtomic(true);
1055           MultiRequest request =
1056             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1057           getStub().multi(controller, request);
1058         } catch (ServiceException se) {
1059           throw ProtobufUtil.getRemoteException(se);
1060         }
1061         return null;
1062       }
1063     };
1064     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1065   }
1066 
1067   /**
1068    * {@inheritDoc}
1069    */
1070   @Override
1071   public Result append(final Append append) throws IOException {
1072     if (append.numFamilies() == 0) {
1073       throw new IOException(
1074           "Invalid arguments to append, no columns specified");
1075     }
1076 
1077     NonceGenerator ng = this.connection.getNonceGenerator();
1078     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1079     RegionServerCallable<Result> callable =
1080       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1081         public Result call(int callTimeout) throws IOException {
1082           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1083           controller.setPriority(getTableName());
1084           controller.setCallTimeout(callTimeout);
1085           try {
1086             MutateRequest request = RequestConverter.buildMutateRequest(
1087               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1088             MutateResponse response = getStub().mutate(controller, request);
1089             if (!response.hasResult()) return null;
1090             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1091           } catch (ServiceException se) {
1092             throw ProtobufUtil.getRemoteException(se);
1093           }
1094         }
1095       };
1096     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1097   }
1098 
1099   /**
1100    * {@inheritDoc}
1101    */
1102   @Override
1103   public Result increment(final Increment increment) throws IOException {
1104     if (!increment.hasFamilies()) {
1105       throw new IOException(
1106           "Invalid arguments to increment, no columns specified");
1107     }
1108     NonceGenerator ng = this.connection.getNonceGenerator();
1109     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1110     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1111         getName(), increment.getRow()) {
1112       public Result call(int callTimeout) throws IOException {
1113         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1114         controller.setPriority(getTableName());
1115         controller.setCallTimeout(callTimeout);
1116         try {
1117           MutateRequest request = RequestConverter.buildMutateRequest(
1118             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1119           MutateResponse response = getStub().mutate(controller, request);
1120           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1121         } catch (ServiceException se) {
1122           throw ProtobufUtil.getRemoteException(se);
1123         }
1124       }
1125     };
1126     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1127   }
1128 
1129   /**
1130    * {@inheritDoc}
1131    */
1132   @Override
1133   public long incrementColumnValue(final byte [] row, final byte [] family,
1134       final byte [] qualifier, final long amount)
1135   throws IOException {
1136     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1137   }
1138 
1139   /**
1140    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1141    */
1142   @Deprecated
1143   @Override
1144   public long incrementColumnValue(final byte [] row, final byte [] family,
1145       final byte [] qualifier, final long amount, final boolean writeToWAL)
1146   throws IOException {
1147     return incrementColumnValue(row, family, qualifier, amount,
1148       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1149   }
1150 
1151   /**
1152    * {@inheritDoc}
1153    */
1154   @Override
1155   public long incrementColumnValue(final byte [] row, final byte [] family,
1156       final byte [] qualifier, final long amount, final Durability durability)
1157   throws IOException {
1158     NullPointerException npe = null;
1159     if (row == null) {
1160       npe = new NullPointerException("row is null");
1161     } else if (family == null) {
1162       npe = new NullPointerException("family is null");
1163     } else if (qualifier == null) {
1164       npe = new NullPointerException("qualifier is null");
1165     }
1166     if (npe != null) {
1167       throw new IOException(
1168           "Invalid arguments to incrementColumnValue", npe);
1169     }
1170 
1171     NonceGenerator ng = this.connection.getNonceGenerator();
1172     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1173     RegionServerCallable<Long> callable =
1174       new RegionServerCallable<Long>(connection, getName(), row) {
1175         public Long call(int callTimeout) throws IOException {
1176           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1177           controller.setPriority(getTableName());
1178           controller.setCallTimeout(callTimeout);
1179           try {
1180             MutateRequest request = RequestConverter.buildIncrementRequest(
1181               getLocation().getRegionInfo().getRegionName(), row, family,
1182               qualifier, amount, durability, nonceGroup, nonce);
1183             MutateResponse response = getStub().mutate(controller, request);
1184             Result result =
1185               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1186             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1187           } catch (ServiceException se) {
1188             throw ProtobufUtil.getRemoteException(se);
1189           }
1190         }
1191       };
1192     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1193   }
1194 
1195   /**
1196    * {@inheritDoc}
1197    */
1198   @Override
1199   public boolean checkAndPut(final byte [] row,
1200       final byte [] family, final byte [] qualifier, final byte [] value,
1201       final Put put)
1202   throws IOException {
1203     RegionServerCallable<Boolean> callable =
1204       new RegionServerCallable<Boolean>(connection, getName(), row) {
1205         public Boolean call(int callTimeout) throws IOException {
1206           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1207           controller.setPriority(tableName);
1208           controller.setCallTimeout(callTimeout);
1209           try {
1210             MutateRequest request = RequestConverter.buildMutateRequest(
1211               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1212                 new BinaryComparator(value), CompareType.EQUAL, put);
1213             MutateResponse response = getStub().mutate(controller, request);
1214             return Boolean.valueOf(response.getProcessed());
1215           } catch (ServiceException se) {
1216             throw ProtobufUtil.getRemoteException(se);
1217           }
1218         }
1219       };
1220     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1221   }
1222 
1223   /**
1224    * {@inheritDoc}
1225    */
1226   @Override
1227   public boolean checkAndPut(final byte [] row, final byte [] family,
1228       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1229       final Put put)
1230   throws IOException {
1231     RegionServerCallable<Boolean> callable =
1232       new RegionServerCallable<Boolean>(connection, getName(), row) {
1233         public Boolean call(int callTimeout) throws IOException {
1234           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1235           controller.setPriority(tableName);
1236           controller.setCallTimeout(callTimeout);
1237           try {
1238             CompareType compareType = CompareType.valueOf(compareOp.name());
1239             MutateRequest request = RequestConverter.buildMutateRequest(
1240               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1241                 new BinaryComparator(value), compareType, put);
1242             MutateResponse response = getStub().mutate(controller, request);
1243             return Boolean.valueOf(response.getProcessed());
1244           } catch (ServiceException se) {
1245             throw ProtobufUtil.getRemoteException(se);
1246           }
1247         }
1248       };
1249     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1250   }
1251 
1252   /**
1253    * {@inheritDoc}
1254    */
1255   @Override
1256   public boolean checkAndDelete(final byte [] row,
1257       final byte [] family, final byte [] qualifier, final byte [] value,
1258       final Delete delete)
1259   throws IOException {
1260     RegionServerCallable<Boolean> callable =
1261       new RegionServerCallable<Boolean>(connection, getName(), row) {
1262         public Boolean call(int callTimeout) throws IOException {
1263           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1264           controller.setPriority(tableName);
1265           controller.setCallTimeout(callTimeout);
1266           try {
1267             MutateRequest request = RequestConverter.buildMutateRequest(
1268               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1269                 new BinaryComparator(value), CompareType.EQUAL, delete);
1270             MutateResponse response = getStub().mutate(controller, request);
1271             return Boolean.valueOf(response.getProcessed());
1272           } catch (ServiceException se) {
1273             throw ProtobufUtil.getRemoteException(se);
1274           }
1275         }
1276       };
1277     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1278   }
1279 
1280   /**
1281    * {@inheritDoc}
1282    */
1283   @Override
1284   public boolean checkAndDelete(final byte [] row, final byte [] family,
1285       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1286       final Delete delete)
1287   throws IOException {
1288     RegionServerCallable<Boolean> callable =
1289       new RegionServerCallable<Boolean>(connection, getName(), row) {
1290         public Boolean call(int callTimeout) throws IOException {
1291           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1292           controller.setPriority(tableName);
1293           controller.setCallTimeout(callTimeout);
1294           try {
1295             CompareType compareType = CompareType.valueOf(compareOp.name());
1296             MutateRequest request = RequestConverter.buildMutateRequest(
1297               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1298                 new BinaryComparator(value), compareType, delete);
1299             MutateResponse response = getStub().mutate(controller, request);
1300             return Boolean.valueOf(response.getProcessed());
1301           } catch (ServiceException se) {
1302             throw ProtobufUtil.getRemoteException(se);
1303           }
1304         }
1305       };
1306     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1307   }
1308 
1309   /**
1310    * {@inheritDoc}
1311    */
1312   @Override
1313   public boolean exists(final Get get) throws IOException {
1314     get.setCheckExistenceOnly(true);
1315     Result r = get(get);
1316     assert r.getExists() != null;
1317     return r.getExists();
1318   }
1319 
1320   /**
1321    * {@inheritDoc}
1322    */
1323   @Override
1324   public Boolean[] exists(final List<Get> gets) throws IOException {
1325     if (gets.isEmpty()) return new Boolean[]{};
1326     if (gets.size() == 1) return new Boolean[]{exists(gets.get(0))};
1327 
1328     for (Get g: gets){
1329       g.setCheckExistenceOnly(true);
1330     }
1331 
1332     Object[] r1;
1333     try {
1334       r1 = batch(gets);
1335     } catch (InterruptedException e) {
1336       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1337     }
1338 
1339     // translate.
1340     Boolean[] results = new Boolean[r1.length];
1341     int i = 0;
1342     for (Object o : r1) {
1343       // batch ensures if there is a failure we get an exception instead
1344       results[i++] = ((Result)o).getExists();
1345     }
1346 
1347     return results;
1348   }
1349 
1350   /**
1351    * {@inheritDoc}
1352    */
1353   @Override
1354   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1355     // As we can have an operation in progress even if the buffer is empty, we call
1356     //  backgroundFlushCommits at least one time.
1357     backgroundFlushCommits(true);
1358   }
1359 
1360   /**
1361    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1362    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1363    *
1364    * @param list The collection of actions.
1365    * @param results An empty array, same size as list. If an exception is thrown,
1366    * you can test here for partial results, and to determine which actions
1367    * processed successfully.
1368    * @throws IOException if there are problems talking to META. Per-item
1369    * exceptions are stored in the results array.
1370    */
1371   public <R> void processBatchCallback(
1372     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1373     throws IOException, InterruptedException {
1374     this.batchCallback(list, results, callback);
1375   }
1376 
1377 
1378   /**
1379    * Parameterized batch processing, allowing varying return types for different
1380    * {@link Row} implementations.
1381    */
1382   public void processBatch(final List<? extends Row> list, final Object[] results)
1383     throws IOException, InterruptedException {
1384     this.batch(list, results);
1385   }
1386 
1387 
1388   @Override
1389   public void close() throws IOException {
1390     if (this.closed) {
1391       return;
1392     }
1393     flushCommits();
1394     if (cleanupPoolOnClose) {
1395       this.pool.shutdown();
1396     }
1397     if (cleanupConnectionOnClose) {
1398       if (this.connection != null) {
1399         this.connection.close();
1400       }
1401     }
1402     this.closed = true;
1403   }
1404 
1405   // validate for well-formedness
1406   public void validatePut(final Put put) throws IllegalArgumentException{
1407     if (put.isEmpty()) {
1408       throw new IllegalArgumentException("No columns to insert");
1409     }
1410     if (maxKeyValueSize > 0) {
1411       for (List<Cell> list : put.getFamilyCellMap().values()) {
1412         for (Cell cell : list) {
1413           // KeyValue v1 expectation.  Cast for now.
1414           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1415           if (kv.getLength() > maxKeyValueSize) {
1416             throw new IllegalArgumentException("KeyValue size too large");
1417           }
1418         }
1419       }
1420     }
1421   }
1422 
1423   /**
1424    * {@inheritDoc}
1425    */
1426   @Override
1427   public boolean isAutoFlush() {
1428     return autoFlush;
1429   }
1430 
1431   /**
1432    * {@inheritDoc}
1433    */
1434   @Deprecated
1435   @Override
1436   public void setAutoFlush(boolean autoFlush) {
1437     setAutoFlush(autoFlush, autoFlush);
1438   }
1439 
1440   /**
1441    * {@inheritDoc}
1442    */
1443   @Override
1444   public void setAutoFlushTo(boolean autoFlush) {
1445     setAutoFlush(autoFlush, clearBufferOnFail);
1446   }
1447 
1448   /**
1449    * {@inheritDoc}
1450    */
1451   @Override
1452   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1453     this.autoFlush = autoFlush;
1454     this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1455   }
1456 
1457   /**
1458    * Returns the maximum size in bytes of the write buffer for this HTable.
1459    * <p>
1460    * The default value comes from the configuration parameter
1461    * {@code hbase.client.write.buffer}.
1462    * @return The size of the write buffer in bytes.
1463    */
1464   @Override
1465   public long getWriteBufferSize() {
1466     return writeBufferSize;
1467   }
1468 
1469   /**
1470    * Sets the size of the buffer in bytes.
1471    * <p>
1472    * If the new size is less than the current amount of data in the
1473    * write buffer, the buffer gets flushed.
1474    * @param writeBufferSize The new write buffer size, in bytes.
1475    * @throws IOException if a remote or network exception occurs.
1476    */
1477   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1478     this.writeBufferSize = writeBufferSize;
1479     if(currentWriteBufferSize > writeBufferSize) {
1480       flushCommits();
1481     }
1482   }
1483 
1484   /**
1485    * The pool is used for mutli requests for this HTable
1486    * @return the pool used for mutli
1487    */
1488   ExecutorService getPool() {
1489     return this.pool;
1490   }
1491 
1492   /**
1493    * Enable or disable region cache prefetch for the table. It will be
1494    * applied for the given table's all HTable instances who share the same
1495    * connection. By default, the cache prefetch is enabled.
1496    * @param tableName name of table to configure.
1497    * @param enable Set to true to enable region cache prefetch. Or set to
1498    * false to disable it.
1499    * @throws IOException
1500    * @deprecated does nothing since 0.99
1501    */
1502   @Deprecated
1503   public static void setRegionCachePrefetch(final byte[] tableName,
1504       final boolean enable)  throws IOException {
1505   }
1506 
1507   /**
1508    * @deprecated does nothing since 0.99
1509    */
1510   @Deprecated
1511   public static void setRegionCachePrefetch(
1512       final TableName tableName,
1513       final boolean enable) throws IOException {
1514   }
1515 
1516   /**
1517    * Enable or disable region cache prefetch for the table. It will be
1518    * applied for the given table's all HTable instances who share the same
1519    * connection. By default, the cache prefetch is enabled.
1520    * @param conf The Configuration object to use.
1521    * @param tableName name of table to configure.
1522    * @param enable Set to true to enable region cache prefetch. Or set to
1523    * false to disable it.
1524    * @throws IOException
1525    * @deprecated does nothing since 0.99
1526    */
1527   @Deprecated
1528   public static void setRegionCachePrefetch(final Configuration conf,
1529       final byte[] tableName, final boolean enable) throws IOException {
1530   }
1531 
1532   /**
1533    * @deprecated does nothing since 0.99
1534    */
1535   @Deprecated
1536   public static void setRegionCachePrefetch(final Configuration conf,
1537       final TableName tableName,
1538       final boolean enable) throws IOException {
1539   }
1540 
1541   /**
1542    * Check whether region cache prefetch is enabled or not for the table.
1543    * @param conf The Configuration object to use.
1544    * @param tableName name of table to check
1545    * @return true if table's region cache prefecth is enabled. Otherwise
1546    * it is disabled.
1547    * @throws IOException
1548    * @deprecated always return false since 0.99
1549    */
1550   @Deprecated
1551   public static boolean getRegionCachePrefetch(final Configuration conf,
1552       final byte[] tableName) throws IOException {
1553     return false;
1554   }
1555 
1556   /**
1557    * @deprecated always return false since 0.99
1558    */
1559   @Deprecated
1560   public static boolean getRegionCachePrefetch(final Configuration conf,
1561       final TableName tableName) throws IOException {
1562     return false;
1563   }
1564 
1565   /**
1566    * Check whether region cache prefetch is enabled or not for the table.
1567    * @param tableName name of table to check
1568    * @return true if table's region cache prefecth is enabled. Otherwise
1569    * it is disabled.
1570    * @throws IOException
1571    * @deprecated always return false since 0.99
1572    */
1573   @Deprecated
1574   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1575     return false;
1576   }
1577 
1578   /**
1579    * @deprecated always return false since 0.99
1580    */
1581   @Deprecated
1582   public static boolean getRegionCachePrefetch(
1583       final TableName tableName) throws IOException {
1584     return false;
1585   }
1586 
1587   /**
1588    * Explicitly clears the region cache to fetch the latest value from META.
1589    * This is a power user function: avoid unless you know the ramifications.
1590    */
1591   public void clearRegionCache() {
1592     this.connection.clearRegionCache();
1593   }
1594 
1595   /**
1596    * {@inheritDoc}
1597    */
1598   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1599     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1600   }
1601 
1602   /**
1603    * {@inheritDoc}
1604    */
1605   @Override
1606   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1607       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1608       throws ServiceException, Throwable {
1609     final Map<byte[],R> results =  Collections.synchronizedMap(
1610         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1611     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1612       public void update(byte[] region, byte[] row, R value) {
1613         if (region != null) {
1614           results.put(region, value);
1615         }
1616       }
1617     });
1618     return results;
1619   }
1620 
1621   /**
1622    * {@inheritDoc}
1623    */
1624   @Override
1625   public <T extends Service, R> void coprocessorService(final Class<T> service,
1626       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1627       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1628 
1629     // get regions covered by the row range
1630     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1631 
1632     Map<byte[],Future<R>> futures =
1633         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1634     for (final byte[] r : keys) {
1635       final RegionCoprocessorRpcChannel channel =
1636           new RegionCoprocessorRpcChannel(connection, tableName, r);
1637       Future<R> future = pool.submit(
1638           new Callable<R>() {
1639             public R call() throws Exception {
1640               T instance = ProtobufUtil.newServiceStub(service, channel);
1641               R result = callable.call(instance);
1642               byte[] region = channel.getLastRegion();
1643               if (callback != null) {
1644                 callback.update(region, r, result);
1645               }
1646               return result;
1647             }
1648           });
1649       futures.put(r, future);
1650     }
1651     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1652       try {
1653         e.getValue().get();
1654       } catch (ExecutionException ee) {
1655         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1656             + Bytes.toStringBinary(e.getKey()), ee);
1657         throw ee.getCause();
1658       } catch (InterruptedException ie) {
1659         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1660             + " for row " + Bytes.toStringBinary(e.getKey()))
1661             .initCause(ie);
1662       }
1663     }
1664   }
1665 
1666   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1667   throws IOException {
1668     if (start == null) {
1669       start = HConstants.EMPTY_START_ROW;
1670     }
1671     if (end == null) {
1672       end = HConstants.EMPTY_END_ROW;
1673     }
1674     return getKeysAndRegionsInRange(start, end, true).getFirst();
1675   }
1676 
1677   public void setOperationTimeout(int operationTimeout) {
1678     this.operationTimeout = operationTimeout;
1679   }
1680 
1681   public int getOperationTimeout() {
1682     return operationTimeout;
1683   }
1684 
1685   @Override
1686   public String toString() {
1687     return tableName + ";" + connection;
1688   }
1689 
1690   /**
1691    * Run basic test.
1692    * @param args Pass table name and row and will get the content.
1693    * @throws IOException
1694    */
1695   public static void main(String[] args) throws IOException {
1696     HTable t = new HTable(HBaseConfiguration.create(), args[0]);
1697     try {
1698       System.out.println(t.get(new Get(Bytes.toBytes(args[1]))));
1699     } finally {
1700       t.close();
1701     }
1702   }
1703 
1704   /**
1705    * {@inheritDoc}
1706    */
1707   @Override
1708   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1709       Descriptors.MethodDescriptor methodDescriptor, Message request,
1710       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1711     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1712         Bytes.BYTES_COMPARATOR));
1713     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1714         new Callback<R>() {
1715 
1716           @Override
1717           public void update(byte[] region, byte[] row, R result) {
1718             if (region != null) {
1719               results.put(region, result);
1720             }
1721           }
1722         });
1723     return results;
1724   }
1725 
1726   /**
1727    * {@inheritDoc}
1728    */
1729   @Override
1730   public <R extends Message> void batchCoprocessorService(
1731       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1732       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1733       throws ServiceException, Throwable {
1734 
1735     // get regions covered by the row range
1736     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1737         getKeysAndRegionsInRange(startKey, endKey, true);
1738     List<byte[]> keys = keysAndRegions.getFirst();
1739     List<HRegionLocation> regions = keysAndRegions.getSecond();
1740 
1741     // check if we have any calls to make
1742     if (keys.isEmpty()) {
1743       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1744           ", end=" + Bytes.toStringBinary(endKey));
1745       return;
1746     }
1747 
1748     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1749     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1750         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1751     for (int i = 0; i < keys.size(); i++) {
1752       final byte[] rowKey = keys.get(i);
1753       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1754       RegionCoprocessorServiceExec exec =
1755           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1756       execs.add(exec);
1757       execsByRow.put(rowKey, exec);
1758     }
1759 
1760     // tracking for any possible deserialization errors on success callback
1761     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1762     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1763     final List<Row> callbackErrorActions = new ArrayList<Row>();
1764     final List<String> callbackErrorServers = new ArrayList<String>();
1765     Object[] results = new Object[execs.size()];
1766 
1767     AsyncProcess asyncProcess =
1768         new AsyncProcess(connection, configuration, pool,
1769             RpcRetryingCallerFactory.instantiate(configuration), true,
1770             RpcControllerFactory.instantiate(configuration));
1771     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1772         new Callback<ClientProtos.CoprocessorServiceResult>() {
1773           @Override
1774           public void update(byte[] region, byte[] row,
1775                               ClientProtos.CoprocessorServiceResult serviceResult) {
1776             if (LOG.isTraceEnabled()) {
1777               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1778                   ": region=" + Bytes.toStringBinary(region) +
1779                   ", row=" + Bytes.toStringBinary(row) +
1780                   ", value=" + serviceResult.getValue().getValue());
1781             }
1782             try {
1783               callback.update(region, row,
1784                   (R) responsePrototype.newBuilderForType().mergeFrom(
1785                       serviceResult.getValue().getValue()).build());
1786             } catch (InvalidProtocolBufferException e) {
1787               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1788                   e);
1789               callbackErrorExceptions.add(e);
1790               callbackErrorActions.add(execsByRow.get(row));
1791               callbackErrorServers.add("null");
1792             }
1793           }
1794         }, results);
1795 
1796     future.waitUntilDone();
1797 
1798     if (future.hasError()) {
1799       throw future.getErrors();
1800     } else if (!callbackErrorExceptions.isEmpty()) {
1801       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1802           callbackErrorServers);
1803     }
1804   }
1805 }