1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import com.google.protobuf.Service;
22  import com.google.protobuf.ServiceException;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.HRegionLocation;
33  import org.apache.hadoop.hbase.HTableDescriptor;
34  import org.apache.hadoop.hbase.KeyValue;
35  import org.apache.hadoop.hbase.KeyValueUtil;
36  import org.apache.hadoop.hbase.ServerName;
37  import org.apache.hadoop.hbase.client.coprocessor.Batch;
38  import org.apache.hadoop.hbase.filter.BinaryComparator;
39  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
40  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
41  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
42  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
43  import org.apache.hadoop.hbase.protobuf.RequestConverter;
44  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
45  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
46  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
47  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
48  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
49  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
50  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
51  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.Pair;
54  import org.apache.hadoop.hbase.util.Threads;
55  
56  import java.io.Closeable;
57  import java.io.IOException;
58  import java.io.InterruptedIOException;
59  import java.util.ArrayList;
60  import java.util.Collections;
61  import java.util.HashMap;
62  import java.util.List;
63  import java.util.Map;
64  import java.util.NavigableMap;
65  import java.util.TreeMap;
66  import java.util.concurrent.Callable;
67  import java.util.concurrent.ExecutionException;
68  import java.util.concurrent.ExecutorService;
69  import java.util.concurrent.Future;
70  import java.util.concurrent.SynchronousQueue;
71  import java.util.concurrent.ThreadPoolExecutor;
72  import java.util.concurrent.TimeUnit;
73  
74  /**
75   * <p>Used to communicate with a single HBase table.
76   *
77   * <p>This class is not thread safe for reads nor write.
78   *
79   * <p>In case of writes (Put, Delete), the underlying write buffer can
80   * be corrupted if multiple threads contend over a single HTable instance.
81   *
82   * <p>In case of reads, some fields used by a Scan are shared among all threads.
83   * The HTable implementation can either not contract to be safe in case of a Get
84   *
85   * <p>To access a table in a multi threaded environment, please consider
86   * using the {@link HTablePool} class to create your HTable instances.
87   *
88   * <p>Instances of HTable passed the same {@link Configuration} instance will
89   * share connections to servers out on the cluster and to the zookeeper ensemble
90   * as well as caches of region locations.  This is usually a *good* thing and it
91   * is recommended to reuse the same configuration object for all your tables.
92   * This happens because they will all share the same underlying
93   * {@link HConnection} instance. See {@link HConnectionManager} for more on
94   * how this mechanism works.
95   *
96   * <p>{@link HConnection} will read most of the
97   * configuration it needs from the passed {@link Configuration} on initial
98   * construction.  Thereafter, for settings such as
99   * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
100  * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
101  * passed {@link Configuration} subsequent to {@link HConnection} construction
102  * will go unnoticed.  To run with changed values, make a new
103  * {@link HTable} passing a new {@link Configuration} instance that has the
104  * new configuration.
105  *
106  * <p>Note that this class implements the {@link Closeable} interface. When a
107  * HTable instance is no longer required, it *should* be closed in order to ensure
108  * that the underlying resources are promptly released. Please note that the close
109  * method can throw java.io.IOException that must be handled.
110  *
111  * @see HBaseAdmin for create, drop, list, enable and disable of tables.
112  * @see HConnection
113  * @see HConnectionManager
114  */
115 @InterfaceAudience.Public
116 @InterfaceStability.Stable
117 public class HTable implements HTableInterface {
118   private static final Log LOG = LogFactory.getLog(HTable.class);
119   private HConnection connection;
120   private final byte [] tableName;
121   private volatile Configuration configuration;
122   private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
123   private long writeBufferSize;
124   private boolean clearBufferOnFail;
125   private boolean autoFlush;
126   private long currentWriteBufferSize;
127   protected int scannerCaching;
128   private int maxKeyValueSize;
129   private ExecutorService pool;  // For Multi
130   private boolean closed;
131   private int operationTimeout;
132   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
133   private final boolean cleanupConnectionOnClose; // close the connection in close()
134 
135   /**
136    * Creates an object to access a HBase table.
137    * Shares zookeeper connection and other resources with other HTable instances
138    * created with the same <code>conf</code> instance.  Uses already-populated
139    * region cache if one is available, populated by any other HTable instances
140    * sharing this <code>conf</code> instance.  Recommended.
141    * @param conf Configuration object to use.
142    * @param tableName Name of the table.
143    * @throws IOException if a remote or network exception occurs
144    */
145   public HTable(Configuration conf, final String tableName)
146   throws IOException {
147     this(conf, Bytes.toBytes(tableName));
148   }
149 
150 
151   /**
152    * Creates an object to access a HBase table.
153    * Shares zookeeper connection and other resources with other HTable instances
154    * created with the same <code>conf</code> instance.  Uses already-populated
155    * region cache if one is available, populated by any other HTable instances
156    * sharing this <code>conf</code> instance.  Recommended.
157    * @param conf Configuration object to use.
158    * @param tableName Name of the table.
159    * @throws IOException if a remote or network exception occurs
160    */
161   public HTable(Configuration conf, final byte [] tableName)
162   throws IOException {
163     this.tableName = tableName;
164     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
165     if (conf == null) {
166       this.connection = null;
167       return;
168     }
169     this.connection = HConnectionManager.getConnection(conf);
170     this.configuration = conf;
171 
172     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
173     if (maxThreads == 0) {
174       maxThreads = 1; // is there a better default?
175     }
176     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
177 
178     // Using the "direct handoff" approach, new threads will only be created
179     // if it is necessary and will grow unbounded. This could be bad but in HCM
180     // we only create as many Runnables as there are region servers. It means
181     // it also scales when new region servers are added.
182     this.pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
183         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-table"));
184     ((ThreadPoolExecutor) this.pool).allowCoreThreadTimeOut(true);
185 
186     this.finishSetup();
187   }
188 
189   /**
190    * Creates an object to access a HBase table.
191    * Shares zookeeper connection and other resources with other HTable instances
192    * created with the same <code>conf</code> instance.  Uses already-populated
193    * region cache if one is available, populated by any other HTable instances
194    * sharing this <code>conf</code> instance.
195    * Use this constructor when the ExecutorService is externally managed.
196    * @param conf Configuration object to use.
197    * @param tableName Name of the table.
198    * @param pool ExecutorService to be used.
199    * @throws IOException if a remote or network exception occurs
200    */
201   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
202       throws IOException {
203     this.connection = HConnectionManager.getConnection(conf);
204     this.configuration = conf;
205     this.pool = pool;
206     this.tableName = tableName;
207     this.cleanupPoolOnClose = false;
208     this.cleanupConnectionOnClose = true;
209 
210     this.finishSetup();
211   }
212 
213   /**
214    * Creates an object to access a HBase table.
215    * Shares zookeeper connection and other resources with other HTable instances
216    * created with the same <code>connection</code> instance.
217    * Use this constructor when the ExecutorService and HConnection instance are
218    * externally managed.
219    * @param tableName Name of the table.
220    * @param connection HConnection to be used.
221    * @param pool ExecutorService to be used.
222    * @throws IOException if a remote or network exception occurs
223    */
224   public HTable(final byte[] tableName, final HConnection connection,
225       final ExecutorService pool) throws IOException {
226     if (pool == null || pool.isShutdown()) {
227       throw new IllegalArgumentException("Pool is null or shut down.");
228     }
229     if (connection == null || connection.isClosed()) {
230       throw new IllegalArgumentException("Connection is null or closed.");
231     }
232     this.tableName = tableName;
233     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
234     this.connection = connection;
235     this.configuration = connection.getConfiguration();
236     this.pool = pool;
237 
238     this.finishSetup();
239   }
240 
241   /**
242    * setup this HTable's parameter based on the passed configuration
243    */
244   private void finishSetup() throws IOException {
245     this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
246     this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
247         : this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
248             HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
249     this.writeBufferSize = this.configuration.getLong(
250         "hbase.client.write.buffer", 2097152);
251     this.clearBufferOnFail = true;
252     this.autoFlush = true;
253     this.currentWriteBufferSize = 0;
254     this.scannerCaching = this.configuration.getInt(
255         HConstants.HBASE_CLIENT_SCANNER_CACHING,
256         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
257 
258     this.maxKeyValueSize = this.configuration.getInt(
259         "hbase.client.keyvalue.maxsize", -1);
260     this.closed = false;
261   }
262 
263   /**
264    * {@inheritDoc}
265    */
266   @Override
267   public Configuration getConfiguration() {
268     return configuration;
269   }
270 
271   /**
272    * Tells whether or not a table is enabled or not. This method creates a
273    * new HBase configuration, so it might make your unit tests fail due to
274    * incorrect ZK client port.
275    * @param tableName Name of table to check.
276    * @return {@code true} if table is online.
277    * @throws IOException if a remote or network exception occurs
278 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
279    */
280   @Deprecated
281   public static boolean isTableEnabled(String tableName) throws IOException {
282     return isTableEnabled(Bytes.toBytes(tableName));
283   }
284 
285   /**
286    * Tells whether or not a table is enabled or not. This method creates a
287    * new HBase configuration, so it might make your unit tests fail due to
288    * incorrect ZK client port.
289    * @param tableName Name of table to check.
290    * @return {@code true} if table is online.
291    * @throws IOException if a remote or network exception occurs
292    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
293    */
294   @Deprecated
295   public static boolean isTableEnabled(byte[] tableName) throws IOException {
296     return isTableEnabled(HBaseConfiguration.create(), tableName);
297   }
298 
299   /**
300    * Tells whether or not a table is enabled or not.
301    * @param conf The Configuration object to use.
302    * @param tableName Name of table to check.
303    * @return {@code true} if table is online.
304    * @throws IOException if a remote or network exception occurs
305 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
306    */
307   @Deprecated
308   public static boolean isTableEnabled(Configuration conf, String tableName)
309   throws IOException {
310     return isTableEnabled(conf, Bytes.toBytes(tableName));
311   }
312 
313   /**
314    * Tells whether or not a table is enabled or not.
315    * @param conf The Configuration object to use.
316    * @param tableName Name of table to check.
317    * @return {@code true} if table is online.
318    * @throws IOException if a remote or network exception occurs
319    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[] tableName)}
320    */
321   @Deprecated
322   public static boolean isTableEnabled(Configuration conf,
323       final byte[] tableName) throws IOException {
324     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
325       @Override
326       public Boolean connect(HConnection connection) throws IOException {
327         return connection.isTableEnabled(tableName);
328       }
329     });
330   }
331 
332   /**
333    * Find region location hosting passed row using cached info
334    * @param row Row to find.
335    * @return The location of the given row.
336    * @throws IOException if a remote or network exception occurs
337    */
338   public HRegionLocation getRegionLocation(final String row)
339   throws IOException {
340     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
341   }
342 
343   /**
344    * Finds the region on which the given row is being served. Does not reload the cache.
345    * @param row Row to find.
346    * @return Location of the row.
347    * @throws IOException if a remote or network exception occurs
348    */
349   public HRegionLocation getRegionLocation(final byte [] row)
350   throws IOException {
351     return connection.getRegionLocation(tableName, row, false);
352   }
353 
354   /**
355    * Finds the region on which the given row is being served.
356    * @param row Row to find.
357    * @param reload true to reload information or false to use cached information
358    * @return Location of the row.
359    * @throws IOException if a remote or network exception occurs
360    */
361   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
362   throws IOException {
363     return connection.getRegionLocation(tableName, row, reload);
364   }
365 
366   /**
367    * {@inheritDoc}
368    */
369   @Override
370   public byte [] getTableName() {
371     return this.tableName;
372   }
373 
374   /**
375    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
376    * manipulations.
377    * @return An HConnection instance.
378    * @deprecated This method will be changed from public to package protected.
379    */
380   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
381   @Deprecated
382   public HConnection getConnection() {
383     return this.connection;
384   }
385 
386   /**
387    * Gets the number of rows that a scanner will fetch at once.
388    * <p>
389    * The default value comes from {@code hbase.client.scanner.caching}.
390    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
391    */
392   @Deprecated
393   public int getScannerCaching() {
394     return scannerCaching;
395   }
396 
397   /**
398    * Sets the number of rows that a scanner will fetch at once.
399    * <p>
400    * This will override the value specified by
401    * {@code hbase.client.scanner.caching}.
402    * Increasing this value will reduce the amount of work needed each time
403    * {@code next()} is called on a scanner, at the expense of memory use
404    * (since more rows will need to be maintained in memory by the scanners).
405    * @param scannerCaching the number of rows a scanner will fetch at once.
406    * @deprecated Use {@link Scan#setCaching(int)}
407    */
408   @Deprecated
409   public void setScannerCaching(int scannerCaching) {
410     this.scannerCaching = scannerCaching;
411   }
412 
413   /**
414    * {@inheritDoc}
415    */
416   @Override
417   public HTableDescriptor getTableDescriptor() throws IOException {
418     return new UnmodifyableHTableDescriptor(
419       this.connection.getHTableDescriptor(this.tableName));
420   }
421 
422   /**
423    * Gets the starting row key for every region in the currently open table.
424    * <p>
425    * This is mainly useful for the MapReduce integration.
426    * @return Array of region starting row keys
427    * @throws IOException if a remote or network exception occurs
428    */
429   public byte [][] getStartKeys() throws IOException {
430     return getStartEndKeys().getFirst();
431   }
432 
433   /**
434    * Gets the ending row key for every region in the currently open table.
435    * <p>
436    * This is mainly useful for the MapReduce integration.
437    * @return Array of region ending row keys
438    * @throws IOException if a remote or network exception occurs
439    */
440   public byte[][] getEndKeys() throws IOException {
441     return getStartEndKeys().getSecond();
442   }
443 
444   /**
445    * Gets the starting and ending row keys for every region in the currently
446    * open table.
447    * <p>
448    * This is mainly useful for the MapReduce integration.
449    * @return Pair of arrays of region starting and ending row keys
450    * @throws IOException if a remote or network exception occurs
451    */
452   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
453     NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
454     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
455     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
456 
457     for (HRegionInfo region : regions.keySet()) {
458       startKeyList.add(region.getStartKey());
459       endKeyList.add(region.getEndKey());
460     }
461 
462     return new Pair<byte [][], byte [][]>(
463       startKeyList.toArray(new byte[startKeyList.size()][]),
464       endKeyList.toArray(new byte[endKeyList.size()][]));
465   }
466 
467   /**
468    * Gets all the regions and their address for this table.
469    * <p>
470    * This is mainly useful for the MapReduce integration.
471    * @return A map of HRegionInfo with it's server address
472    * @throws IOException if a remote or network exception occurs
473    */
474   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
475     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocation, singular, returns an HRegionLocation.
476     return MetaScanner.allTableRegions(getConfiguration(), getTableName(), false);
477   }
478 
479   /**
480    * Get the corresponding regions for an arbitrary range of keys.
481    * <p>
482    * @param startKey Starting row in range, inclusive
483    * @param endKey Ending row in range, exclusive
484    * @return A list of HRegionLocations corresponding to the regions that
485    * contain the specified range
486    * @throws IOException if a remote or network exception occurs
487    */
488   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
489     final byte [] endKey) throws IOException {
490     return getKeysAndRegionsInRange(startKey, endKey, false).getSecond();
491     }
492 
493   /**
494    * Get the corresponding start keys and regions for an arbitrary range of
495    * keys.
496    * <p>
497    * @param startKey Starting row in range, inclusive
498    * @param endKey Ending row in range
499    * @param includeEndKey true if endRow is inclusive, false if exclusive
500    * @return A pair of list of start keys and list of HRegionLocations that
501    *         contain the specified range
502    * @throws IOException if a remote or network exception occurs
503    */
504   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
505       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
506       throws IOException {
507     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
508     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
509       throw new IllegalArgumentException(
510         "Invalid range: " + Bytes.toStringBinary(startKey) +
511         " > " + Bytes.toStringBinary(endKey));
512     }
513     List<byte[]> keysInRange = new ArrayList<byte[]>();
514     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
515     byte[] currentKey = startKey;
516     do {
517       HRegionLocation regionLocation = getRegionLocation(currentKey, false);
518       keysInRange.add(currentKey);
519       regionsInRange.add(regionLocation);
520       currentKey = regionLocation.getRegionInfo().getEndKey();
521     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
522         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
523             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
524     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
525         regionsInRange);
526   }
527 
528   /**
529    * {@inheritDoc}
530    */
531    @Override
532    public Result getRowOrBefore(final byte[] row, final byte[] family)
533    throws IOException {
534      return new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
535        public Result call() throws IOException {
536          return ProtobufUtil.getRowOrBefore(stub,
537            location.getRegionInfo().getRegionName(), row, family);
538        }
539      }.withRetries();
540    }
541 
542    /**
543     * {@inheritDoc}
544     */
545   @Override
546   public ResultScanner getScanner(final Scan scan) throws IOException {
547     if (scan.getCaching() <= 0) {
548       scan.setCaching(getScannerCaching());
549     }
550     return new ClientScanner(getConfiguration(), scan, getTableName(),
551         this.connection);
552   }
553 
554   /**
555    * {@inheritDoc}
556    */
557   @Override
558   public ResultScanner getScanner(byte [] family) throws IOException {
559     Scan scan = new Scan();
560     scan.addFamily(family);
561     return getScanner(scan);
562   }
563 
564   /**
565    * {@inheritDoc}
566    */
567   @Override
568   public ResultScanner getScanner(byte [] family, byte [] qualifier)
569   throws IOException {
570     Scan scan = new Scan();
571     scan.addColumn(family, qualifier);
572     return getScanner(scan);
573   }
574 
575   /**
576    * {@inheritDoc}
577    */
578   @Override
579   public Result get(final Get get) throws IOException {
580     return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
581           public Result call() throws IOException {
582             return ProtobufUtil.get(stub,
583               location.getRegionInfo().getRegionName(), get);
584           }
585         }.withRetries();
586   }
587 
588   /**
589    * {@inheritDoc}
590    */
591   @Override
592   public Result[] get(List<Get> gets) throws IOException {
593     try {
594       Object [] r1 = batch((List)gets);
595 
596       // translate.
597       Result [] results = new Result[r1.length];
598       int i=0;
599       for (Object o : r1) {
600         // batch ensures if there is a failure we get an exception instead
601         results[i++] = (Result) o;
602       }
603 
604       return results;
605     } catch (InterruptedException e) {
606       throw new IOException(e);
607     }
608   }
609 
610   @Override
611   public void batch(final List<?extends Row> actions, final Object[] results)
612       throws InterruptedException, IOException {
613     connection.processBatchCallback(actions, tableName, pool, results, null);
614   }
615 
616   @Override
617   public Object[] batch(final List<? extends Row> actions)
618      throws InterruptedException, IOException {
619     Object[] results = new Object[actions.size()];
620     connection.processBatchCallback(actions, tableName, pool, results, null);
621     return results;
622   }
623 
624   @Override
625   public <R> void batchCallback(
626     final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
627     throws IOException, InterruptedException {
628     connection.processBatchCallback(actions, tableName, pool, results, callback);
629   }
630 
631   @Override
632   public <R> Object[] batchCallback(
633     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
634       InterruptedException {
635     Object[] results = new Object[actions.size()];
636     connection.processBatchCallback(actions, tableName, pool, results, callback);
637     return results;
638   }
639 
640   /**
641    * {@inheritDoc}
642    */
643   @Override
644   public void delete(final Delete delete)
645   throws IOException {
646     new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
647           public Boolean call() throws IOException {
648             try {
649               MutateRequest request = RequestConverter.buildMutateRequest(
650                 location.getRegionInfo().getRegionName(), delete);
651               MutateResponse response = stub.mutate(null, request);
652               return Boolean.valueOf(response.getProcessed());
653             } catch (ServiceException se) {
654               throw ProtobufUtil.getRemoteException(se);
655             }
656           }
657         }.withRetries();
658   }
659 
660   /**
661    * {@inheritDoc}
662    */
663   @Override
664   public void delete(final List<Delete> deletes)
665   throws IOException {
666     Object[] results = new Object[deletes.size()];
667     try {
668       connection.processBatch((List) deletes, tableName, pool, results);
669     } catch (InterruptedException e) {
670       throw new IOException(e);
671     } finally {
672       // mutate list so that it is empty for complete success, or contains only failed records
673       // results are returned in the same order as the requests in list
674       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
675       for (int i = results.length - 1; i>=0; i--) {
676         // if result is not null, it succeeded
677         if (results[i] instanceof Result) {
678           deletes.remove(i);
679         }
680       }
681     }
682   }
683 
684   /**
685    * {@inheritDoc}
686    */
687   @Override
688   public void put(final Put put) throws IOException {
689     doPut(put);
690     if (autoFlush) {
691       flushCommits();
692     }
693   }
694 
695   /**
696    * {@inheritDoc}
697    */
698   @Override
699   public void put(final List<Put> puts) throws IOException {
700     for (Put put : puts) {
701       doPut(put);
702     }
703     if (autoFlush) {
704       flushCommits();
705     }
706   }
707 
708   private void doPut(Put put) throws IOException{
709     validatePut(put);
710     writeBuffer.add(put);
711     currentWriteBufferSize += put.heapSize();
712     if (currentWriteBufferSize > writeBufferSize) {
713       flushCommits();
714     }
715   }
716 
717   /**
718    * {@inheritDoc}
719    */
720   @Override
721   public void mutateRow(final RowMutations rm) throws IOException {
722     new ServerCallable<Void>(connection, tableName, rm.getRow(),
723         operationTimeout) {
724       public Void call() throws IOException {
725         try {
726           MultiRequest request = RequestConverter.buildMultiRequest(
727             location.getRegionInfo().getRegionName(), rm);
728           stub.multi(null, request);
729         } catch (ServiceException se) {
730           throw ProtobufUtil.getRemoteException(se);
731         }
732         return null;
733       }
734     }.withRetries();
735   }
736 
737   /**
738    * {@inheritDoc}
739    */
740   @Override
741   public Result append(final Append append) throws IOException {
742     if (append.numFamilies() == 0) {
743       throw new IOException(
744           "Invalid arguments to append, no columns specified");
745     }
746     return new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
747           public Result call() throws IOException {
748             try {
749               MutateRequest request = RequestConverter.buildMutateRequest(
750                 location.getRegionInfo().getRegionName(), append);
751               PayloadCarryingRpcController rpcController =
752                 new PayloadCarryingRpcController();
753               MutateResponse response = stub.mutate(rpcController, request);
754               if (!response.hasResult()) return null;
755               return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
756             } catch (ServiceException se) {
757               throw ProtobufUtil.getRemoteException(se);
758             }
759           }
760         }.withRetries();
761   }
762 
763   /**
764    * {@inheritDoc}
765    */
766   @Override
767   public Result increment(final Increment increment) throws IOException {
768     if (!increment.hasFamilies()) {
769       throw new IOException(
770           "Invalid arguments to increment, no columns specified");
771     }
772     return new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
773           public Result call() throws IOException {
774             try {
775               MutateRequest request = RequestConverter.buildMutateRequest(
776                 location.getRegionInfo().getRegionName(), increment);
777               PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController();
778               MutateResponse response = stub.mutate(rpcContoller, request);
779               return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner());
780             } catch (ServiceException se) {
781               throw ProtobufUtil.getRemoteException(se);
782             }
783           }
784         }.withRetries();
785   }
786 
787   /**
788    * {@inheritDoc}
789    */
790   @Override
791   public long incrementColumnValue(final byte [] row, final byte [] family,
792       final byte [] qualifier, final long amount)
793   throws IOException {
794     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
795   }
796 
797   /**
798    * {@inheritDoc}
799    */
800   @Override
801   public long incrementColumnValue(final byte [] row, final byte [] family,
802       final byte [] qualifier, final long amount, final Durability durability)
803   throws IOException {
804     NullPointerException npe = null;
805     if (row == null) {
806       npe = new NullPointerException("row is null");
807     } else if (family == null) {
808       npe = new NullPointerException("family is null");
809     } else if (qualifier == null) {
810       npe = new NullPointerException("qualifier is null");
811     }
812     if (npe != null) {
813       throw new IOException(
814           "Invalid arguments to incrementColumnValue", npe);
815     }
816     return new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
817           public Long call() throws IOException {
818             try {
819               MutateRequest request = RequestConverter.buildMutateRequest(
820                 location.getRegionInfo().getRegionName(), row, family,
821                 qualifier, amount, durability);
822               PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
823               MutateResponse response = stub.mutate(rpcController, request);
824               Result result =
825                 ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
826               return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
827             } catch (ServiceException se) {
828               throw ProtobufUtil.getRemoteException(se);
829             }
830           }
831         }.withRetries();
832   }
833 
834   /**
835    * {@inheritDoc}
836    */
837   @Override
838   public boolean checkAndPut(final byte [] row,
839       final byte [] family, final byte [] qualifier, final byte [] value,
840       final Put put)
841   throws IOException {
842     return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
843           public Boolean call() throws IOException {
844             try {
845               MutateRequest request = RequestConverter.buildMutateRequest(
846                 location.getRegionInfo().getRegionName(), row, family, qualifier,
847                 new BinaryComparator(value), CompareType.EQUAL, put);
848               MutateResponse response = stub.mutate(null, request);
849               return Boolean.valueOf(response.getProcessed());
850             } catch (ServiceException se) {
851               throw ProtobufUtil.getRemoteException(se);
852             }
853           }
854         }.withRetries();
855   }
856 
857 
858   /**
859    * {@inheritDoc}
860    */
861   @Override
862   public boolean checkAndDelete(final byte [] row,
863       final byte [] family, final byte [] qualifier, final byte [] value,
864       final Delete delete)
865   throws IOException {
866     return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
867           public Boolean call() throws IOException {
868             try {
869               MutateRequest request = RequestConverter.buildMutateRequest(
870                 location.getRegionInfo().getRegionName(), row, family, qualifier,
871                 new BinaryComparator(value), CompareType.EQUAL, delete);
872               MutateResponse response = stub.mutate(null, request);
873               return Boolean.valueOf(response.getProcessed());
874             } catch (ServiceException se) {
875               throw ProtobufUtil.getRemoteException(se);
876             }
877           }
878         }.withRetries();
879   }
880 
881   /**
882    * {@inheritDoc}
883    */
884   @Override
885   public boolean exists(final Get get) throws IOException {
886     return new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
887           public Boolean call() throws IOException {
888             try {
889               GetRequest request = RequestConverter.buildGetRequest(
890                   location.getRegionInfo().getRegionName(), get, true);
891               GetResponse response = stub.get(null, request);
892               return response.getExists();
893             } catch (ServiceException se) {
894               throw ProtobufUtil.getRemoteException(se);
895             }
896           }
897         }.withRetries();
898   }
899 
900   /**
901    * Goal of this inner class is to keep track of the initial position of a get in a list before
902    * sorting it. This is used to send back results in the same orders we got the Gets before we sort
903    * them.
904    */
905   private static class SortedGet implements Comparable<SortedGet> {
906     protected int initialIndex = -1; // Used to store the get initial index in a list.
907     protected Get get; // Encapsulated Get instance.
908 
909     public SortedGet (Get get, int initialIndex) {
910       this.get = get;
911       this.initialIndex = initialIndex;
912     }
913 
914     public int getInitialIndex() {
915       return initialIndex;
916     }
917 
918     @Override
919     public int compareTo(SortedGet o) {
920       return get.compareTo(o.get);
921     }
922 
923     public Get getGet() {
924       return get;
925     }
926 
927     @Override
928     public int hashCode() {
929       return get.hashCode();
930     }
931 
932     @Override
933     public boolean equals(Object obj) {
934       if (obj instanceof SortedGet)
935         return get.equals(((SortedGet)obj).get);
936       else
937         return false;
938     }
939   }
940 
941   /**
942    * {@inheritDoc}
943    */
944   @Override
945   public Boolean[] exists(final List<Get> gets) throws IOException {
946     // Prepare the sorted list of gets. Take the list of gets received, and encapsulate them into
947     // a list of SortedGet instances. Simple list parsing, so complexity here is O(n)
948     // The list is later used to recreate the response order based on the order the Gets
949     // got received.
950     ArrayList<SortedGet> sortedGetsList = new ArrayList<HTable.SortedGet>();
951     for (int indexGet = 0; indexGet < gets.size(); indexGet++) {
952       sortedGetsList.add(new SortedGet (gets.get(indexGet), indexGet));
953     }
954 
955     // Sorting the list to get the Gets ordered based on the key.
956     Collections.sort(sortedGetsList); // O(n log n)
957 
958     // step 1: sort the requests by regions to send them bundled.
959     // Map key is startKey index. Map value is the list of Gets related to the region starting
960     // with the startKey.
961     Map<Integer, List<Get>> getsByRegion = new HashMap<Integer, List<Get>>();
962 
963     // Reference map to quickly find back in which region a get belongs.
964     Map<Get, Integer> getToRegionIndexMap = new HashMap<Get, Integer>();
965     Pair<byte[][], byte[][]> startEndKeys = getStartEndKeys();
966 
967     int regionIndex = 0;
968     for (final SortedGet get : sortedGetsList) {
969       // Progress on the regions until we find the one the current get resides in.
970       while ((regionIndex < startEndKeys.getSecond().length) && ((Bytes.compareTo(startEndKeys.getSecond()[regionIndex], get.getGet().getRow()) <= 0))) {
971         regionIndex++;
972       }
973       List<Get> regionGets = getsByRegion.get(regionIndex);
974       if (regionGets == null) {
975         regionGets = new ArrayList<Get>();
976         getsByRegion.put(regionIndex, regionGets);
977       }
978       regionGets.add(get.getGet());
979       getToRegionIndexMap.put(get.getGet(), regionIndex);
980     }
981 
982     // step 2: make the requests
983     Map<Integer, Future<List<Boolean>>> futures =
984         new HashMap<Integer, Future<List<Boolean>>>(sortedGetsList.size());
985     for (final Map.Entry<Integer, List<Get>> getsByRegionEntry : getsByRegion.entrySet()) {
986       Callable<List<Boolean>> callable = new Callable<List<Boolean>>() {
987         public List<Boolean> call() throws Exception {
988           return new ServerCallable<List<Boolean>>(connection, tableName, getsByRegionEntry.getValue()
989               .get(0).getRow(), operationTimeout) {
990             public List<Boolean> call() throws IOException {
991               try {
992                 MultiGetRequest requests = RequestConverter.buildMultiGetRequest(location
993                     .getRegionInfo().getRegionName(), getsByRegionEntry.getValue(), true, false);
994                 MultiGetResponse responses = stub.multiGet(null, requests);
995                 return responses.getExistsList();
996               } catch (ServiceException se) {
997                 throw ProtobufUtil.getRemoteException(se);
998               }
999             }
1000           }.withRetries();
1001         }
1002       };
1003       futures.put(getsByRegionEntry.getKey(), pool.submit(callable));
1004     }
1005 
1006     // step 3: collect the failures and successes
1007     Map<Integer, List<Boolean>> responses = new HashMap<Integer, List<Boolean>>();
1008     for (final Map.Entry<Integer, List<Get>> sortedGetEntry : getsByRegion.entrySet()) {
1009       try {
1010         Future<List<Boolean>> future = futures.get(sortedGetEntry.getKey());
1011         List<Boolean> resp = future.get();
1012 
1013         if (resp == null) {
1014           LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
1015         }
1016         responses.put(sortedGetEntry.getKey(), resp);
1017       } catch (ExecutionException e) {
1018         LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
1019       } catch (InterruptedException e) {
1020         LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
1021         Thread.currentThread().interrupt();
1022       }
1023     }
1024     Boolean[] results = new Boolean[sortedGetsList.size()];
1025 
1026     // step 4: build the response.
1027     Map<Integer, Integer> indexes = new HashMap<Integer, Integer>();
1028     for (int i = 0; i < sortedGetsList.size(); i++) {
1029       Integer regionInfoIndex = getToRegionIndexMap.get(sortedGetsList.get(i).getGet());
1030       Integer index = indexes.get(regionInfoIndex);
1031       if (index == null) {
1032         index = 0;
1033       }
1034       results[sortedGetsList.get(i).getInitialIndex()] = responses.get(regionInfoIndex).get(index);
1035       indexes.put(regionInfoIndex, index + 1);
1036     }
1037 
1038     return results;
1039   }
1040 
1041   /**
1042    * {@inheritDoc}
1043    */
1044   @Override
1045   public void flushCommits() throws IOException {
1046     if (writeBuffer.isEmpty()){
1047       // Early exit: we can be called on empty buffers.
1048       return;
1049     }
1050 
1051     Object[] results = new Object[writeBuffer.size()];
1052     boolean success = false;
1053     try {
1054       this.connection.processBatch(writeBuffer, tableName, pool, results);
1055       success = true;
1056     } catch (InterruptedException e) {
1057       throw new InterruptedIOException(e.getMessage());
1058     } finally {
1059       // mutate list so that it is empty for complete success, or contains
1060       // only failed records. Results are returned in the same order as the
1061       // requests in list. Walk the list backwards, so we can remove from list
1062       // without impacting the indexes of earlier members
1063       currentWriteBufferSize = 0;
1064       if (success || clearBufferOnFail) {
1065         writeBuffer.clear();
1066       } else {
1067         for (int i = results.length - 1; i >= 0; i--) {
1068           if (results[i] instanceof Result) {
1069             writeBuffer.remove(i);
1070           } else {
1071             currentWriteBufferSize += writeBuffer.get(i).heapSize();
1072           }
1073         }
1074       }
1075     }
1076   }
1077 
1078   /**
1079    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1080    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1081    *
1082    * @param list The collection of actions.
1083    * @param results An empty array, same size as list. If an exception is thrown,
1084    * you can test here for partial results, and to determine which actions
1085    * processed successfully.
1086    * @throws IOException if there are problems talking to META. Per-item
1087    * exceptions are stored in the results array.
1088    */
1089   public <R> void processBatchCallback(
1090     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1091     throws IOException, InterruptedException {
1092     connection.processBatchCallback(list, tableName, pool, results, callback);
1093   }
1094 
1095 
1096   /**
1097    * Parameterized batch processing, allowing varying return types for different
1098    * {@link Row} implementations.
1099    */
1100   public void processBatch(final List<? extends Row> list, final Object[] results)
1101     throws IOException, InterruptedException {
1102 
1103     this.processBatchCallback(list, results, null);
1104   }
1105 
1106 
1107   @Override
1108   public void close() throws IOException {
1109     if (this.closed) {
1110       return;
1111     }
1112     flushCommits();
1113     if (cleanupPoolOnClose) {
1114       this.pool.shutdown();
1115     }
1116     if (cleanupConnectionOnClose) {
1117       if (this.connection != null) {
1118         this.connection.close();
1119       }
1120     }
1121     this.closed = true;
1122   }
1123 
1124   // validate for well-formedness
1125   public void validatePut(final Put put) throws IllegalArgumentException{
1126     if (put.isEmpty()) {
1127       throw new IllegalArgumentException("No columns to insert");
1128     }
1129     if (maxKeyValueSize > 0) {
1130       for (List<? extends Cell> list : put.getFamilyMap().values()) {
1131         for (Cell cell : list) {
1132           // KeyValue v1 expectation.  Cast for now.
1133           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1134           if (kv.getLength() > maxKeyValueSize) {
1135             throw new IllegalArgumentException("KeyValue size too large");
1136           }
1137         }
1138       }
1139     }
1140   }
1141 
1142   /**
1143    * {@inheritDoc}
1144    */
1145   @Override
1146   public boolean isAutoFlush() {
1147     return autoFlush;
1148   }
1149 
1150   /**
1151    * See {@link #setAutoFlush(boolean, boolean)}
1152    *
1153    * @param autoFlush
1154    *          Whether or not to enable 'auto-flush'.
1155    */
1156   public void setAutoFlush(boolean autoFlush) {
1157     setAutoFlush(autoFlush, autoFlush);
1158   }
1159 
1160   /**
1161    * Turns 'auto-flush' on or off.
1162    * <p>
1163    * When enabled (default), {@link Put} operations don't get buffered/delayed
1164    * and are immediately executed. Failed operations are not retried. This is
1165    * slower but safer.
1166    * <p>
1167    * Turning off {@link #autoFlush} means that multiple {@link Put}s will be
1168    * accepted before any RPC is actually sent to do the write operations. If the
1169    * application dies before pending writes get flushed to HBase, data will be
1170    * lost.
1171    * <p>
1172    * When you turn {@link #autoFlush} off, you should also consider the
1173    * {@link #clearBufferOnFail} option. By default, asynchronous {@link Put}
1174    * requests will be retried on failure until successful. However, this can
1175    * pollute the writeBuffer and slow down batching performance. Additionally,
1176    * you may want to issue a number of Put requests and call
1177    * {@link #flushCommits()} as a barrier. In both use cases, consider setting
1178    * clearBufferOnFail to true to erase the buffer after {@link #flushCommits()}
1179    * has been called, regardless of success.
1180    *
1181    * @param autoFlush
1182    *          Whether or not to enable 'auto-flush'.
1183    * @param clearBufferOnFail
1184    *          Whether to keep Put failures in the writeBuffer
1185    * @see #flushCommits
1186    */
1187   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1188     this.autoFlush = autoFlush;
1189     this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1190   }
1191 
1192   /**
1193    * Returns the maximum size in bytes of the write buffer for this HTable.
1194    * <p>
1195    * The default value comes from the configuration parameter
1196    * {@code hbase.client.write.buffer}.
1197    * @return The size of the write buffer in bytes.
1198    */
1199   public long getWriteBufferSize() {
1200     return writeBufferSize;
1201   }
1202 
1203   /**
1204    * Sets the size of the buffer in bytes.
1205    * <p>
1206    * If the new size is less than the current amount of data in the
1207    * write buffer, the buffer gets flushed.
1208    * @param writeBufferSize The new write buffer size, in bytes.
1209    * @throws IOException if a remote or network exception occurs.
1210    */
1211   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1212     this.writeBufferSize = writeBufferSize;
1213     if(currentWriteBufferSize > writeBufferSize) {
1214       flushCommits();
1215     }
1216   }
1217 
1218   /**
1219    * Returns the write buffer.
1220    * @return The current write buffer.
1221    */
1222   public ArrayList<Put> getWriteBuffer() {
1223     return writeBuffer;
1224   }
1225 
1226   /**
1227    * The pool is used for mutli requests for this HTable
1228    * @return the pool used for mutli
1229    */
1230   ExecutorService getPool() {
1231     return this.pool;
1232   }
1233 
1234   /**
1235    * Enable or disable region cache prefetch for the table. It will be
1236    * applied for the given table's all HTable instances who share the same
1237    * connection. By default, the cache prefetch is enabled.
1238    * @param tableName name of table to configure.
1239    * @param enable Set to true to enable region cache prefetch. Or set to
1240    * false to disable it.
1241    * @throws IOException
1242    */
1243   public static void setRegionCachePrefetch(final byte[] tableName,
1244       final boolean enable) throws IOException {
1245     HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration
1246         .create()) {
1247       @Override
1248       public Void connect(HConnection connection) throws IOException {
1249         connection.setRegionCachePrefetch(tableName, enable);
1250         return null;
1251       }
1252     });
1253   }
1254 
1255   /**
1256    * Enable or disable region cache prefetch for the table. It will be
1257    * applied for the given table's all HTable instances who share the same
1258    * connection. By default, the cache prefetch is enabled.
1259    * @param conf The Configuration object to use.
1260    * @param tableName name of table to configure.
1261    * @param enable Set to true to enable region cache prefetch. Or set to
1262    * false to disable it.
1263    * @throws IOException
1264    */
1265   public static void setRegionCachePrefetch(final Configuration conf,
1266       final byte[] tableName, final boolean enable) throws IOException {
1267     HConnectionManager.execute(new HConnectable<Void>(conf) {
1268       @Override
1269       public Void connect(HConnection connection) throws IOException {
1270         connection.setRegionCachePrefetch(tableName, enable);
1271         return null;
1272       }
1273     });
1274   }
1275 
1276   /**
1277    * Check whether region cache prefetch is enabled or not for the table.
1278    * @param conf The Configuration object to use.
1279    * @param tableName name of table to check
1280    * @return true if table's region cache prefecth is enabled. Otherwise
1281    * it is disabled.
1282    * @throws IOException
1283    */
1284   public static boolean getRegionCachePrefetch(final Configuration conf,
1285       final byte[] tableName) throws IOException {
1286     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
1287       @Override
1288       public Boolean connect(HConnection connection) throws IOException {
1289         return connection.getRegionCachePrefetch(tableName);
1290       }
1291     });
1292   }
1293 
1294   /**
1295    * Check whether region cache prefetch is enabled or not for the table.
1296    * @param tableName name of table to check
1297    * @return true if table's region cache prefecth is enabled. Otherwise
1298    * it is disabled.
1299    * @throws IOException
1300    */
1301   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1302     return HConnectionManager.execute(new HConnectable<Boolean>(
1303         HBaseConfiguration.create()) {
1304       @Override
1305       public Boolean connect(HConnection connection) throws IOException {
1306         return connection.getRegionCachePrefetch(tableName);
1307       }
1308     });
1309  }
1310 
1311   /**
1312    * Explicitly clears the region cache to fetch the latest value from META.
1313    * This is a power user function: avoid unless you know the ramifications.
1314    */
1315   public void clearRegionCache() {
1316     this.connection.clearRegionCache();
1317   }
1318 
1319   /**
1320    * {@inheritDoc}
1321    */
1322   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1323     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1324   }
1325 
1326   /**
1327    * {@inheritDoc}
1328    */
1329   @Override
1330   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1331       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1332       throws ServiceException, Throwable {
1333     final Map<byte[],R> results =  Collections.synchronizedMap(
1334         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1335     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1336       public void update(byte[] region, byte[] row, R value) {
1337         results.put(region, value);
1338       }
1339     });
1340     return results;
1341   }
1342 
1343   /**
1344    * {@inheritDoc}
1345    */
1346   @Override
1347   public <T extends Service, R> void coprocessorService(final Class<T> service,
1348       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1349       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1350 
1351     // get regions covered by the row range
1352     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1353 
1354     Map<byte[],Future<R>> futures =
1355         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1356     for (final byte[] r : keys) {
1357       final RegionCoprocessorRpcChannel channel =
1358           new RegionCoprocessorRpcChannel(connection, tableName, r);
1359       Future<R> future = pool.submit(
1360           new Callable<R>() {
1361             public R call() throws Exception {
1362               T instance = ProtobufUtil.newServiceStub(service, channel);
1363               R result = callable.call(instance);
1364               byte[] region = channel.getLastRegion();
1365               if (callback != null) {
1366                 callback.update(region, r, result);
1367               }
1368               return result;
1369             }
1370           });
1371       futures.put(r, future);
1372     }
1373     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1374       try {
1375         e.getValue().get();
1376       } catch (ExecutionException ee) {
1377         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1378             + Bytes.toStringBinary(e.getKey()), ee);
1379         throw ee.getCause();
1380       } catch (InterruptedException ie) {
1381         Thread.currentThread().interrupt();
1382         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1383             + " for row " + Bytes.toStringBinary(e.getKey()))
1384             .initCause(ie);
1385       }
1386     }
1387   }
1388 
1389   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1390   throws IOException {
1391     if (start == null) {
1392       start = HConstants.EMPTY_START_ROW;
1393     }
1394     if (end == null) {
1395       end = HConstants.EMPTY_END_ROW;
1396     }
1397     return getKeysAndRegionsInRange(start, end, true).getFirst();
1398   }
1399 
1400   public void setOperationTimeout(int operationTimeout) {
1401     this.operationTimeout = operationTimeout;
1402   }
1403 
1404   public int getOperationTimeout() {
1405     return operationTimeout;
1406   }
1407 
1408 }