View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import com.google.protobuf.InvalidProtocolBufferException;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.classification.InterfaceStability;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.KeyValueUtil;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
57  import org.apache.hadoop.hbase.client.coprocessor.Batch;
58  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
59  import org.apache.hadoop.hbase.filter.BinaryComparator;
60  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
61  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
62  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
63  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
64  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
65  import org.apache.hadoop.hbase.protobuf.RequestConverter;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
70  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
71  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
72  import org.apache.hadoop.hbase.util.Bytes;
73  import org.apache.hadoop.hbase.util.Pair;
74  import org.apache.hadoop.hbase.util.Threads;
75  
76  import com.google.protobuf.Descriptors;
77  import com.google.protobuf.Message;
78  import com.google.protobuf.Service;
79  import com.google.protobuf.ServiceException;
80  
81  import com.google.common.annotations.VisibleForTesting;
82  
83  /**
84   * <p>Used to communicate with a single HBase table.  An implementation of
85   * {@link HTableInterface}.  Instances of this class can be constructed directly but it is
86   * encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}.
87   * See {@link HConnectionManager} class comment for an example.
88   *
89   * <p>This class is not thread safe for reads nor write.
90   *
91   * <p>In case of writes (Put, Delete), the underlying write buffer can
92   * be corrupted if multiple threads contend over a single HTable instance.
93   *
94   * <p>In case of reads, some fields used by a Scan are shared among all threads.
95   * The HTable implementation can either not contract to be safe in case of a Get
96   *
97   * <p>Instances of HTable passed the same {@link Configuration} instance will
98   * share connections to servers out on the cluster and to the zookeeper ensemble
99   * as well as caches of region locations.  This is usually a *good* thing and it
100  * is recommended to reuse the same configuration object for all your tables.
101  * This happens because they will all share the same underlying
102  * {@link HConnection} instance. See {@link HConnectionManager} for more on
103  * how this mechanism works.
104  *
105  * <p>{@link HConnection} will read most of the
106  * configuration it needs from the passed {@link Configuration} on initial
107  * construction.  Thereafter, for settings such as
108  * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
109  * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
110  * passed {@link Configuration} subsequent to {@link HConnection} construction
111  * will go unnoticed.  To run with changed values, make a new
112  * {@link HTable} passing a new {@link Configuration} instance that has the
113  * new configuration.
114  *
115  * <p>Note that this class implements the {@link Closeable} interface. When a
116  * HTable instance is no longer required, it *should* be closed in order to ensure
117  * that the underlying resources are promptly released. Please note that the close
118  * method can throw java.io.IOException that must be handled.
119  *
120  * @see HBaseAdmin for create, drop, list, enable and disable of tables.
121  * @see HConnection
122  * @see HConnectionManager
123  */
124 @InterfaceAudience.Public
125 @InterfaceStability.Stable
126 public class HTable implements HTableInterface {
127   private static final Log LOG = LogFactory.getLog(HTable.class);
128   protected ClusterConnection connection;
129   private final TableName tableName;
130   private volatile Configuration configuration;
131   protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
132   private long writeBufferSize;
133   private boolean clearBufferOnFail;
134   private boolean autoFlush;
135   protected long currentWriteBufferSize;
136   protected int scannerCaching;
137   private int maxKeyValueSize;
138   private ExecutorService pool;  // For Multi
139   private boolean closed;
140   private int operationTimeout;
141   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
142   private final boolean cleanupConnectionOnClose; // close the connection in close()
143 
144   /** The Async process for puts with autoflush set to false or multiputs */
145   protected AsyncProcess ap;
146   /** The Async process for batch */
147   protected AsyncProcess multiAp;
148   private RpcRetryingCallerFactory rpcCallerFactory;
149 
150   /**
151    * Creates an object to access a HBase table.
152    * Shares zookeeper connection and other resources with other HTable instances
153    * created with the same <code>conf</code> instance.  Uses already-populated
154    * region cache if one is available, populated by any other HTable instances
155    * sharing this <code>conf</code> instance.  Recommended.
156    * @param conf Configuration object to use.
157    * @param tableName Name of the table.
158    * @throws IOException if a remote or network exception occurs
159    */
160   public HTable(Configuration conf, final String tableName)
161   throws IOException {
162     this(conf, TableName.valueOf(tableName));
163   }
164 
165   /**
166    * Creates an object to access a HBase table.
167    * Shares zookeeper connection and other resources with other HTable instances
168    * created with the same <code>conf</code> instance.  Uses already-populated
169    * region cache if one is available, populated by any other HTable instances
170    * sharing this <code>conf</code> instance.  Recommended.
171    * @param conf Configuration object to use.
172    * @param tableName Name of the table.
173    * @throws IOException if a remote or network exception occurs
174    */
175   public HTable(Configuration conf, final byte[] tableName)
176   throws IOException {
177     this(conf, TableName.valueOf(tableName));
178   }
179 
180 
181 
182   /**
183    * Creates an object to access a HBase table.
184    * Shares zookeeper connection and other resources with other HTable instances
185    * created with the same <code>conf</code> instance.  Uses already-populated
186    * region cache if one is available, populated by any other HTable instances
187    * sharing this <code>conf</code> instance.  Recommended.
188    * @param conf Configuration object to use.
189    * @param tableName table name pojo
190    * @throws IOException if a remote or network exception occurs
191    */
192   public HTable(Configuration conf, final TableName tableName)
193   throws IOException {
194     this.tableName = tableName;
195     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
196     if (conf == null) {
197       this.connection = null;
198       return;
199     }
200     this.connection = ConnectionManager.getConnectionInternal(conf);
201     this.configuration = conf;
202 
203     this.pool = getDefaultExecutor(conf);
204     this.finishSetup();
205   }
206 
207   /**
208    * Creates an object to access a HBase table. Shares zookeeper connection and other resources with
209    * other HTable instances created with the same <code>connection</code> instance. Use this
210    * constructor when the HConnection instance is externally managed.
211    * @param tableName Name of the table.
212    * @param connection HConnection to be used.
213    * @throws IOException if a remote or network exception occurs
214    * @deprecated Do not use.
215    */
216   @Deprecated
217   public HTable(TableName tableName, HConnection connection) throws IOException {
218     this.tableName = tableName;
219     this.cleanupPoolOnClose = true;
220     this.cleanupConnectionOnClose = false;
221     this.connection = (ClusterConnection)connection;
222     this.configuration = connection.getConfiguration();
223 
224     this.pool = getDefaultExecutor(this.configuration);
225     this.finishSetup();
226   }
227 
228   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
229     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
230     if (maxThreads == 0) {
231       maxThreads = 1; // is there a better default?
232     }
233     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
234 
235     // Using the "direct handoff" approach, new threads will only be created
236     // if it is necessary and will grow unbounded. This could be bad but in HCM
237     // we only create as many Runnables as there are region servers. It means
238     // it also scales when new region servers are added.
239     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
240         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
241     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
242     return pool;
243   }
244 
245   /**
246    * Creates an object to access a HBase table.
247    * Shares zookeeper connection and other resources with other HTable instances
248    * created with the same <code>conf</code> instance.  Uses already-populated
249    * region cache if one is available, populated by any other HTable instances
250    * sharing this <code>conf</code> instance.
251    * Use this constructor when the ExecutorService is externally managed.
252    * @param conf Configuration object to use.
253    * @param tableName Name of the table.
254    * @param pool ExecutorService to be used.
255    * @throws IOException if a remote or network exception occurs
256    */
257   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
258       throws IOException {
259     this(conf, TableName.valueOf(tableName), pool);
260   }
261 
262   /**
263    * Creates an object to access a HBase table.
264    * Shares zookeeper connection and other resources with other HTable instances
265    * created with the same <code>conf</code> instance.  Uses already-populated
266    * region cache if one is available, populated by any other HTable instances
267    * sharing this <code>conf</code> instance.
268    * Use this constructor when the ExecutorService is externally managed.
269    * @param conf Configuration object to use.
270    * @param tableName Name of the table.
271    * @param pool ExecutorService to be used.
272    * @throws IOException if a remote or network exception occurs
273    */
274   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
275       throws IOException {
276     this.connection = ConnectionManager.getConnectionInternal(conf);
277     this.configuration = conf;
278     this.pool = pool;
279     this.tableName = tableName;
280     this.cleanupPoolOnClose = false;
281     this.cleanupConnectionOnClose = true;
282 
283     this.finishSetup();
284   }
285 
286   /**
287    * Creates an object to access a HBase table.
288    * Shares zookeeper connection and other resources with other HTable instances
289    * created with the same <code>connection</code> instance.
290    * Use this constructor when the ExecutorService and HConnection instance are
291    * externally managed.
292    * @param tableName Name of the table.
293    * @param connection HConnection to be used.
294    * @param pool ExecutorService to be used.
295    * @throws IOException if a remote or network exception occurs.
296    * @deprecated Do not use, internal ctor.
297    */
298   @Deprecated
299   public HTable(final byte[] tableName, final HConnection connection,
300       final ExecutorService pool) throws IOException {
301     this(TableName.valueOf(tableName), connection, pool);
302   }
303 
304   /** @deprecated Do not use, internal ctor. */
305   @Deprecated
306   public HTable(TableName tableName, final HConnection connection,
307       final ExecutorService pool) throws IOException {
308     this(tableName, (ClusterConnection)connection, pool);
309   }
310 
311   /**
312    * Creates an object to access a HBase table.
313    * Shares zookeeper connection and other resources with other HTable instances
314    * created with the same <code>connection</code> instance.
315    * Visible only for HTableWrapper which is in different package.
316    * Should not be used by exernal code.
317    * @param tableName Name of the table.
318    * @param connection HConnection to be used.
319    * @param pool ExecutorService to be used.
320    * @throws IOException if a remote or network exception occurs
321    */
322   @InterfaceAudience.Private
323   public HTable(TableName tableName, final ClusterConnection connection,
324       final ExecutorService pool) throws IOException {
325     if (connection == null || connection.isClosed()) {
326       throw new IllegalArgumentException("Connection is null or closed.");
327     }
328     this.tableName = tableName;
329     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
330     this.connection = connection;
331     this.configuration = connection.getConfiguration();
332     this.pool = pool;
333 
334     this.finishSetup();
335   }
336 
337   /**
338    * For internal testing.
339    */
340   protected HTable() {
341     tableName = null;
342     cleanupPoolOnClose = false;
343     cleanupConnectionOnClose = false;
344   }
345 
346   /**
347    * setup this HTable's parameter based on the passed configuration
348    */
349   private void finishSetup() throws IOException {
350     this.operationTimeout = tableName.isSystemTable() ?
351       this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
352         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
353       this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
354         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
355     this.writeBufferSize = this.configuration.getLong(
356         "hbase.client.write.buffer", 2097152);
357     this.clearBufferOnFail = true;
358     this.autoFlush = true;
359     this.currentWriteBufferSize = 0;
360     this.scannerCaching = this.configuration.getInt(
361         HConstants.HBASE_CLIENT_SCANNER_CACHING,
362         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
363 
364     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
365     // puts need to track errors globally due to how the APIs currently work.
366     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true);
367     multiAp = this.connection.getAsyncProcess();
368 
369     this.maxKeyValueSize = this.configuration.getInt(
370         "hbase.client.keyvalue.maxsize", -1);
371     this.closed = false;
372   }
373 
374 
375 
376   /**
377    * {@inheritDoc}
378    */
379   @Override
380   public Configuration getConfiguration() {
381     return configuration;
382   }
383 
384   /**
385    * Tells whether or not a table is enabled or not. This method creates a
386    * new HBase configuration, so it might make your unit tests fail due to
387    * incorrect ZK client port.
388    * @param tableName Name of table to check.
389    * @return {@code true} if table is online.
390    * @throws IOException if a remote or network exception occurs
391 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
392    */
393   @Deprecated
394   public static boolean isTableEnabled(String tableName) throws IOException {
395     return isTableEnabled(TableName.valueOf(tableName));
396   }
397 
398   /**
399    * Tells whether or not a table is enabled or not. This method creates a
400    * new HBase configuration, so it might make your unit tests fail due to
401    * incorrect ZK client port.
402    * @param tableName Name of table to check.
403    * @return {@code true} if table is online.
404    * @throws IOException if a remote or network exception occurs
405 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
406    */
407   @Deprecated
408   public static boolean isTableEnabled(byte[] tableName) throws IOException {
409     return isTableEnabled(TableName.valueOf(tableName));
410   }
411 
412   /**
413    * Tells whether or not a table is enabled or not. This method creates a
414    * new HBase configuration, so it might make your unit tests fail due to
415    * incorrect ZK client port.
416    * @param tableName Name of table to check.
417    * @return {@code true} if table is online.
418    * @throws IOException if a remote or network exception occurs
419    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
420    */
421   @Deprecated
422   public static boolean isTableEnabled(TableName tableName) throws IOException {
423     return isTableEnabled(HBaseConfiguration.create(), tableName);
424   }
425 
426   /**
427    * Tells whether or not a table is enabled or not.
428    * @param conf The Configuration object to use.
429    * @param tableName Name of table to check.
430    * @return {@code true} if table is online.
431    * @throws IOException if a remote or network exception occurs
432 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
433    */
434   @Deprecated
435   public static boolean isTableEnabled(Configuration conf, String tableName)
436   throws IOException {
437     return isTableEnabled(conf, TableName.valueOf(tableName));
438   }
439 
440   /**
441    * Tells whether or not a table is enabled or not.
442    * @param conf The Configuration object to use.
443    * @param tableName Name of table to check.
444    * @return {@code true} if table is online.
445    * @throws IOException if a remote or network exception occurs
446 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
447    */
448   @Deprecated
449   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
450   throws IOException {
451     return isTableEnabled(conf, TableName.valueOf(tableName));
452   }
453 
454   /**
455    * Tells whether or not a table is enabled or not.
456    * @param conf The Configuration object to use.
457    * @param tableName Name of table to check.
458    * @return {@code true} if table is online.
459    * @throws IOException if a remote or network exception occurs
460    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
461    */
462   @Deprecated
463   public static boolean isTableEnabled(Configuration conf,
464       final TableName tableName) throws IOException {
465     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
466       @Override
467       public Boolean connect(HConnection connection) throws IOException {
468         return connection.isTableEnabled(tableName);
469       }
470     });
471   }
472 
473   /**
474    * Find region location hosting passed row using cached info
475    * @param row Row to find.
476    * @return The location of the given row.
477    * @throws IOException if a remote or network exception occurs
478    */
479   public HRegionLocation getRegionLocation(final String row)
480   throws IOException {
481     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
482   }
483 
484   /**
485    * Finds the region on which the given row is being served. Does not reload the cache.
486    * @param row Row to find.
487    * @return Location of the row.
488    * @throws IOException if a remote or network exception occurs
489    */
490   public HRegionLocation getRegionLocation(final byte [] row)
491   throws IOException {
492     return connection.getRegionLocation(tableName, row, false);
493   }
494 
495   /**
496    * Finds the region on which the given row is being served.
497    * @param row Row to find.
498    * @param reload true to reload information or false to use cached information
499    * @return Location of the row.
500    * @throws IOException if a remote or network exception occurs
501    */
502   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
503   throws IOException {
504     return connection.getRegionLocation(tableName, row, reload);
505   }
506 
507   /**
508    * {@inheritDoc}
509    */
510   @Override
511   public byte [] getTableName() {
512     return this.tableName.getName();
513   }
514 
515   @Override
516   public TableName getName() {
517     return tableName;
518   }
519 
520   /**
521    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
522    * manipulations.
523    * @return An HConnection instance.
524    * @deprecated This method will be changed from public to package protected.
525    */
526   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
527   @Deprecated
528   @VisibleForTesting
529   public HConnection getConnection() {
530     return this.connection;
531   }
532 
533   /**
534    * Gets the number of rows that a scanner will fetch at once.
535    * <p>
536    * The default value comes from {@code hbase.client.scanner.caching}.
537    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
538    */
539   @Deprecated
540   public int getScannerCaching() {
541     return scannerCaching;
542   }
543 
544   /**
545    * Kept in 0.96 for backward compatibility
546    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
547    */
548   @Deprecated
549   public List<Row> getWriteBuffer() {
550     return writeAsyncBuffer;
551   }
552 
553   /**
554    * Sets the number of rows that a scanner will fetch at once.
555    * <p>
556    * This will override the value specified by
557    * {@code hbase.client.scanner.caching}.
558    * Increasing this value will reduce the amount of work needed each time
559    * {@code next()} is called on a scanner, at the expense of memory use
560    * (since more rows will need to be maintained in memory by the scanners).
561    * @param scannerCaching the number of rows a scanner will fetch at once.
562    * @deprecated Use {@link Scan#setCaching(int)}
563    */
564   @Deprecated
565   public void setScannerCaching(int scannerCaching) {
566     this.scannerCaching = scannerCaching;
567   }
568 
569   /**
570    * {@inheritDoc}
571    */
572   @Override
573   public HTableDescriptor getTableDescriptor() throws IOException {
574     return new UnmodifyableHTableDescriptor(
575       this.connection.getHTableDescriptor(this.tableName));
576   }
577 
578   /**
579    * Gets the starting row key for every region in the currently open table.
580    * <p>
581    * This is mainly useful for the MapReduce integration.
582    * @return Array of region starting row keys
583    * @throws IOException if a remote or network exception occurs
584    */
585   public byte [][] getStartKeys() throws IOException {
586     return getStartEndKeys().getFirst();
587   }
588 
589   /**
590    * Gets the ending row key for every region in the currently open table.
591    * <p>
592    * This is mainly useful for the MapReduce integration.
593    * @return Array of region ending row keys
594    * @throws IOException if a remote or network exception occurs
595    */
596   public byte[][] getEndKeys() throws IOException {
597     return getStartEndKeys().getSecond();
598   }
599 
600   /**
601    * Gets the starting and ending row keys for every region in the currently
602    * open table.
603    * <p>
604    * This is mainly useful for the MapReduce integration.
605    * @return Pair of arrays of region starting and ending row keys
606    * @throws IOException if a remote or network exception occurs
607    */
608   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
609     NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
610     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
611     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
612 
613     for (HRegionInfo region : regions.keySet()) {
614       startKeyList.add(region.getStartKey());
615       endKeyList.add(region.getEndKey());
616     }
617 
618     return new Pair<byte [][], byte [][]>(
619       startKeyList.toArray(new byte[startKeyList.size()][]),
620       endKeyList.toArray(new byte[endKeyList.size()][]));
621   }
622 
623   /**
624    * Gets all the regions and their address for this table.
625    * <p>
626    * This is mainly useful for the MapReduce integration.
627    * @return A map of HRegionInfo with it's server address
628    * @throws IOException if a remote or network exception occurs
629    */
630   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
631     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocation, singular, returns an HRegionLocation.
632     return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName(), false);
633   }
634 
635   /**
636    * Get the corresponding regions for an arbitrary range of keys.
637    * <p>
638    * @param startKey Starting row in range, inclusive
639    * @param endKey Ending row in range, exclusive
640    * @return A list of HRegionLocations corresponding to the regions that
641    * contain the specified range
642    * @throws IOException if a remote or network exception occurs
643    */
644   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
645     final byte [] endKey) throws IOException {
646     return getRegionsInRange(startKey, endKey, false);
647   }
648 
649   /**
650    * Get the corresponding regions for an arbitrary range of keys.
651    * <p>
652    * @param startKey Starting row in range, inclusive
653    * @param endKey Ending row in range, exclusive
654    * @param reload true to reload information or false to use cached information
655    * @return A list of HRegionLocations corresponding to the regions that
656    * contain the specified range
657    * @throws IOException if a remote or network exception occurs
658    */
659   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
660       final byte [] endKey, final boolean reload) throws IOException {
661     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
662   }
663 
664   /**
665    * Get the corresponding start keys and regions for an arbitrary range of
666    * keys.
667    * <p>
668    * @param startKey Starting row in range, inclusive
669    * @param endKey Ending row in range
670    * @param includeEndKey true if endRow is inclusive, false if exclusive
671    * @return A pair of list of start keys and list of HRegionLocations that
672    *         contain the specified range
673    * @throws IOException if a remote or network exception occurs
674    */
675   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
676       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
677       throws IOException {
678     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
679   }
680 
681   /**
682    * Get the corresponding start keys and regions for an arbitrary range of
683    * keys.
684    * <p>
685    * @param startKey Starting row in range, inclusive
686    * @param endKey Ending row in range
687    * @param includeEndKey true if endRow is inclusive, false if exclusive
688    * @param reload true to reload information or false to use cached information
689    * @return A pair of list of start keys and list of HRegionLocations that
690    *         contain the specified range
691    * @throws IOException if a remote or network exception occurs
692    */
693   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
694       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
695       final boolean reload) throws IOException {
696     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
697     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
698       throw new IllegalArgumentException(
699         "Invalid range: " + Bytes.toStringBinary(startKey) +
700         " > " + Bytes.toStringBinary(endKey));
701     }
702     List<byte[]> keysInRange = new ArrayList<byte[]>();
703     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
704     byte[] currentKey = startKey;
705     do {
706       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
707       keysInRange.add(currentKey);
708       regionsInRange.add(regionLocation);
709       currentKey = regionLocation.getRegionInfo().getEndKey();
710     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
711         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
712             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
713     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
714         regionsInRange);
715   }
716 
717   /**
718    * {@inheritDoc}
719    */
720    @Override
721    public Result getRowOrBefore(final byte[] row, final byte[] family)
722        throws IOException {
723      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
724          tableName, row) {
725        public Result call(int callTimeout) throws IOException {
726          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
727          controller.setPriority(tableName);
728          controller.setCallTimeout(callTimeout);
729          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
730              getLocation().getRegionInfo().getRegionName(), row, family);
731          try {
732            ClientProtos.GetResponse response = getStub().get(controller, request);
733            if (!response.hasResult()) return null;
734            return ProtobufUtil.toResult(response.getResult());
735          } catch (ServiceException se) {
736            throw ProtobufUtil.getRemoteException(se);
737          }
738        }
739      };
740      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
741    }
742 
743    /**
744     * {@inheritDoc}
745     */
746   @Override
747   public ResultScanner getScanner(final Scan scan) throws IOException {
748     if (scan.getCaching() <= 0) {
749       scan.setCaching(getScannerCaching());
750     }
751 
752     if (scan.isReversed()) {
753       if (scan.isSmall()) {
754         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
755             this.connection);
756       } else {
757         return new ReversedClientScanner(getConfiguration(), scan, getName(),
758             this.connection);
759       }
760     }
761 
762     if (scan.isSmall()) {
763       return new ClientSmallScanner(getConfiguration(), scan, getName(),
764           this.connection, this.rpcCallerFactory);
765     } else {
766       return new ClientScanner(getConfiguration(), scan,
767           getName(), this.connection);
768     }
769   }
770 
771   /**
772    * {@inheritDoc}
773    */
774   @Override
775   public ResultScanner getScanner(byte [] family) throws IOException {
776     Scan scan = new Scan();
777     scan.addFamily(family);
778     return getScanner(scan);
779   }
780 
781   /**
782    * {@inheritDoc}
783    */
784   @Override
785   public ResultScanner getScanner(byte [] family, byte [] qualifier)
786   throws IOException {
787     Scan scan = new Scan();
788     scan.addColumn(family, qualifier);
789     return getScanner(scan);
790   }
791 
792   /**
793    * {@inheritDoc}
794    */
795   @Override
796   public Result get(final Get get) throws IOException {
797     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
798         getName(), get.getRow()) {
799       public Result call(int callTimeout) throws IOException {
800         ClientProtos.GetRequest request =
801             RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
802         PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
803         controller.setPriority(tableName);
804         controller.setCallTimeout(callTimeout);
805         try {
806           ClientProtos.GetResponse response = getStub().get(controller, request);
807           if (response == null) return null;
808           return ProtobufUtil.toResult(response.getResult());
809         } catch (ServiceException se) {
810           throw ProtobufUtil.getRemoteException(se);
811         }
812       }
813     };
814     return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
815   }
816 
817   /**
818    * {@inheritDoc}
819    */
820   @Override
821   public Result[] get(List<Get> gets) throws IOException {
822     if (gets.size() == 1) {
823       return new Result[]{get(gets.get(0))};
824     }
825     try {
826       Object [] r1 = batch((List)gets);
827 
828       // translate.
829       Result [] results = new Result[r1.length];
830       int i=0;
831       for (Object o : r1) {
832         // batch ensures if there is a failure we get an exception instead
833         results[i++] = (Result) o;
834       }
835 
836       return results;
837     } catch (InterruptedException e) {
838       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
839     }
840   }
841 
842   /**
843    * {@inheritDoc}
844    */
845   @Override
846   public void batch(final List<? extends Row> actions, final Object[] results)
847       throws InterruptedException, IOException {
848     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
849     ars.waitUntilDone();
850     if (ars.hasError()) {
851       throw ars.getErrors();
852     }
853   }
854 
855   /**
856    * {@inheritDoc}
857    * @deprecated If any exception is thrown by one of the actions, there is no way to
858    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
859    */
860   @Override
861   public Object[] batch(final List<? extends Row> actions)
862      throws InterruptedException, IOException {
863     Object[] results = new Object[actions.size()];
864     batch(actions, results);
865     return results;
866   }
867 
868   /**
869    * {@inheritDoc}
870    */
871   @Override
872   public <R> void batchCallback(
873       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
874       throws IOException, InterruptedException {
875     connection.processBatchCallback(actions, tableName, pool, results, callback);
876   }
877 
878   /**
879    * {@inheritDoc}
880    * @deprecated If any exception is thrown by one of the actions, there is no way to
881    * retrieve the partially executed results. Use
882    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
883    * instead.
884    */
885   @Override
886   public <R> Object[] batchCallback(
887     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
888       InterruptedException {
889     Object[] results = new Object[actions.size()];
890     batchCallback(actions, results, callback);
891     return results;
892   }
893 
894   /**
895    * {@inheritDoc}
896    */
897   @Override
898   public void delete(final Delete delete)
899   throws IOException {
900     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
901         tableName, delete.getRow()) {
902       public Boolean call(int callTimeout) throws IOException {
903         PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
904         controller.setPriority(tableName);
905         controller.setCallTimeout(callTimeout);
906 
907         try {
908           MutateRequest request = RequestConverter.buildMutateRequest(
909             getLocation().getRegionInfo().getRegionName(), delete);
910           MutateResponse response = getStub().mutate(controller, request);
911           return Boolean.valueOf(response.getProcessed());
912         } catch (ServiceException se) {
913           throw ProtobufUtil.getRemoteException(se);
914         }
915       }
916     };
917     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
918   }
919 
920   /**
921    * {@inheritDoc}
922    */
923   @Override
924   public void delete(final List<Delete> deletes)
925   throws IOException {
926     Object[] results = new Object[deletes.size()];
927     try {
928       batch(deletes, results);
929     } catch (InterruptedException e) {
930       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
931     } finally {
932       // mutate list so that it is empty for complete success, or contains only failed records
933       // results are returned in the same order as the requests in list
934       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
935       for (int i = results.length - 1; i>=0; i--) {
936         // if result is not null, it succeeded
937         if (results[i] instanceof Result) {
938           deletes.remove(i);
939         }
940       }
941     }
942   }
943 
944   /**
945    * {@inheritDoc}
946    */
947   @Override
948   public void put(final Put put)
949       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
950     doPut(put);
951     if (autoFlush) {
952       flushCommits();
953     }
954   }
955 
956   /**
957    * {@inheritDoc}
958    */
959   @Override
960   public void put(final List<Put> puts)
961       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
962     for (Put put : puts) {
963       doPut(put);
964     }
965     if (autoFlush) {
966       flushCommits();
967     }
968   }
969 
970 
971   /**
972    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
973    *  cluster.
974    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
975    * @throws InterruptedIOException if we were interrupted.
976    */
977   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
978     // This behavior is highly non-intuitive... it does not protect us against
979     // 94-incompatible behavior, which is a timing issue because hasError, the below code
980     // and setter of hasError are not synchronized. Perhaps it should be removed.
981     if (ap.hasError()) {
982       writeAsyncBuffer.add(put);
983       backgroundFlushCommits(true);
984     }
985 
986     validatePut(put);
987 
988     currentWriteBufferSize += put.heapSize();
989     writeAsyncBuffer.add(put);
990 
991     while (currentWriteBufferSize > writeBufferSize) {
992       backgroundFlushCommits(false);
993     }
994   }
995 
996 
997   /**
998    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
999    * If the is an error (max retried reach from a previous flush or bad operation), it tries to
1000    * send all operations in the buffer and sends an exception.
1001    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
1002    *                     returning.
1003    */
1004   private void backgroundFlushCommits(boolean synchronous) throws
1005       InterruptedIOException, RetriesExhaustedWithDetailsException {
1006 
1007     try {
1008       if (!synchronous) {
1009         ap.submit(tableName, writeAsyncBuffer, true, null, false);
1010         if (ap.hasError()) {
1011           LOG.debug(tableName + ": One or more of the operations have failed -" +
1012               " waiting for all operation in progress to finish (successfully or not)");
1013         }
1014       }
1015       if (synchronous || ap.hasError()) {
1016         while (!writeAsyncBuffer.isEmpty()) {
1017           ap.submit(tableName, writeAsyncBuffer, true, null, false);
1018         }
1019         List<Row> failedRows = clearBufferOnFail ? null : writeAsyncBuffer;
1020         RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(failedRows);
1021         if (error != null) {
1022           throw error;
1023         }
1024       }
1025     } finally {
1026       currentWriteBufferSize = 0;
1027       for (Row mut : writeAsyncBuffer) {
1028         if (mut instanceof Mutation) {
1029           currentWriteBufferSize += ((Mutation) mut).heapSize();
1030         }
1031       }
1032     }
1033   }
1034 
1035   /**
1036    * {@inheritDoc}
1037    */
1038   @Override
1039   public void mutateRow(final RowMutations rm) throws IOException {
1040     RegionServerCallable<Void> callable =
1041         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1042       public Void call(int callTimeout) throws IOException {
1043         PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1044         controller.setPriority(tableName);
1045         controller.setCallTimeout(callTimeout);
1046         try {
1047           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1048             getLocation().getRegionInfo().getRegionName(), rm);
1049           regionMutationBuilder.setAtomic(true);
1050           MultiRequest request =
1051             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1052           getStub().multi(controller, request);
1053         } catch (ServiceException se) {
1054           throw ProtobufUtil.getRemoteException(se);
1055         }
1056         return null;
1057       }
1058     };
1059     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1060   }
1061 
1062   /**
1063    * {@inheritDoc}
1064    */
1065   @Override
1066   public Result append(final Append append) throws IOException {
1067     if (append.numFamilies() == 0) {
1068       throw new IOException(
1069           "Invalid arguments to append, no columns specified");
1070     }
1071 
1072     NonceGenerator ng = this.connection.getNonceGenerator();
1073     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1074     RegionServerCallable<Result> callable =
1075       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1076         public Result call(int callTimeout) throws IOException {
1077           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1078           controller.setPriority(getTableName());
1079           controller.setCallTimeout(callTimeout);
1080           try {
1081             MutateRequest request = RequestConverter.buildMutateRequest(
1082               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1083             MutateResponse response = getStub().mutate(controller, request);
1084             if (!response.hasResult()) return null;
1085             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1086           } catch (ServiceException se) {
1087             throw ProtobufUtil.getRemoteException(se);
1088           }
1089         }
1090       };
1091     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1092   }
1093 
1094   /**
1095    * {@inheritDoc}
1096    */
1097   @Override
1098   public Result increment(final Increment increment) throws IOException {
1099     if (!increment.hasFamilies()) {
1100       throw new IOException(
1101           "Invalid arguments to increment, no columns specified");
1102     }
1103     NonceGenerator ng = this.connection.getNonceGenerator();
1104     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1105     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1106         getName(), increment.getRow()) {
1107       public Result call(int callTimeout) throws IOException {
1108         PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1109         controller.setPriority(getTableName());
1110         controller.setCallTimeout(callTimeout);
1111         try {
1112           MutateRequest request = RequestConverter.buildMutateRequest(
1113             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1114           MutateResponse response = getStub().mutate(controller, request);
1115           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1116         } catch (ServiceException se) {
1117           throw ProtobufUtil.getRemoteException(se);
1118         }
1119         }
1120       };
1121     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1122   }
1123 
1124   /**
1125    * {@inheritDoc}
1126    */
1127   @Override
1128   public long incrementColumnValue(final byte [] row, final byte [] family,
1129       final byte [] qualifier, final long amount)
1130   throws IOException {
1131     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1132   }
1133 
1134   /**
1135    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1136    */
1137   @Deprecated
1138   @Override
1139   public long incrementColumnValue(final byte [] row, final byte [] family,
1140       final byte [] qualifier, final long amount, final boolean writeToWAL)
1141   throws IOException {
1142     return incrementColumnValue(row, family, qualifier, amount,
1143       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1144   }
1145 
1146   /**
1147    * {@inheritDoc}
1148    */
1149   @Override
1150   public long incrementColumnValue(final byte [] row, final byte [] family,
1151       final byte [] qualifier, final long amount, final Durability durability)
1152   throws IOException {
1153     NullPointerException npe = null;
1154     if (row == null) {
1155       npe = new NullPointerException("row is null");
1156     } else if (family == null) {
1157       npe = new NullPointerException("family is null");
1158     } else if (qualifier == null) {
1159       npe = new NullPointerException("qualifier is null");
1160     }
1161     if (npe != null) {
1162       throw new IOException(
1163           "Invalid arguments to incrementColumnValue", npe);
1164     }
1165 
1166     NonceGenerator ng = this.connection.getNonceGenerator();
1167     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1168     RegionServerCallable<Long> callable =
1169       new RegionServerCallable<Long>(connection, getName(), row) {
1170         public Long call(int callTimeout) throws IOException {
1171           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1172           controller.setPriority(getTableName());
1173           controller.setCallTimeout(callTimeout);
1174           try {
1175             MutateRequest request = RequestConverter.buildIncrementRequest(
1176               getLocation().getRegionInfo().getRegionName(), row, family,
1177               qualifier, amount, durability, nonceGroup, nonce);
1178             MutateResponse response = getStub().mutate(controller, request);
1179             Result result =
1180               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1181             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1182           } catch (ServiceException se) {
1183             throw ProtobufUtil.getRemoteException(se);
1184           }
1185         }
1186       };
1187     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1188   }
1189 
1190   /**
1191    * {@inheritDoc}
1192    */
1193   @Override
1194   public boolean checkAndPut(final byte [] row,
1195       final byte [] family, final byte [] qualifier, final byte [] value,
1196       final Put put)
1197   throws IOException {
1198     RegionServerCallable<Boolean> callable =
1199       new RegionServerCallable<Boolean>(connection, getName(), row) {
1200         public Boolean call(int callTimeout) throws IOException {
1201           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1202           controller.setPriority(tableName);
1203           controller.setCallTimeout(callTimeout);
1204           try {
1205             MutateRequest request = RequestConverter.buildMutateRequest(
1206               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1207                 new BinaryComparator(value), CompareType.EQUAL, put);
1208             MutateResponse response = getStub().mutate(controller, request);
1209             return Boolean.valueOf(response.getProcessed());
1210           } catch (ServiceException se) {
1211             throw ProtobufUtil.getRemoteException(se);
1212           }
1213         }
1214       };
1215     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1216   }
1217 
1218   /**
1219    * {@inheritDoc}
1220    */
1221   @Override
1222   public boolean checkAndPut(final byte [] row, final byte [] family,
1223       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1224       final Put put)
1225   throws IOException {
1226     RegionServerCallable<Boolean> callable =
1227       new RegionServerCallable<Boolean>(connection, getName(), row) {
1228         public Boolean call(int callTimeout) throws IOException {
1229           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1230           controller.setPriority(tableName);
1231           controller.setCallTimeout(callTimeout);
1232           try {
1233             CompareType compareType = CompareType.valueOf(compareOp.name());
1234             MutateRequest request = RequestConverter.buildMutateRequest(
1235               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1236                 new BinaryComparator(value), compareType, put);
1237             MutateResponse response = getStub().mutate(controller, request);
1238             return Boolean.valueOf(response.getProcessed());
1239           } catch (ServiceException se) {
1240             throw ProtobufUtil.getRemoteException(se);
1241           }
1242         }
1243       };
1244     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1245   }
1246 
1247   /**
1248    * {@inheritDoc}
1249    */
1250   @Override
1251   public boolean checkAndDelete(final byte [] row,
1252       final byte [] family, final byte [] qualifier, final byte [] value,
1253       final Delete delete)
1254   throws IOException {
1255     RegionServerCallable<Boolean> callable =
1256       new RegionServerCallable<Boolean>(connection, getName(), row) {
1257         public Boolean call(int callTimeout) throws IOException {
1258           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1259           controller.setPriority(tableName);
1260           controller.setCallTimeout(callTimeout);
1261           try {
1262             MutateRequest request = RequestConverter.buildMutateRequest(
1263               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1264                 new BinaryComparator(value), CompareType.EQUAL, delete);
1265             MutateResponse response = getStub().mutate(controller, request);
1266             return Boolean.valueOf(response.getProcessed());
1267           } catch (ServiceException se) {
1268             throw ProtobufUtil.getRemoteException(se);
1269           }
1270         }
1271       };
1272     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1273   }
1274 
1275   /**
1276    * {@inheritDoc}
1277    */
1278   @Override
1279   public boolean checkAndDelete(final byte [] row, final byte [] family,
1280       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1281       final Delete delete)
1282   throws IOException {
1283     RegionServerCallable<Boolean> callable =
1284       new RegionServerCallable<Boolean>(connection, getName(), row) {
1285         public Boolean call(int callTimeout) throws IOException {
1286           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1287           controller.setPriority(tableName);
1288           controller.setCallTimeout(callTimeout);
1289           try {
1290             CompareType compareType = CompareType.valueOf(compareOp.name());
1291             MutateRequest request = RequestConverter.buildMutateRequest(
1292               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1293                 new BinaryComparator(value), compareType, delete);
1294             MutateResponse response = getStub().mutate(controller, request);
1295             return Boolean.valueOf(response.getProcessed());
1296           } catch (ServiceException se) {
1297             throw ProtobufUtil.getRemoteException(se);
1298           }
1299         }
1300       };
1301     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1302   }
1303 
1304   /**
1305    * {@inheritDoc}
1306    */
1307   @Override
1308   public boolean exists(final Get get) throws IOException {
1309     get.setCheckExistenceOnly(true);
1310     Result r = get(get);
1311     assert r.getExists() != null;
1312     return r.getExists();
1313   }
1314 
1315   /**
1316    * {@inheritDoc}
1317    */
1318   @Override
1319   public Boolean[] exists(final List<Get> gets) throws IOException {
1320     if (gets.isEmpty()) return new Boolean[]{};
1321     if (gets.size() == 1) return new Boolean[]{exists(gets.get(0))};
1322 
1323     for (Get g: gets){
1324       g.setCheckExistenceOnly(true);
1325     }
1326 
1327     Object[] r1;
1328     try {
1329       r1 = batch(gets);
1330     } catch (InterruptedException e) {
1331       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1332     }
1333 
1334     // translate.
1335     Boolean[] results = new Boolean[r1.length];
1336     int i = 0;
1337     for (Object o : r1) {
1338       // batch ensures if there is a failure we get an exception instead
1339       results[i++] = ((Result)o).getExists();
1340     }
1341 
1342     return results;
1343   }
1344 
1345   /**
1346    * {@inheritDoc}
1347    */
1348   @Override
1349   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1350     // As we can have an operation in progress even if the buffer is empty, we call
1351     //  backgroundFlushCommits at least one time.
1352     backgroundFlushCommits(true);
1353   }
1354 
1355   /**
1356    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1357    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1358    *
1359    * @param list The collection of actions.
1360    * @param results An empty array, same size as list. If an exception is thrown,
1361    * you can test here for partial results, and to determine which actions
1362    * processed successfully.
1363    * @throws IOException if there are problems talking to META. Per-item
1364    * exceptions are stored in the results array.
1365    */
1366   public <R> void processBatchCallback(
1367     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1368     throws IOException, InterruptedException {
1369     this.batchCallback(list, results, callback);
1370   }
1371 
1372 
1373   /**
1374    * Parameterized batch processing, allowing varying return types for different
1375    * {@link Row} implementations.
1376    */
1377   public void processBatch(final List<? extends Row> list, final Object[] results)
1378     throws IOException, InterruptedException {
1379     this.batch(list, results);
1380   }
1381 
1382 
1383   @Override
1384   public void close() throws IOException {
1385     if (this.closed) {
1386       return;
1387     }
1388     flushCommits();
1389     if (cleanupPoolOnClose) {
1390       this.pool.shutdown();
1391     }
1392     if (cleanupConnectionOnClose) {
1393       if (this.connection != null) {
1394         this.connection.close();
1395       }
1396     }
1397     this.closed = true;
1398   }
1399 
1400   // validate for well-formedness
1401   public void validatePut(final Put put) throws IllegalArgumentException{
1402     if (put.isEmpty()) {
1403       throw new IllegalArgumentException("No columns to insert");
1404     }
1405     if (maxKeyValueSize > 0) {
1406       for (List<Cell> list : put.getFamilyCellMap().values()) {
1407         for (Cell cell : list) {
1408           // KeyValue v1 expectation.  Cast for now.
1409           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1410           if (kv.getLength() > maxKeyValueSize) {
1411             throw new IllegalArgumentException("KeyValue size too large");
1412           }
1413         }
1414       }
1415     }
1416   }
1417 
1418   /**
1419    * {@inheritDoc}
1420    */
1421   @Override
1422   public boolean isAutoFlush() {
1423     return autoFlush;
1424   }
1425 
1426   /**
1427    * {@inheritDoc}
1428    */
1429   @Deprecated
1430   @Override
1431   public void setAutoFlush(boolean autoFlush) {
1432     setAutoFlush(autoFlush, autoFlush);
1433   }
1434 
1435   /**
1436    * {@inheritDoc}
1437    */
1438   @Override
1439   public void setAutoFlushTo(boolean autoFlush) {
1440     setAutoFlush(autoFlush, clearBufferOnFail);
1441   }
1442 
1443   /**
1444    * {@inheritDoc}
1445    */
1446   @Override
1447   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1448     this.autoFlush = autoFlush;
1449     this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1450   }
1451 
1452   /**
1453    * Returns the maximum size in bytes of the write buffer for this HTable.
1454    * <p>
1455    * The default value comes from the configuration parameter
1456    * {@code hbase.client.write.buffer}.
1457    * @return The size of the write buffer in bytes.
1458    */
1459   @Override
1460   public long getWriteBufferSize() {
1461     return writeBufferSize;
1462   }
1463 
1464   /**
1465    * Sets the size of the buffer in bytes.
1466    * <p>
1467    * If the new size is less than the current amount of data in the
1468    * write buffer, the buffer gets flushed.
1469    * @param writeBufferSize The new write buffer size, in bytes.
1470    * @throws IOException if a remote or network exception occurs.
1471    */
1472   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1473     this.writeBufferSize = writeBufferSize;
1474     if(currentWriteBufferSize > writeBufferSize) {
1475       flushCommits();
1476     }
1477   }
1478 
1479   /**
1480    * The pool is used for mutli requests for this HTable
1481    * @return the pool used for mutli
1482    */
1483   ExecutorService getPool() {
1484     return this.pool;
1485   }
1486 
1487   /**
1488    * Enable or disable region cache prefetch for the table. It will be
1489    * applied for the given table's all HTable instances who share the same
1490    * connection. By default, the cache prefetch is enabled.
1491    * @param tableName name of table to configure.
1492    * @param enable Set to true to enable region cache prefetch. Or set to
1493    * false to disable it.
1494    * @throws IOException
1495    */
1496   public static void setRegionCachePrefetch(final byte[] tableName,
1497       final boolean enable) throws IOException {
1498     setRegionCachePrefetch(TableName.valueOf(tableName), enable);
1499   }
1500 
1501   public static void setRegionCachePrefetch(
1502       final TableName tableName,
1503       final boolean enable) throws IOException {
1504     HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration.create()) {
1505       @Override
1506       public Void connect(HConnection connection) throws IOException {
1507         connection.setRegionCachePrefetch(tableName, enable);
1508         return null;
1509       }
1510     });
1511   }
1512 
1513   /**
1514    * Enable or disable region cache prefetch for the table. It will be
1515    * applied for the given table's all HTable instances who share the same
1516    * connection. By default, the cache prefetch is enabled.
1517    * @param conf The Configuration object to use.
1518    * @param tableName name of table to configure.
1519    * @param enable Set to true to enable region cache prefetch. Or set to
1520    * false to disable it.
1521    * @throws IOException
1522    */
1523   public static void setRegionCachePrefetch(final Configuration conf,
1524       final byte[] tableName, final boolean enable) throws IOException {
1525     setRegionCachePrefetch(conf, TableName.valueOf(tableName), enable);
1526   }
1527 
1528   public static void setRegionCachePrefetch(final Configuration conf,
1529       final TableName tableName,
1530       final boolean enable) throws IOException {
1531     HConnectionManager.execute(new HConnectable<Void>(conf) {
1532       @Override
1533       public Void connect(HConnection connection) throws IOException {
1534         connection.setRegionCachePrefetch(tableName, enable);
1535         return null;
1536       }
1537     });
1538   }
1539 
1540   /**
1541    * Check whether region cache prefetch is enabled or not for the table.
1542    * @param conf The Configuration object to use.
1543    * @param tableName name of table to check
1544    * @return true if table's region cache prefecth is enabled. Otherwise
1545    * it is disabled.
1546    * @throws IOException
1547    */
1548   public static boolean getRegionCachePrefetch(final Configuration conf,
1549       final byte[] tableName) throws IOException {
1550     return getRegionCachePrefetch(conf, TableName.valueOf(tableName));
1551   }
1552 
1553   public static boolean getRegionCachePrefetch(final Configuration conf,
1554       final TableName tableName) throws IOException {
1555     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
1556       @Override
1557       public Boolean connect(HConnection connection) throws IOException {
1558         return connection.getRegionCachePrefetch(tableName);
1559       }
1560     });
1561   }
1562 
1563   /**
1564    * Check whether region cache prefetch is enabled or not for the table.
1565    * @param tableName name of table to check
1566    * @return true if table's region cache prefecth is enabled. Otherwise
1567    * it is disabled.
1568    * @throws IOException
1569    */
1570   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1571     return getRegionCachePrefetch(TableName.valueOf(tableName));
1572   }
1573 
1574   public static boolean getRegionCachePrefetch(
1575       final TableName tableName) throws IOException {
1576     return HConnectionManager.execute(new HConnectable<Boolean>(
1577         HBaseConfiguration.create()) {
1578       @Override
1579       public Boolean connect(HConnection connection) throws IOException {
1580         return connection.getRegionCachePrefetch(tableName);
1581       }
1582     });
1583   }
1584 
1585   /**
1586    * Explicitly clears the region cache to fetch the latest value from META.
1587    * This is a power user function: avoid unless you know the ramifications.
1588    */
1589   public void clearRegionCache() {
1590     this.connection.clearRegionCache();
1591   }
1592 
1593   /**
1594    * {@inheritDoc}
1595    */
1596   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1597     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1598   }
1599 
1600   /**
1601    * {@inheritDoc}
1602    */
1603   @Override
1604   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1605       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1606       throws ServiceException, Throwable {
1607     final Map<byte[],R> results =  Collections.synchronizedMap(
1608         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1609     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1610       public void update(byte[] region, byte[] row, R value) {
1611         if (region != null) {
1612           results.put(region, value);
1613         }
1614       }
1615     });
1616     return results;
1617   }
1618 
1619   /**
1620    * {@inheritDoc}
1621    */
1622   @Override
1623   public <T extends Service, R> void coprocessorService(final Class<T> service,
1624       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1625       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1626 
1627     // get regions covered by the row range
1628     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1629 
1630     Map<byte[],Future<R>> futures =
1631         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1632     for (final byte[] r : keys) {
1633       final RegionCoprocessorRpcChannel channel =
1634           new RegionCoprocessorRpcChannel(connection, tableName, r);
1635       Future<R> future = pool.submit(
1636           new Callable<R>() {
1637             public R call() throws Exception {
1638               T instance = ProtobufUtil.newServiceStub(service, channel);
1639               R result = callable.call(instance);
1640               byte[] region = channel.getLastRegion();
1641               if (callback != null) {
1642                 callback.update(region, r, result);
1643               }
1644               return result;
1645             }
1646           });
1647       futures.put(r, future);
1648     }
1649     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1650       try {
1651         e.getValue().get();
1652       } catch (ExecutionException ee) {
1653         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1654             + Bytes.toStringBinary(e.getKey()), ee);
1655         throw ee.getCause();
1656       } catch (InterruptedException ie) {
1657         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1658             + " for row " + Bytes.toStringBinary(e.getKey()))
1659             .initCause(ie);
1660       }
1661     }
1662   }
1663 
1664   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1665   throws IOException {
1666     if (start == null) {
1667       start = HConstants.EMPTY_START_ROW;
1668     }
1669     if (end == null) {
1670       end = HConstants.EMPTY_END_ROW;
1671     }
1672     return getKeysAndRegionsInRange(start, end, true).getFirst();
1673   }
1674 
1675   public void setOperationTimeout(int operationTimeout) {
1676     this.operationTimeout = operationTimeout;
1677   }
1678 
1679   public int getOperationTimeout() {
1680     return operationTimeout;
1681   }
1682 
1683   @Override
1684   public String toString() {
1685     return tableName + ";" + connection;
1686   }
1687 
1688   /**
1689    * Run basic test.
1690    * @param args Pass table name and row and will get the content.
1691    * @throws IOException
1692    */
1693   public static void main(String[] args) throws IOException {
1694     HTable t = new HTable(HBaseConfiguration.create(), args[0]);
1695     try {
1696       System.out.println(t.get(new Get(Bytes.toBytes(args[1]))));
1697     } finally {
1698       t.close();
1699     }
1700   }
1701 
1702   /**
1703    * {@inheritDoc}
1704    */
1705   @Override
1706   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1707       Descriptors.MethodDescriptor methodDescriptor, Message request,
1708       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1709     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1710         Bytes.BYTES_COMPARATOR));
1711     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1712         new Callback<R>() {
1713 
1714           @Override
1715           public void update(byte[] region, byte[] row, R result) {
1716             if (region != null) {
1717               results.put(region, result);
1718             }
1719           }
1720         });
1721     return results;
1722   }
1723 
1724   /**
1725    * {@inheritDoc}
1726    */
1727   @Override
1728   public <R extends Message> void batchCoprocessorService(
1729       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1730       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1731       throws ServiceException, Throwable {
1732 
1733     // get regions covered by the row range
1734     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1735         getKeysAndRegionsInRange(startKey, endKey, true);
1736     List<byte[]> keys = keysAndRegions.getFirst();
1737     List<HRegionLocation> regions = keysAndRegions.getSecond();
1738 
1739     // check if we have any calls to make
1740     if (keys.isEmpty()) {
1741       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1742           ", end=" + Bytes.toStringBinary(endKey));
1743       return;
1744     }
1745 
1746     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1747     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1748         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1749     for (int i = 0; i < keys.size(); i++) {
1750       final byte[] rowKey = keys.get(i);
1751       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1752       RegionCoprocessorServiceExec exec =
1753           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1754       execs.add(exec);
1755       execsByRow.put(rowKey, exec);
1756     }
1757 
1758     // tracking for any possible deserialization errors on success callback
1759     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1760     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1761     final List<Row> callbackErrorActions = new ArrayList<Row>();
1762     final List<String> callbackErrorServers = new ArrayList<String>();
1763     Object[] results = new Object[execs.size()];
1764 
1765     AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool,
1766           RpcRetryingCallerFactory.instantiate(configuration), true);
1767     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1768         new Callback<ClientProtos.CoprocessorServiceResult>() {
1769           @Override
1770           public void update(byte[] region, byte[] row,
1771                               ClientProtos.CoprocessorServiceResult serviceResult) {
1772             if (LOG.isTraceEnabled()) {
1773               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1774                   ": region=" + Bytes.toStringBinary(region) +
1775                   ", row=" + Bytes.toStringBinary(row) +
1776                   ", value=" + serviceResult.getValue().getValue());
1777             }
1778             try {
1779               callback.update(region, row,
1780                   (R) responsePrototype.newBuilderForType().mergeFrom(
1781                       serviceResult.getValue().getValue()).build());
1782             } catch (InvalidProtocolBufferException e) {
1783               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1784                   e);
1785               callbackErrorExceptions.add(e);
1786               callbackErrorActions.add(execsByRow.get(row));
1787               callbackErrorServers.add("null");
1788             }
1789           }
1790         }, results);
1791 
1792     future.waitUntilDone();
1793 
1794     if (future.hasError()) {
1795       throw future.getErrors();
1796     } else if (!callbackErrorExceptions.isEmpty()) {
1797       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1798           callbackErrorServers);
1799     }
1800   }
1801 }