View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.TreeMap;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionLocation;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.KeyValueUtil;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.classification.InterfaceStability;
47  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
48  import org.apache.hadoop.hbase.client.coprocessor.Batch;
49  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
50  import org.apache.hadoop.hbase.filter.BinaryComparator;
51  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
52  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
53  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
54  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
55  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
56  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
57  import org.apache.hadoop.hbase.protobuf.RequestConverter;
58  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
59  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
60  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
61  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
62  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
63  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
64  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.Pair;
67  import org.apache.hadoop.hbase.util.ReflectionUtils;
68  import org.apache.hadoop.hbase.util.Threads;
69
70  import com.google.common.annotations.VisibleForTesting;
71  // DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
72  // Internally, we use shaded protobuf. This below are part of our public API.
73  import com.google.protobuf.Descriptors;
74  import com.google.protobuf.Message;
75  import com.google.protobuf.Service;
76  import com.google.protobuf.ServiceException;
77  // SEE ABOVE NOTE!
78
79  /**
80   * An implementation of {@link Table}. Used to communicate with a single HBase table.
81   * Lightweight. Get as needed and just close when done.
82   * Instances of this class SHOULD NOT be constructed directly.
83   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
84   * class comment for an example of how.
85   *
86   * <p>This class is NOT thread safe for reads nor writes.
87   * In the case of writes (Put, Delete), the underlying write buffer can
88   * be corrupted if multiple threads contend over a single HTable instance.
89   * In the case of reads, some fields used by a Scan are shared among all threads.
90   *
91   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
92   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
93   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
94   * Interface Classification</a>
95   * There are no guarantees for backwards source / binary compatibility and methods or class can
96   * change or go away without deprecation.
97   *
98   * @see Table
99   * @see Admin
100  * @see Connection
101  * @see ConnectionFactory
102  */
103 @InterfaceAudience.Private
104 @InterfaceStability.Stable
105 public class HTable implements Table {
106   private static final Log LOG = LogFactory.getLog(HTable.class);
107   protected ClusterConnection connection;
108   private final TableName tableName;
109   private volatile Configuration configuration;
110   private ConnectionConfiguration connConfiguration;
111   protected BufferedMutatorImpl mutator;
112   private boolean closed = false;
113   protected int scannerCaching;
114   protected long scannerMaxResultSize;
115   private ExecutorService pool;  // For Multi & Scan
116   private int operationTimeout; // global timeout for each blocking method with retrying rpc
117   private int readRpcTimeout; // timeout for each read rpc request
118   private int writeRpcTimeout; // timeout for each write rpc request
119   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
120   private final boolean cleanupConnectionOnClose; // close the connection in close()
121   private Consistency defaultConsistency = Consistency.STRONG;
122   private HRegionLocator locator;
123
124   /** The Async process for batch */
125   protected AsyncProcess multiAp;
126   private RpcRetryingCallerFactory rpcCallerFactory;
127   private RpcControllerFactory rpcControllerFactory;
128
129   // Marked Private @since 1.0
130   @InterfaceAudience.Private
131   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
132     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
133     if (maxThreads == 0) {
134       maxThreads = 1; // is there a better default?
135     }
136     int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
137     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
138
139     // Using the "direct handoff" approach, new threads will only be created
140     // if it is necessary and will grow unbounded. This could be bad but in HCM
141     // we only create as many Runnables as there are region servers. It means
142     // it also scales when new region servers are added.
143     ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
144       TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
145     pool.allowCoreThreadTimeOut(true);
146     return pool;
147   }
148
149   /**
150    * Creates an object to access a HBase table.
151    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
152    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
153    * @param tableName Name of the table.
154    * @param connection Connection to be used.
155    * @param pool ExecutorService to be used.
156    * @throws IOException if a remote or network exception occurs
157    */
158   @InterfaceAudience.Private
159   protected HTable(TableName tableName, final ClusterConnection connection,
160       final ConnectionConfiguration tableConfig,
161       final RpcRetryingCallerFactory rpcCallerFactory,
162       final RpcControllerFactory rpcControllerFactory,
163       final ExecutorService pool) throws IOException {
164     if (connection == null || connection.isClosed()) {
165       throw new IllegalArgumentException("Connection is null or closed.");
166     }
167     this.tableName = tableName;
168     this.cleanupConnectionOnClose = false;
169     this.connection = connection;
170     this.configuration = connection.getConfiguration();
171     this.connConfiguration = tableConfig;
172     this.pool = pool;
173     if (pool == null) {
174       this.pool = getDefaultExecutor(this.configuration);
175       this.cleanupPoolOnClose = true;
176     } else {
177       this.cleanupPoolOnClose = false;
178     }
179
180     this.rpcCallerFactory = rpcCallerFactory;
181     this.rpcControllerFactory = rpcControllerFactory;
182
183     this.finishSetup();
184   }
185
186   /**
187    * For internal testing. Uses Connection provided in {@code params}.
188    * @throws IOException
189    */
190   @VisibleForTesting
191   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
192     connection = conn;
193     tableName = params.getTableName();
194     connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
195     cleanupPoolOnClose = false;
196     cleanupConnectionOnClose = false;
197     // used from tests, don't trust the connection is real
198     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
199   }
200
201   /**
202    * @return maxKeyValueSize from configuration.
203    */
204   public static int getMaxKeyValueSize(Configuration conf) {
205     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
206   }
207
208   /**
209    * setup this HTable's parameter based on the passed configuration
210    */
211   private void finishSetup() throws IOException {
212     if (connConfiguration == null) {
213       connConfiguration = new ConnectionConfiguration(configuration);
214     }
215
216     this.operationTimeout = tableName.isSystemTable() ?
217         connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
218     this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
219         configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
220             HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
221     this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
222         configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
223             HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
224     this.scannerCaching = connConfiguration.getScannerCaching();
225     this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
226     if (this.rpcCallerFactory == null) {
227       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
228     }
229     if (this.rpcControllerFactory == null) {
230       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
231     }
232
233     // puts need to track errors globally due to how the APIs currently work.
234     multiAp = this.connection.getAsyncProcess();
235     this.locator = new HRegionLocator(getName(), connection);
236   }
237
238   /**
239    * {@inheritDoc}
240    */
241   @Override
242   public Configuration getConfiguration() {
243     return configuration;
244   }
245
246   @Override
247   public TableName getName() {
248     return tableName;
249   }
250
251   /**
252    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
253    * manipulations.
254    * @return A Connection instance.
255    */
256   @VisibleForTesting
257   protected Connection getConnection() {
258     return this.connection;
259   }
260
261   /**
262    * {@inheritDoc}
263    */
264   @Override
265   public HTableDescriptor getTableDescriptor() throws IOException {
266     HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
267       rpcControllerFactory, operationTimeout, readRpcTimeout);
268     if (htd != null) {
269       return new UnmodifyableHTableDescriptor(htd);
270     }
271     return null;
272   }
273
274   /**
275    * Get the corresponding start keys and regions for an arbitrary range of
276    * keys.
277    * <p>
278    * @param startKey Starting row in range, inclusive
279    * @param endKey Ending row in range
280    * @param includeEndKey true if endRow is inclusive, false if exclusive
281    * @return A pair of list of start keys and list of HRegionLocations that
282    *         contain the specified range
283    * @throws IOException if a remote or network exception occurs
284    */
285   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
286       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
287       throws IOException {
288     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
289   }
290
291   /**
292    * Get the corresponding start keys and regions for an arbitrary range of
293    * keys.
294    * <p>
295    * @param startKey Starting row in range, inclusive
296    * @param endKey Ending row in range
297    * @param includeEndKey true if endRow is inclusive, false if exclusive
298    * @param reload true to reload information or false to use cached information
299    * @return A pair of list of start keys and list of HRegionLocations that
300    *         contain the specified range
301    * @throws IOException if a remote or network exception occurs
302    */
303   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
304       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
305       final boolean reload) throws IOException {
306     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
307     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
308       throw new IllegalArgumentException(
309         "Invalid range: " + Bytes.toStringBinary(startKey) +
310         " > " + Bytes.toStringBinary(endKey));
311     }
312     List<byte[]> keysInRange = new ArrayList<byte[]>();
313     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
314     byte[] currentKey = startKey;
315     do {
316       HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
317       keysInRange.add(currentKey);
318       regionsInRange.add(regionLocation);
319       currentKey = regionLocation.getRegionInfo().getEndKey();
320     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
321         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
322             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
323     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
324         regionsInRange);
325   }
326
327   /**
328    * The underlying {@link HTable} must not be closed.
329    * {@link Table#getScanner(Scan)} has other usage details.
330    */
331   @Override
332   public ResultScanner getScanner(final Scan scan) throws IOException {
333     if (scan.getBatch() > 0 && scan.isSmall()) {
334       throw new IllegalArgumentException("Small scan should not be used with batching");
335     }
336
337     if (scan.getCaching() <= 0) {
338       scan.setCaching(scannerCaching);
339     }
340     if (scan.getMaxResultSize() <= 0) {
341       scan.setMaxResultSize(scannerMaxResultSize);
342     }
343
344     Boolean async = scan.isAsyncPrefetch();
345     if (async == null) {
346       async = connConfiguration.isClientScannerAsyncPrefetch();
347     }
348
349     if (scan.isReversed()) {
350       if (scan.isSmall()) {
351         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
352             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
353             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
354       } else {
355         return new ReversedClientScanner(getConfiguration(), scan, getName(),
356             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
357             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
358       }
359     }
360
361     if (scan.isSmall()) {
362       return new ClientSmallScanner(getConfiguration(), scan, getName(),
363           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
364           pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
365     } else {
366       if (async) {
367         return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,
368             this.rpcCallerFactory, this.rpcControllerFactory,
369             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
370       } else {
371         return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection,
372             this.rpcCallerFactory, this.rpcControllerFactory,
373             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
374       }
375     }
376   }
377
378   /**
379    * The underlying {@link HTable} must not be closed.
380    * {@link Table#getScanner(byte[])} has other usage details.
381    */
382   @Override
383   public ResultScanner getScanner(byte [] family) throws IOException {
384     Scan scan = new Scan();
385     scan.addFamily(family);
386     return getScanner(scan);
387   }
388
389   /**
390    * The underlying {@link HTable} must not be closed.
391    * {@link Table#getScanner(byte[], byte[])} has other usage details.
392    */
393   @Override
394   public ResultScanner getScanner(byte [] family, byte [] qualifier)
395   throws IOException {
396     Scan scan = new Scan();
397     scan.addColumn(family, qualifier);
398     return getScanner(scan);
399   }
400
401   /**
402    * {@inheritDoc}
403    */
404   @Override
405   public Result get(final Get get) throws IOException {
406     return get(get, get.isCheckExistenceOnly());
407   }
408
409   private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
410     // if we are changing settings to the get, clone it.
411     if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
412       get = ReflectionUtils.newInstance(get.getClass(), get);
413       get.setCheckExistenceOnly(checkExistenceOnly);
414       if (get.getConsistency() == null){
415         get.setConsistency(defaultConsistency);
416       }
417     }
418
419     if (get.getConsistency() == Consistency.STRONG) {
420       // Good old call.
421       final Get configuredGet = get;
422       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
423           this.rpcControllerFactory, getName(), get.getRow()) {
424         @Override
425         protected Result rpcCall() throws Exception {
426           ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
427               getLocation().getRegionInfo().getRegionName(), configuredGet);
428           ClientProtos.GetResponse response = getStub().get(getRpcController(), request);
429           if (response == null) return null;
430           return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
431         }
432       };
433       return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
434           this.operationTimeout);
435     }
436
437     // Call that takes into account the replica
438     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
439         rpcControllerFactory, tableName, this.connection, get, pool,
440         connConfiguration.getRetriesNumber(),
441         operationTimeout,
442         connConfiguration.getPrimaryCallTimeoutMicroSecond());
443     return callable.call(operationTimeout);
444   }
445
446   /**
447    * {@inheritDoc}
448    */
449   @Override
450   public Result[] get(List<Get> gets) throws IOException {
451     if (gets.size() == 1) {
452       return new Result[]{get(gets.get(0))};
453     }
454     try {
455       Object[] r1 = new Object[gets.size()];
456       batch((List<? extends Row>)gets, r1);
457       // Translate.
458       Result [] results = new Result[r1.length];
459       int i = 0;
460       for (Object obj: r1) {
461         // Batch ensures if there is a failure we get an exception instead
462         results[i++] = (Result)obj;
463       }
464       return results;
465     } catch (InterruptedException e) {
466       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
467     }
468   }
469
470   /**
471    * {@inheritDoc}
472    */
473   @Override
474   public void batch(final List<? extends Row> actions, final Object[] results)
475       throws InterruptedException, IOException {
476     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
477     ars.waitUntilDone();
478     if (ars.hasError()) {
479       throw ars.getErrors();
480     }
481   }
482
483   /**
484    * {@inheritDoc}
485    */
486   @Override
487   public <R> void batchCallback(
488     final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
489     throws IOException, InterruptedException {
490     doBatchWithCallback(actions, results, callback, connection, pool, tableName);
491   }
492
493   public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
494     Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
495     throws InterruptedIOException, RetriesExhaustedWithDetailsException {
496     AsyncRequestFuture ars = connection.getAsyncProcess().submitAll(
497       pool, tableName, actions, callback, results);
498     ars.waitUntilDone();
499     if (ars.hasError()) {
500       throw ars.getErrors();
501     }
502   }
503
504   /**
505    * {@inheritDoc}
506    */
507   @Override
508   public void delete(final Delete delete)
509   throws IOException {
510     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
511         this.rpcControllerFactory, getName(), delete.getRow()) {
512       @Override
513       protected Boolean rpcCall() throws Exception {
514         MutateRequest request = RequestConverter.buildMutateRequest(
515           getLocation().getRegionInfo().getRegionName(), delete);
516         MutateResponse response = getStub().mutate(getRpcController(), request);
517         return Boolean.valueOf(response.getProcessed());
518       }
519     };
520     rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
521         this.operationTimeout);
522   }
523
524   /**
525    * {@inheritDoc}
526    */
527   @Override
528   public void delete(final List<Delete> deletes)
529   throws IOException {
530     Object[] results = new Object[deletes.size()];
531     try {
532       batch(deletes, results);
533     } catch (InterruptedException e) {
534       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
535     } finally {
536       // mutate list so that it is empty for complete success, or contains only failed records
537       // results are returned in the same order as the requests in list walk the list backwards,
538       // so we can remove from list without impacting the indexes of earlier members
539       for (int i = results.length - 1; i>=0; i--) {
540         // if result is not null, it succeeded
541         if (results[i] instanceof Result) {
542           deletes.remove(i);
543         }
544       }
545     }
546   }
547
548   /**
549    * {@inheritDoc}
550    * @throws IOException
551    */
552   @Override
553   public void put(final Put put) throws IOException {
554     getBufferedMutator().mutate(put);
555     flushCommits();
556   }
557
558   /**
559    * {@inheritDoc}
560    * @throws IOException
561    */
562   @Override
563   public void put(final List<Put> puts) throws IOException {
564     getBufferedMutator().mutate(puts);
565     flushCommits();
566   }
567
568   /**
569    * {@inheritDoc}
570    */
571   @Override
572   public void mutateRow(final RowMutations rm) throws IOException {
573     CancellableRegionServerCallable<MultiResponse> callable =
574       new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
575           rpcControllerFactory) {
576       @Override
577       protected MultiResponse rpcCall() throws Exception {
578         RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
579             getLocation().getRegionInfo().getRegionName(), rm);
580         regionMutationBuilder.setAtomic(true);
581         MultiRequest request =
582             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
583         ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request);
584         ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
585         if (res.hasException()) {
586           Throwable ex = ProtobufUtil.toException(res.getException());
587           if (ex instanceof IOException) {
588             throw (IOException) ex;
589           }
590           throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
591         }
592         return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
593       }
594     };
595     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
596         null, null, callable, operationTimeout);
597     ars.waitUntilDone();
598     if (ars.hasError()) {
599       throw ars.getErrors();
600     }
601   }
602
603   private static void checkHasFamilies(final Mutation mutation) throws IOException {
604     if (mutation.numFamilies() == 0) {
605       throw new IOException("Invalid arguments to " + mutation + ", zero columns specified");
606     }
607   }
608
609   /**
610    * {@inheritDoc}
611    */
612   @Override
613   public Result append(final Append append) throws IOException {
614     checkHasFamilies(append);
615     NoncedRegionServerCallable<Result> callable =
616         new NoncedRegionServerCallable<Result>(this.connection,
617         this.rpcControllerFactory, getName(), append.getRow()) {
618       @Override
619       protected Result call(PayloadCarryingRpcController controller) throws Exception {
620         MutateRequest request = RequestConverter.buildMutateRequest(
621           getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
622         MutateResponse response = getStub().mutate(controller, request);
623         if (!response.hasResult()) return null;
624         return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
625       }
626     };
627     return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeout).
628         callWithRetries(callable, this.operationTimeout);
629   }
630
631   /**
632    * {@inheritDoc}
633    */
634   @Override
635   public Result increment(final Increment increment) throws IOException {
636     checkHasFamilies(increment);
637     NoncedRegionServerCallable<Result> callable =
638         new NoncedRegionServerCallable<Result>(this.connection,
639         this.rpcControllerFactory, getName(), increment.getRow()) {
640       @Override
641       protected Result call(PayloadCarryingRpcController controller) throws Exception {
642         MutateRequest request = RequestConverter.buildMutateRequest(
643           getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
644         MutateResponse response = getStub().mutate(controller, request);
645         // Should this check for null like append does?
646         return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
647       }
648     };
649     return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
650         this.operationTimeout);
651   }
652
653   /**
654    * {@inheritDoc}
655    */
656   @Override
657   public long incrementColumnValue(final byte [] row, final byte [] family,
658       final byte [] qualifier, final long amount)
659   throws IOException {
660     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
661   }
662
663   /**
664    * {@inheritDoc}
665    */
666   @Override
667   public long incrementColumnValue(final byte [] row, final byte [] family,
668       final byte [] qualifier, final long amount, final Durability durability)
669   throws IOException {
670     NullPointerException npe = null;
671     if (row == null) {
672       npe = new NullPointerException("row is null");
673     } else if (family == null) {
674       npe = new NullPointerException("family is null");
675     } else if (qualifier == null) {
676       npe = new NullPointerException("qualifier is null");
677     }
678     if (npe != null) {
679       throw new IOException(
680           "Invalid arguments to incrementColumnValue", npe);
681     }
682
683     NoncedRegionServerCallable<Long> callable =
684         new NoncedRegionServerCallable<Long>(this.connection, this.rpcControllerFactory, getName(),
685             row) {
686       @Override
687       protected Long call(PayloadCarryingRpcController controller) throws Exception {
688         MutateRequest request = RequestConverter.buildIncrementRequest(
689           getLocation().getRegionInfo().getRegionName(), row, family,
690           qualifier, amount, durability, getNonceGroup(), getNonce());
691         MutateResponse response = getStub().mutate(controller, request);
692         Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
693         return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
694       }
695     };
696     return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeout).
697         callWithRetries(callable, this.operationTimeout);
698   }
699
700   /**
701    * {@inheritDoc}
702    */
703   @Override
704   public boolean checkAndPut(final byte [] row,
705       final byte [] family, final byte [] qualifier, final byte [] value,
706       final Put put)
707   throws IOException {
708     RegionServerCallable<Boolean> callable =
709         new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
710             getName(), row) {
711       @Override
712       protected Boolean rpcCall() throws Exception {
713         MutateRequest request = RequestConverter.buildMutateRequest(
714           getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
715           new BinaryComparator(value), CompareType.EQUAL, put);
716         MutateResponse response = getStub().mutate(getRpcController(), request);
717         return Boolean.valueOf(response.getProcessed());
718       }
719     };
720     return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
721         callWithRetries(callable, this.operationTimeout);
722   }
723
724   /**
725    * {@inheritDoc}
726    */
727   @Override
728   public boolean checkAndPut(final byte [] row, final byte [] family,
729       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
730       final Put put)
731   throws IOException {
732     RegionServerCallable<Boolean> callable =
733         new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
734             getName(), row) {
735       @Override
736       protected Boolean rpcCall() throws Exception {
737         CompareType compareType = CompareType.valueOf(compareOp.name());
738         MutateRequest request = RequestConverter.buildMutateRequest(
739           getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
740           new BinaryComparator(value), compareType, put);
741         MutateResponse response = getStub().mutate(getRpcController(), request);
742         return Boolean.valueOf(response.getProcessed());
743       }
744     };
745     return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
746         callWithRetries(callable, this.operationTimeout);
747   }
748
749   /**
750    * {@inheritDoc}
751    */
752   @Override
753   public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
754       final byte [] value, final Delete delete)
755   throws IOException {
756     RegionServerCallable<Boolean> callable =
757         new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
758             getName(), row) {
759       @Override
760       protected Boolean rpcCall() throws Exception {
761         MutateRequest request = RequestConverter.buildMutateRequest(
762           getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
763           new BinaryComparator(value), CompareType.EQUAL, delete);
764         MutateResponse response = getStub().mutate(getRpcController(), request);
765         return Boolean.valueOf(response.getProcessed());
766       }
767     };
768     return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
769         callWithRetries(callable, this.operationTimeout);
770   }
771
772   /**
773    * {@inheritDoc}
774    */
775   @Override
776   public boolean checkAndDelete(final byte [] row, final byte [] family,
777       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
778       final Delete delete)
779   throws IOException {
780     RegionServerCallable<Boolean> callable =
781         new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
782             getName(), row) {
783       @Override
784       protected Boolean rpcCall() throws Exception {
785         CompareType compareType = CompareType.valueOf(compareOp.name());
786         MutateRequest request = RequestConverter.buildMutateRequest(
787           getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
788           new BinaryComparator(value), compareType, delete);
789         MutateResponse response = getStub().mutate(getRpcController(), request);
790         return Boolean.valueOf(response.getProcessed());
791       }
792     };
793     return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).callWithRetries(callable,
794         this.operationTimeout);
795   }
796
797   /**
798    * {@inheritDoc}
799    */
800   @Override
801   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
802     final CompareOp compareOp, final byte [] value, final RowMutations rm)
803     throws IOException {
804     CancellableRegionServerCallable<MultiResponse> callable =
805       new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
806         rpcControllerFactory) {
807         @Override
808         protected MultiResponse rpcCall() throws Exception {
809           CompareType compareType = CompareType.valueOf(compareOp.name());
810           MultiRequest request = RequestConverter.buildMutateRequest(
811             getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
812             new BinaryComparator(value), compareType, rm);
813           ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request);
814           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
815           if (res.hasException()) {
816             Throwable ex = ProtobufUtil.toException(res.getException());
817             if (ex instanceof IOException) {
818               throw (IOException)ex;
819             }
820             throw new IOException("Failed to checkAndMutate row: "+
821               Bytes.toStringBinary(rm.getRow()), ex);
822           }
823           return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
824         }
825       };
826
827     /**
828      *  Currently, we use one array to store 'processed' flag which is returned by server.
829      *  It is excessive to send such a large array, but that is required by the framework right now
830      * */
831     Object[] results = new Object[rm.getMutations().size()];
832     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
833       null, results, callable, operationTimeout);
834     ars.waitUntilDone();
835     if (ars.hasError()) {
836       throw ars.getErrors();
837     }
838
839     return ((Result)results[0]).getExists();
840   }
841
842   /**
843    * {@inheritDoc}
844    */
845   @Override
846   public boolean exists(final Get get) throws IOException {
847     Result r = get(get, true);
848     assert r.getExists() != null;
849     return r.getExists();
850   }
851
852   /**
853    * {@inheritDoc}
854    */
855   @Override
856   public boolean[] existsAll(final List<Get> gets) throws IOException {
857     if (gets.isEmpty()) return new boolean[]{};
858     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
859
860     ArrayList<Get> exists = new ArrayList<Get>(gets.size());
861     for (Get g: gets){
862       Get ge = new Get(g);
863       ge.setCheckExistenceOnly(true);
864       exists.add(ge);
865     }
866
867     Object[] r1= new Object[exists.size()];
868     try {
869       batch(exists, r1);
870     } catch (InterruptedException e) {
871       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
872     }
873
874     // translate.
875     boolean[] results = new boolean[r1.length];
876     int i = 0;
877     for (Object o : r1) {
878       // batch ensures if there is a failure we get an exception instead
879       results[i++] = ((Result)o).getExists();
880     }
881
882     return results;
883   }
884
885   /**
886    * @throws IOException
887    */
888   void flushCommits() throws IOException {
889     if (mutator == null) {
890       // nothing to flush if there's no mutator; don't bother creating one.
891       return;
892     }
893     getBufferedMutator().flush();
894   }
895
896   /**
897    * Process a mixed batch of Get, Put and Delete actions. All actions for a
898    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
899    *
900    * @param list The collection of actions.
901    * @param results An empty array, same size as list. If an exception is thrown,
902    *   you can test here for partial results, and to determine which actions
903    *   processed successfully.
904    * @throws IOException if there are problems talking to META. Per-item
905    *   exceptions are stored in the results array.
906    */
907   public <R> void processBatchCallback(
908     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
909     throws IOException, InterruptedException {
910     this.batchCallback(list, results, callback);
911   }
912
913
914   /**
915    * Parameterized batch processing, allowing varying return types for different
916    * {@link Row} implementations.
917    */
918   public void processBatch(final List<? extends Row> list, final Object[] results)
919     throws IOException, InterruptedException {
920     this.batch(list, results);
921   }
922
923
924   @Override
925   public void close() throws IOException {
926     if (this.closed) {
927       return;
928     }
929     flushCommits();
930     if (cleanupPoolOnClose) {
931       this.pool.shutdown();
932       try {
933         boolean terminated = false;
934         do {
935           // wait until the pool has terminated
936           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
937         } while (!terminated);
938       } catch (InterruptedException e) {
939         this.pool.shutdownNow();
940         LOG.warn("waitForTermination interrupted");
941       }
942     }
943     if (cleanupConnectionOnClose) {
944       if (this.connection != null) {
945         this.connection.close();
946       }
947     }
948     this.closed = true;
949   }
950
951   // validate for well-formedness
952   public void validatePut(final Put put) throws IllegalArgumentException {
953     validatePut(put, connConfiguration.getMaxKeyValueSize());
954   }
955
956   // validate for well-formedness
957   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
958     if (put.isEmpty()) {
959       throw new IllegalArgumentException("No columns to insert");
960     }
961     if (maxKeyValueSize > 0) {
962       for (List<Cell> list : put.getFamilyCellMap().values()) {
963         for (Cell cell : list) {
964           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
965             throw new IllegalArgumentException("KeyValue size too large");
966           }
967         }
968       }
969     }
970   }
971
972   /**
973    * Returns the maximum size in bytes of the write buffer for this HTable.
974    * <p>
975    * The default value comes from the configuration parameter
976    * {@code hbase.client.write.buffer}.
977    * @return The size of the write buffer in bytes.
978    */
979   @Override
980   public long getWriteBufferSize() {
981     if (mutator == null) {
982       return connConfiguration.getWriteBufferSize();
983     } else {
984       return mutator.getWriteBufferSize();
985     }
986   }
987
988   /**
989    * Sets the size of the buffer in bytes.
990    * <p>
991    * If the new size is less than the current amount of data in the
992    * write buffer, the buffer gets flushed.
993    * @param writeBufferSize The new write buffer size, in bytes.
994    * @throws IOException if a remote or network exception occurs.
995    */
996   @Override
997   public void setWriteBufferSize(long writeBufferSize) throws IOException {
998     getBufferedMutator();
999     mutator.setWriteBufferSize(writeBufferSize);
1000   }
1001
1002   /**
1003    * The pool is used for mutli requests for this HTable
1004    * @return the pool used for mutli
1005    */
1006   ExecutorService getPool() {
1007     return this.pool;
1008   }
1009
1010   /**
1011    * Explicitly clears the region cache to fetch the latest value from META.
1012    * This is a power user function: avoid unless you know the ramifications.
1013    */
1014   public void clearRegionCache() {
1015     this.connection.clearRegionCache();
1016   }
1017
1018   /**
1019    * {@inheritDoc}
1020    */
1021   @Override
1022   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1023     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1024   }
1025
1026   /**
1027    * {@inheritDoc}
1028    */
1029   @Override
1030   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1031       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1032       throws ServiceException, Throwable {
1033     final Map<byte[],R> results =  Collections.synchronizedMap(
1034         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1035     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1036       @Override
1037       public void update(byte[] region, byte[] row, R value) {
1038         if (region != null) {
1039           results.put(region, value);
1040         }
1041       }
1042     });
1043     return results;
1044   }
1045
1046   /**
1047    * {@inheritDoc}
1048    */
1049   @Override
1050   public <T extends Service, R> void coprocessorService(final Class<T> service,
1051       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1052       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1053
1054     // get regions covered by the row range
1055     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1056
1057     Map<byte[],Future<R>> futures =
1058         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1059     for (final byte[] r : keys) {
1060       final RegionCoprocessorRpcChannel channel =
1061           new RegionCoprocessorRpcChannel(connection, tableName, r);
1062       Future<R> future = pool.submit(new Callable<R>() {
1063         @Override
1064         public R call() throws Exception {
1065           T instance = ProtobufUtil.newServiceStub(service, channel);
1066           R result = callable.call(instance);
1067           byte[] region = channel.getLastRegion();
1068           if (callback != null) {
1069             callback.update(region, r, result);
1070           }
1071           return result;
1072         }
1073       });
1074       futures.put(r, future);
1075     }
1076     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1077       try {
1078         e.getValue().get();
1079       } catch (ExecutionException ee) {
1080         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1081             + Bytes.toStringBinary(e.getKey()), ee);
1082         throw ee.getCause();
1083       } catch (InterruptedException ie) {
1084         throw new InterruptedIOException("Interrupted calling coprocessor service "
1085             + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1086       }
1087     }
1088   }
1089
1090   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1091   throws IOException {
1092     if (start == null) {
1093       start = HConstants.EMPTY_START_ROW;
1094     }
1095     if (end == null) {
1096       end = HConstants.EMPTY_END_ROW;
1097     }
1098     return getKeysAndRegionsInRange(start, end, true).getFirst();
1099   }
1100
1101   @Override
1102   public void setOperationTimeout(int operationTimeout) {
1103     this.operationTimeout = operationTimeout;
1104   }
1105
1106   @Override
1107   public int getOperationTimeout() {
1108     return operationTimeout;
1109   }
1110
1111   @Override
1112   @Deprecated
1113   public int getRpcTimeout() {
1114     return readRpcTimeout;
1115   }
1116
1117   @Override
1118   @Deprecated
1119   public void setRpcTimeout(int rpcTimeout) {
1120     this.readRpcTimeout = rpcTimeout;
1121     this.writeRpcTimeout = rpcTimeout;
1122   }
1123
1124   @Override
1125   public int getWriteRpcTimeout() {
1126     return writeRpcTimeout;
1127   }
1128
1129   @Override
1130   public void setWriteRpcTimeout(int writeRpcTimeout) {
1131     this.writeRpcTimeout = writeRpcTimeout;
1132   }
1133
1134   @Override
1135   public int getReadRpcTimeout() { return readRpcTimeout; }
1136
1137   @Override
1138   public void setReadRpcTimeout(int readRpcTimeout) {
1139     this.readRpcTimeout = readRpcTimeout;
1140   }
1141
1142   @Override
1143   public String toString() {
1144     return tableName + ";" + connection;
1145   }
1146
1147   @Override
1148   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1149       Descriptors.MethodDescriptor methodDescriptor, Message request,
1150       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1151     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1152         Bytes.BYTES_COMPARATOR));
1153     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1154         new Callback<R>() {
1155       @Override
1156       public void update(byte[] region, byte[] row, R result) {
1157         if (region != null) {
1158           results.put(region, result);
1159         }
1160       }
1161     });
1162     return results;
1163   }
1164
1165   /**
1166    * {@inheritDoc}
1167    */
1168   @Override
1169   public <R extends Message> void batchCoprocessorService(
1170       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1171       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1172       throws ServiceException, Throwable {
1173
1174     if (startKey == null) {
1175       startKey = HConstants.EMPTY_START_ROW;
1176     }
1177     if (endKey == null) {
1178       endKey = HConstants.EMPTY_END_ROW;
1179     }
1180     // get regions covered by the row range
1181     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1182         getKeysAndRegionsInRange(startKey, endKey, true);
1183     List<byte[]> keys = keysAndRegions.getFirst();
1184     List<HRegionLocation> regions = keysAndRegions.getSecond();
1185
1186     // check if we have any calls to make
1187     if (keys.isEmpty()) {
1188       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1189           ", end=" + Bytes.toStringBinary(endKey));
1190       return;
1191     }
1192
1193     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1194     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1195         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1196     for (int i = 0; i < keys.size(); i++) {
1197       final byte[] rowKey = keys.get(i);
1198       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1199       RegionCoprocessorServiceExec exec =
1200           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1201       execs.add(exec);
1202       execsByRow.put(rowKey, exec);
1203     }
1204
1205     // tracking for any possible deserialization errors on success callback
1206     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1207     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1208     final List<Row> callbackErrorActions = new ArrayList<Row>();
1209     final List<String> callbackErrorServers = new ArrayList<String>();
1210     Object[] results = new Object[execs.size()];
1211
1212     AsyncProcess asyncProcess =
1213         new AsyncProcess(connection, configuration, pool,
1214             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1215             true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
1216
1217     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1218         new Callback<ClientProtos.CoprocessorServiceResult>() {
1219           @Override
1220           public void update(byte[] region, byte[] row,
1221                               ClientProtos.CoprocessorServiceResult serviceResult) {
1222             if (LOG.isTraceEnabled()) {
1223               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1224                   ": region=" + Bytes.toStringBinary(region) +
1225                   ", row=" + Bytes.toStringBinary(row) +
1226                   ", value=" + serviceResult.getValue().getValue());
1227             }
1228             try {
1229               Message.Builder builder = responsePrototype.newBuilderForType();
1230               ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1231               callback.update(region, row, (R) builder.build());
1232             } catch (IOException e) {
1233               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1234                   e);
1235               callbackErrorExceptions.add(e);
1236               callbackErrorActions.add(execsByRow.get(row));
1237               callbackErrorServers.add("null");
1238             }
1239           }
1240         }, results);
1241
1242     future.waitUntilDone();
1243
1244     if (future.hasError()) {
1245       throw future.getErrors();
1246     } else if (!callbackErrorExceptions.isEmpty()) {
1247       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1248           callbackErrorServers);
1249     }
1250   }
1251
1252   public RegionLocator getRegionLocator() {
1253     return this.locator;
1254   }
1255
1256   @VisibleForTesting
1257   BufferedMutator getBufferedMutator() throws IOException {
1258     if (mutator == null) {
1259       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1260           new BufferedMutatorParams(tableName)
1261               .pool(pool)
1262               .writeBufferSize(connConfiguration.getWriteBufferSize())
1263               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
1264       );
1265     }
1266     return mutator;
1267   }
1268 }