View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.TreeMap;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.DoNotRetryIOException;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionLocation;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValueUtil;
45  import org.apache.hadoop.hbase.TableName;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.classification.InterfaceStability;
48  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
49  import org.apache.hadoop.hbase.client.coprocessor.Batch;
50  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
51  import org.apache.hadoop.hbase.filter.BinaryComparator;
52  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
53  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
54  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
55  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
56  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
57  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
58  import org.apache.hadoop.hbase.protobuf.RequestConverter;
59  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
60  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
61  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
62  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
63  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
64  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
65  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.Pair;
68  import org.apache.hadoop.hbase.util.ReflectionUtils;
69  import org.apache.hadoop.hbase.util.Threads;
70  
71  import com.google.common.annotations.VisibleForTesting;
72  import com.google.protobuf.Descriptors;
73  import com.google.protobuf.Message;
74  import com.google.protobuf.Service;
75  import com.google.protobuf.ServiceException;
76  
77  /**
78   * An implementation of {@link Table}. Used to communicate with a single HBase table.
79   * Lightweight. Get as needed and just close when done.
80   * Instances of this class SHOULD NOT be constructed directly.
81   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
82   * class comment for an example of how.
83   *
84   * <p>This class is NOT thread safe for reads nor writes.
85   * In the case of writes (Put, Delete), the underlying write buffer can
86   * be corrupted if multiple threads contend over a single HTable instance.
87   * In the case of reads, some fields used by a Scan are shared among all threads.
88   *
89   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
90   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
91   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
92   * Interface Classification</a>
93   * There are no guarantees for backwards source / binary compatibility and methods or class can
94   * change or go away without deprecation.
95   *
96   * @see Table
97   * @see Admin
98   * @see Connection
99   * @see ConnectionFactory
100  */
101 @InterfaceAudience.Private
102 @InterfaceStability.Stable
103 public class HTable implements HTableInterface {
104   private static final Log LOG = LogFactory.getLog(HTable.class);
105   protected ClusterConnection connection;
106   private final TableName tableName;
107   private volatile Configuration configuration;
108   private ConnectionConfiguration connConfiguration;
109   protected BufferedMutatorImpl mutator;
110   private boolean autoFlush = true;
111   private boolean closed = false;
112   protected int scannerCaching;
113   protected long scannerMaxResultSize;
114   private ExecutorService pool;  // For Multi & Scan
115   private int operationTimeout; // global timeout for each blocking method with retrying rpc
116   private int rpcTimeout; // timeout for each rpc request
117   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
118   private final boolean cleanupConnectionOnClose; // close the connection in close()
119   private Consistency defaultConsistency = Consistency.STRONG;
120   private HRegionLocator locator;
121
122   /** The Async process for batch */
123   protected AsyncProcess multiAp;
124   private RpcRetryingCallerFactory rpcCallerFactory;
125   private RpcControllerFactory rpcControllerFactory;
126
127   // Marked Private @since 1.0
128   @InterfaceAudience.Private
129   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
130     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
131     if (maxThreads == 0) {
132       maxThreads = 1; // is there a better default?
133     }
134     int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
135     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
136
137     // Using the "direct handoff" approach, new threads will only be created
138     // if it is necessary and will grow unbounded. This could be bad but in HCM
139     // we only create as many Runnables as there are region servers. It means
140     // it also scales when new region servers are added.
141     ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
142       TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
143     pool.allowCoreThreadTimeOut(true);
144     return pool;
145   }
146
147   /**
148    * Creates an object to access a HBase table.
149    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
150    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
151    * @param tableName Name of the table.
152    * @param connection HConnection to be used.
153    * @param pool ExecutorService to be used.
154    * @throws IOException if a remote or network exception occurs
155    */
156   @InterfaceAudience.Private
157   protected HTable(TableName tableName, final ClusterConnection connection,
158       final ConnectionConfiguration tableConfig,
159       final RpcRetryingCallerFactory rpcCallerFactory,
160       final RpcControllerFactory rpcControllerFactory,
161       final ExecutorService pool) throws IOException {
162     if (connection == null || connection.isClosed()) {
163       throw new IllegalArgumentException("Connection is null or closed.");
164     }
165     this.tableName = tableName;
166     this.cleanupConnectionOnClose = false;
167     this.connection = connection;
168     this.configuration = connection.getConfiguration();
169     this.connConfiguration = tableConfig;
170     this.pool = pool;
171     if (pool == null) {
172       this.pool = getDefaultExecutor(this.configuration);
173       this.cleanupPoolOnClose = true;
174     } else {
175       this.cleanupPoolOnClose = false;
176     }
177
178     this.rpcCallerFactory = rpcCallerFactory;
179     this.rpcControllerFactory = rpcControllerFactory;
180
181     this.finishSetup();
182   }
183
184   /**
185    * For internal testing. Uses Connection provided in {@code params}.
186    * @throws IOException
187    */
188   @VisibleForTesting
189   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
190     connection = conn;
191     tableName = params.getTableName();
192     connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
193     cleanupPoolOnClose = false;
194     cleanupConnectionOnClose = false;
195     // used from tests, don't trust the connection is real
196     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
197   }
198
199   /**
200    * @return maxKeyValueSize from configuration.
201    */
202   public static int getMaxKeyValueSize(Configuration conf) {
203     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
204   }
205
206   /**
207    * setup this HTable's parameter based on the passed configuration
208    */
209   private void finishSetup() throws IOException {
210     if (connConfiguration == null) {
211       connConfiguration = new ConnectionConfiguration(configuration);
212     }
213
214     this.operationTimeout = tableName.isSystemTable() ?
215         connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
216     this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
217         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
218     this.scannerCaching = connConfiguration.getScannerCaching();
219     this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
220     if (this.rpcCallerFactory == null) {
221       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
222     }
223     if (this.rpcControllerFactory == null) {
224       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
225     }
226
227     // puts need to track errors globally due to how the APIs currently work.
228     multiAp = this.connection.getAsyncProcess();
229     this.locator = new HRegionLocator(getName(), connection);
230   }
231
232   /**
233    * {@inheritDoc}
234    */
235   @Override
236   public Configuration getConfiguration() {
237     return configuration;
238   }
239
240   /**
241    * {@inheritDoc}
242    */
243   @Override
244   public byte [] getTableName() {
245     return this.tableName.getName();
246   }
247
248   @Override
249   public TableName getName() {
250     return tableName;
251   }
252
253   /**
254    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
255    * manipulations.
256    * @return An HConnection instance.
257    * @deprecated This method will be changed from public to package protected.
258    */
259   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
260   @Deprecated
261   @VisibleForTesting
262   public HConnection getConnection() {
263     return this.connection;
264   }
265
266   /**
267    * {@inheritDoc}
268    */
269   @Override
270   public HTableDescriptor getTableDescriptor() throws IOException {
271     HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
272       rpcControllerFactory, operationTimeout, rpcTimeout);
273     if (htd != null) {
274       return new UnmodifyableHTableDescriptor(htd);
275     }
276     return null;
277   }
278
279   /**
280    * Get the corresponding start keys and regions for an arbitrary range of
281    * keys.
282    * <p>
283    * @param startKey Starting row in range, inclusive
284    * @param endKey Ending row in range
285    * @param includeEndKey true if endRow is inclusive, false if exclusive
286    * @return A pair of list of start keys and list of HRegionLocations that
287    *         contain the specified range
288    * @throws IOException if a remote or network exception occurs
289    */
290   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
291       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
292       throws IOException {
293     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
294   }
295
296   /**
297    * Get the corresponding start keys and regions for an arbitrary range of
298    * keys.
299    * <p>
300    * @param startKey Starting row in range, inclusive
301    * @param endKey Ending row in range
302    * @param includeEndKey true if endRow is inclusive, false if exclusive
303    * @param reload true to reload information or false to use cached information
304    * @return A pair of list of start keys and list of HRegionLocations that
305    *         contain the specified range
306    * @throws IOException if a remote or network exception occurs
307    */
308   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
309       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
310       final boolean reload) throws IOException {
311     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
312     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
313       throw new IllegalArgumentException(
314         "Invalid range: " + Bytes.toStringBinary(startKey) +
315         " > " + Bytes.toStringBinary(endKey));
316     }
317     List<byte[]> keysInRange = new ArrayList<byte[]>();
318     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
319     byte[] currentKey = startKey;
320     do {
321       HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
322       keysInRange.add(currentKey);
323       regionsInRange.add(regionLocation);
324       currentKey = regionLocation.getRegionInfo().getEndKey();
325     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
326         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
327             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
328     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
329         regionsInRange);
330   }
331
332   /**
333    * The underlying {@link HTable} must not be closed.
334    * {@link HTableInterface#getScanner(Scan)} has other usage details.
335    */
336   @Override
337   public ResultScanner getScanner(final Scan scan) throws IOException {
338     if (scan.getBatch() > 0 && scan.isSmall()) {
339       throw new IllegalArgumentException("Small scan should not be used with batching");
340     }
341
342     if (scan.getCaching() <= 0) {
343       scan.setCaching(scannerCaching);
344     }
345     if (scan.getMaxResultSize() <= 0) {
346       scan.setMaxResultSize(scannerMaxResultSize);
347     }
348
349     Boolean async = scan.isAsyncPrefetch();
350     if (async == null) {
351       async = connConfiguration.isClientScannerAsyncPrefetch();
352     }
353
354     if (scan.isReversed()) {
355       if (scan.isSmall()) {
356         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
357             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
358             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
359       } else {
360         return new ReversedClientScanner(getConfiguration(), scan, getName(),
361             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
362             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
363       }
364     }
365
366     if (scan.isSmall()) {
367       return new ClientSmallScanner(getConfiguration(), scan, getName(),
368           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
369           pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
370     } else {
371       if (async) {
372         return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,
373             this.rpcCallerFactory, this.rpcControllerFactory,
374             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
375       } else {
376         return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection,
377             this.rpcCallerFactory, this.rpcControllerFactory,
378             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
379       }
380     }
381   }
382
383   /**
384    * The underlying {@link HTable} must not be closed.
385    * {@link HTableInterface#getScanner(byte[])} has other usage details.
386    */
387   @Override
388   public ResultScanner getScanner(byte [] family) throws IOException {
389     Scan scan = new Scan();
390     scan.addFamily(family);
391     return getScanner(scan);
392   }
393
394   /**
395    * The underlying {@link HTable} must not be closed.
396    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
397    */
398   @Override
399   public ResultScanner getScanner(byte [] family, byte [] qualifier)
400   throws IOException {
401     Scan scan = new Scan();
402     scan.addColumn(family, qualifier);
403     return getScanner(scan);
404   }
405
406   /**
407    * {@inheritDoc}
408    */
409   @Override
410   public Result get(final Get get) throws IOException {
411     return get(get, get.isCheckExistenceOnly());
412   }
413
414   private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
415     // if we are changing settings to the get, clone it.
416     if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
417       get = ReflectionUtils.newInstance(get.getClass(), get);
418       get.setCheckExistenceOnly(checkExistenceOnly);
419       if (get.getConsistency() == null){
420         get.setConsistency(defaultConsistency);
421       }
422     }
423
424     if (get.getConsistency() == Consistency.STRONG) {
425       // Good old call.
426       final Get getReq = get;
427       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
428           getName(), get.getRow()) {
429         @Override
430         public Result call(int callTimeout) throws IOException {
431           ClientProtos.GetRequest request =
432             RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
433           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
434           controller.setPriority(tableName);
435           controller.setCallTimeout(callTimeout);
436           try {
437             ClientProtos.GetResponse response = getStub().get(controller, request);
438             if (response == null) return null;
439             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
440           } catch (ServiceException se) {
441             throw ProtobufUtil.getRemoteException(se);
442           }
443         }
444       };
445       return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
446           this.operationTimeout);
447     }
448
449     // Call that takes into account the replica
450     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
451         rpcControllerFactory, tableName, this.connection, get, pool,
452         connConfiguration.getRetriesNumber(),
453         operationTimeout,
454         connConfiguration.getPrimaryCallTimeoutMicroSecond());
455     return callable.call();
456   }
457
458
459   /**
460    * {@inheritDoc}
461    */
462   @Override
463   public Result[] get(List<Get> gets) throws IOException {
464     if (gets.size() == 1) {
465       return new Result[]{get(gets.get(0))};
466     }
467     try {
468       Object[] r1 = new Object[gets.size()];
469       batch((List) gets, r1);
470
471       // translate.
472       Result [] results = new Result[r1.length];
473       int i=0;
474       for (Object o : r1) {
475         // batch ensures if there is a failure we get an exception instead
476         results[i++] = (Result) o;
477       }
478
479       return results;
480     } catch (InterruptedException e) {
481       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
482     }
483   }
484
485   /**
486    * {@inheritDoc}
487    */
488   @Override
489   public void batch(final List<? extends Row> actions, final Object[] results)
490       throws InterruptedException, IOException {
491     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
492     ars.waitUntilDone();
493     if (ars.hasError()) {
494       throw ars.getErrors();
495     }
496   }
497
498   /**
499    * {@inheritDoc}
500    */
501   @Override
502   public <R> void batchCallback(
503       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
504       throws IOException, InterruptedException {
505     connection.processBatchCallback(actions, tableName, pool, results, callback);
506   }
507
508   /**
509    * {@inheritDoc}
510    */
511   @Override
512   public void delete(final Delete delete)
513   throws IOException {
514     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
515         tableName, delete.getRow()) {
516       @Override
517       public Boolean call(int callTimeout) throws IOException {
518         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
519         controller.setPriority(tableName);
520         controller.setCallTimeout(callTimeout);
521
522         try {
523           MutateRequest request = RequestConverter.buildMutateRequest(
524             getLocation().getRegionInfo().getRegionName(), delete);
525           MutateResponse response = getStub().mutate(controller, request);
526           return Boolean.valueOf(response.getProcessed());
527         } catch (ServiceException se) {
528           throw ProtobufUtil.getRemoteException(se);
529         }
530       }
531     };
532     rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
533         this.operationTimeout);
534   }
535
536   /**
537    * {@inheritDoc}
538    */
539   @Override
540   public void delete(final List<Delete> deletes)
541   throws IOException {
542     Object[] results = new Object[deletes.size()];
543     try {
544       batch(deletes, results);
545     } catch (InterruptedException e) {
546       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
547     } finally {
548       // mutate list so that it is empty for complete success, or contains only failed records
549       // results are returned in the same order as the requests in list walk the list backwards,
550       // so we can remove from list without impacting the indexes of earlier members
551       for (int i = results.length - 1; i>=0; i--) {
552         // if result is not null, it succeeded
553         if (results[i] instanceof Result) {
554           deletes.remove(i);
555         }
556       }
557     }
558   }
559
560   /**
561    * {@inheritDoc}
562    * @throws IOException
563    */
564   @Override
565   public void put(final Put put) throws IOException {
566     getBufferedMutator().mutate(put);
567     if (autoFlush) {
568       flushCommits();
569     }
570   }
571
572   /**
573    * {@inheritDoc}
574    * @throws IOException
575    */
576   @Override
577   public void put(final List<Put> puts) throws IOException {
578     getBufferedMutator().mutate(puts);
579     if (autoFlush) {
580       flushCommits();
581     }
582   }
583
584   /**
585    * {@inheritDoc}
586    */
587   @Override
588   public void mutateRow(final RowMutations rm) throws IOException {
589     final RetryingTimeTracker tracker = new RetryingTimeTracker();
590     PayloadCarryingServerCallable<MultiResponse> callable =
591       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
592           rpcControllerFactory) {
593         @Override
594         public MultiResponse call(int callTimeout) throws IOException {
595           tracker.start();
596           controller.setPriority(tableName);
597           int remainingTime = tracker.getRemainingTime(callTimeout);
598           if (remainingTime == 0) {
599             throw new DoNotRetryIOException("Timeout for mutate row");
600           }
601           controller.setCallTimeout(remainingTime);
602           try {
603             RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
604                 getLocation().getRegionInfo().getRegionName(), rm);
605             regionMutationBuilder.setAtomic(true);
606             MultiRequest request =
607                 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
608             ClientProtos.MultiResponse response = getStub().multi(controller, request);
609             ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
610             if (res.hasException()) {
611               Throwable ex = ProtobufUtil.toException(res.getException());
612               if (ex instanceof IOException) {
613                 throw (IOException) ex;
614               }
615               throw new IOException("Failed to mutate row: " +
616                   Bytes.toStringBinary(rm.getRow()), ex);
617             }
618             return ResponseConverter.getResults(request, response, controller.cellScanner());
619           } catch (ServiceException se) {
620             throw ProtobufUtil.getRemoteException(se);
621           }
622         }
623       };
624     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
625         null, null, callable, operationTimeout);
626     ars.waitUntilDone();
627     if (ars.hasError()) {
628       throw ars.getErrors();
629     }
630   }
631
632   /**
633    * {@inheritDoc}
634    */
635   @Override
636   public Result append(final Append append) throws IOException {
637     if (append.numFamilies() == 0) {
638       throw new IOException(
639           "Invalid arguments to append, no columns specified");
640     }
641
642     NonceGenerator ng = this.connection.getNonceGenerator();
643     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
644     RegionServerCallable<Result> callable =
645       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
646         @Override
647         public Result call(int callTimeout) throws IOException {
648           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
649           controller.setPriority(getTableName());
650           controller.setCallTimeout(callTimeout);
651           try {
652             MutateRequest request = RequestConverter.buildMutateRequest(
653               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
654             MutateResponse response = getStub().mutate(controller, request);
655             if (!response.hasResult()) return null;
656             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
657           } catch (ServiceException se) {
658             throw ProtobufUtil.getRemoteException(se);
659           }
660         }
661       };
662     return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
663         this.operationTimeout);
664   }
665
666   /**
667    * {@inheritDoc}
668    */
669   @Override
670   public Result increment(final Increment increment) throws IOException {
671     if (!increment.hasFamilies()) {
672       throw new IOException(
673           "Invalid arguments to increment, no columns specified");
674     }
675     NonceGenerator ng = this.connection.getNonceGenerator();
676     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
677     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
678         getName(), increment.getRow()) {
679       @Override
680       public Result call(int callTimeout) throws IOException {
681         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
682         controller.setPriority(getTableName());
683         controller.setCallTimeout(callTimeout);
684         try {
685           MutateRequest request = RequestConverter.buildMutateRequest(
686             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
687           MutateResponse response = getStub().mutate(controller, request);
688           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
689         } catch (ServiceException se) {
690           throw ProtobufUtil.getRemoteException(se);
691         }
692       }
693     };
694     return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
695         this.operationTimeout);
696   }
697
698   /**
699    * {@inheritDoc}
700    */
701   @Override
702   public long incrementColumnValue(final byte [] row, final byte [] family,
703       final byte [] qualifier, final long amount)
704   throws IOException {
705     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
706   }
707
708   /**
709    * {@inheritDoc}
710    */
711   @Override
712   public long incrementColumnValue(final byte [] row, final byte [] family,
713       final byte [] qualifier, final long amount, final Durability durability)
714   throws IOException {
715     NullPointerException npe = null;
716     if (row == null) {
717       npe = new NullPointerException("row is null");
718     } else if (family == null) {
719       npe = new NullPointerException("family is null");
720     } else if (qualifier == null) {
721       npe = new NullPointerException("qualifier is null");
722     }
723     if (npe != null) {
724       throw new IOException(
725           "Invalid arguments to incrementColumnValue", npe);
726     }
727
728     NonceGenerator ng = this.connection.getNonceGenerator();
729     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
730     RegionServerCallable<Long> callable =
731       new RegionServerCallable<Long>(connection, getName(), row) {
732         @Override
733         public Long call(int callTimeout) throws IOException {
734           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
735           controller.setPriority(getTableName());
736           controller.setCallTimeout(callTimeout);
737           try {
738             MutateRequest request = RequestConverter.buildIncrementRequest(
739               getLocation().getRegionInfo().getRegionName(), row, family,
740               qualifier, amount, durability, nonceGroup, nonce);
741             MutateResponse response = getStub().mutate(controller, request);
742             Result result =
743               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
744             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
745           } catch (ServiceException se) {
746             throw ProtobufUtil.getRemoteException(se);
747           }
748         }
749       };
750     return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
751         this.operationTimeout);
752   }
753
754   /**
755    * {@inheritDoc}
756    */
757   @Override
758   public boolean checkAndPut(final byte [] row,
759       final byte [] family, final byte [] qualifier, final byte [] value,
760       final Put put)
761   throws IOException {
762     RegionServerCallable<Boolean> callable =
763       new RegionServerCallable<Boolean>(connection, getName(), row) {
764         @Override
765         public Boolean call(int callTimeout) throws IOException {
766           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
767           controller.setPriority(tableName);
768           controller.setCallTimeout(callTimeout);
769           try {
770             MutateRequest request = RequestConverter.buildMutateRequest(
771                 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
772                 new BinaryComparator(value), CompareType.EQUAL, put);
773             MutateResponse response = getStub().mutate(controller, request);
774             return Boolean.valueOf(response.getProcessed());
775           } catch (ServiceException se) {
776             throw ProtobufUtil.getRemoteException(se);
777           }
778         }
779       };
780     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
781         this.operationTimeout);
782   }
783
784   /**
785    * {@inheritDoc}
786    */
787   @Override
788   public boolean checkAndPut(final byte [] row, final byte [] family,
789       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
790       final Put put)
791   throws IOException {
792     RegionServerCallable<Boolean> callable =
793       new RegionServerCallable<Boolean>(connection, getName(), row) {
794         @Override
795         public Boolean call(int callTimeout) throws IOException {
796           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
797           controller.setPriority(tableName);
798           controller.setCallTimeout(callTimeout);
799           try {
800             CompareType compareType = CompareType.valueOf(compareOp.name());
801             MutateRequest request = RequestConverter.buildMutateRequest(
802               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
803                 new BinaryComparator(value), compareType, put);
804             MutateResponse response = getStub().mutate(controller, request);
805             return Boolean.valueOf(response.getProcessed());
806           } catch (ServiceException se) {
807             throw ProtobufUtil.getRemoteException(se);
808           }
809         }
810       };
811     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
812         this.operationTimeout);
813   }
814
815   /**
816    * {@inheritDoc}
817    */
818   @Override
819   public boolean checkAndDelete(final byte [] row,
820       final byte [] family, final byte [] qualifier, final byte [] value,
821       final Delete delete)
822   throws IOException {
823     RegionServerCallable<Boolean> callable =
824       new RegionServerCallable<Boolean>(connection, getName(), row) {
825         @Override
826         public Boolean call(int callTimeout) throws IOException {
827           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
828           controller.setPriority(tableName);
829           controller.setCallTimeout(callTimeout);
830           try {
831             MutateRequest request = RequestConverter.buildMutateRequest(
832               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
833                 new BinaryComparator(value), CompareType.EQUAL, delete);
834             MutateResponse response = getStub().mutate(controller, request);
835             return Boolean.valueOf(response.getProcessed());
836           } catch (ServiceException se) {
837             throw ProtobufUtil.getRemoteException(se);
838           }
839         }
840       };
841     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
842         this.operationTimeout);
843   }
844
845   /**
846    * {@inheritDoc}
847    */
848   @Override
849   public boolean checkAndDelete(final byte [] row, final byte [] family,
850       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
851       final Delete delete)
852   throws IOException {
853     RegionServerCallable<Boolean> callable =
854       new RegionServerCallable<Boolean>(connection, getName(), row) {
855         @Override
856         public Boolean call(int callTimeout) throws IOException {
857           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
858           controller.setPriority(tableName);
859           controller.setCallTimeout(callTimeout);
860           try {
861             CompareType compareType = CompareType.valueOf(compareOp.name());
862             MutateRequest request = RequestConverter.buildMutateRequest(
863               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
864                 new BinaryComparator(value), compareType, delete);
865             MutateResponse response = getStub().mutate(controller, request);
866             return Boolean.valueOf(response.getProcessed());
867           } catch (ServiceException se) {
868             throw ProtobufUtil.getRemoteException(se);
869           }
870         }
871       };
872     return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
873         this.operationTimeout);
874   }
875
876   /**
877    * {@inheritDoc}
878    */
879   @Override
880   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
881     final CompareOp compareOp, final byte [] value, final RowMutations rm)
882     throws IOException {
883     final RetryingTimeTracker tracker = new RetryingTimeTracker();
884     PayloadCarryingServerCallable<MultiResponse> callable =
885       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
886         rpcControllerFactory) {
887         @Override
888         public MultiResponse call(int callTimeout) throws IOException {
889           tracker.start();
890           controller.setPriority(tableName);
891           int remainingTime = tracker.getRemainingTime(callTimeout);
892           if (remainingTime == 0) {
893             throw new DoNotRetryIOException("Timeout for mutate row");
894           }
895           controller.setCallTimeout(remainingTime);
896           try {
897             CompareType compareType = CompareType.valueOf(compareOp.name());
898             MultiRequest request = RequestConverter.buildMutateRequest(
899               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
900               new BinaryComparator(value), compareType, rm);
901             ClientProtos.MultiResponse response = getStub().multi(controller, request);
902             ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
903             if (res.hasException()) {
904               Throwable ex = ProtobufUtil.toException(res.getException());
905               if(ex instanceof IOException) {
906                 throw (IOException)ex;
907               }
908               throw new IOException("Failed to checkAndMutate row: "+
909                                     Bytes.toStringBinary(rm.getRow()), ex);
910             }
911             return ResponseConverter.getResults(request, response, controller.cellScanner());
912           } catch (ServiceException se) {
913             throw ProtobufUtil.getRemoteException(se);
914           }
915         }
916       };
917     /**
918      *  Currently, we use one array to store 'processed' flag which is returned by server.
919      *  It is excessive to send such a large array, but that is required by the framework right now
920      * */
921     Object[] results = new Object[rm.getMutations().size()];
922     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
923       null, results, callable, operationTimeout);
924     ars.waitUntilDone();
925     if (ars.hasError()) {
926       throw ars.getErrors();
927     }
928
929     return ((Result)results[0]).getExists();
930   }
931
932   /**
933    * {@inheritDoc}
934    */
935   @Override
936   public boolean exists(final Get get) throws IOException {
937     Result r = get(get, true);
938     assert r.getExists() != null;
939     return r.getExists();
940   }
941
942   /**
943    * {@inheritDoc}
944    */
945   @Override
946   public boolean[] existsAll(final List<Get> gets) throws IOException {
947     if (gets.isEmpty()) return new boolean[]{};
948     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
949
950     ArrayList<Get> exists = new ArrayList<Get>(gets.size());
951     for (Get g: gets){
952       Get ge = new Get(g);
953       ge.setCheckExistenceOnly(true);
954       exists.add(ge);
955     }
956
957     Object[] r1= new Object[exists.size()];
958     try {
959       batch(exists, r1);
960     } catch (InterruptedException e) {
961       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
962     }
963
964     // translate.
965     boolean[] results = new boolean[r1.length];
966     int i = 0;
967     for (Object o : r1) {
968       // batch ensures if there is a failure we get an exception instead
969       results[i++] = ((Result)o).getExists();
970     }
971
972     return results;
973   }
974
975   /**
976    * {@inheritDoc}
977    * @throws IOException
978    */
979   @Override
980   public void flushCommits() throws IOException {
981     if (mutator == null) {
982       // nothing to flush if there's no mutator; don't bother creating one.
983       return;
984     }
985     getBufferedMutator().flush();
986   }
987
988   /**
989    * Process a mixed batch of Get, Put and Delete actions. All actions for a
990    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
991    *
992    * @param list The collection of actions.
993    * @param results An empty array, same size as list. If an exception is thrown,
994    * you can test here for partial results, and to determine which actions
995    * processed successfully.
996    * @throws IOException if there are problems talking to META. Per-item
997    * exceptions are stored in the results array.
998    */
999   public <R> void processBatchCallback(
1000     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1001     throws IOException, InterruptedException {
1002     this.batchCallback(list, results, callback);
1003   }
1004
1005
1006   /**
1007    * Parameterized batch processing, allowing varying return types for different
1008    * {@link Row} implementations.
1009    */
1010   public void processBatch(final List<? extends Row> list, final Object[] results)
1011     throws IOException, InterruptedException {
1012     this.batch(list, results);
1013   }
1014
1015
1016   @Override
1017   public void close() throws IOException {
1018     if (this.closed) {
1019       return;
1020     }
1021     flushCommits();
1022     if (cleanupPoolOnClose) {
1023       this.pool.shutdown();
1024       try {
1025         boolean terminated = false;
1026         do {
1027           // wait until the pool has terminated
1028           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1029         } while (!terminated);
1030       } catch (InterruptedException e) {
1031         this.pool.shutdownNow();
1032         LOG.warn("waitForTermination interrupted");
1033       }
1034     }
1035     if (cleanupConnectionOnClose) {
1036       if (this.connection != null) {
1037         this.connection.close();
1038       }
1039     }
1040     this.closed = true;
1041   }
1042
1043   // validate for well-formedness
1044   public void validatePut(final Put put) throws IllegalArgumentException {
1045     validatePut(put, connConfiguration.getMaxKeyValueSize());
1046   }
1047
1048   // validate for well-formedness
1049   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1050     if (put.isEmpty()) {
1051       throw new IllegalArgumentException("No columns to insert");
1052     }
1053     if (maxKeyValueSize > 0) {
1054       for (List<Cell> list : put.getFamilyCellMap().values()) {
1055         for (Cell cell : list) {
1056           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1057             throw new IllegalArgumentException("KeyValue size too large");
1058           }
1059         }
1060       }
1061     }
1062   }
1063
1064   /**
1065    * {@inheritDoc}
1066    */
1067   @Override
1068   public boolean isAutoFlush() {
1069     return autoFlush;
1070   }
1071
1072   /**
1073    * {@inheritDoc}
1074    */
1075   @Override
1076   public void setAutoFlushTo(boolean autoFlush) {
1077     this.autoFlush = autoFlush;
1078   }
1079
1080   /**
1081    * {@inheritDoc}
1082    */
1083   @Override
1084   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1085     this.autoFlush = autoFlush;
1086   }
1087
1088   /**
1089    * Returns the maximum size in bytes of the write buffer for this HTable.
1090    * <p>
1091    * The default value comes from the configuration parameter
1092    * {@code hbase.client.write.buffer}.
1093    * @return The size of the write buffer in bytes.
1094    */
1095   @Override
1096   public long getWriteBufferSize() {
1097     if (mutator == null) {
1098       return connConfiguration.getWriteBufferSize();
1099     } else {
1100       return mutator.getWriteBufferSize();
1101     }
1102   }
1103
1104   /**
1105    * Sets the size of the buffer in bytes.
1106    * <p>
1107    * If the new size is less than the current amount of data in the
1108    * write buffer, the buffer gets flushed.
1109    * @param writeBufferSize The new write buffer size, in bytes.
1110    * @throws IOException if a remote or network exception occurs.
1111    */
1112   @Override
1113   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1114     getBufferedMutator();
1115     mutator.setWriteBufferSize(writeBufferSize);
1116   }
1117
1118   /**
1119    * The pool is used for mutli requests for this HTable
1120    * @return the pool used for mutli
1121    */
1122   ExecutorService getPool() {
1123     return this.pool;
1124   }
1125
1126   /**
1127    * Explicitly clears the region cache to fetch the latest value from META.
1128    * This is a power user function: avoid unless you know the ramifications.
1129    */
1130   public void clearRegionCache() {
1131     this.connection.clearRegionCache();
1132   }
1133
1134   /**
1135    * {@inheritDoc}
1136    */
1137   @Override
1138   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1139     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1140   }
1141
1142   /**
1143    * {@inheritDoc}
1144    */
1145   @Override
1146   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1147       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1148       throws ServiceException, Throwable {
1149     final Map<byte[],R> results =  Collections.synchronizedMap(
1150         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1151     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1152       @Override
1153       public void update(byte[] region, byte[] row, R value) {
1154         if (region != null) {
1155           results.put(region, value);
1156         }
1157       }
1158     });
1159     return results;
1160   }
1161
1162   /**
1163    * {@inheritDoc}
1164    */
1165   @Override
1166   public <T extends Service, R> void coprocessorService(final Class<T> service,
1167       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1168       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1169
1170     // get regions covered by the row range
1171     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1172
1173     Map<byte[],Future<R>> futures =
1174         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1175     for (final byte[] r : keys) {
1176       final RegionCoprocessorRpcChannel channel =
1177           new RegionCoprocessorRpcChannel(connection, tableName, r);
1178       Future<R> future = pool.submit(
1179           new Callable<R>() {
1180             @Override
1181             public R call() throws Exception {
1182               T instance = ProtobufUtil.newServiceStub(service, channel);
1183               R result = callable.call(instance);
1184               byte[] region = channel.getLastRegion();
1185               if (callback != null) {
1186                 callback.update(region, r, result);
1187               }
1188               return result;
1189             }
1190           });
1191       futures.put(r, future);
1192     }
1193     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1194       try {
1195         e.getValue().get();
1196       } catch (ExecutionException ee) {
1197         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1198             + Bytes.toStringBinary(e.getKey()), ee);
1199         throw ee.getCause();
1200       } catch (InterruptedException ie) {
1201         throw new InterruptedIOException("Interrupted calling coprocessor service "
1202             + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1203       }
1204     }
1205   }
1206
1207   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1208   throws IOException {
1209     if (start == null) {
1210       start = HConstants.EMPTY_START_ROW;
1211     }
1212     if (end == null) {
1213       end = HConstants.EMPTY_END_ROW;
1214     }
1215     return getKeysAndRegionsInRange(start, end, true).getFirst();
1216   }
1217
1218   @Override
1219   public void setOperationTimeout(int operationTimeout) {
1220     this.operationTimeout = operationTimeout;
1221   }
1222
1223   @Override
1224   public int getOperationTimeout() {
1225     return operationTimeout;
1226   }
1227
1228   @Override
1229   public int getRpcTimeout() {
1230     return rpcTimeout;
1231   }
1232
1233   @Override
1234   public void setRpcTimeout(int rpcTimeout) {
1235     this.rpcTimeout = rpcTimeout;
1236   }
1237
1238   @Override
1239   public String toString() {
1240     return tableName + ";" + connection;
1241   }
1242
1243   /**
1244    * {@inheritDoc}
1245    */
1246   @Override
1247   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1248       Descriptors.MethodDescriptor methodDescriptor, Message request,
1249       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1250     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1251         Bytes.BYTES_COMPARATOR));
1252     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1253         new Callback<R>() {
1254
1255           @Override
1256           public void update(byte[] region, byte[] row, R result) {
1257             if (region != null) {
1258               results.put(region, result);
1259             }
1260           }
1261         });
1262     return results;
1263   }
1264
1265   /**
1266    * {@inheritDoc}
1267    */
1268   @Override
1269   public <R extends Message> void batchCoprocessorService(
1270       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1271       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1272       throws ServiceException, Throwable {
1273
1274     if (startKey == null) {
1275       startKey = HConstants.EMPTY_START_ROW;
1276     }
1277     if (endKey == null) {
1278       endKey = HConstants.EMPTY_END_ROW;
1279     }
1280     // get regions covered by the row range
1281     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1282         getKeysAndRegionsInRange(startKey, endKey, true);
1283     List<byte[]> keys = keysAndRegions.getFirst();
1284     List<HRegionLocation> regions = keysAndRegions.getSecond();
1285
1286     // check if we have any calls to make
1287     if (keys.isEmpty()) {
1288       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1289           ", end=" + Bytes.toStringBinary(endKey));
1290       return;
1291     }
1292 
1293     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1294     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1295         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1296     for (int i = 0; i < keys.size(); i++) {
1297       final byte[] rowKey = keys.get(i);
1298       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1299       RegionCoprocessorServiceExec exec =
1300           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1301       execs.add(exec);
1302       execsByRow.put(rowKey, exec);
1303     }
1304
1305     // tracking for any possible deserialization errors on success callback
1306     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1307     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1308     final List<Row> callbackErrorActions = new ArrayList<Row>();
1309     final List<String> callbackErrorServers = new ArrayList<String>();
1310     Object[] results = new Object[execs.size()];
1311
1312     AsyncProcess asyncProcess =
1313         new AsyncProcess(connection, configuration, pool,
1314             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1315             true, RpcControllerFactory.instantiate(configuration));
1316
1317     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1318         new Callback<ClientProtos.CoprocessorServiceResult>() {
1319           @Override
1320           public void update(byte[] region, byte[] row,
1321                               ClientProtos.CoprocessorServiceResult serviceResult) {
1322             if (LOG.isTraceEnabled()) {
1323               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1324                   ": region=" + Bytes.toStringBinary(region) +
1325                   ", row=" + Bytes.toStringBinary(row) +
1326                   ", value=" + serviceResult.getValue().getValue());
1327             }
1328             try {
1329               Message.Builder builder = responsePrototype.newBuilderForType();
1330               ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1331               callback.update(region, row, (R) builder.build());
1332             } catch (IOException e) {
1333               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1334                   e);
1335               callbackErrorExceptions.add(e);
1336               callbackErrorActions.add(execsByRow.get(row));
1337               callbackErrorServers.add("null");
1338             }
1339           }
1340         }, results);
1341
1342     future.waitUntilDone();
1343
1344     if (future.hasError()) {
1345       throw future.getErrors();
1346     } else if (!callbackErrorExceptions.isEmpty()) {
1347       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1348           callbackErrorServers);
1349     }
1350   }
1351
1352   public RegionLocator getRegionLocator() {
1353     return this.locator;
1354   }
1355
1356   @VisibleForTesting
1357   BufferedMutator getBufferedMutator() throws IOException {
1358     if (mutator == null) {
1359       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1360           new BufferedMutatorParams(tableName)
1361               .pool(pool)
1362               .writeBufferSize(connConfiguration.getWriteBufferSize())
1363               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
1364       );
1365     }
1366     return mutator;
1367   }
1368 }