View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.TreeMap;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.SynchronousQueue;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionLocation;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.KeyValueUtil;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.classification.InterfaceStability;
47  import org.apache.hadoop.hbase.client.coprocessor.Batch;
48  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
49  import org.apache.hadoop.hbase.filter.BinaryComparator;
50  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
51  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
52  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
53  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
54  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
55  import org.apache.hadoop.hbase.protobuf.RequestConverter;
56  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
57  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
58  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
59  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
60  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
61  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
62  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
63  import org.apache.hadoop.hbase.util.Bytes;
64  import org.apache.hadoop.hbase.util.Pair;
65  import org.apache.hadoop.hbase.util.ReflectionUtils;
66  import org.apache.hadoop.hbase.util.Threads;
67
68  import com.google.common.annotations.VisibleForTesting;
69  // DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
70  // Internally, we use shaded protobuf. This below are part of our public API.
71  import com.google.protobuf.Descriptors;
72  import com.google.protobuf.Message;
73  import com.google.protobuf.Service;
74  import com.google.protobuf.ServiceException;
75  // SEE ABOVE NOTE!
76
77  /**
78   * An implementation of {@link Table}. Used to communicate with a single HBase table.
79   * Lightweight. Get as needed and just close when done.
80   * Instances of this class SHOULD NOT be constructed directly.
81   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
82   * class comment for an example of how.
83   *
84   * <p>This class is NOT thread safe for reads nor writes.
85   * In the case of writes (Put, Delete), the underlying write buffer can
86   * be corrupted if multiple threads contend over a single HTable instance.
87   * In the case of reads, some fields used by a Scan are shared among all threads.
88   *
89   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
90   * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
91   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
92   * Interface Classification</a>
93   * There are no guarantees for backwards source / binary compatibility and methods or class can
94   * change or go away without deprecation.
95   *
96   * @see Table
97   * @see Admin
98   * @see Connection
99   * @see ConnectionFactory
100  */
101 @InterfaceAudience.Private
102 @InterfaceStability.Stable
103 public class HTable implements Table {
104   private static final Log LOG = LogFactory.getLog(HTable.class);
105   protected ClusterConnection connection;
106   private final TableName tableName;
107   private volatile Configuration configuration;
108   private ConnectionConfiguration connConfiguration;
109   protected BufferedMutatorImpl mutator;
110   private boolean closed = false;
111   protected int scannerCaching;
112   protected long scannerMaxResultSize;
113   private ExecutorService pool;  // For Multi & Scan
114   private int operationTimeout; // global timeout for each blocking method with retrying rpc
115   private int readRpcTimeout; // timeout for each read rpc request
116   private int writeRpcTimeout; // timeout for each write rpc request
117   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
118   private final boolean cleanupConnectionOnClose; // close the connection in close()
119   private Consistency defaultConsistency = Consistency.STRONG;
120   private HRegionLocator locator;
121
122   /** The Async process for batch */
123   protected AsyncProcess multiAp;
124   private RpcRetryingCallerFactory rpcCallerFactory;
125   private RpcControllerFactory rpcControllerFactory;
126
127   // Marked Private @since 1.0
128   @InterfaceAudience.Private
129   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
130     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
131     if (maxThreads == 0) {
132       maxThreads = 1; // is there a better default?
133     }
134     int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
135     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
136
137     // Using the "direct handoff" approach, new threads will only be created
138     // if it is necessary and will grow unbounded. This could be bad but in HCM
139     // we only create as many Runnables as there are region servers. It means
140     // it also scales when new region servers are added.
141     ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
142       TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
143     pool.allowCoreThreadTimeOut(true);
144     return pool;
145   }
146
147   /**
148    * Creates an object to access a HBase table.
149    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
150    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
151    * @param tableName Name of the table.
152    * @param connection Connection to be used.
153    * @param pool ExecutorService to be used.
154    * @throws IOException if a remote or network exception occurs
155    */
156   @InterfaceAudience.Private
157   protected HTable(TableName tableName, final ClusterConnection connection,
158       final ConnectionConfiguration tableConfig,
159       final RpcRetryingCallerFactory rpcCallerFactory,
160       final RpcControllerFactory rpcControllerFactory,
161       final ExecutorService pool) throws IOException {
162     if (connection == null || connection.isClosed()) {
163       throw new IllegalArgumentException("Connection is null or closed.");
164     }
165     this.tableName = tableName;
166     this.cleanupConnectionOnClose = false;
167     this.connection = connection;
168     this.configuration = connection.getConfiguration();
169     this.connConfiguration = tableConfig;
170     this.pool = pool;
171     if (pool == null) {
172       this.pool = getDefaultExecutor(this.configuration);
173       this.cleanupPoolOnClose = true;
174     } else {
175       this.cleanupPoolOnClose = false;
176     }
177
178     this.rpcCallerFactory = rpcCallerFactory;
179     this.rpcControllerFactory = rpcControllerFactory;
180
181     this.finishSetup();
182   }
183
184   /**
185    * For internal testing. Uses Connection provided in {@code params}.
186    * @throws IOException
187    */
188   @VisibleForTesting
189   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
190     connection = conn;
191     tableName = params.getTableName();
192     connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
193     cleanupPoolOnClose = false;
194     cleanupConnectionOnClose = false;
195     // used from tests, don't trust the connection is real
196     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
197     this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
198         conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
199             HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
200     this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
201         conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
202             HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
203   }
204
205   /**
206    * @return maxKeyValueSize from configuration.
207    */
208   public static int getMaxKeyValueSize(Configuration conf) {
209     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
210   }
211
212   /**
213    * setup this HTable's parameter based on the passed configuration
214    */
215   private void finishSetup() throws IOException {
216     if (connConfiguration == null) {
217       connConfiguration = new ConnectionConfiguration(configuration);
218     }
219
220     this.operationTimeout = tableName.isSystemTable() ?
221         connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
222     this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
223         configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
224             HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
225     this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
226         configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
227             HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
228     this.scannerCaching = connConfiguration.getScannerCaching();
229     this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
230     if (this.rpcCallerFactory == null) {
231       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
232     }
233     if (this.rpcControllerFactory == null) {
234       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
235     }
236
237     // puts need to track errors globally due to how the APIs currently work.
238     multiAp = this.connection.getAsyncProcess();
239     this.locator = new HRegionLocator(getName(), connection);
240   }
241
242   /**
243    * {@inheritDoc}
244    */
245   @Override
246   public Configuration getConfiguration() {
247     return configuration;
248   }
249
250   @Override
251   public TableName getName() {
252     return tableName;
253   }
254
255   /**
256    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
257    * manipulations.
258    * @return A Connection instance.
259    */
260   @VisibleForTesting
261   protected Connection getConnection() {
262     return this.connection;
263   }
264
265   /**
266    * {@inheritDoc}
267    */
268   @Override
269   public HTableDescriptor getTableDescriptor() throws IOException {
270     HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
271       rpcControllerFactory, operationTimeout, readRpcTimeout);
272     if (htd != null) {
273       return new UnmodifyableHTableDescriptor(htd);
274     }
275     return null;
276   }
277
278   /**
279    * Get the corresponding start keys and regions for an arbitrary range of
280    * keys.
281    * <p>
282    * @param startKey Starting row in range, inclusive
283    * @param endKey Ending row in range
284    * @param includeEndKey true if endRow is inclusive, false if exclusive
285    * @return A pair of list of start keys and list of HRegionLocations that
286    *         contain the specified range
287    * @throws IOException if a remote or network exception occurs
288    */
289   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
290       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
291       throws IOException {
292     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
293   }
294
295   /**
296    * Get the corresponding start keys and regions for an arbitrary range of
297    * keys.
298    * <p>
299    * @param startKey Starting row in range, inclusive
300    * @param endKey Ending row in range
301    * @param includeEndKey true if endRow is inclusive, false if exclusive
302    * @param reload true to reload information or false to use cached information
303    * @return A pair of list of start keys and list of HRegionLocations that
304    *         contain the specified range
305    * @throws IOException if a remote or network exception occurs
306    */
307   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
308       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
309       final boolean reload) throws IOException {
310     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
311     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
312       throw new IllegalArgumentException(
313         "Invalid range: " + Bytes.toStringBinary(startKey) +
314         " > " + Bytes.toStringBinary(endKey));
315     }
316     List<byte[]> keysInRange = new ArrayList<byte[]>();
317     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
318     byte[] currentKey = startKey;
319     do {
320       HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
321       keysInRange.add(currentKey);
322       regionsInRange.add(regionLocation);
323       currentKey = regionLocation.getRegionInfo().getEndKey();
324     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
325         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
326             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
327     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
328         regionsInRange);
329   }
330
331   /**
332    * The underlying {@link HTable} must not be closed.
333    * {@link Table#getScanner(Scan)} has other usage details.
334    */
335   @Override
336   public ResultScanner getScanner(final Scan scan) throws IOException {
337     if (scan.getBatch() > 0 && scan.isSmall()) {
338       throw new IllegalArgumentException("Small scan should not be used with batching");
339     }
340
341     if (scan.getCaching() <= 0) {
342       scan.setCaching(scannerCaching);
343     }
344     if (scan.getMaxResultSize() <= 0) {
345       scan.setMaxResultSize(scannerMaxResultSize);
346     }
347
348     Boolean async = scan.isAsyncPrefetch();
349     if (async == null) {
350       async = connConfiguration.isClientScannerAsyncPrefetch();
351     }
352
353     if (scan.isReversed()) {
354       if (scan.isSmall()) {
355         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
356             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
357             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
358       } else {
359         return new ReversedClientScanner(getConfiguration(), scan, getName(),
360             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
361             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
362       }
363     }
364
365     if (scan.isSmall()) {
366       return new ClientSmallScanner(getConfiguration(), scan, getName(),
367           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
368           pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
369     } else {
370       if (async) {
371         return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,
372             this.rpcCallerFactory, this.rpcControllerFactory,
373             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
374       } else {
375         return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection,
376             this.rpcCallerFactory, this.rpcControllerFactory,
377             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
378       }
379     }
380   }
381
382   /**
383    * The underlying {@link HTable} must not be closed.
384    * {@link Table#getScanner(byte[])} has other usage details.
385    */
386   @Override
387   public ResultScanner getScanner(byte [] family) throws IOException {
388     Scan scan = new Scan();
389     scan.addFamily(family);
390     return getScanner(scan);
391   }
392
393   /**
394    * The underlying {@link HTable} must not be closed.
395    * {@link Table#getScanner(byte[], byte[])} has other usage details.
396    */
397   @Override
398   public ResultScanner getScanner(byte [] family, byte [] qualifier)
399   throws IOException {
400     Scan scan = new Scan();
401     scan.addColumn(family, qualifier);
402     return getScanner(scan);
403   }
404
405   /**
406    * {@inheritDoc}
407    */
408   @Override
409   public Result get(final Get get) throws IOException {
410     return get(get, get.isCheckExistenceOnly());
411   }
412
413   private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
414     // if we are changing settings to the get, clone it.
415     if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
416       get = ReflectionUtils.newInstance(get.getClass(), get);
417       get.setCheckExistenceOnly(checkExistenceOnly);
418       if (get.getConsistency() == null){
419         get.setConsistency(defaultConsistency);
420       }
421     }
422
423     if (get.getConsistency() == Consistency.STRONG) {
424       // Good old call.
425       final Get configuredGet = get;
426       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
427           this.rpcControllerFactory, getName(), get.getRow()) {
428         @Override
429         protected Result rpcCall() throws Exception {
430           ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
431               getLocation().getRegionInfo().getRegionName(), configuredGet);
432           ClientProtos.GetResponse response = getStub().get(getRpcController(), request);
433           if (response == null) return null;
434           return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
435         }
436       };
437       return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
438           this.operationTimeout);
439     }
440
441     // Call that takes into account the replica
442     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
443         rpcControllerFactory, tableName, this.connection, get, pool,
444         connConfiguration.getRetriesNumber(),
445         operationTimeout,
446         connConfiguration.getPrimaryCallTimeoutMicroSecond());
447     return callable.call(operationTimeout);
448   }
449
450   /**
451    * {@inheritDoc}
452    */
453   @Override
454   public Result[] get(List<Get> gets) throws IOException {
455     if (gets.size() == 1) {
456       return new Result[]{get(gets.get(0))};
457     }
458     try {
459       Object[] r1 = new Object[gets.size()];
460       batch((List<? extends Row>)gets, r1, readRpcTimeout);
461       // Translate.
462       Result [] results = new Result[r1.length];
463       int i = 0;
464       for (Object obj: r1) {
465         // Batch ensures if there is a failure we get an exception instead
466         results[i++] = (Result)obj;
467       }
468       return results;
469     } catch (InterruptedException e) {
470       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
471     }
472   }
473
474   /**
475    * {@inheritDoc}
476    */
477   @Override
478   public void batch(final List<? extends Row> actions, final Object[] results)
479       throws InterruptedException, IOException {
480     batch(actions, results, -1);
481   }
482
483   public void batch(final List<? extends Row> actions, final Object[] results, int timeout)
484       throws InterruptedException, IOException {
485     AsyncRequestFuture ars = null;
486     if (timeout != -1) {
487       ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout);
488     } else {
489       // use default timeout in AP
490       ars = multiAp.submitAll(pool, tableName, actions, null, results);
491     }
492     ars.waitUntilDone();
493     if (ars.hasError()) {
494       throw ars.getErrors();
495     }
496   }
497
498   /**
499    * {@inheritDoc}
500    */
501   @Override
502   public <R> void batchCallback(
503     final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
504     throws IOException, InterruptedException {
505     doBatchWithCallback(actions, results, callback, connection, pool, tableName);
506   }
507
508   public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
509     Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
510     throws InterruptedIOException, RetriesExhaustedWithDetailsException {
511     AsyncRequestFuture ars = connection.getAsyncProcess().submitAll(
512       pool, tableName, actions, callback, results);
513     ars.waitUntilDone();
514     if (ars.hasError()) {
515       throw ars.getErrors();
516     }
517   }
518
519   /**
520    * {@inheritDoc}
521    */
522   @Override
523   public void delete(final Delete delete)
524   throws IOException {
525     CancellableRegionServerCallable<SingleResponse> callable =
526         new CancellableRegionServerCallable<SingleResponse>(
527             connection, getName(), delete.getRow(), this.rpcControllerFactory) {
528       @Override
529       protected SingleResponse rpcCall() throws Exception {
530         MutateRequest request = RequestConverter.buildMutateRequest(
531           getLocation().getRegionInfo().getRegionName(), delete);
532         MutateResponse response = getStub().mutate(getRpcController(), request);
533         return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
534       }
535     };
536     List<Row> rows = new ArrayList<Row>();
537     rows.add(delete);
538     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
539         null, null, callable, operationTimeout);
540     ars.waitUntilDone();
541     if (ars.hasError()) {
542       throw ars.getErrors();
543     }
544   }
545
546   /**
547    * {@inheritDoc}
548    */
549   @Override
550   public void delete(final List<Delete> deletes)
551   throws IOException {
552     Object[] results = new Object[deletes.size()];
553     try {
554       batch(deletes, results, writeRpcTimeout);
555     } catch (InterruptedException e) {
556       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
557     } finally {
558       // mutate list so that it is empty for complete success, or contains only failed records
559       // results are returned in the same order as the requests in list walk the list backwards,
560       // so we can remove from list without impacting the indexes of earlier members
561       for (int i = results.length - 1; i>=0; i--) {
562         // if result is not null, it succeeded
563         if (results[i] instanceof Result) {
564           deletes.remove(i);
565         }
566       }
567     }
568   }
569
570   /**
571    * {@inheritDoc}
572    * @throws IOException
573    */
574   @Override
575   public void put(final Put put) throws IOException {
576     getBufferedMutator().mutate(put);
577     flushCommits();
578   }
579
580   /**
581    * {@inheritDoc}
582    * @throws IOException
583    */
584   @Override
585   public void put(final List<Put> puts) throws IOException {
586     getBufferedMutator().mutate(puts);
587     flushCommits();
588   }
589
590   /**
591    * {@inheritDoc}
592    */
593   @Override
594   public void mutateRow(final RowMutations rm) throws IOException {
595     CancellableRegionServerCallable<MultiResponse> callable =
596       new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
597           rpcControllerFactory) {
598       @Override
599       protected MultiResponse rpcCall() throws Exception {
600         RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
601             getLocation().getRegionInfo().getRegionName(), rm);
602         regionMutationBuilder.setAtomic(true);
603         MultiRequest request =
604             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
605         ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request);
606         ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
607         if (res.hasException()) {
608           Throwable ex = ProtobufUtil.toException(res.getException());
609           if (ex instanceof IOException) {
610             throw (IOException) ex;
611           }
612           throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
613         }
614         return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
615       }
616     };
617     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
618         null, null, callable, operationTimeout);
619     ars.waitUntilDone();
620     if (ars.hasError()) {
621       throw ars.getErrors();
622     }
623   }
624
625   private static void checkHasFamilies(final Mutation mutation) throws IOException {
626     if (mutation.numFamilies() == 0) {
627       throw new IOException("Invalid arguments to " + mutation + ", zero columns specified");
628     }
629   }
630
631   /**
632    * {@inheritDoc}
633    */
634   @Override
635   public Result append(final Append append) throws IOException {
636     checkHasFamilies(append);
637     NoncedRegionServerCallable<Result> callable =
638         new NoncedRegionServerCallable<Result>(this.connection, this.rpcControllerFactory,
639             getName(), append.getRow()) {
640           @Override
641           protected Result rpcCall() throws Exception {
642             MutateRequest request = RequestConverter.buildMutateRequest(
643               getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
644             MutateResponse response = getStub().mutate(getRpcController(), request);
645             if (!response.hasResult()) return null;
646             return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
647           }
648         };
649     return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeout).
650         callWithRetries(callable, this.operationTimeout);
651   }
652
653   /**
654    * {@inheritDoc}
655    */
656   @Override
657   public Result increment(final Increment increment) throws IOException {
658     checkHasFamilies(increment);
659     NoncedRegionServerCallable<Result> callable =
660       new NoncedRegionServerCallable<Result>(this.connection,
661       this.rpcControllerFactory, getName(), increment.getRow()) {
662         @Override
663         protected Result rpcCall() throws Exception {
664           MutateRequest request = RequestConverter.buildMutateRequest(
665             getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
666           MutateResponse response = getStub().mutate(getRpcController(), request);
667           // Should this check for null like append does?
668           return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
669         }
670     };
671     return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
672         this.operationTimeout);
673   }
674
675   /**
676    * {@inheritDoc}
677    */
678   @Override
679   public long incrementColumnValue(final byte [] row, final byte [] family,
680       final byte [] qualifier, final long amount)
681   throws IOException {
682     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
683   }
684
685   /**
686    * {@inheritDoc}
687    */
688   @Override
689   public long incrementColumnValue(final byte [] row, final byte [] family,
690       final byte [] qualifier, final long amount, final Durability durability)
691   throws IOException {
692     NullPointerException npe = null;
693     if (row == null) {
694       npe = new NullPointerException("row is null");
695     } else if (family == null) {
696       npe = new NullPointerException("family is null");
697     } else if (qualifier == null) {
698       npe = new NullPointerException("qualifier is null");
699     }
700     if (npe != null) {
701       throw new IOException(
702           "Invalid arguments to incrementColumnValue", npe);
703     }
704
705     NoncedRegionServerCallable<Long> callable =
706         new NoncedRegionServerCallable<Long>(this.connection, this.rpcControllerFactory, getName(),
707             row) {
708       @Override
709       protected Long rpcCall() throws Exception {
710         MutateRequest request = RequestConverter.buildIncrementRequest(
711             getLocation().getRegionInfo().getRegionName(), row, family,
712             qualifier, amount, durability, getNonceGroup(), getNonce());
713         MutateResponse response = getStub().mutate(getRpcController(), request);
714         Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
715         return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
716       }
717     };
718     return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeout).
719         callWithRetries(callable, this.operationTimeout);
720   }
721
722   /**
723    * {@inheritDoc}
724    */
725   @Override
726   public boolean checkAndPut(final byte [] row,
727       final byte [] family, final byte [] qualifier, final byte [] value,
728       final Put put)
729   throws IOException {
730     return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put);
731   }
732
733   /**
734    * {@inheritDoc}
735    */
736   @Override
737   public boolean checkAndPut(final byte [] row, final byte [] family,
738       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
739       final Put put)
740   throws IOException {
741     RegionServerCallable<Boolean> callable =
742         new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
743             getName(), row) {
744       @Override
745       protected Boolean rpcCall() throws Exception {
746         CompareType compareType = CompareType.valueOf(compareOp.name());
747         MutateRequest request = RequestConverter.buildMutateRequest(
748           getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
749           new BinaryComparator(value), compareType, put);
750         MutateResponse response = getStub().mutate(getRpcController(), request);
751         return Boolean.valueOf(response.getProcessed());
752       }
753     };
754     return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
755         callWithRetries(callable, this.operationTimeout);
756   }
757
758   /**
759    * {@inheritDoc}
760    */
761   @Override
762   public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
763       final byte [] value, final Delete delete)
764   throws IOException {
765     return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete);
766   }
767
768   /**
769    * {@inheritDoc}
770    */
771   @Override
772   public boolean checkAndDelete(final byte [] row, final byte [] family,
773       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
774       final Delete delete)
775   throws IOException {
776     CancellableRegionServerCallable<SingleResponse> callable =
777         new CancellableRegionServerCallable<SingleResponse>(
778             this.connection, getName(), row, this.rpcControllerFactory) {
779       @Override
780       protected SingleResponse rpcCall() throws Exception {
781         CompareType compareType = CompareType.valueOf(compareOp.name());
782         MutateRequest request = RequestConverter.buildMutateRequest(
783           getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
784           new BinaryComparator(value), compareType, delete);
785         MutateResponse response = getStub().mutate(getRpcController(), request);
786         return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
787       }
788     };
789     List<Row> rows = new ArrayList<Row>();
790     rows.add(delete);
791
792     Object[] results = new Object[1];
793     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
794         null, results, callable, operationTimeout);
795     ars.waitUntilDone();
796     if (ars.hasError()) {
797       throw ars.getErrors();
798     }
799     return ((SingleResponse.Entry)results[0]).isProcessed();
800   }
801
802   /**
803    * {@inheritDoc}
804    */
805   @Override
806   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
807     final CompareOp compareOp, final byte [] value, final RowMutations rm)
808     throws IOException {
809     CancellableRegionServerCallable<MultiResponse> callable =
810       new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
811         rpcControllerFactory) {
812         @Override
813         protected MultiResponse rpcCall() throws Exception {
814           CompareType compareType = CompareType.valueOf(compareOp.name());
815           MultiRequest request = RequestConverter.buildMutateRequest(
816             getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
817             new BinaryComparator(value), compareType, rm);
818           ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request);
819           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
820           if (res.hasException()) {
821             Throwable ex = ProtobufUtil.toException(res.getException());
822             if (ex instanceof IOException) {
823               throw (IOException)ex;
824             }
825             throw new IOException("Failed to checkAndMutate row: "+
826               Bytes.toStringBinary(rm.getRow()), ex);
827           }
828           return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
829         }
830       };
831
832     /**
833      *  Currently, we use one array to store 'processed' flag which is returned by server.
834      *  It is excessive to send such a large array, but that is required by the framework right now
835      * */
836     Object[] results = new Object[rm.getMutations().size()];
837     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
838       null, results, callable, operationTimeout);
839     ars.waitUntilDone();
840     if (ars.hasError()) {
841       throw ars.getErrors();
842     }
843
844     return ((Result)results[0]).getExists();
845   }
846
847   /**
848    * {@inheritDoc}
849    */
850   @Override
851   public boolean exists(final Get get) throws IOException {
852     Result r = get(get, true);
853     assert r.getExists() != null;
854     return r.getExists();
855   }
856
857   /**
858    * {@inheritDoc}
859    */
860   @Override
861   public boolean[] existsAll(final List<Get> gets) throws IOException {
862     if (gets.isEmpty()) return new boolean[]{};
863     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
864
865     ArrayList<Get> exists = new ArrayList<Get>(gets.size());
866     for (Get g: gets){
867       Get ge = new Get(g);
868       ge.setCheckExistenceOnly(true);
869       exists.add(ge);
870     }
871
872     Object[] r1= new Object[exists.size()];
873     try {
874       batch(exists, r1, readRpcTimeout);
875     } catch (InterruptedException e) {
876       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
877     }
878
879     // translate.
880     boolean[] results = new boolean[r1.length];
881     int i = 0;
882     for (Object o : r1) {
883       // batch ensures if there is a failure we get an exception instead
884       results[i++] = ((Result)o).getExists();
885     }
886
887     return results;
888   }
889
890   /**
891    * @throws IOException
892    */
893   void flushCommits() throws IOException {
894     if (mutator == null) {
895       // nothing to flush if there's no mutator; don't bother creating one.
896       return;
897     }
898     getBufferedMutator().flush();
899   }
900
901   /**
902    * Process a mixed batch of Get, Put and Delete actions. All actions for a
903    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
904    *
905    * @param list The collection of actions.
906    * @param results An empty array, same size as list. If an exception is thrown,
907    *   you can test here for partial results, and to determine which actions
908    *   processed successfully.
909    * @throws IOException if there are problems talking to META. Per-item
910    *   exceptions are stored in the results array.
911    */
912   public <R> void processBatchCallback(
913     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
914     throws IOException, InterruptedException {
915     this.batchCallback(list, results, callback);
916   }
917
918   @Override
919   public void close() throws IOException {
920     if (this.closed) {
921       return;
922     }
923     flushCommits();
924     if (cleanupPoolOnClose) {
925       this.pool.shutdown();
926       try {
927         boolean terminated = false;
928         do {
929           // wait until the pool has terminated
930           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
931         } while (!terminated);
932       } catch (InterruptedException e) {
933         this.pool.shutdownNow();
934         LOG.warn("waitForTermination interrupted");
935       }
936     }
937     if (cleanupConnectionOnClose) {
938       if (this.connection != null) {
939         this.connection.close();
940       }
941     }
942     this.closed = true;
943   }
944
945   // validate for well-formedness
946   public void validatePut(final Put put) throws IllegalArgumentException {
947     validatePut(put, connConfiguration.getMaxKeyValueSize());
948   }
949
950   // validate for well-formedness
951   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
952     if (put.isEmpty()) {
953       throw new IllegalArgumentException("No columns to insert");
954     }
955     if (maxKeyValueSize > 0) {
956       for (List<Cell> list : put.getFamilyCellMap().values()) {
957         for (Cell cell : list) {
958           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
959             throw new IllegalArgumentException("KeyValue size too large");
960           }
961         }
962       }
963     }
964   }
965
966   /**
967    * Returns the maximum size in bytes of the write buffer for this HTable.
968    * <p>
969    * The default value comes from the configuration parameter
970    * {@code hbase.client.write.buffer}.
971    * @return The size of the write buffer in bytes.
972    */
973   @Override
974   public long getWriteBufferSize() {
975     if (mutator == null) {
976       return connConfiguration.getWriteBufferSize();
977     } else {
978       return mutator.getWriteBufferSize();
979     }
980   }
981
982   /**
983    * Sets the size of the buffer in bytes.
984    * <p>
985    * If the new size is less than the current amount of data in the
986    * write buffer, the buffer gets flushed.
987    * @param writeBufferSize The new write buffer size, in bytes.
988    * @throws IOException if a remote or network exception occurs.
989    */
990   @Override
991   public void setWriteBufferSize(long writeBufferSize) throws IOException {
992     getBufferedMutator();
993     mutator.setWriteBufferSize(writeBufferSize);
994   }
995
996   /**
997    * The pool is used for mutli requests for this HTable
998    * @return the pool used for mutli
999    */
1000   ExecutorService getPool() {
1001     return this.pool;
1002   }
1003
1004   /**
1005    * Explicitly clears the region cache to fetch the latest value from META.
1006    * This is a power user function: avoid unless you know the ramifications.
1007    */
1008   public void clearRegionCache() {
1009     this.connection.clearRegionCache();
1010   }
1011
1012   /**
1013    * {@inheritDoc}
1014    */
1015   @Override
1016   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1017     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1018   }
1019
1020   /**
1021    * {@inheritDoc}
1022    */
1023   @Override
1024   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1025       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1026       throws ServiceException, Throwable {
1027     final Map<byte[],R> results =  Collections.synchronizedMap(
1028         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1029     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1030       @Override
1031       public void update(byte[] region, byte[] row, R value) {
1032         if (region != null) {
1033           results.put(region, value);
1034         }
1035       }
1036     });
1037     return results;
1038   }
1039
1040   /**
1041    * {@inheritDoc}
1042    */
1043   @Override
1044   public <T extends Service, R> void coprocessorService(final Class<T> service,
1045       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1046       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1047
1048     // get regions covered by the row range
1049     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1050
1051     Map<byte[],Future<R>> futures =
1052         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1053     for (final byte[] r : keys) {
1054       final RegionCoprocessorRpcChannel channel =
1055           new RegionCoprocessorRpcChannel(connection, tableName, r);
1056       Future<R> future = pool.submit(new Callable<R>() {
1057         @Override
1058         public R call() throws Exception {
1059           T instance = ProtobufUtil.newServiceStub(service, channel);
1060           R result = callable.call(instance);
1061           byte[] region = channel.getLastRegion();
1062           if (callback != null) {
1063             callback.update(region, r, result);
1064           }
1065           return result;
1066         }
1067       });
1068       futures.put(r, future);
1069     }
1070     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1071       try {
1072         e.getValue().get();
1073       } catch (ExecutionException ee) {
1074         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1075             + Bytes.toStringBinary(e.getKey()), ee);
1076         throw ee.getCause();
1077       } catch (InterruptedException ie) {
1078         throw new InterruptedIOException("Interrupted calling coprocessor service "
1079             + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1080       }
1081     }
1082   }
1083
1084   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1085   throws IOException {
1086     if (start == null) {
1087       start = HConstants.EMPTY_START_ROW;
1088     }
1089     if (end == null) {
1090       end = HConstants.EMPTY_END_ROW;
1091     }
1092     return getKeysAndRegionsInRange(start, end, true).getFirst();
1093   }
1094
1095   @Override
1096   public void setOperationTimeout(int operationTimeout) {
1097     this.operationTimeout = operationTimeout;
1098   }
1099
1100   @Override
1101   public int getOperationTimeout() {
1102     return operationTimeout;
1103   }
1104
1105   @Override
1106   @Deprecated
1107   public int getRpcTimeout() {
1108     return readRpcTimeout;
1109   }
1110
1111   @Override
1112   @Deprecated
1113   public void setRpcTimeout(int rpcTimeout) {
1114     this.readRpcTimeout = rpcTimeout;
1115     this.writeRpcTimeout = rpcTimeout;
1116   }
1117
1118   @Override
1119   public int getWriteRpcTimeout() {
1120     return writeRpcTimeout;
1121   }
1122
1123   @Override
1124   public void setWriteRpcTimeout(int writeRpcTimeout) {
1125     this.writeRpcTimeout = writeRpcTimeout;
1126   }
1127
1128   @Override
1129   public int getReadRpcTimeout() { return readRpcTimeout; }
1130
1131   @Override
1132   public void setReadRpcTimeout(int readRpcTimeout) {
1133     this.readRpcTimeout = readRpcTimeout;
1134   }
1135
1136   @Override
1137   public String toString() {
1138     return tableName + ";" + connection;
1139   }
1140
1141   @Override
1142   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1143       Descriptors.MethodDescriptor methodDescriptor, Message request,
1144       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1145     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1146         Bytes.BYTES_COMPARATOR));
1147     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1148         new Callback<R>() {
1149       @Override
1150       public void update(byte[] region, byte[] row, R result) {
1151         if (region != null) {
1152           results.put(region, result);
1153         }
1154       }
1155     });
1156     return results;
1157   }
1158
1159   /**
1160    * {@inheritDoc}
1161    */
1162   @Override
1163   public <R extends Message> void batchCoprocessorService(
1164       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1165       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1166       throws ServiceException, Throwable {
1167
1168     if (startKey == null) {
1169       startKey = HConstants.EMPTY_START_ROW;
1170     }
1171     if (endKey == null) {
1172       endKey = HConstants.EMPTY_END_ROW;
1173     }
1174     // get regions covered by the row range
1175     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1176         getKeysAndRegionsInRange(startKey, endKey, true);
1177     List<byte[]> keys = keysAndRegions.getFirst();
1178     List<HRegionLocation> regions = keysAndRegions.getSecond();
1179
1180     // check if we have any calls to make
1181     if (keys.isEmpty()) {
1182       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1183           ", end=" + Bytes.toStringBinary(endKey));
1184       return;
1185     }
1186
1187     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1188     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1189         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1190     for (int i = 0; i < keys.size(); i++) {
1191       final byte[] rowKey = keys.get(i);
1192       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1193       RegionCoprocessorServiceExec exec =
1194           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1195       execs.add(exec);
1196       execsByRow.put(rowKey, exec);
1197     }
1198
1199     // tracking for any possible deserialization errors on success callback
1200     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1201     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1202     final List<Row> callbackErrorActions = new ArrayList<Row>();
1203     final List<String> callbackErrorServers = new ArrayList<String>();
1204     Object[] results = new Object[execs.size()];
1205
1206     AsyncProcess asyncProcess =
1207         new AsyncProcess(connection, configuration, pool,
1208             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1209             true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
1210
1211     AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs,
1212         new Callback<ClientProtos.CoprocessorServiceResult>() {
1213           @Override
1214           public void update(byte[] region, byte[] row,
1215                               ClientProtos.CoprocessorServiceResult serviceResult) {
1216             if (LOG.isTraceEnabled()) {
1217               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1218                   ": region=" + Bytes.toStringBinary(region) +
1219                   ", row=" + Bytes.toStringBinary(row) +
1220                   ", value=" + serviceResult.getValue().getValue());
1221             }
1222             try {
1223               Message.Builder builder = responsePrototype.newBuilderForType();
1224               ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1225               callback.update(region, row, (R) builder.build());
1226             } catch (IOException e) {
1227               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1228                   e);
1229               callbackErrorExceptions.add(e);
1230               callbackErrorActions.add(execsByRow.get(row));
1231               callbackErrorServers.add("null");
1232             }
1233           }
1234         }, results);
1235
1236     future.waitUntilDone();
1237
1238     if (future.hasError()) {
1239       throw future.getErrors();
1240     } else if (!callbackErrorExceptions.isEmpty()) {
1241       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1242           callbackErrorServers);
1243     }
1244   }
1245
1246   public RegionLocator getRegionLocator() {
1247     return this.locator;
1248   }
1249
1250   @VisibleForTesting
1251   BufferedMutator getBufferedMutator() throws IOException {
1252     if (mutator == null) {
1253       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1254           new BufferedMutatorParams(tableName)
1255               .pool(pool)
1256               .writeBufferSize(connConfiguration.getWriteBufferSize())
1257               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
1258       );
1259     }
1260     return mutator;
1261   }
1262 }