View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.protobuf.Descriptors;
23  import com.google.protobuf.Message;
24  import com.google.protobuf.Service;
25  import com.google.protobuf.ServiceException;
26
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.TreeMap;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.SynchronousQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionLocation;
49  import org.apache.hadoop.hbase.HTableDescriptor;
50  import org.apache.hadoop.hbase.KeyValueUtil;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.classification.InterfaceStability;
54  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
55  import org.apache.hadoop.hbase.client.coprocessor.Batch;
56  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
57  import org.apache.hadoop.hbase.filter.BinaryComparator;
58  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
59  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
60  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
61  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
62  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
63  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
64  import org.apache.hadoop.hbase.protobuf.RequestConverter;
65  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
70  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
71  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
72  import org.apache.hadoop.hbase.util.Bytes;
73  import org.apache.hadoop.hbase.util.Pair;
74  import org.apache.hadoop.hbase.util.ReflectionUtils;
75  import org.apache.hadoop.hbase.util.Threads;
76  
77  /**
78   * An implementation of {@link Table}. Used to communicate with a single HBase table.
79   * Lightweight. Get as needed and just close when done.
80   * Instances of this class SHOULD NOT be constructed directly.
81   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
82   * class comment for an example of how.
83   *
84   * <p>This class is NOT thread safe for reads nor writes.
85   * In the case of writes (Put, Delete), the underlying write buffer can
86   * be corrupted if multiple threads contend over a single HTable instance.
87   * In the case of reads, some fields used by a Scan are shared among all threads.
88   *
89   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
90   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
91   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
92   * Interface Classification</a>
93   * There are no guarantees for backwards source / binary compatibility and methods or class can
94   * change or go away without deprecation.
95   *
96   * @see Table
97   * @see Admin
98   * @see Connection
99   * @see ConnectionFactory
100  */
101 @InterfaceAudience.Private
102 @InterfaceStability.Stable
103 public class HTable implements Table {
104   private static final Log LOG = LogFactory.getLog(HTable.class);
105   protected ClusterConnection connection;
106   private final TableName tableName;
107   private volatile Configuration configuration;
108   private ConnectionConfiguration connConfiguration;
109   protected BufferedMutatorImpl mutator;
110   private boolean closed = false;
111   protected int scannerCaching;
112   protected long scannerMaxResultSize;
113   private ExecutorService pool;  // For Multi & Scan
114   private int operationTimeout; // global timeout for each blocking method with retrying rpc
115   private int rpcTimeout; // timeout for each rpc request
116   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
117   private final boolean cleanupConnectionOnClose; // close the connection in close()
118   private Consistency defaultConsistency = Consistency.STRONG;
119   private HRegionLocator locator;
120
121   /** The Async process for batch */
122   protected AsyncProcess multiAp;
123   private RpcRetryingCallerFactory rpcCallerFactory;
124   private RpcControllerFactory rpcControllerFactory;
125
126   // Marked Private @since 1.0
127   @InterfaceAudience.Private
128   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
129     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
130     if (maxThreads == 0) {
131       maxThreads = 1; // is there a better default?
132     }
133     int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
134     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
135
136     // Using the "direct handoff" approach, new threads will only be created
137     // if it is necessary and will grow unbounded. This could be bad but in HCM
138     // we only create as many Runnables as there are region servers. It means
139     // it also scales when new region servers are added.
140     ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
141       TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
142     pool.allowCoreThreadTimeOut(true);
143     return pool;
144   }
145
146   /**
147    * Creates an object to access a HBase table.
148    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
149    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
150    * @param tableName Name of the table.
151    * @param connection Connection to be used.
152    * @param pool ExecutorService to be used.
153    * @throws IOException if a remote or network exception occurs
154    */
155   @InterfaceAudience.Private
156   protected HTable(TableName tableName, final ClusterConnection connection,
157       final ConnectionConfiguration tableConfig,
158       final RpcRetryingCallerFactory rpcCallerFactory,
159       final RpcControllerFactory rpcControllerFactory,
160       final ExecutorService pool) throws IOException {
161     if (connection == null || connection.isClosed()) {
162       throw new IllegalArgumentException("Connection is null or closed.");
163     }
164     this.tableName = tableName;
165     this.cleanupConnectionOnClose = false;
166     this.connection = connection;
167     this.configuration = connection.getConfiguration();
168     this.connConfiguration = tableConfig;
169     this.pool = pool;
170     if (pool == null) {
171       this.pool = getDefaultExecutor(this.configuration);
172       this.cleanupPoolOnClose = true;
173     } else {
174       this.cleanupPoolOnClose = false;
175     }
176
177     this.rpcCallerFactory = rpcCallerFactory;
178     this.rpcControllerFactory = rpcControllerFactory;
179
180     this.finishSetup();
181   }
182
183   /**
184    * For internal testing. Uses Connection provided in {@code params}.
185    * @throws IOException
186    */
187   @VisibleForTesting
188   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
189     connection = conn;
190     tableName = params.getTableName();
191     connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
192     cleanupPoolOnClose = false;
193     cleanupConnectionOnClose = false;
194     // used from tests, don't trust the connection is real
195     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
196   }
197
198   /**
199    * @return maxKeyValueSize from configuration.
200    */
201   public static int getMaxKeyValueSize(Configuration conf) {
202     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
203   }
204
205   /**
206    * setup this HTable's parameter based on the passed configuration
207    */
208   private void finishSetup() throws IOException {
209     if (connConfiguration == null) {
210       connConfiguration = new ConnectionConfiguration(configuration);
211     }
212
213     this.operationTimeout = tableName.isSystemTable() ?
214         connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
215     this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
216         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
217     this.scannerCaching = connConfiguration.getScannerCaching();
218     this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
219     if (this.rpcCallerFactory == null) {
220       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
221     }
222     if (this.rpcControllerFactory == null) {
223       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
224     }
225
226     // puts need to track errors globally due to how the APIs currently work.
227     multiAp = this.connection.getAsyncProcess();
228     this.locator = new HRegionLocator(getName(), connection);
229   }
230
231   /**
232    * {@inheritDoc}
233    */
234   @Override
235   public Configuration getConfiguration() {
236     return configuration;
237   }
238
239   @Override
240   public TableName getName() {
241     return tableName;
242   }
243
244   /**
245    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
246    * manipulations.
247    * @return A Connection instance.
248    */
249   @VisibleForTesting
250   protected Connection getConnection() {
251     return this.connection;
252   }
253
254   /**
255    * {@inheritDoc}
256    */
257   @Override
258   public HTableDescriptor getTableDescriptor() throws IOException {
259     HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
260       rpcControllerFactory, operationTimeout, rpcTimeout);
261     if (htd != null) {
262       return new UnmodifyableHTableDescriptor(htd);
263     }
264     return null;
265   }
266
267   /**
268    * Get the corresponding start keys and regions for an arbitrary range of
269    * keys.
270    * <p>
271    * @param startKey Starting row in range, inclusive
272    * @param endKey Ending row in range
273    * @param includeEndKey true if endRow is inclusive, false if exclusive
274    * @return A pair of list of start keys and list of HRegionLocations that
275    *         contain the specified range
276    * @throws IOException if a remote or network exception occurs
277    */
278   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
279       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
280       throws IOException {
281     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
282   }
283
284   /**
285    * Get the corresponding start keys and regions for an arbitrary range of
286    * keys.
287    * <p>
288    * @param startKey Starting row in range, inclusive
289    * @param endKey Ending row in range
290    * @param includeEndKey true if endRow is inclusive, false if exclusive
291    * @param reload true to reload information or false to use cached information
292    * @return A pair of list of start keys and list of HRegionLocations that
293    *         contain the specified range
294    * @throws IOException if a remote or network exception occurs
295    */
296   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
297       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
298       final boolean reload) throws IOException {
299     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
300     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
301       throw new IllegalArgumentException(
302         "Invalid range: " + Bytes.toStringBinary(startKey) +
303         " > " + Bytes.toStringBinary(endKey));
304     }
305     List<byte[]> keysInRange = new ArrayList<byte[]>();
306     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
307     byte[] currentKey = startKey;
308     do {
309       HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
310       keysInRange.add(currentKey);
311       regionsInRange.add(regionLocation);
312       currentKey = regionLocation.getRegionInfo().getEndKey();
313     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
314         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
315             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
316     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
317         regionsInRange);
318   }
319
320   /**
321    * The underlying {@link HTable} must not be closed.
322    * {@link Table#getScanner(Scan)} has other usage details.
323    */
324   @Override
325   public ResultScanner getScanner(final Scan scan) throws IOException {
326     if (scan.getBatch() > 0 && scan.isSmall()) {
327       throw new IllegalArgumentException("Small scan should not be used with batching");
328     }
329
330     if (scan.getCaching() <= 0) {
331       scan.setCaching(scannerCaching);
332     }
333     if (scan.getMaxResultSize() <= 0) {
334       scan.setMaxResultSize(scannerMaxResultSize);
335     }
336
337     Boolean async = scan.isAsyncPrefetch();
338     if (async == null) {
339       async = connConfiguration.isClientScannerAsyncPrefetch();
340     }
341
342     if (scan.isReversed()) {
343       if (scan.isSmall()) {
344         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
345             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
346             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
347       } else {
348         return new ReversedClientScanner(getConfiguration(), scan, getName(),
349             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
350             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
351       }
352     }
353
354     if (scan.isSmall()) {
355       return new ClientSmallScanner(getConfiguration(), scan, getName(),
356           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
357           pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
358     } else {
359       if (async) {
360         return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,
361             this.rpcCallerFactory, this.rpcControllerFactory,
362             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
363       } else {
364         return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection,
365             this.rpcCallerFactory, this.rpcControllerFactory,
366             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
367       }
368     }
369   }
370
371   /**
372    * The underlying {@link HTable} must not be closed.
373    * {@link Table#getScanner(byte[])} has other usage details.
374    */
375   @Override
376   public ResultScanner getScanner(byte [] family) throws IOException {
377     Scan scan = new Scan();
378     scan.addFamily(family);
379     return getScanner(scan);
380   }
381
382   /**
383    * The underlying {@link HTable} must not be closed.
384    * {@link Table#getScanner(byte[], byte[])} has other usage details.
385    */
386   @Override
387   public ResultScanner getScanner(byte [] family, byte [] qualifier)
388   throws IOException {
389     Scan scan = new Scan();
390     scan.addColumn(family, qualifier);
391     return getScanner(scan);
392   }
393
394   /**
395    * {@inheritDoc}
396    */
397   @Override
398   public Result get(final Get get) throws IOException {
399     return get(get, get.isCheckExistenceOnly());
400   }
401
402   private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
403     // if we are changing settings to the get, clone it.
404     if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
405       get = ReflectionUtils.newInstance(get.getClass(), get);
406       get.setCheckExistenceOnly(checkExistenceOnly);
407       if (get.getConsistency() == null){
408         get.setConsistency(defaultConsistency);
409       }
410     }
411
412     if (get.getConsistency() == Consistency.STRONG) {
413       // Good old call.
414       final Get getReq = get;
415       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
416           getName(), get.getRow()) {
417         @Override
418         public Result call(int callTimeout) throws IOException {
419           ClientProtos.GetRequest request =
420             RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
421           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
422           controller.setPriority(tableName);
423           controller.setCallTimeout(callTimeout);
424           try {
425             ClientProtos.GetResponse response = getStub().get(controller, request);
426             if (response == null) return null;
427             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
428           } catch (ServiceException se) {
429             throw ProtobufUtil.getRemoteException(se);
430           }
431         }
432       };
433       return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
434           this.operationTimeout);
435     }
436
437     // Call that takes into account the replica
438     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
439         rpcControllerFactory, tableName, this.connection, get, pool,
440         connConfiguration.getRetriesNumber(),
441         operationTimeout,
442         connConfiguration.getPrimaryCallTimeoutMicroSecond());
443     return callable.call(operationTimeout);
444   }
445
446
447   /**
448    * {@inheritDoc}
449    */
450   @Override
451   public Result[] get(List<Get> gets) throws IOException {
452     if (gets.size() == 1) {
453       return new Result[]{get(gets.get(0))};
454     }
455     try {
456       Object[] r1 = new Object[gets.size()];
457       batch((List) gets, r1);
458
459       // translate.
460       Result [] results = new Result[r1.length];
461       int i=0;
462       for (Object o : r1) {
463         // batch ensures if there is a failure we get an exception instead
464         results[i++] = (Result) o;
465       }
466
467       return results;
468     } catch (InterruptedException e) {
469       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
470     }
471   }
472
473   /**
474    * {@inheritDoc}
475    */
476   @Override
477   public void batch(final List<? extends Row> actions, final Object[] results)
478       throws InterruptedException, IOException {
479     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
480     ars.waitUntilDone();
481     if (ars.hasError()) {
482       throw ars.getErrors();
483     }
484   }
485
486   /**
487    * {@inheritDoc}
488    */
489   @Override
490   public <R> void batchCallback(
491     final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
492     throws IOException, InterruptedException {
493     doBatchWithCallback(actions, results, callback, connection, pool, tableName);
494   }
495
496   public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
497     Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
498     throws InterruptedIOException, RetriesExhaustedWithDetailsException {
499     AsyncRequestFuture ars = connection.getAsyncProcess().submitAll(
500       pool, tableName, actions, callback, results);
501     ars.waitUntilDone();
502     if (ars.hasError()) {
503       throw ars.getErrors();
504     }
505   }
506
507   /**
508    * {@inheritDoc}
509    */
510   @Override
511   public void delete(final Delete delete)
512   throws IOException {
513     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
514         tableName, delete.getRow()) {
515       @Override
516       public Boolean call(int callTimeout) throws IOException {
517         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
518         controller.setPriority(tableName);
519         controller.setCallTimeout(callTimeout);
520
521         try {
522           MutateRequest request = RequestConverter.buildMutateRequest(
523             getLocation().getRegionInfo().getRegionName(), delete);
524           MutateResponse response = getStub().mutate(controller, request);
525           return Boolean.valueOf(response.getProcessed());
526         } catch (ServiceException se) {
527           throw ProtobufUtil.getRemoteException(se);
528         }
529       }
530     };
531     rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
532         this.operationTimeout);
533   }
534
535   /**
536    * {@inheritDoc}
537    */
538   @Override
539   public void delete(final List<Delete> deletes)
540   throws IOException {
541     Object[] results = new Object[deletes.size()];
542     try {
543       batch(deletes, results);
544     } catch (InterruptedException e) {
545       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
546     } finally {
547       // mutate list so that it is empty for complete success, or contains only failed records
548       // results are returned in the same order as the requests in list walk the list backwards,
549       // so we can remove from list without impacting the indexes of earlier members
550       for (int i = results.length - 1; i>=0; i--) {
551         // if result is not null, it succeeded
552         if (results[i] instanceof Result) {
553           deletes.remove(i);
554         }
555       }
556     }
557   }
558
559   /**
560    * {@inheritDoc}
561    * @throws IOException
562    */
563   @Override
564   public void put(final Put put) throws IOException {
565     getBufferedMutator().mutate(put);
566     flushCommits();
567   }
568
569   /**
570    * {@inheritDoc}
571    * @throws IOException
572    */
573   @Override
574   public void put(final List<Put> puts) throws IOException {
575     getBufferedMutator().mutate(puts);
576     flushCommits();
577   }
578
579   /**
580    * {@inheritDoc}
581    */
582   @Override
583   public void mutateRow(final RowMutations rm) throws IOException {
584     final RetryingTimeTracker tracker = new RetryingTimeTracker();
585     PayloadCarryingServerCallable<MultiResponse> callable =
586       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
587           rpcControllerFactory) {
588         @Override
589         public MultiResponse call(int callTimeout) throws IOException {
590           tracker.start();
591           controller.setPriority(tableName);
592           int remainingTime = tracker.getRemainingTime(callTimeout);
593           if (remainingTime == 0) {
594             throw new DoNotRetryIOException("Timeout for mutate row");
595           }
596           controller.setCallTimeout(remainingTime);
597           try {
598             RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
599                 getLocation().getRegionInfo().getRegionName(), rm);
600             regionMutationBuilder.setAtomic(true);
601             MultiRequest request =
602                 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
603             ClientProtos.MultiResponse response = getStub().multi(controller, request);
604             ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
605             if (res.hasException()) {
606               Throwable ex = ProtobufUtil.toException(res.getException());
607               if (ex instanceof IOException) {
608                 throw (IOException) ex;
609               }
610               throw new IOException("Failed to mutate row: " +
611                   Bytes.toStringBinary(rm.getRow()), ex);
612             }
613             return ResponseConverter.getResults(request, response, controller.cellScanner());
614           } catch (ServiceException se) {
615             throw ProtobufUtil.getRemoteException(se);
616           }
617         }
618       };
619     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
620         null, null, callable, operationTimeout);
621     ars.waitUntilDone();
622     if (ars.hasError()) {
623       throw ars.getErrors();
624     }
625   }
626
627   /**
628    * {@inheritDoc}
629    */
630   @Override
631   public Result append(final Append append) throws IOException {
632     if (append.numFamilies() == 0) {
633       throw new IOException(
634           "Invalid arguments to append, no columns specified");
635     }
636
637     NonceGenerator ng = this.connection.getNonceGenerator();
638     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
639     RegionServerCallable<Result> callable =
640       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
641         @Override
642         public Result call(int callTimeout) throws IOException {
643           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
644           controller.setPriority(getTableName());
645           controller.setCallTimeout(callTimeout);
646           try {
647             MutateRequest request = RequestConverter.buildMutateRequest(
648               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
649             MutateResponse response = getStub().mutate(controller, request);
650             if (!response.hasResult()) return null;
651             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
652           } catch (ServiceException se) {
653             throw ProtobufUtil.getRemoteException(se);
654           }
655         }
656       };
657     return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
658         this.operationTimeout);
659   }
660
661   /**
662    * {@inheritDoc}
663    */
664   @Override
665   public Result increment(final Increment increment) throws IOException {
666     if (!increment.hasFamilies()) {
667       throw new IOException(
668           "Invalid arguments to increment, no columns specified");
669     }
670     NonceGenerator ng = this.connection.getNonceGenerator();
671     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
672     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
673         getName(), increment.getRow()) {
674       @Override
675       public Result call(int callTimeout) throws IOException {
676         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
677         controller.setPriority(getTableName());
678         controller.setCallTimeout(callTimeout);
679         try {
680           MutateRequest request = RequestConverter.buildMutateRequest(
681             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
682           MutateResponse response = getStub().mutate(controller, request);
683           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
684         } catch (ServiceException se) {
685           throw ProtobufUtil.getRemoteException(se);
686         }
687       }
688     };
689     return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
690         this.operationTimeout);
691   }
692
693   /**
694    * {@inheritDoc}
695    */
696   @Override
697   public long incrementColumnValue(final byte [] row, final byte [] family,
698       final byte [] qualifier, final long amount)
699   throws IOException {
700     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
701   }
702
703   /**
704    * {@inheritDoc}
705    */
706   @Override
707   public long incrementColumnValue(final byte [] row, final byte [] family,
708       final byte [] qualifier, final long amount, final Durability durability)
709   throws IOException {
710     NullPointerException npe = null;
711     if (row == null) {
712       npe = new NullPointerException("row is null");
713     } else if (family == null) {
714       npe = new NullPointerException("family is null");
715     } else if (qualifier == null) {
716       npe = new NullPointerException("qualifier is null");
717     }
718     if (npe != null) {
719       throw new IOException(
720           "Invalid arguments to incrementColumnValue", npe);
721     }
722
723     NonceGenerator ng = this.connection.getNonceGenerator();
724     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
725     RegionServerCallable<Long> callable =
726       new RegionServerCallable<Long>(connection, getName(), row) {
727         @Override
728         public Long call(int callTimeout) throws IOException {
729           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
730           controller.setPriority(getTableName());
731           controller.setCallTimeout(callTimeout);
732           try {
733             MutateRequest request = RequestConverter.buildIncrementRequest(
734               getLocation().getRegionInfo().getRegionName(), row, family,
735               qualifier, amount, durability, nonceGroup, nonce);
736             MutateResponse response = getStub().mutate(controller, request);
737             Result result =
738               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
739             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
740           } catch (ServiceException se) {
741             throw ProtobufUtil.getRemoteException(se);
742           }
743         }
744       };
745     return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
746         this.operationTimeout);
747   }
748
749   /**
750    * {@inheritDoc}
751    */
752   @Override
753   public boolean checkAndPut(final byte [] row,
754       final byte [] family, final byte [] qualifier, final byte [] value,
755       final Put put)
756   throws IOException {
757     RegionServerCallable<Boolean> callable =
758       new RegionServerCallable<Boolean>(connection, getName(), row) {
759         @Override
760         public Boolean call(int callTimeout) throws IOException {
761           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
762           controller.setPriority(tableName);
763           controller.setCallTimeout(callTimeout);
764           try {
765             MutateRequest request = RequestConverter.buildMutateRequest(
766                 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
767                 new BinaryComparator(value), CompareType.EQUAL, put);
768             MutateResponse response = getStub().mutate(controller, request);
769             return Boolean.valueOf(response.getProcessed());
770           } catch (ServiceException se) {
771             throw ProtobufUtil.getRemoteException(se);
772           }
773         }
774       };
775     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
776         this.operationTimeout);
777   }
778
779   /**
780    * {@inheritDoc}
781    */
782   @Override
783   public boolean checkAndPut(final byte [] row, final byte [] family,
784       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
785       final Put put)
786   throws IOException {
787     RegionServerCallable<Boolean> callable =
788       new RegionServerCallable<Boolean>(connection, getName(), row) {
789         @Override
790         public Boolean call(int callTimeout) throws IOException {
791           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
792           controller.setPriority(tableName);
793           controller.setCallTimeout(callTimeout);
794           try {
795             CompareType compareType = CompareType.valueOf(compareOp.name());
796             MutateRequest request = RequestConverter.buildMutateRequest(
797               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
798                 new BinaryComparator(value), compareType, put);
799             MutateResponse response = getStub().mutate(controller, request);
800             return Boolean.valueOf(response.getProcessed());
801           } catch (ServiceException se) {
802             throw ProtobufUtil.getRemoteException(se);
803           }
804         }
805       };
806     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
807         this.operationTimeout);
808   }
809
810   /**
811    * {@inheritDoc}
812    */
813   @Override
814   public boolean checkAndDelete(final byte [] row,
815       final byte [] family, final byte [] qualifier, final byte [] value,
816       final Delete delete)
817   throws IOException {
818     RegionServerCallable<Boolean> callable =
819       new RegionServerCallable<Boolean>(connection, getName(), row) {
820         @Override
821         public Boolean call(int callTimeout) throws IOException {
822           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
823           controller.setPriority(tableName);
824           controller.setCallTimeout(callTimeout);
825           try {
826             MutateRequest request = RequestConverter.buildMutateRequest(
827               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
828                 new BinaryComparator(value), CompareType.EQUAL, delete);
829             MutateResponse response = getStub().mutate(controller, request);
830             return Boolean.valueOf(response.getProcessed());
831           } catch (ServiceException se) {
832             throw ProtobufUtil.getRemoteException(se);
833           }
834         }
835       };
836     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
837         this.operationTimeout);
838   }
839
840   /**
841    * {@inheritDoc}
842    */
843   @Override
844   public boolean checkAndDelete(final byte [] row, final byte [] family,
845       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
846       final Delete delete)
847   throws IOException {
848     RegionServerCallable<Boolean> callable =
849       new RegionServerCallable<Boolean>(connection, getName(), row) {
850         @Override
851         public Boolean call(int callTimeout) throws IOException {
852           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
853           controller.setPriority(tableName);
854           controller.setCallTimeout(callTimeout);
855           try {
856             CompareType compareType = CompareType.valueOf(compareOp.name());
857             MutateRequest request = RequestConverter.buildMutateRequest(
858               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
859                 new BinaryComparator(value), compareType, delete);
860             MutateResponse response = getStub().mutate(controller, request);
861             return Boolean.valueOf(response.getProcessed());
862           } catch (ServiceException se) {
863             throw ProtobufUtil.getRemoteException(se);
864           }
865         }
866       };
867     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
868         this.operationTimeout);
869   }
870
871   /**
872    * {@inheritDoc}
873    */
874   @Override
875   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
876     final CompareOp compareOp, final byte [] value, final RowMutations rm)
877     throws IOException {
878     final RetryingTimeTracker tracker = new RetryingTimeTracker();
879     PayloadCarryingServerCallable<MultiResponse> callable =
880       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
881         rpcControllerFactory) {
882         @Override
883         public MultiResponse call(int callTimeout) throws IOException {
884           tracker.start();
885           controller.setPriority(tableName);
886           int remainingTime = tracker.getRemainingTime(callTimeout);
887           if (remainingTime == 0) {
888             throw new DoNotRetryIOException("Timeout for mutate row");
889           }
890           controller.setCallTimeout(remainingTime);
891           try {
892             CompareType compareType = CompareType.valueOf(compareOp.name());
893             MultiRequest request = RequestConverter.buildMutateRequest(
894               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
895               new BinaryComparator(value), compareType, rm);
896             ClientProtos.MultiResponse response = getStub().multi(controller, request);
897             ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
898             if (res.hasException()) {
899               Throwable ex = ProtobufUtil.toException(res.getException());
900               if(ex instanceof IOException) {
901                 throw (IOException)ex;
902               }
903               throw new IOException("Failed to checkAndMutate row: "+
904                                     Bytes.toStringBinary(rm.getRow()), ex);
905             }
906             return ResponseConverter.getResults(request, response, controller.cellScanner());
907           } catch (ServiceException se) {
908             throw ProtobufUtil.getRemoteException(se);
909           }
910         }
911       };
912     /**
913      *  Currently, we use one array to store 'processed' flag which is returned by server.
914      *  It is excessive to send such a large array, but that is required by the framework right now
915      * */
916     Object[] results = new Object[rm.getMutations().size()];
917     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
918       null, results, callable, operationTimeout);
919     ars.waitUntilDone();
920     if (ars.hasError()) {
921       throw ars.getErrors();
922     }
923
924     return ((Result)results[0]).getExists();
925   }
926
927   /**
928    * {@inheritDoc}
929    */
930   @Override
931   public boolean exists(final Get get) throws IOException {
932     Result r = get(get, true);
933     assert r.getExists() != null;
934     return r.getExists();
935   }
936
937   /**
938    * {@inheritDoc}
939    */
940   @Override
941   public boolean[] existsAll(final List<Get> gets) throws IOException {
942     if (gets.isEmpty()) return new boolean[]{};
943     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
944
945     ArrayList<Get> exists = new ArrayList<Get>(gets.size());
946     for (Get g: gets){
947       Get ge = new Get(g);
948       ge.setCheckExistenceOnly(true);
949       exists.add(ge);
950     }
951
952     Object[] r1= new Object[exists.size()];
953     try {
954       batch(exists, r1);
955     } catch (InterruptedException e) {
956       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
957     }
958
959     // translate.
960     boolean[] results = new boolean[r1.length];
961     int i = 0;
962     for (Object o : r1) {
963       // batch ensures if there is a failure we get an exception instead
964       results[i++] = ((Result)o).getExists();
965     }
966
967     return results;
968   }
969
970   /**
971    * {@inheritDoc}
972    * @throws IOException
973    */
974   void flushCommits() throws IOException {
975     if (mutator == null) {
976       // nothing to flush if there's no mutator; don't bother creating one.
977       return;
978     }
979     getBufferedMutator().flush();
980   }
981
982   /**
983    * Process a mixed batch of Get, Put and Delete actions. All actions for a
984    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
985    *
986    * @param list The collection of actions.
987    * @param results An empty array, same size as list. If an exception is thrown,
988    *   you can test here for partial results, and to determine which actions
989    *   processed successfully.
990    * @throws IOException if there are problems talking to META. Per-item
991    *   exceptions are stored in the results array.
992    */
993   public <R> void processBatchCallback(
994     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
995     throws IOException, InterruptedException {
996     this.batchCallback(list, results, callback);
997   }
998
999
1000   /**
1001    * Parameterized batch processing, allowing varying return types for different
1002    * {@link Row} implementations.
1003    */
1004   public void processBatch(final List<? extends Row> list, final Object[] results)
1005     throws IOException, InterruptedException {
1006     this.batch(list, results);
1007   }
1008
1009
1010   @Override
1011   public void close() throws IOException {
1012     if (this.closed) {
1013       return;
1014     }
1015     flushCommits();
1016     if (cleanupPoolOnClose) {
1017       this.pool.shutdown();
1018       try {
1019         boolean terminated = false;
1020         do {
1021           // wait until the pool has terminated
1022           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1023         } while (!terminated);
1024       } catch (InterruptedException e) {
1025         this.pool.shutdownNow();
1026         LOG.warn("waitForTermination interrupted");
1027       }
1028     }
1029     if (cleanupConnectionOnClose) {
1030       if (this.connection != null) {
1031         this.connection.close();
1032       }
1033     }
1034     this.closed = true;
1035   }
1036
1037   // validate for well-formedness
1038   public void validatePut(final Put put) throws IllegalArgumentException {
1039     validatePut(put, connConfiguration.getMaxKeyValueSize());
1040   }
1041
1042   // validate for well-formedness
1043   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1044     if (put.isEmpty()) {
1045       throw new IllegalArgumentException("No columns to insert");
1046     }
1047     if (maxKeyValueSize > 0) {
1048       for (List<Cell> list : put.getFamilyCellMap().values()) {
1049         for (Cell cell : list) {
1050           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1051             throw new IllegalArgumentException("KeyValue size too large");
1052           }
1053         }
1054       }
1055     }
1056   }
1057
1058   /**
1059    * Returns the maximum size in bytes of the write buffer for this HTable.
1060    * <p>
1061    * The default value comes from the configuration parameter
1062    * {@code hbase.client.write.buffer}.
1063    * @return The size of the write buffer in bytes.
1064    */
1065   @Override
1066   public long getWriteBufferSize() {
1067     if (mutator == null) {
1068       return connConfiguration.getWriteBufferSize();
1069     } else {
1070       return mutator.getWriteBufferSize();
1071     }
1072   }
1073
1074   /**
1075    * Sets the size of the buffer in bytes.
1076    * <p>
1077    * If the new size is less than the current amount of data in the
1078    * write buffer, the buffer gets flushed.
1079    * @param writeBufferSize The new write buffer size, in bytes.
1080    * @throws IOException if a remote or network exception occurs.
1081    */
1082   @Override
1083   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1084     getBufferedMutator();
1085     mutator.setWriteBufferSize(writeBufferSize);
1086   }
1087
1088   /**
1089    * The pool is used for mutli requests for this HTable
1090    * @return the pool used for mutli
1091    */
1092   ExecutorService getPool() {
1093     return this.pool;
1094   }
1095
1096   /**
1097    * Explicitly clears the region cache to fetch the latest value from META.
1098    * This is a power user function: avoid unless you know the ramifications.
1099    */
1100   public void clearRegionCache() {
1101     this.connection.clearRegionCache();
1102   }
1103
1104   /**
1105    * {@inheritDoc}
1106    */
1107   @Override
1108   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1109     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1110   }
1111
1112   /**
1113    * {@inheritDoc}
1114    */
1115   @Override
1116   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1117       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1118       throws ServiceException, Throwable {
1119     final Map<byte[],R> results =  Collections.synchronizedMap(
1120         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1121     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1122       @Override
1123       public void update(byte[] region, byte[] row, R value) {
1124         if (region != null) {
1125           results.put(region, value);
1126         }
1127       }
1128     });
1129     return results;
1130   }
1131
1132   /**
1133    * {@inheritDoc}
1134    */
1135   @Override
1136   public <T extends Service, R> void coprocessorService(final Class<T> service,
1137       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1138       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1139
1140     // get regions covered by the row range
1141     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1142
1143     Map<byte[],Future<R>> futures =
1144         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1145     for (final byte[] r : keys) {
1146       final RegionCoprocessorRpcChannel channel =
1147           new RegionCoprocessorRpcChannel(connection, tableName, r);
1148       Future<R> future = pool.submit(
1149           new Callable<R>() {
1150             @Override
1151             public R call() throws Exception {
1152               T instance = ProtobufUtil.newServiceStub(service, channel);
1153               R result = callable.call(instance);
1154               byte[] region = channel.getLastRegion();
1155               if (callback != null) {
1156                 callback.update(region, r, result);
1157               }
1158               return result;
1159             }
1160           });
1161       futures.put(r, future);
1162     }
1163     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1164       try {
1165         e.getValue().get();
1166       } catch (ExecutionException ee) {
1167         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1168             + Bytes.toStringBinary(e.getKey()), ee);
1169         throw ee.getCause();
1170       } catch (InterruptedException ie) {
1171         throw new InterruptedIOException("Interrupted calling coprocessor service "
1172             + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1173       }
1174     }
1175   }
1176
1177   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1178   throws IOException {
1179     if (start == null) {
1180       start = HConstants.EMPTY_START_ROW;
1181     }
1182     if (end == null) {
1183       end = HConstants.EMPTY_END_ROW;
1184     }
1185     return getKeysAndRegionsInRange(start, end, true).getFirst();
1186   }
1187
1188   @Override
1189   public void setOperationTimeout(int operationTimeout) {
1190     this.operationTimeout = operationTimeout;
1191   }
1192
1193   @Override
1194   public int getOperationTimeout() {
1195     return operationTimeout;
1196   }
1197
1198   @Override
1199   public int getRpcTimeout() {
1200     return rpcTimeout;
1201   }
1202
1203   @Override
1204   public void setRpcTimeout(int rpcTimeout) {
1205     this.rpcTimeout = rpcTimeout;
1206   }
1207
1208   @Override
1209   public String toString() {
1210     return tableName + ";" + connection;
1211   }
1212
1213   /**
1214    * {@inheritDoc}
1215    */
1216   @Override
1217   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1218       Descriptors.MethodDescriptor methodDescriptor, Message request,
1219       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1220     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1221         Bytes.BYTES_COMPARATOR));
1222     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1223         new Callback<R>() {
1224
1225           @Override
1226           public void update(byte[] region, byte[] row, R result) {
1227             if (region != null) {
1228               results.put(region, result);
1229             }
1230           }
1231         });
1232     return results;
1233   }
1234
1235   /**
1236    * {@inheritDoc}
1237    */
1238   @Override
1239   public <R extends Message> void batchCoprocessorService(
1240       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1241       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1242       throws ServiceException, Throwable {
1243
1244     if (startKey == null) {
1245       startKey = HConstants.EMPTY_START_ROW;
1246     }
1247     if (endKey == null) {
1248       endKey = HConstants.EMPTY_END_ROW;
1249     }
1250     // get regions covered by the row range
1251     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1252         getKeysAndRegionsInRange(startKey, endKey, true);
1253     List<byte[]> keys = keysAndRegions.getFirst();
1254     List<HRegionLocation> regions = keysAndRegions.getSecond();
1255
1256     // check if we have any calls to make
1257     if (keys.isEmpty()) {
1258       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1259           ", end=" + Bytes.toStringBinary(endKey));
1260       return;
1261     }
1262
1263     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1264     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1265         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1266     for (int i = 0; i < keys.size(); i++) {
1267       final byte[] rowKey = keys.get(i);
1268       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1269       RegionCoprocessorServiceExec exec =
1270           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1271       execs.add(exec);
1272       execsByRow.put(rowKey, exec);
1273     }
1274
1275     // tracking for any possible deserialization errors on success callback
1276     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1277     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1278     final List<Row> callbackErrorActions = new ArrayList<Row>();
1279     final List<String> callbackErrorServers = new ArrayList<String>();
1280     Object[] results = new Object[execs.size()];
1281
1282     AsyncProcess asyncProcess =
1283         new AsyncProcess(connection, configuration, pool,
1284             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1285             true, RpcControllerFactory.instantiate(configuration));
1286
1287     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1288         new Callback<ClientProtos.CoprocessorServiceResult>() {
1289           @Override
1290           public void update(byte[] region, byte[] row,
1291                               ClientProtos.CoprocessorServiceResult serviceResult) {
1292             if (LOG.isTraceEnabled()) {
1293               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1294                   ": region=" + Bytes.toStringBinary(region) +
1295                   ", row=" + Bytes.toStringBinary(row) +
1296                   ", value=" + serviceResult.getValue().getValue());
1297             }
1298             try {
1299               Message.Builder builder = responsePrototype.newBuilderForType();
1300               ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1301               callback.update(region, row, (R) builder.build());
1302             } catch (IOException e) {
1303               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1304                   e);
1305               callbackErrorExceptions.add(e);
1306               callbackErrorActions.add(execsByRow.get(row));
1307               callbackErrorServers.add("null");
1308             }
1309           }
1310         }, results);
1311
1312     future.waitUntilDone();
1313
1314     if (future.hasError()) {
1315       throw future.getErrors();
1316     } else if (!callbackErrorExceptions.isEmpty()) {
1317       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1318           callbackErrorServers);
1319     }
1320   }
1321
1322   public RegionLocator getRegionLocator() {
1323     return this.locator;
1324   }
1325
1326   @VisibleForTesting
1327   BufferedMutator getBufferedMutator() throws IOException {
1328     if (mutator == null) {
1329       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1330           new BufferedMutatorParams(tableName)
1331               .pool(pool)
1332               .writeBufferSize(connConfiguration.getWriteBufferSize())
1333               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
1334       );
1335     }
1336     return mutator;
1337   }
1338 }