View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.TreeMap;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionLocation;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.KeyValueUtil;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.classification.InterfaceStability;
47  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
48  import org.apache.hadoop.hbase.client.coprocessor.Batch;
49  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
50  import org.apache.hadoop.hbase.filter.BinaryComparator;
51  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
52  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
53  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
54  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
55  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
56  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
57  import org.apache.hadoop.hbase.protobuf.RequestConverter;
58  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
59  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
60  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
61  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
62  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
63  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
64  import org.apache.hadoop.hbase.util.Bytes;
65  import org.apache.hadoop.hbase.util.Pair;
66  import org.apache.hadoop.hbase.util.ReflectionUtils;
67  import org.apache.hadoop.hbase.util.Threads;
68  
69  import com.google.common.annotations.VisibleForTesting;
70  import com.google.protobuf.Descriptors;
71  import com.google.protobuf.Message;
72  import com.google.protobuf.Service;
73  import com.google.protobuf.ServiceException;
74  
75  /**
76   * An implementation of {@link Table}. Used to communicate with a single HBase table.
77   * Lightweight. Get as needed and just close when done.
78   * Instances of this class SHOULD NOT be constructed directly.
79   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
80   * class comment for an example of how.
81   *
82   * <p>This class is NOT thread safe for reads nor writes.
83   * In the case of writes (Put, Delete), the underlying write buffer can
84   * be corrupted if multiple threads contend over a single HTable instance.
85   * In the case of reads, some fields used by a Scan are shared among all threads.
86   *
87   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
88   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
89   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
90   * Interface Classification</a>
91   * There are no guarantees for backwards source / binary compatibility and methods or class can
92   * change or go away without deprecation.
93   *
94   * @see Table
95   * @see Admin
96   * @see Connection
97   * @see ConnectionFactory
98   */
99  @InterfaceAudience.Private
100 @InterfaceStability.Stable
101 public class HTable implements HTableInterface {
102   private static final Log LOG = LogFactory.getLog(HTable.class);
103   protected ClusterConnection connection;
104   private final TableName tableName;
105   private volatile Configuration configuration;
106   private TableConfiguration tableConfiguration;
107   protected BufferedMutatorImpl mutator;
108   private boolean autoFlush = true;
109   private boolean closed = false;
110   protected int scannerCaching;
111   protected long scannerMaxResultSize;
112   private ExecutorService pool;  // For Multi & Scan
113   private int operationTimeout;
114   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
115   private final boolean cleanupConnectionOnClose; // close the connection in close()
116   private Consistency defaultConsistency = Consistency.STRONG;
117   private HRegionLocator locator;
118 
119   /** The Async process for batch */
120   protected AsyncProcess multiAp;
121   private RpcRetryingCallerFactory rpcCallerFactory;
122   private RpcControllerFactory rpcControllerFactory;
123 
124   // Marked Private @since 1.0
125   @InterfaceAudience.Private
126   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
127     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
128     if (maxThreads == 0) {
129       maxThreads = 1; // is there a better default?
130     }
131     int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
132     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
133 
134     // Using the "direct handoff" approach, new threads will only be created
135     // if it is necessary and will grow unbounded. This could be bad but in HCM
136     // we only create as many Runnables as there are region servers. It means
137     // it also scales when new region servers are added.
138     ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
139       TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
140     pool.allowCoreThreadTimeOut(true);
141     return pool;
142   }
143 
144   /**
145    * Creates an object to access a HBase table.
146    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
147    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
148    * @param tableName Name of the table.
149    * @param connection HConnection to be used.
150    * @param pool ExecutorService to be used.
151    * @throws IOException if a remote or network exception occurs
152    */
153   @InterfaceAudience.Private
154   protected HTable(TableName tableName, final ClusterConnection connection,
155       final TableConfiguration tableConfig,
156       final RpcRetryingCallerFactory rpcCallerFactory,
157       final RpcControllerFactory rpcControllerFactory,
158       final ExecutorService pool) throws IOException {
159     if (connection == null || connection.isClosed()) {
160       throw new IllegalArgumentException("Connection is null or closed.");
161     }
162     this.tableName = tableName;
163     this.cleanupConnectionOnClose = false;
164     this.connection = connection;
165     this.configuration = connection.getConfiguration();
166     this.tableConfiguration = tableConfig;
167     this.pool = pool;
168     if (pool == null) {
169       this.pool = getDefaultExecutor(this.configuration);
170       this.cleanupPoolOnClose = true;
171     } else {
172       this.cleanupPoolOnClose = false;
173     }
174 
175     this.rpcCallerFactory = rpcCallerFactory;
176     this.rpcControllerFactory = rpcControllerFactory;
177 
178     this.finishSetup();
179   }
180 
181   /**
182    * For internal testing. Uses Connection provided in {@code params}.
183    * @throws IOException
184    */
185   @VisibleForTesting
186   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
187     connection = conn;
188     tableName = params.getTableName();
189     tableConfiguration = new TableConfiguration(connection.getConfiguration());
190     cleanupPoolOnClose = false;
191     cleanupConnectionOnClose = false;
192     // used from tests, don't trust the connection is real
193     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
194   }
195 
196   /**
197    * @return maxKeyValueSize from configuration.
198    */
199   public static int getMaxKeyValueSize(Configuration conf) {
200     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
201   }
202 
203   /**
204    * setup this HTable's parameter based on the passed configuration
205    */
206   private void finishSetup() throws IOException {
207     if (tableConfiguration == null) {
208       tableConfiguration = new TableConfiguration(configuration);
209     }
210 
211     this.operationTimeout = tableName.isSystemTable() ?
212         tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
213     this.scannerCaching = tableConfiguration.getScannerCaching();
214     this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize();
215     if (this.rpcCallerFactory == null) {
216       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
217     }
218     if (this.rpcControllerFactory == null) {
219       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
220     }
221 
222     // puts need to track errors globally due to how the APIs currently work.
223     multiAp = this.connection.getAsyncProcess();
224     this.locator = new HRegionLocator(getName(), connection);
225   }
226 
227   /**
228    * {@inheritDoc}
229    */
230   @Override
231   public Configuration getConfiguration() {
232     return configuration;
233   }
234 
235   /**
236    * {@inheritDoc}
237    */
238   @Override
239   public byte [] getTableName() {
240     return this.tableName.getName();
241   }
242 
243   @Override
244   public TableName getName() {
245     return tableName;
246   }
247 
248   /**
249    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
250    * manipulations.
251    * @return An HConnection instance.
252    * @deprecated This method will be changed from public to package protected.
253    */
254   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
255   @Deprecated
256   @VisibleForTesting
257   public HConnection getConnection() {
258     return this.connection;
259   }
260 
261   /**
262    * {@inheritDoc}
263    */
264   @Override
265   public HTableDescriptor getTableDescriptor() throws IOException {
266     HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, operationTimeout);
267     if (htd != null) {
268       return new UnmodifyableHTableDescriptor(htd);
269     }
270     return null;
271   }
272 
273   private <V> V executeMasterCallable(MasterCallable<V> callable) throws IOException {
274     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
275     try {
276       return caller.callWithRetries(callable, operationTimeout);
277     } finally {
278       callable.close();
279     }
280   }
281 
282   /**
283    * Get the corresponding start keys and regions for an arbitrary range of
284    * keys.
285    * <p>
286    * @param startKey Starting row in range, inclusive
287    * @param endKey Ending row in range
288    * @param includeEndKey true if endRow is inclusive, false if exclusive
289    * @return A pair of list of start keys and list of HRegionLocations that
290    *         contain the specified range
291    * @throws IOException if a remote or network exception occurs
292    */
293   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
294       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
295       throws IOException {
296     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
297   }
298 
299   /**
300    * Get the corresponding start keys and regions for an arbitrary range of
301    * keys.
302    * <p>
303    * @param startKey Starting row in range, inclusive
304    * @param endKey Ending row in range
305    * @param includeEndKey true if endRow is inclusive, false if exclusive
306    * @param reload true to reload information or false to use cached information
307    * @return A pair of list of start keys and list of HRegionLocations that
308    *         contain the specified range
309    * @throws IOException if a remote or network exception occurs
310    */
311   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
312       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
313       final boolean reload) throws IOException {
314     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
315     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
316       throw new IllegalArgumentException(
317         "Invalid range: " + Bytes.toStringBinary(startKey) +
318         " > " + Bytes.toStringBinary(endKey));
319     }
320     List<byte[]> keysInRange = new ArrayList<byte[]>();
321     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
322     byte[] currentKey = startKey;
323     do {
324       HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
325       keysInRange.add(currentKey);
326       regionsInRange.add(regionLocation);
327       currentKey = regionLocation.getRegionInfo().getEndKey();
328     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
329         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
330             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
331     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
332         regionsInRange);
333   }
334 
335   /**
336    * The underlying {@link HTable} must not be closed.
337    * {@link HTableInterface#getScanner(Scan)} has other usage details.
338    */
339   @Override
340   public ResultScanner getScanner(final Scan scan) throws IOException {
341     if (scan.getBatch() > 0 && scan.isSmall()) {
342       throw new IllegalArgumentException("Small scan should not be used with batching");
343     }
344 
345     if (scan.getCaching() <= 0) {
346       scan.setCaching(scannerCaching);
347     }
348     if (scan.getMaxResultSize() <= 0) {
349       scan.setMaxResultSize(scannerMaxResultSize);
350     }
351 
352     Boolean async = scan.isAsyncPrefetch();
353     if (async == null) {
354       async = tableConfiguration.isClientScannerAsyncPrefetch();
355     }
356 
357     if (scan.isReversed()) {
358       if (scan.isSmall()) {
359         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
360             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
361             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
362       } else {
363         return new ReversedClientScanner(getConfiguration(), scan, getName(),
364             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
365             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
366       }
367     }
368 
369     if (scan.isSmall()) {
370       return new ClientSmallScanner(getConfiguration(), scan, getName(),
371           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
372           pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
373     } else {
374       if (async) {
375         return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,
376             this.rpcCallerFactory, this.rpcControllerFactory,
377             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
378       } else {
379         return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection,
380             this.rpcCallerFactory, this.rpcControllerFactory,
381             pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
382       }
383     }
384   }
385 
386   /**
387    * The underlying {@link HTable} must not be closed.
388    * {@link HTableInterface#getScanner(byte[])} has other usage details.
389    */
390   @Override
391   public ResultScanner getScanner(byte [] family) throws IOException {
392     Scan scan = new Scan();
393     scan.addFamily(family);
394     return getScanner(scan);
395   }
396 
397   /**
398    * The underlying {@link HTable} must not be closed.
399    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
400    */
401   @Override
402   public ResultScanner getScanner(byte [] family, byte [] qualifier)
403   throws IOException {
404     Scan scan = new Scan();
405     scan.addColumn(family, qualifier);
406     return getScanner(scan);
407   }
408 
409   /**
410    * {@inheritDoc}
411    */
412   @Override
413   public Result get(final Get get) throws IOException {
414     return get(get, get.isCheckExistenceOnly());
415   }
416 
417   private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
418     // if we are changing settings to the get, clone it.
419     if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
420       get = ReflectionUtils.newInstance(get.getClass(), get);
421       get.setCheckExistenceOnly(checkExistenceOnly);
422       if (get.getConsistency() == null){
423         get.setConsistency(defaultConsistency);
424       }
425     }
426 
427     if (get.getConsistency() == Consistency.STRONG) {
428       // Good old call.
429       final Get getReq = get;
430       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
431           getName(), get.getRow()) {
432         @Override
433         public Result call(int callTimeout) throws IOException {
434           ClientProtos.GetRequest request =
435             RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
436           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
437           controller.setPriority(tableName);
438           controller.setCallTimeout(callTimeout);
439           try {
440             ClientProtos.GetResponse response = getStub().get(controller, request);
441             if (response == null) return null;
442             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
443           } catch (ServiceException se) {
444             throw ProtobufUtil.getRemoteException(se);
445           }
446         }
447       };
448       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
449     }
450 
451     // Call that takes into account the replica
452     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
453       rpcControllerFactory, tableName, this.connection, get, pool,
454       tableConfiguration.getRetriesNumber(),
455       operationTimeout,
456       tableConfiguration.getPrimaryCallTimeoutMicroSecond());
457     return callable.call();
458   }
459 
460 
461   /**
462    * {@inheritDoc}
463    */
464   @Override
465   public Result[] get(List<Get> gets) throws IOException {
466     if (gets.size() == 1) {
467       return new Result[]{get(gets.get(0))};
468     }
469     try {
470       Object[] r1 = new Object[gets.size()];
471       batch((List) gets, r1);
472 
473       // translate.
474       Result [] results = new Result[r1.length];
475       int i=0;
476       for (Object o : r1) {
477         // batch ensures if there is a failure we get an exception instead
478         results[i++] = (Result) o;
479       }
480 
481       return results;
482     } catch (InterruptedException e) {
483       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
484     }
485   }
486 
487   /**
488    * {@inheritDoc}
489    */
490   @Override
491   public void batch(final List<? extends Row> actions, final Object[] results)
492       throws InterruptedException, IOException {
493     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
494     ars.waitUntilDone();
495     if (ars.hasError()) {
496       throw ars.getErrors();
497     }
498   }
499 
500   /**
501    * {@inheritDoc}
502    */
503   @Override
504   public <R> void batchCallback(
505       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
506       throws IOException, InterruptedException {
507     connection.processBatchCallback(actions, tableName, pool, results, callback);
508   }
509 
510   /**
511    * {@inheritDoc}
512    */
513   @Override
514   public void delete(final Delete delete)
515   throws IOException {
516     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
517         tableName, delete.getRow()) {
518       @Override
519       public Boolean call(int callTimeout) throws IOException {
520         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
521         controller.setPriority(tableName);
522         controller.setCallTimeout(callTimeout);
523 
524         try {
525           MutateRequest request = RequestConverter.buildMutateRequest(
526             getLocation().getRegionInfo().getRegionName(), delete);
527           MutateResponse response = getStub().mutate(controller, request);
528           return Boolean.valueOf(response.getProcessed());
529         } catch (ServiceException se) {
530           throw ProtobufUtil.getRemoteException(se);
531         }
532       }
533     };
534     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
535   }
536 
537   /**
538    * {@inheritDoc}
539    */
540   @Override
541   public void delete(final List<Delete> deletes)
542   throws IOException {
543     Object[] results = new Object[deletes.size()];
544     try {
545       batch(deletes, results);
546     } catch (InterruptedException e) {
547       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
548     } finally {
549       // mutate list so that it is empty for complete success, or contains only failed records
550       // results are returned in the same order as the requests in list walk the list backwards,
551       // so we can remove from list without impacting the indexes of earlier members
552       for (int i = results.length - 1; i>=0; i--) {
553         // if result is not null, it succeeded
554         if (results[i] instanceof Result) {
555           deletes.remove(i);
556         }
557       }
558     }
559   }
560 
561   /**
562    * {@inheritDoc}
563    * @throws IOException
564    */
565   @Override
566   public void put(final Put put) throws IOException {
567     getBufferedMutator().mutate(put);
568     if (autoFlush) {
569       flushCommits();
570     }
571   }
572 
573   /**
574    * {@inheritDoc}
575    * @throws IOException
576    */
577   @Override
578   public void put(final List<Put> puts) throws IOException {
579     getBufferedMutator().mutate(puts);
580     if (autoFlush) {
581       flushCommits();
582     }
583   }
584 
585   /**
586    * {@inheritDoc}
587    */
588   @Override
589   public void mutateRow(final RowMutations rm) throws IOException {
590     RegionServerCallable<Void> callable =
591         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
592       @Override
593       public Void call(int callTimeout) throws IOException {
594         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
595         controller.setPriority(tableName);
596         controller.setCallTimeout(callTimeout);
597         try {
598           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
599             getLocation().getRegionInfo().getRegionName(), rm);
600           regionMutationBuilder.setAtomic(true);
601           MultiRequest request =
602             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
603           ClientProtos.MultiResponse response = getStub().multi(controller, request);
604           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
605           if (res.hasException()) {
606             Throwable ex = ProtobufUtil.toException(res.getException());
607             if(ex instanceof IOException) {
608               throw (IOException)ex;
609             }
610             throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
611           }
612         } catch (ServiceException se) {
613           throw ProtobufUtil.getRemoteException(se);
614         }
615         return null;
616       }
617     };
618     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
619   }
620 
621   /**
622    * {@inheritDoc}
623    */
624   @Override
625   public Result append(final Append append) throws IOException {
626     if (append.numFamilies() == 0) {
627       throw new IOException(
628           "Invalid arguments to append, no columns specified");
629     }
630 
631     NonceGenerator ng = this.connection.getNonceGenerator();
632     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
633     RegionServerCallable<Result> callable =
634       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
635         @Override
636         public Result call(int callTimeout) throws IOException {
637           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
638           controller.setPriority(getTableName());
639           controller.setCallTimeout(callTimeout);
640           try {
641             MutateRequest request = RequestConverter.buildMutateRequest(
642               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
643             MutateResponse response = getStub().mutate(controller, request);
644             if (!response.hasResult()) return null;
645             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
646           } catch (ServiceException se) {
647             throw ProtobufUtil.getRemoteException(se);
648           }
649         }
650       };
651     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
652   }
653 
654   /**
655    * {@inheritDoc}
656    */
657   @Override
658   public Result increment(final Increment increment) throws IOException {
659     if (!increment.hasFamilies()) {
660       throw new IOException(
661           "Invalid arguments to increment, no columns specified");
662     }
663     NonceGenerator ng = this.connection.getNonceGenerator();
664     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
665     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
666         getName(), increment.getRow()) {
667       @Override
668       public Result call(int callTimeout) throws IOException {
669         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
670         controller.setPriority(getTableName());
671         controller.setCallTimeout(callTimeout);
672         try {
673           MutateRequest request = RequestConverter.buildMutateRequest(
674             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
675           MutateResponse response = getStub().mutate(controller, request);
676           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
677         } catch (ServiceException se) {
678           throw ProtobufUtil.getRemoteException(se);
679         }
680       }
681     };
682     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
683   }
684 
685   /**
686    * {@inheritDoc}
687    */
688   @Override
689   public long incrementColumnValue(final byte [] row, final byte [] family,
690       final byte [] qualifier, final long amount)
691   throws IOException {
692     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
693   }
694 
695   /**
696    * {@inheritDoc}
697    */
698   @Override
699   public long incrementColumnValue(final byte [] row, final byte [] family,
700       final byte [] qualifier, final long amount, final Durability durability)
701   throws IOException {
702     NullPointerException npe = null;
703     if (row == null) {
704       npe = new NullPointerException("row is null");
705     } else if (family == null) {
706       npe = new NullPointerException("family is null");
707     } else if (qualifier == null) {
708       npe = new NullPointerException("qualifier is null");
709     }
710     if (npe != null) {
711       throw new IOException(
712           "Invalid arguments to incrementColumnValue", npe);
713     }
714 
715     NonceGenerator ng = this.connection.getNonceGenerator();
716     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
717     RegionServerCallable<Long> callable =
718       new RegionServerCallable<Long>(connection, getName(), row) {
719         @Override
720         public Long call(int callTimeout) throws IOException {
721           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
722           controller.setPriority(getTableName());
723           controller.setCallTimeout(callTimeout);
724           try {
725             MutateRequest request = RequestConverter.buildIncrementRequest(
726               getLocation().getRegionInfo().getRegionName(), row, family,
727               qualifier, amount, durability, nonceGroup, nonce);
728             MutateResponse response = getStub().mutate(controller, request);
729             Result result =
730               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
731             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
732           } catch (ServiceException se) {
733             throw ProtobufUtil.getRemoteException(se);
734           }
735         }
736       };
737     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
738   }
739 
740   /**
741    * {@inheritDoc}
742    */
743   @Override
744   public boolean checkAndPut(final byte [] row,
745       final byte [] family, final byte [] qualifier, final byte [] value,
746       final Put put)
747   throws IOException {
748     RegionServerCallable<Boolean> callable =
749       new RegionServerCallable<Boolean>(connection, getName(), row) {
750         @Override
751         public Boolean call(int callTimeout) throws IOException {
752           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
753           controller.setPriority(tableName);
754           controller.setCallTimeout(callTimeout);
755           try {
756             MutateRequest request = RequestConverter.buildMutateRequest(
757                 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
758                 new BinaryComparator(value), CompareType.EQUAL, put);
759             MutateResponse response = getStub().mutate(controller, request);
760             return Boolean.valueOf(response.getProcessed());
761           } catch (ServiceException se) {
762             throw ProtobufUtil.getRemoteException(se);
763           }
764         }
765       };
766     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
767   }
768 
769   /**
770    * {@inheritDoc}
771    */
772   @Override
773   public boolean checkAndPut(final byte [] row, final byte [] family,
774       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
775       final Put put)
776   throws IOException {
777     RegionServerCallable<Boolean> callable =
778       new RegionServerCallable<Boolean>(connection, getName(), row) {
779         @Override
780         public Boolean call(int callTimeout) throws IOException {
781           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
782           controller.setPriority(tableName);
783           controller.setCallTimeout(callTimeout);
784           try {
785             CompareType compareType = CompareType.valueOf(compareOp.name());
786             MutateRequest request = RequestConverter.buildMutateRequest(
787               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
788                 new BinaryComparator(value), compareType, put);
789             MutateResponse response = getStub().mutate(controller, request);
790             return Boolean.valueOf(response.getProcessed());
791           } catch (ServiceException se) {
792             throw ProtobufUtil.getRemoteException(se);
793           }
794         }
795       };
796     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
797   }
798 
799   /**
800    * {@inheritDoc}
801    */
802   @Override
803   public boolean checkAndDelete(final byte [] row,
804       final byte [] family, final byte [] qualifier, final byte [] value,
805       final Delete delete)
806   throws IOException {
807     RegionServerCallable<Boolean> callable =
808       new RegionServerCallable<Boolean>(connection, getName(), row) {
809         @Override
810         public Boolean call(int callTimeout) throws IOException {
811           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
812           controller.setPriority(tableName);
813           controller.setCallTimeout(callTimeout);
814           try {
815             MutateRequest request = RequestConverter.buildMutateRequest(
816               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
817                 new BinaryComparator(value), CompareType.EQUAL, delete);
818             MutateResponse response = getStub().mutate(controller, request);
819             return Boolean.valueOf(response.getProcessed());
820           } catch (ServiceException se) {
821             throw ProtobufUtil.getRemoteException(se);
822           }
823         }
824       };
825     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
826   }
827 
828   /**
829    * {@inheritDoc}
830    */
831   @Override
832   public boolean checkAndDelete(final byte [] row, final byte [] family,
833       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
834       final Delete delete)
835   throws IOException {
836     RegionServerCallable<Boolean> callable =
837       new RegionServerCallable<Boolean>(connection, getName(), row) {
838         @Override
839         public Boolean call(int callTimeout) throws IOException {
840           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
841           controller.setPriority(tableName);
842           controller.setCallTimeout(callTimeout);
843           try {
844             CompareType compareType = CompareType.valueOf(compareOp.name());
845             MutateRequest request = RequestConverter.buildMutateRequest(
846               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
847                 new BinaryComparator(value), compareType, delete);
848             MutateResponse response = getStub().mutate(controller, request);
849             return Boolean.valueOf(response.getProcessed());
850           } catch (ServiceException se) {
851             throw ProtobufUtil.getRemoteException(se);
852           }
853         }
854       };
855     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
856   }
857 
858   /**
859    * {@inheritDoc}
860    */
861   @Override
862   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
863       final CompareOp compareOp, final byte [] value, final RowMutations rm)
864   throws IOException {
865     RegionServerCallable<Boolean> callable =
866         new RegionServerCallable<Boolean>(connection, getName(), row) {
867           @Override
868           public Boolean call(int callTimeout) throws IOException {
869             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
870             controller.setPriority(tableName);
871             controller.setCallTimeout(callTimeout);
872             try {
873               CompareType compareType = CompareType.valueOf(compareOp.name());
874               MultiRequest request = RequestConverter.buildMutateRequest(
875                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
876                   new BinaryComparator(value), compareType, rm);
877               ClientProtos.MultiResponse response = getStub().multi(controller, request);
878               ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
879               if (res.hasException()) {
880                 Throwable ex = ProtobufUtil.toException(res.getException());
881                 if(ex instanceof IOException) {
882                   throw (IOException)ex;
883                 }
884                 throw new IOException("Failed to checkAndMutate row: "+
885                     Bytes.toStringBinary(rm.getRow()), ex);
886               }
887               return Boolean.valueOf(response.getProcessed());
888             } catch (ServiceException se) {
889               throw ProtobufUtil.getRemoteException(se);
890             }
891           }
892         };
893     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
894   }
895 
896   /**
897    * {@inheritDoc}
898    */
899   @Override
900   public boolean exists(final Get get) throws IOException {
901     Result r = get(get, true);
902     assert r.getExists() != null;
903     return r.getExists();
904   }
905 
906   /**
907    * {@inheritDoc}
908    */
909   @Override
910   public boolean[] existsAll(final List<Get> gets) throws IOException {
911     if (gets.isEmpty()) return new boolean[]{};
912     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
913 
914     ArrayList<Get> exists = new ArrayList<Get>(gets.size());
915     for (Get g: gets){
916       Get ge = new Get(g);
917       ge.setCheckExistenceOnly(true);
918       exists.add(ge);
919     }
920 
921     Object[] r1= new Object[exists.size()];
922     try {
923       batch(exists, r1);
924     } catch (InterruptedException e) {
925       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
926     }
927 
928     // translate.
929     boolean[] results = new boolean[r1.length];
930     int i = 0;
931     for (Object o : r1) {
932       // batch ensures if there is a failure we get an exception instead
933       results[i++] = ((Result)o).getExists();
934     }
935 
936     return results;
937   }
938 
939   /**
940    * {@inheritDoc}
941    * @throws IOException
942    */
943   @Override
944   public void flushCommits() throws IOException {
945     if (mutator == null) {
946       // nothing to flush if there's no mutator; don't bother creating one.
947       return;
948     }
949     getBufferedMutator().flush();
950   }
951 
952   /**
953    * Process a mixed batch of Get, Put and Delete actions. All actions for a
954    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
955    *
956    * @param list The collection of actions.
957    * @param results An empty array, same size as list. If an exception is thrown,
958    * you can test here for partial results, and to determine which actions
959    * processed successfully.
960    * @throws IOException if there are problems talking to META. Per-item
961    * exceptions are stored in the results array.
962    */
963   public <R> void processBatchCallback(
964     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
965     throws IOException, InterruptedException {
966     this.batchCallback(list, results, callback);
967   }
968 
969 
970   /**
971    * Parameterized batch processing, allowing varying return types for different
972    * {@link Row} implementations.
973    */
974   public void processBatch(final List<? extends Row> list, final Object[] results)
975     throws IOException, InterruptedException {
976     this.batch(list, results);
977   }
978 
979 
980   @Override
981   public void close() throws IOException {
982     if (this.closed) {
983       return;
984     }
985     flushCommits();
986     if (cleanupPoolOnClose) {
987       this.pool.shutdown();
988       try {
989         boolean terminated = false;
990         do {
991           // wait until the pool has terminated
992           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
993         } while (!terminated);
994       } catch (InterruptedException e) {
995         this.pool.shutdownNow();
996         LOG.warn("waitForTermination interrupted");
997       }
998     }
999     if (cleanupConnectionOnClose) {
1000       if (this.connection != null) {
1001         this.connection.close();
1002       }
1003     }
1004     this.closed = true;
1005   }
1006 
1007   // validate for well-formedness
1008   public void validatePut(final Put put) throws IllegalArgumentException {
1009     validatePut(put, tableConfiguration.getMaxKeyValueSize());
1010   }
1011 
1012   // validate for well-formedness
1013   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1014     if (put.isEmpty()) {
1015       throw new IllegalArgumentException("No columns to insert");
1016     }
1017     if (maxKeyValueSize > 0) {
1018       for (List<Cell> list : put.getFamilyCellMap().values()) {
1019         for (Cell cell : list) {
1020           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1021             throw new IllegalArgumentException("KeyValue size too large");
1022           }
1023         }
1024       }
1025     }
1026   }
1027 
1028   /**
1029    * {@inheritDoc}
1030    */
1031   @Override
1032   public boolean isAutoFlush() {
1033     return autoFlush;
1034   }
1035 
1036   /**
1037    * {@inheritDoc}
1038    */
1039   @Override
1040   public void setAutoFlushTo(boolean autoFlush) {
1041     this.autoFlush = autoFlush;
1042   }
1043 
1044   /**
1045    * {@inheritDoc}
1046    */
1047   @Override
1048   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1049     this.autoFlush = autoFlush;
1050   }
1051 
1052   /**
1053    * Returns the maximum size in bytes of the write buffer for this HTable.
1054    * <p>
1055    * The default value comes from the configuration parameter
1056    * {@code hbase.client.write.buffer}.
1057    * @return The size of the write buffer in bytes.
1058    */
1059   @Override
1060   public long getWriteBufferSize() {
1061     if (mutator == null) {
1062       return tableConfiguration.getWriteBufferSize();
1063     } else {
1064       return mutator.getWriteBufferSize();
1065     }
1066   }
1067 
1068   /**
1069    * Sets the size of the buffer in bytes.
1070    * <p>
1071    * If the new size is less than the current amount of data in the
1072    * write buffer, the buffer gets flushed.
1073    * @param writeBufferSize The new write buffer size, in bytes.
1074    * @throws IOException if a remote or network exception occurs.
1075    */
1076   @Override
1077   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1078     getBufferedMutator();
1079     mutator.setWriteBufferSize(writeBufferSize);
1080   }
1081 
1082   /**
1083    * The pool is used for mutli requests for this HTable
1084    * @return the pool used for mutli
1085    */
1086   ExecutorService getPool() {
1087     return this.pool;
1088   }
1089 
1090   /**
1091    * Explicitly clears the region cache to fetch the latest value from META.
1092    * This is a power user function: avoid unless you know the ramifications.
1093    */
1094   public void clearRegionCache() {
1095     this.connection.clearRegionCache();
1096   }
1097 
1098   /**
1099    * {@inheritDoc}
1100    */
1101   @Override
1102   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1103     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1104   }
1105 
1106   /**
1107    * {@inheritDoc}
1108    */
1109   @Override
1110   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1111       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1112       throws ServiceException, Throwable {
1113     final Map<byte[],R> results =  Collections.synchronizedMap(
1114         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1115     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1116       @Override
1117       public void update(byte[] region, byte[] row, R value) {
1118         if (region != null) {
1119           results.put(region, value);
1120         }
1121       }
1122     });
1123     return results;
1124   }
1125 
1126   /**
1127    * {@inheritDoc}
1128    */
1129   @Override
1130   public <T extends Service, R> void coprocessorService(final Class<T> service,
1131       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1132       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1133 
1134     // get regions covered by the row range
1135     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1136 
1137     Map<byte[],Future<R>> futures =
1138         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1139     for (final byte[] r : keys) {
1140       final RegionCoprocessorRpcChannel channel =
1141           new RegionCoprocessorRpcChannel(connection, tableName, r);
1142       Future<R> future = pool.submit(
1143           new Callable<R>() {
1144             @Override
1145             public R call() throws Exception {
1146               T instance = ProtobufUtil.newServiceStub(service, channel);
1147               R result = callable.call(instance);
1148               byte[] region = channel.getLastRegion();
1149               if (callback != null) {
1150                 callback.update(region, r, result);
1151               }
1152               return result;
1153             }
1154           });
1155       futures.put(r, future);
1156     }
1157     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1158       try {
1159         e.getValue().get();
1160       } catch (ExecutionException ee) {
1161         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1162             + Bytes.toStringBinary(e.getKey()), ee);
1163         throw ee.getCause();
1164       } catch (InterruptedException ie) {
1165         throw new InterruptedIOException("Interrupted calling coprocessor service "
1166             + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1167       }
1168     }
1169   }
1170 
1171   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1172   throws IOException {
1173     if (start == null) {
1174       start = HConstants.EMPTY_START_ROW;
1175     }
1176     if (end == null) {
1177       end = HConstants.EMPTY_END_ROW;
1178     }
1179     return getKeysAndRegionsInRange(start, end, true).getFirst();
1180   }
1181 
1182   public void setOperationTimeout(int operationTimeout) {
1183     this.operationTimeout = operationTimeout;
1184   }
1185 
1186   public int getOperationTimeout() {
1187     return operationTimeout;
1188   }
1189 
1190   @Override
1191   public String toString() {
1192     return tableName + ";" + connection;
1193   }
1194 
1195   /**
1196    * {@inheritDoc}
1197    */
1198   @Override
1199   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1200       Descriptors.MethodDescriptor methodDescriptor, Message request,
1201       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1202     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1203         Bytes.BYTES_COMPARATOR));
1204     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1205         new Callback<R>() {
1206 
1207           @Override
1208           public void update(byte[] region, byte[] row, R result) {
1209             if (region != null) {
1210               results.put(region, result);
1211             }
1212           }
1213         });
1214     return results;
1215   }
1216 
1217   /**
1218    * {@inheritDoc}
1219    */
1220   @Override
1221   public <R extends Message> void batchCoprocessorService(
1222       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1223       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1224       throws ServiceException, Throwable {
1225 
1226     if (startKey == null) {
1227       startKey = HConstants.EMPTY_START_ROW;
1228     }
1229     if (endKey == null) {
1230       endKey = HConstants.EMPTY_END_ROW;
1231     }
1232     // get regions covered by the row range
1233     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1234         getKeysAndRegionsInRange(startKey, endKey, true);
1235     List<byte[]> keys = keysAndRegions.getFirst();
1236     List<HRegionLocation> regions = keysAndRegions.getSecond();
1237 
1238     // check if we have any calls to make
1239     if (keys.isEmpty()) {
1240       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1241           ", end=" + Bytes.toStringBinary(endKey));
1242       return;
1243     }
1244 
1245     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1246     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1247         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1248     for (int i = 0; i < keys.size(); i++) {
1249       final byte[] rowKey = keys.get(i);
1250       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1251       RegionCoprocessorServiceExec exec =
1252           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1253       execs.add(exec);
1254       execsByRow.put(rowKey, exec);
1255     }
1256 
1257     // tracking for any possible deserialization errors on success callback
1258     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1259     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1260     final List<Row> callbackErrorActions = new ArrayList<Row>();
1261     final List<String> callbackErrorServers = new ArrayList<String>();
1262     Object[] results = new Object[execs.size()];
1263 
1264     AsyncProcess asyncProcess =
1265         new AsyncProcess(connection, configuration, pool,
1266             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1267             true, RpcControllerFactory.instantiate(configuration));
1268 
1269     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1270         new Callback<ClientProtos.CoprocessorServiceResult>() {
1271           @Override
1272           public void update(byte[] region, byte[] row,
1273                               ClientProtos.CoprocessorServiceResult serviceResult) {
1274             if (LOG.isTraceEnabled()) {
1275               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1276                   ": region=" + Bytes.toStringBinary(region) +
1277                   ", row=" + Bytes.toStringBinary(row) +
1278                   ", value=" + serviceResult.getValue().getValue());
1279             }
1280             try {
1281               Message.Builder builder = responsePrototype.newBuilderForType();
1282               ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1283               callback.update(region, row, (R) builder.build());
1284             } catch (IOException e) {
1285               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1286                   e);
1287               callbackErrorExceptions.add(e);
1288               callbackErrorActions.add(execsByRow.get(row));
1289               callbackErrorServers.add("null");
1290             }
1291           }
1292         }, results);
1293 
1294     future.waitUntilDone();
1295 
1296     if (future.hasError()) {
1297       throw future.getErrors();
1298     } else if (!callbackErrorExceptions.isEmpty()) {
1299       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1300           callbackErrorServers);
1301     }
1302   }
1303 
1304   public RegionLocator getRegionLocator() {
1305     return this.locator;
1306   }
1307 
1308   @VisibleForTesting
1309   BufferedMutator getBufferedMutator() throws IOException {
1310     if (mutator == null) {
1311       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1312           new BufferedMutatorParams(tableName)
1313               .pool(pool)
1314               .writeBufferSize(tableConfiguration.getWriteBufferSize())
1315               .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize())
1316       );
1317     }
1318     return mutator;
1319   }
1320 }