View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.TreeMap;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionLocation;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.KeyValueUtil;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.TableNotFoundException;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.classification.InterfaceStability;
48  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
49  import org.apache.hadoop.hbase.client.coprocessor.Batch;
50  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
51  import org.apache.hadoop.hbase.filter.BinaryComparator;
52  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
53  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
54  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
55  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
56  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
57  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
58  import org.apache.hadoop.hbase.protobuf.RequestConverter;
59  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
60  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
61  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
62  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
63  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
64  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
65  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
66  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
67  import org.apache.hadoop.hbase.util.Bytes;
68  import org.apache.hadoop.hbase.util.Pair;
69  import org.apache.hadoop.hbase.util.ReflectionUtils;
70  import org.apache.hadoop.hbase.util.Threads;
71  
72  import com.google.common.annotations.VisibleForTesting;
73  import com.google.protobuf.Descriptors;
74  import com.google.protobuf.InvalidProtocolBufferException;
75  import com.google.protobuf.Message;
76  import com.google.protobuf.Service;
77  import com.google.protobuf.ServiceException;
78  
79  /**
80   * An implementation of {@link Table}. Used to communicate with a single HBase table.
81   * Lightweight. Get as needed and just close when done.
82   * Instances of this class SHOULD NOT be constructed directly.
83   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
84   * class comment for an example of how.
85   *
86   * <p>This class is NOT thread safe for reads nor writes.
87   * In the case of writes (Put, Delete), the underlying write buffer can
88   * be corrupted if multiple threads contend over a single HTable instance.
89   * In the case of reads, some fields used by a Scan are shared among all threads.
90   *
91   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
92   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
93   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
94   * Interface Classification</a>
95   * There are no guarantees for backwards source / binary compatibility and methods or class can
96   * change or go away without deprecation.
97   *
98   * @see Table
99   * @see Admin
100  * @see Connection
101  * @see ConnectionFactory
102  */
103 @InterfaceAudience.Private
104 @InterfaceStability.Stable
105 public class HTable implements HTableInterface {
106   private static final Log LOG = LogFactory.getLog(HTable.class);
107   protected ClusterConnection connection;
108   private final TableName tableName;
109   private volatile Configuration configuration;
110   private TableConfiguration tableConfiguration;
111   protected BufferedMutatorImpl mutator;
112   private boolean autoFlush = true;
113   private boolean closed = false;
114   protected int scannerCaching;
115   protected long scannerMaxResultSize;
116   private ExecutorService pool;  // For Multi & Scan
117   private int operationTimeout;
118   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
119   private final boolean cleanupConnectionOnClose; // close the connection in close()
120   private Consistency defaultConsistency = Consistency.STRONG;
121   private HRegionLocator locator;
122 
123   /** The Async process for batch */
124   protected AsyncProcess multiAp;
125   private RpcRetryingCallerFactory rpcCallerFactory;
126   private RpcControllerFactory rpcControllerFactory;
127 
128   // Marked Private @since 1.0
129   @InterfaceAudience.Private
130   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
131     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
132     if (maxThreads == 0) {
133       maxThreads = 1; // is there a better default?
134     }
135     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
136 
137     // Using the "direct handoff" approach, new threads will only be created
138     // if it is necessary and will grow unbounded. This could be bad but in HCM
139     // we only create as many Runnables as there are region servers. It means
140     // it also scales when new region servers are added.
141     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
142         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
143     pool.allowCoreThreadTimeOut(true);
144     return pool;
145   }
146 
147   /**
148    * Creates an object to access a HBase table.
149    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
150    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
151    * @param tableName Name of the table.
152    * @param connection HConnection to be used.
153    * @param pool ExecutorService to be used.
154    * @throws IOException if a remote or network exception occurs
155    */
156   @InterfaceAudience.Private
157   protected HTable(TableName tableName, final ClusterConnection connection,
158       final TableConfiguration tableConfig,
159       final RpcRetryingCallerFactory rpcCallerFactory,
160       final RpcControllerFactory rpcControllerFactory,
161       final ExecutorService pool) throws IOException {
162     if (connection == null || connection.isClosed()) {
163       throw new IllegalArgumentException("Connection is null or closed.");
164     }
165     this.tableName = tableName;
166     this.cleanupConnectionOnClose = false;
167     this.connection = connection;
168     this.configuration = connection.getConfiguration();
169     this.tableConfiguration = tableConfig;
170     this.pool = pool;
171     if (pool == null) {
172       this.pool = getDefaultExecutor(this.configuration);
173       this.cleanupPoolOnClose = true;
174     } else {
175       this.cleanupPoolOnClose = false;
176     }
177 
178     this.rpcCallerFactory = rpcCallerFactory;
179     this.rpcControllerFactory = rpcControllerFactory;
180 
181     this.finishSetup();
182   }
183 
184   /**
185    * For internal testing. Uses Connection provided in {@code params}.
186    * @throws IOException
187    */
188   @VisibleForTesting
189   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
190     connection = conn;
191     tableName = params.getTableName();
192     tableConfiguration = new TableConfiguration(connection.getConfiguration());
193     cleanupPoolOnClose = false;
194     cleanupConnectionOnClose = false;
195     // used from tests, don't trust the connection is real
196     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
197   }
198 
199   /**
200    * @return maxKeyValueSize from configuration.
201    */
202   public static int getMaxKeyValueSize(Configuration conf) {
203     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
204   }
205 
206   /**
207    * setup this HTable's parameter based on the passed configuration
208    */
209   private void finishSetup() throws IOException {
210     if (tableConfiguration == null) {
211       tableConfiguration = new TableConfiguration(configuration);
212     }
213 
214     this.operationTimeout = tableName.isSystemTable() ?
215         tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
216     this.scannerCaching = tableConfiguration.getScannerCaching();
217     this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize();
218     if (this.rpcCallerFactory == null) {
219       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
220     }
221     if (this.rpcControllerFactory == null) {
222       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
223     }
224 
225     // puts need to track errors globally due to how the APIs currently work.
226     multiAp = this.connection.getAsyncProcess();
227     this.locator = new HRegionLocator(getName(), connection);
228   }
229 
230   /**
231    * {@inheritDoc}
232    */
233   @Override
234   public Configuration getConfiguration() {
235     return configuration;
236   }
237 
238   /**
239    * {@inheritDoc}
240    */
241   @Override
242   public byte [] getTableName() {
243     return this.tableName.getName();
244   }
245 
246   @Override
247   public TableName getName() {
248     return tableName;
249   }
250 
251   /**
252    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
253    * manipulations.
254    * @return An HConnection instance.
255    * @deprecated This method will be changed from public to package protected.
256    */
257   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
258   @Deprecated
259   @VisibleForTesting
260   public HConnection getConnection() {
261     return this.connection;
262   }
263 
264   /**
265    * {@inheritDoc}
266    */
267   @Override
268   public HTableDescriptor getTableDescriptor() throws IOException {
269     // TODO: This is the same as HBaseAdmin.getTableDescriptor(). Only keep one.
270     if (tableName == null) return null;
271     if (tableName.equals(TableName.META_TABLE_NAME)) {
272       return HTableDescriptor.META_TABLEDESC;
273     }
274     HTableDescriptor htd = executeMasterCallable(
275       new MasterCallable<HTableDescriptor>(getConnection()) {
276       @Override
277       public HTableDescriptor call(int callTimeout) throws ServiceException {
278         GetTableDescriptorsResponse htds;
279         GetTableDescriptorsRequest req =
280             RequestConverter.buildGetTableDescriptorsRequest(tableName);
281         htds = master.getTableDescriptors(null, req);
282 
283         if (!htds.getTableSchemaList().isEmpty()) {
284           return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
285         }
286         return null;
287       }
288     });
289     if (htd != null) {
290       return new UnmodifyableHTableDescriptor(htd);
291     }
292     throw new TableNotFoundException(tableName.getNameAsString());
293   }
294 
295   private <V> V executeMasterCallable(MasterCallable<V> callable) throws IOException {
296     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
297     try {
298       return caller.callWithRetries(callable, operationTimeout);
299     } finally {
300       callable.close();
301     }
302   }
303 
304   /**
305    * Get the corresponding start keys and regions for an arbitrary range of
306    * keys.
307    * <p>
308    * @param startKey Starting row in range, inclusive
309    * @param endKey Ending row in range
310    * @param includeEndKey true if endRow is inclusive, false if exclusive
311    * @return A pair of list of start keys and list of HRegionLocations that
312    *         contain the specified range
313    * @throws IOException if a remote or network exception occurs
314    */
315   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
316       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
317       throws IOException {
318     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
319   }
320 
321   /**
322    * Get the corresponding start keys and regions for an arbitrary range of
323    * keys.
324    * <p>
325    * @param startKey Starting row in range, inclusive
326    * @param endKey Ending row in range
327    * @param includeEndKey true if endRow is inclusive, false if exclusive
328    * @param reload true to reload information or false to use cached information
329    * @return A pair of list of start keys and list of HRegionLocations that
330    *         contain the specified range
331    * @throws IOException if a remote or network exception occurs
332    */
333   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
334       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
335       final boolean reload) throws IOException {
336     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
337     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
338       throw new IllegalArgumentException(
339         "Invalid range: " + Bytes.toStringBinary(startKey) +
340         " > " + Bytes.toStringBinary(endKey));
341     }
342     List<byte[]> keysInRange = new ArrayList<byte[]>();
343     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
344     byte[] currentKey = startKey;
345     do {
346       HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
347       keysInRange.add(currentKey);
348       regionsInRange.add(regionLocation);
349       currentKey = regionLocation.getRegionInfo().getEndKey();
350     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
351         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
352             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
353     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
354         regionsInRange);
355   }
356 
357   /**
358    * The underlying {@link HTable} must not be closed.
359    * {@link HTableInterface#getScanner(Scan)} has other usage details.
360    */
361   @Override
362   public ResultScanner getScanner(final Scan scan) throws IOException {
363     if (scan.getBatch() > 0 && scan.isSmall()) {
364       throw new IllegalArgumentException("Small scan should not be used with batching");
365     }
366 
367     if (scan.getCaching() <= 0) {
368       scan.setCaching(scannerCaching);
369     }
370     if (scan.getMaxResultSize() <= 0) {
371       scan.setMaxResultSize(scannerMaxResultSize);
372     }
373 
374     Boolean async = scan.isAsyncPrefetch();
375     if (async == null) {
376       async = tableConfiguration.isClientScannerAsyncPrefetch();
377     }
378 
379     if (scan.isReversed()) {
380       if (scan.isSmall()) {
381         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
382             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
383             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
384       } else {
385         return new ReversedClientScanner(getConfiguration(), scan, getName(),
386             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
387             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
388       }
389     }
390 
391     if (scan.isSmall()) {
392       return new ClientSmallScanner(getConfiguration(), scan, getName(),
393           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
394           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
395     } else {
396       if (async) {
397         return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,
398             this.rpcCallerFactory, this.rpcControllerFactory,
399             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
400       } else {
401         return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection,
402             this.rpcCallerFactory, this.rpcControllerFactory,
403             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
404       }
405     }
406   }
407 
408   /**
409    * The underlying {@link HTable} must not be closed.
410    * {@link HTableInterface#getScanner(byte[])} has other usage details.
411    */
412   @Override
413   public ResultScanner getScanner(byte [] family) throws IOException {
414     Scan scan = new Scan();
415     scan.addFamily(family);
416     return getScanner(scan);
417   }
418 
419   /**
420    * The underlying {@link HTable} must not be closed.
421    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
422    */
423   @Override
424   public ResultScanner getScanner(byte [] family, byte [] qualifier)
425   throws IOException {
426     Scan scan = new Scan();
427     scan.addColumn(family, qualifier);
428     return getScanner(scan);
429   }
430 
431   /**
432    * {@inheritDoc}
433    */
434   @Override
435   public Result get(final Get get) throws IOException {
436     return get(get, get.isCheckExistenceOnly());
437   }
438 
439   private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
440     // if we are changing settings to the get, clone it.
441     if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
442       get = ReflectionUtils.newInstance(get.getClass(), get);
443       get.setCheckExistenceOnly(checkExistenceOnly);
444       if (get.getConsistency() == null){
445         get.setConsistency(defaultConsistency);
446       }
447     }
448 
449     if (get.getConsistency() == Consistency.STRONG) {
450       // Good old call.
451       final Get getReq = get;
452       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
453           getName(), get.getRow()) {
454         @Override
455         public Result call(int callTimeout) throws IOException {
456           ClientProtos.GetRequest request =
457             RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
458           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
459           controller.setPriority(tableName);
460           controller.setCallTimeout(callTimeout);
461           try {
462             ClientProtos.GetResponse response = getStub().get(controller, request);
463             if (response == null) return null;
464             return ProtobufUtil.toResult(response.getResult());
465           } catch (ServiceException se) {
466             throw ProtobufUtil.getRemoteException(se);
467           }
468         }
469       };
470       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
471     }
472 
473     // Call that takes into account the replica
474     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
475       rpcControllerFactory, tableName, this.connection, get, pool,
476       tableConfiguration.getRetriesNumber(),
477       operationTimeout,
478       tableConfiguration.getPrimaryCallTimeoutMicroSecond());
479     return callable.call();
480   }
481 
482 
483   /**
484    * {@inheritDoc}
485    */
486   @Override
487   public Result[] get(List<Get> gets) throws IOException {
488     if (gets.size() == 1) {
489       return new Result[]{get(gets.get(0))};
490     }
491     try {
492       Object[] r1 = new Object[gets.size()];
493       batch((List) gets, r1);
494 
495       // translate.
496       Result [] results = new Result[r1.length];
497       int i=0;
498       for (Object o : r1) {
499         // batch ensures if there is a failure we get an exception instead
500         results[i++] = (Result) o;
501       }
502 
503       return results;
504     } catch (InterruptedException e) {
505       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
506     }
507   }
508 
509   /**
510    * {@inheritDoc}
511    */
512   @Override
513   public void batch(final List<? extends Row> actions, final Object[] results)
514       throws InterruptedException, IOException {
515     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
516     ars.waitUntilDone();
517     if (ars.hasError()) {
518       throw ars.getErrors();
519     }
520   }
521 
522   /**
523    * {@inheritDoc}
524    */
525   @Override
526   public <R> void batchCallback(
527       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
528       throws IOException, InterruptedException {
529     connection.processBatchCallback(actions, tableName, pool, results, callback);
530   }
531 
532   /**
533    * {@inheritDoc}
534    */
535   @Override
536   public void delete(final Delete delete)
537   throws IOException {
538     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
539         tableName, delete.getRow()) {
540       @Override
541       public Boolean call(int callTimeout) throws IOException {
542         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
543         controller.setPriority(tableName);
544         controller.setCallTimeout(callTimeout);
545 
546         try {
547           MutateRequest request = RequestConverter.buildMutateRequest(
548             getLocation().getRegionInfo().getRegionName(), delete);
549           MutateResponse response = getStub().mutate(controller, request);
550           return Boolean.valueOf(response.getProcessed());
551         } catch (ServiceException se) {
552           throw ProtobufUtil.getRemoteException(se);
553         }
554       }
555     };
556     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
557   }
558 
559   /**
560    * {@inheritDoc}
561    */
562   @Override
563   public void delete(final List<Delete> deletes)
564   throws IOException {
565     Object[] results = new Object[deletes.size()];
566     try {
567       batch(deletes, results);
568     } catch (InterruptedException e) {
569       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
570     } finally {
571       // mutate list so that it is empty for complete success, or contains only failed records
572       // results are returned in the same order as the requests in list walk the list backwards,
573       // so we can remove from list without impacting the indexes of earlier members
574       for (int i = results.length - 1; i>=0; i--) {
575         // if result is not null, it succeeded
576         if (results[i] instanceof Result) {
577           deletes.remove(i);
578         }
579       }
580     }
581   }
582 
583   /**
584    * {@inheritDoc}
585    * @throws IOException
586    */
587   @Override
588   public void put(final Put put) throws IOException {
589     getBufferedMutator().mutate(put);
590     if (autoFlush) {
591       flushCommits();
592     }
593   }
594 
595   /**
596    * {@inheritDoc}
597    * @throws IOException
598    */
599   @Override
600   public void put(final List<Put> puts) throws IOException {
601     getBufferedMutator().mutate(puts);
602     if (autoFlush) {
603       flushCommits();
604     }
605   }
606 
607   /**
608    * {@inheritDoc}
609    */
610   @Override
611   public void mutateRow(final RowMutations rm) throws IOException {
612     RegionServerCallable<Void> callable =
613         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
614       @Override
615       public Void call(int callTimeout) throws IOException {
616         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
617         controller.setPriority(tableName);
618         controller.setCallTimeout(callTimeout);
619         try {
620           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
621             getLocation().getRegionInfo().getRegionName(), rm);
622           regionMutationBuilder.setAtomic(true);
623           MultiRequest request =
624             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
625           ClientProtos.MultiResponse response = getStub().multi(controller, request);
626           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
627           if (res.hasException()) {
628             Throwable ex = ProtobufUtil.toException(res.getException());
629             if(ex instanceof IOException) {
630               throw (IOException)ex;
631             }
632             throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
633           }
634         } catch (ServiceException se) {
635           throw ProtobufUtil.getRemoteException(se);
636         }
637         return null;
638       }
639     };
640     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
641   }
642 
643   /**
644    * {@inheritDoc}
645    */
646   @Override
647   public Result append(final Append append) throws IOException {
648     if (append.numFamilies() == 0) {
649       throw new IOException(
650           "Invalid arguments to append, no columns specified");
651     }
652 
653     NonceGenerator ng = this.connection.getNonceGenerator();
654     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
655     RegionServerCallable<Result> callable =
656       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
657         @Override
658         public Result call(int callTimeout) throws IOException {
659           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
660           controller.setPriority(getTableName());
661           controller.setCallTimeout(callTimeout);
662           try {
663             MutateRequest request = RequestConverter.buildMutateRequest(
664               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
665             MutateResponse response = getStub().mutate(controller, request);
666             if (!response.hasResult()) return null;
667             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
668           } catch (ServiceException se) {
669             throw ProtobufUtil.getRemoteException(se);
670           }
671         }
672       };
673     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
674   }
675 
676   /**
677    * {@inheritDoc}
678    */
679   @Override
680   public Result increment(final Increment increment) throws IOException {
681     if (!increment.hasFamilies()) {
682       throw new IOException(
683           "Invalid arguments to increment, no columns specified");
684     }
685     NonceGenerator ng = this.connection.getNonceGenerator();
686     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
687     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
688         getName(), increment.getRow()) {
689       @Override
690       public Result call(int callTimeout) throws IOException {
691         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
692         controller.setPriority(getTableName());
693         controller.setCallTimeout(callTimeout);
694         try {
695           MutateRequest request = RequestConverter.buildMutateRequest(
696             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
697           MutateResponse response = getStub().mutate(controller, request);
698           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
699         } catch (ServiceException se) {
700           throw ProtobufUtil.getRemoteException(se);
701         }
702       }
703     };
704     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
705   }
706 
707   /**
708    * {@inheritDoc}
709    */
710   @Override
711   public long incrementColumnValue(final byte [] row, final byte [] family,
712       final byte [] qualifier, final long amount)
713   throws IOException {
714     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
715   }
716 
717   /**
718    * {@inheritDoc}
719    */
720   @Override
721   public long incrementColumnValue(final byte [] row, final byte [] family,
722       final byte [] qualifier, final long amount, final Durability durability)
723   throws IOException {
724     NullPointerException npe = null;
725     if (row == null) {
726       npe = new NullPointerException("row is null");
727     } else if (family == null) {
728       npe = new NullPointerException("family is null");
729     } else if (qualifier == null) {
730       npe = new NullPointerException("qualifier is null");
731     }
732     if (npe != null) {
733       throw new IOException(
734           "Invalid arguments to incrementColumnValue", npe);
735     }
736 
737     NonceGenerator ng = this.connection.getNonceGenerator();
738     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
739     RegionServerCallable<Long> callable =
740       new RegionServerCallable<Long>(connection, getName(), row) {
741         @Override
742         public Long call(int callTimeout) throws IOException {
743           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
744           controller.setPriority(getTableName());
745           controller.setCallTimeout(callTimeout);
746           try {
747             MutateRequest request = RequestConverter.buildIncrementRequest(
748               getLocation().getRegionInfo().getRegionName(), row, family,
749               qualifier, amount, durability, nonceGroup, nonce);
750             MutateResponse response = getStub().mutate(controller, request);
751             Result result =
752               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
753             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
754           } catch (ServiceException se) {
755             throw ProtobufUtil.getRemoteException(se);
756           }
757         }
758       };
759     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
760   }
761 
762   /**
763    * {@inheritDoc}
764    */
765   @Override
766   public boolean checkAndPut(final byte [] row,
767       final byte [] family, final byte [] qualifier, final byte [] value,
768       final Put put)
769   throws IOException {
770     RegionServerCallable<Boolean> callable =
771       new RegionServerCallable<Boolean>(connection, getName(), row) {
772         @Override
773         public Boolean call(int callTimeout) throws IOException {
774           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
775           controller.setPriority(tableName);
776           controller.setCallTimeout(callTimeout);
777           try {
778             MutateRequest request = RequestConverter.buildMutateRequest(
779                 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
780                 new BinaryComparator(value), CompareType.EQUAL, put);
781             MutateResponse response = getStub().mutate(controller, request);
782             return Boolean.valueOf(response.getProcessed());
783           } catch (ServiceException se) {
784             throw ProtobufUtil.getRemoteException(se);
785           }
786         }
787       };
788     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
789   }
790 
791   /**
792    * {@inheritDoc}
793    */
794   @Override
795   public boolean checkAndPut(final byte [] row, final byte [] family,
796       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
797       final Put put)
798   throws IOException {
799     RegionServerCallable<Boolean> callable =
800       new RegionServerCallable<Boolean>(connection, getName(), row) {
801         @Override
802         public Boolean call(int callTimeout) throws IOException {
803           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
804           controller.setPriority(tableName);
805           controller.setCallTimeout(callTimeout);
806           try {
807             CompareType compareType = CompareType.valueOf(compareOp.name());
808             MutateRequest request = RequestConverter.buildMutateRequest(
809               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
810                 new BinaryComparator(value), compareType, put);
811             MutateResponse response = getStub().mutate(controller, request);
812             return Boolean.valueOf(response.getProcessed());
813           } catch (ServiceException se) {
814             throw ProtobufUtil.getRemoteException(se);
815           }
816         }
817       };
818     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
819   }
820 
821   /**
822    * {@inheritDoc}
823    */
824   @Override
825   public boolean checkAndDelete(final byte [] row,
826       final byte [] family, final byte [] qualifier, final byte [] value,
827       final Delete delete)
828   throws IOException {
829     RegionServerCallable<Boolean> callable =
830       new RegionServerCallable<Boolean>(connection, getName(), row) {
831         @Override
832         public Boolean call(int callTimeout) throws IOException {
833           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
834           controller.setPriority(tableName);
835           controller.setCallTimeout(callTimeout);
836           try {
837             MutateRequest request = RequestConverter.buildMutateRequest(
838               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
839                 new BinaryComparator(value), CompareType.EQUAL, delete);
840             MutateResponse response = getStub().mutate(controller, request);
841             return Boolean.valueOf(response.getProcessed());
842           } catch (ServiceException se) {
843             throw ProtobufUtil.getRemoteException(se);
844           }
845         }
846       };
847     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
848   }
849 
850   /**
851    * {@inheritDoc}
852    */
853   @Override
854   public boolean checkAndDelete(final byte [] row, final byte [] family,
855       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
856       final Delete delete)
857   throws IOException {
858     RegionServerCallable<Boolean> callable =
859       new RegionServerCallable<Boolean>(connection, getName(), row) {
860         @Override
861         public Boolean call(int callTimeout) throws IOException {
862           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
863           controller.setPriority(tableName);
864           controller.setCallTimeout(callTimeout);
865           try {
866             CompareType compareType = CompareType.valueOf(compareOp.name());
867             MutateRequest request = RequestConverter.buildMutateRequest(
868               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
869                 new BinaryComparator(value), compareType, delete);
870             MutateResponse response = getStub().mutate(controller, request);
871             return Boolean.valueOf(response.getProcessed());
872           } catch (ServiceException se) {
873             throw ProtobufUtil.getRemoteException(se);
874           }
875         }
876       };
877     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
878   }
879 
880   /**
881    * {@inheritDoc}
882    */
883   @Override
884   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
885       final CompareOp compareOp, final byte [] value, final RowMutations rm)
886   throws IOException {
887     RegionServerCallable<Boolean> callable =
888         new RegionServerCallable<Boolean>(connection, getName(), row) {
889           @Override
890           public Boolean call(int callTimeout) throws IOException {
891             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
892             controller.setPriority(tableName);
893             controller.setCallTimeout(callTimeout);
894             try {
895               CompareType compareType = CompareType.valueOf(compareOp.name());
896               MultiRequest request = RequestConverter.buildMutateRequest(
897                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
898                   new BinaryComparator(value), compareType, rm);
899               ClientProtos.MultiResponse response = getStub().multi(controller, request);
900               ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
901               if (res.hasException()) {
902                 Throwable ex = ProtobufUtil.toException(res.getException());
903                 if(ex instanceof IOException) {
904                   throw (IOException)ex;
905                 }
906                 throw new IOException("Failed to checkAndMutate row: "+
907                     Bytes.toStringBinary(rm.getRow()), ex);
908               }
909               return Boolean.valueOf(response.getProcessed());
910             } catch (ServiceException se) {
911               throw ProtobufUtil.getRemoteException(se);
912             }
913           }
914         };
915     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
916   }
917 
918   /**
919    * {@inheritDoc}
920    */
921   @Override
922   public boolean exists(final Get get) throws IOException {
923     Result r = get(get, true);
924     assert r.getExists() != null;
925     return r.getExists();
926   }
927 
928   /**
929    * {@inheritDoc}
930    */
931   @Override
932   public boolean[] existsAll(final List<Get> gets) throws IOException {
933     if (gets.isEmpty()) return new boolean[]{};
934     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
935 
936     ArrayList<Get> exists = new ArrayList<Get>(gets.size());
937     for (Get g: gets){
938       Get ge = new Get(g);
939       ge.setCheckExistenceOnly(true);
940       exists.add(ge);
941     }
942 
943     Object[] r1= new Object[exists.size()];
944     try {
945       batch(exists, r1);
946     } catch (InterruptedException e) {
947       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
948     }
949 
950     // translate.
951     boolean[] results = new boolean[r1.length];
952     int i = 0;
953     for (Object o : r1) {
954       // batch ensures if there is a failure we get an exception instead
955       results[i++] = ((Result)o).getExists();
956     }
957 
958     return results;
959   }
960 
961   /**
962    * {@inheritDoc}
963    * @throws IOException
964    */
965   @Override
966   public void flushCommits() throws IOException {
967     if (mutator == null) {
968       // nothing to flush if there's no mutator; don't bother creating one.
969       return;
970     }
971     getBufferedMutator().flush();
972   }
973 
974   /**
975    * Process a mixed batch of Get, Put and Delete actions. All actions for a
976    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
977    *
978    * @param list The collection of actions.
979    * @param results An empty array, same size as list. If an exception is thrown,
980    * you can test here for partial results, and to determine which actions
981    * processed successfully.
982    * @throws IOException if there are problems talking to META. Per-item
983    * exceptions are stored in the results array.
984    */
985   public <R> void processBatchCallback(
986     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
987     throws IOException, InterruptedException {
988     this.batchCallback(list, results, callback);
989   }
990 
991 
992   /**
993    * Parameterized batch processing, allowing varying return types for different
994    * {@link Row} implementations.
995    */
996   public void processBatch(final List<? extends Row> list, final Object[] results)
997     throws IOException, InterruptedException {
998     this.batch(list, results);
999   }
1000 
1001 
1002   @Override
1003   public void close() throws IOException {
1004     if (this.closed) {
1005       return;
1006     }
1007     flushCommits();
1008     if (cleanupPoolOnClose) {
1009       this.pool.shutdown();
1010       try {
1011         boolean terminated = false;
1012         do {
1013           // wait until the pool has terminated
1014           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1015         } while (!terminated);
1016       } catch (InterruptedException e) {
1017         this.pool.shutdownNow();
1018         LOG.warn("waitForTermination interrupted");
1019       }
1020     }
1021     if (cleanupConnectionOnClose) {
1022       if (this.connection != null) {
1023         this.connection.close();
1024       }
1025     }
1026     this.closed = true;
1027   }
1028 
1029   // validate for well-formedness
1030   public void validatePut(final Put put) throws IllegalArgumentException {
1031     validatePut(put, tableConfiguration.getMaxKeyValueSize());
1032   }
1033 
1034   // validate for well-formedness
1035   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1036     if (put.isEmpty()) {
1037       throw new IllegalArgumentException("No columns to insert");
1038     }
1039     if (maxKeyValueSize > 0) {
1040       for (List<Cell> list : put.getFamilyCellMap().values()) {
1041         for (Cell cell : list) {
1042           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1043             throw new IllegalArgumentException("KeyValue size too large");
1044           }
1045         }
1046       }
1047     }
1048   }
1049 
1050   /**
1051    * {@inheritDoc}
1052    */
1053   @Override
1054   public boolean isAutoFlush() {
1055     return autoFlush;
1056   }
1057 
1058   /**
1059    * {@inheritDoc}
1060    */
1061   @Override
1062   public void setAutoFlushTo(boolean autoFlush) {
1063     this.autoFlush = autoFlush;
1064   }
1065 
1066   /**
1067    * {@inheritDoc}
1068    */
1069   @Override
1070   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1071     this.autoFlush = autoFlush;
1072   }
1073 
1074   /**
1075    * Returns the maximum size in bytes of the write buffer for this HTable.
1076    * <p>
1077    * The default value comes from the configuration parameter
1078    * {@code hbase.client.write.buffer}.
1079    * @return The size of the write buffer in bytes.
1080    */
1081   @Override
1082   public long getWriteBufferSize() {
1083     if (mutator == null) {
1084       return tableConfiguration.getWriteBufferSize();
1085     } else {
1086       return mutator.getWriteBufferSize();
1087     }
1088   }
1089 
1090   /**
1091    * Sets the size of the buffer in bytes.
1092    * <p>
1093    * If the new size is less than the current amount of data in the
1094    * write buffer, the buffer gets flushed.
1095    * @param writeBufferSize The new write buffer size, in bytes.
1096    * @throws IOException if a remote or network exception occurs.
1097    */
1098   @Override
1099   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1100     getBufferedMutator();
1101     mutator.setWriteBufferSize(writeBufferSize);
1102   }
1103 
1104   /**
1105    * The pool is used for mutli requests for this HTable
1106    * @return the pool used for mutli
1107    */
1108   ExecutorService getPool() {
1109     return this.pool;
1110   }
1111 
1112   /**
1113    * Explicitly clears the region cache to fetch the latest value from META.
1114    * This is a power user function: avoid unless you know the ramifications.
1115    */
1116   public void clearRegionCache() {
1117     this.connection.clearRegionCache();
1118   }
1119 
1120   /**
1121    * {@inheritDoc}
1122    */
1123   @Override
1124   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1125     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1126   }
1127 
1128   /**
1129    * {@inheritDoc}
1130    */
1131   @Override
1132   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1133       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1134       throws ServiceException, Throwable {
1135     final Map<byte[],R> results =  Collections.synchronizedMap(
1136         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1137     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1138       @Override
1139       public void update(byte[] region, byte[] row, R value) {
1140         if (region != null) {
1141           results.put(region, value);
1142         }
1143       }
1144     });
1145     return results;
1146   }
1147 
1148   /**
1149    * {@inheritDoc}
1150    */
1151   @Override
1152   public <T extends Service, R> void coprocessorService(final Class<T> service,
1153       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1154       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1155 
1156     // get regions covered by the row range
1157     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1158 
1159     Map<byte[],Future<R>> futures =
1160         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1161     for (final byte[] r : keys) {
1162       final RegionCoprocessorRpcChannel channel =
1163           new RegionCoprocessorRpcChannel(connection, tableName, r);
1164       Future<R> future = pool.submit(
1165           new Callable<R>() {
1166             @Override
1167             public R call() throws Exception {
1168               T instance = ProtobufUtil.newServiceStub(service, channel);
1169               R result = callable.call(instance);
1170               byte[] region = channel.getLastRegion();
1171               if (callback != null) {
1172                 callback.update(region, r, result);
1173               }
1174               return result;
1175             }
1176           });
1177       futures.put(r, future);
1178     }
1179     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1180       try {
1181         e.getValue().get();
1182       } catch (ExecutionException ee) {
1183         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1184             + Bytes.toStringBinary(e.getKey()), ee);
1185         throw ee.getCause();
1186       } catch (InterruptedException ie) {
1187         throw new InterruptedIOException("Interrupted calling coprocessor service "
1188             + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1189       }
1190     }
1191   }
1192 
1193   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1194   throws IOException {
1195     if (start == null) {
1196       start = HConstants.EMPTY_START_ROW;
1197     }
1198     if (end == null) {
1199       end = HConstants.EMPTY_END_ROW;
1200     }
1201     return getKeysAndRegionsInRange(start, end, true).getFirst();
1202   }
1203 
1204   public void setOperationTimeout(int operationTimeout) {
1205     this.operationTimeout = operationTimeout;
1206   }
1207 
1208   public int getOperationTimeout() {
1209     return operationTimeout;
1210   }
1211 
1212   @Override
1213   public String toString() {
1214     return tableName + ";" + connection;
1215   }
1216 
1217   /**
1218    * {@inheritDoc}
1219    */
1220   @Override
1221   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1222       Descriptors.MethodDescriptor methodDescriptor, Message request,
1223       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1224     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1225         Bytes.BYTES_COMPARATOR));
1226     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1227         new Callback<R>() {
1228 
1229           @Override
1230           public void update(byte[] region, byte[] row, R result) {
1231             if (region != null) {
1232               results.put(region, result);
1233             }
1234           }
1235         });
1236     return results;
1237   }
1238 
1239   /**
1240    * {@inheritDoc}
1241    */
1242   @Override
1243   public <R extends Message> void batchCoprocessorService(
1244       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1245       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1246       throws ServiceException, Throwable {
1247 
1248     if (startKey == null) {
1249       startKey = HConstants.EMPTY_START_ROW;
1250     }
1251     if (endKey == null) {
1252       endKey = HConstants.EMPTY_END_ROW;
1253     }
1254     // get regions covered by the row range
1255     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1256         getKeysAndRegionsInRange(startKey, endKey, true);
1257     List<byte[]> keys = keysAndRegions.getFirst();
1258     List<HRegionLocation> regions = keysAndRegions.getSecond();
1259 
1260     // check if we have any calls to make
1261     if (keys.isEmpty()) {
1262       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1263           ", end=" + Bytes.toStringBinary(endKey));
1264       return;
1265     }
1266 
1267     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1268     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1269         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1270     for (int i = 0; i < keys.size(); i++) {
1271       final byte[] rowKey = keys.get(i);
1272       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1273       RegionCoprocessorServiceExec exec =
1274           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1275       execs.add(exec);
1276       execsByRow.put(rowKey, exec);
1277     }
1278 
1279     // tracking for any possible deserialization errors on success callback
1280     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1281     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1282     final List<Row> callbackErrorActions = new ArrayList<Row>();
1283     final List<String> callbackErrorServers = new ArrayList<String>();
1284     Object[] results = new Object[execs.size()];
1285 
1286     AsyncProcess asyncProcess =
1287         new AsyncProcess(connection, configuration, pool,
1288             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1289             true, RpcControllerFactory.instantiate(configuration));
1290 
1291     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1292         new Callback<ClientProtos.CoprocessorServiceResult>() {
1293           @Override
1294           public void update(byte[] region, byte[] row,
1295                               ClientProtos.CoprocessorServiceResult serviceResult) {
1296             if (LOG.isTraceEnabled()) {
1297               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1298                   ": region=" + Bytes.toStringBinary(region) +
1299                   ", row=" + Bytes.toStringBinary(row) +
1300                   ", value=" + serviceResult.getValue().getValue());
1301             }
1302             try {
1303               callback.update(region, row,
1304                   (R) responsePrototype.newBuilderForType().mergeFrom(
1305                       serviceResult.getValue().getValue()).build());
1306             } catch (InvalidProtocolBufferException e) {
1307               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1308                   e);
1309               callbackErrorExceptions.add(e);
1310               callbackErrorActions.add(execsByRow.get(row));
1311               callbackErrorServers.add("null");
1312             }
1313           }
1314         }, results);
1315 
1316     future.waitUntilDone();
1317 
1318     if (future.hasError()) {
1319       throw future.getErrors();
1320     } else if (!callbackErrorExceptions.isEmpty()) {
1321       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1322           callbackErrorServers);
1323     }
1324   }
1325 
1326   public RegionLocator getRegionLocator() {
1327     return this.locator;
1328   }
1329 
1330   @VisibleForTesting
1331   BufferedMutator getBufferedMutator() throws IOException {
1332     if (mutator == null) {
1333       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1334           new BufferedMutatorParams(tableName)
1335               .pool(pool)
1336               .writeBufferSize(tableConfiguration.getWriteBufferSize())
1337               .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize())
1338       );
1339     }
1340     return mutator;
1341   }
1342 }