001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
022// Internally, we use shaded protobuf. This below are part of our public API.
023//SEE ABOVE NOTE!
024import com.google.protobuf.Descriptors;
025import com.google.protobuf.Message;
026import com.google.protobuf.Service;
027import com.google.protobuf.ServiceException;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.CompareOperator;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HRegionLocation;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.io.TimeRange;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.yetus.audience.InterfaceStability;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hadoop.hbase.client.coprocessor.Batch;
041import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
042import org.apache.hadoop.hbase.filter.BinaryComparator;
043import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
044import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
045import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
048import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
049import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
050import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.hbase.util.ReflectionUtils;
060import org.apache.hadoop.hbase.util.Threads;
061
062import java.io.IOException;
063import java.io.InterruptedIOException;
064import java.util.ArrayList;
065import java.util.Collections;
066import java.util.List;
067import java.util.Map;
068import java.util.TreeMap;
069import java.util.concurrent.Callable;
070import java.util.concurrent.ExecutionException;
071import java.util.concurrent.ExecutorService;
072import java.util.concurrent.Future;
073import java.util.concurrent.SynchronousQueue;
074import java.util.concurrent.ThreadPoolExecutor;
075import java.util.concurrent.TimeUnit;
076
077import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
078
079/**
080 * An implementation of {@link Table}. Used to communicate with a single HBase table.
081 * Lightweight. Get as needed and just close when done.
082 * Instances of this class SHOULD NOT be constructed directly.
083 * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
084 * class comment for an example of how.
085 *
086 * <p>This class is thread safe since 2.0.0 if not invoking any of the setter methods.
087 * All setters are moved into {@link TableBuilder} and reserved here only for keeping
088 * backward compatibility, and TODO will be removed soon.
089 *
090 * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
091 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
092 * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
093 * Interface Classification</a>
094 * There are no guarantees for backwards source / binary compatibility and methods or class can
095 * change or go away without deprecation.
096 *
097 * @see Table
098 * @see Admin
099 * @see Connection
100 * @see ConnectionFactory
101 */
102@InterfaceAudience.Private
103@InterfaceStability.Stable
104public class HTable implements Table {
105  private static final Logger LOG = LoggerFactory.getLogger(HTable.class);
106  private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
107  private final ClusterConnection connection;
108  private final TableName tableName;
109  private final Configuration configuration;
110  private final ConnectionConfiguration connConfiguration;
111  private boolean closed = false;
112  private final int scannerCaching;
113  private final long scannerMaxResultSize;
114  private final ExecutorService pool;  // For Multi & Scan
115  private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc
116  private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX
117  private int readRpcTimeoutMs; // timeout for each read rpc request
118  private int writeRpcTimeoutMs; // timeout for each write rpc request
119  private final boolean cleanupPoolOnClose; // shutdown the pool in close()
120  private final HRegionLocator locator;
121
122  /** The Async process for batch */
123  @VisibleForTesting
124  AsyncProcess multiAp;
125  private final RpcRetryingCallerFactory rpcCallerFactory;
126  private final RpcControllerFactory rpcControllerFactory;
127
128  // Marked Private @since 1.0
129  @InterfaceAudience.Private
130  public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
131    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
132    if (maxThreads == 0) {
133      maxThreads = 1; // is there a better default?
134    }
135    int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
136    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
137
138    // Using the "direct handoff" approach, new threads will only be created
139    // if it is necessary and will grow unbounded. This could be bad but in HCM
140    // we only create as many Runnables as there are region servers. It means
141    // it also scales when new region servers are added.
142    ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
143      TimeUnit.SECONDS, new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable"));
144    pool.allowCoreThreadTimeOut(true);
145    return pool;
146  }
147
148  /**
149   * Creates an object to access a HBase table.
150   * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
151   * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
152   * @param connection Connection to be used.
153   * @param builder The table builder
154   * @param rpcCallerFactory The RPC caller factory
155   * @param rpcControllerFactory The RPC controller factory
156   * @param pool ExecutorService to be used.
157   */
158  @InterfaceAudience.Private
159  protected HTable(final ConnectionImplementation connection,
160      final TableBuilderBase builder,
161      final RpcRetryingCallerFactory rpcCallerFactory,
162      final RpcControllerFactory rpcControllerFactory,
163      final ExecutorService pool) {
164    this.connection = Preconditions.checkNotNull(connection, "connection is null");
165    this.configuration = connection.getConfiguration();
166    this.connConfiguration = connection.getConnectionConfiguration();
167    if (pool == null) {
168      this.pool = getDefaultExecutor(this.configuration);
169      this.cleanupPoolOnClose = true;
170    } else {
171      this.pool = pool;
172      this.cleanupPoolOnClose = false;
173    }
174    if (rpcCallerFactory == null) {
175      this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
176    } else {
177      this.rpcCallerFactory = rpcCallerFactory;
178    }
179
180    if (rpcControllerFactory == null) {
181      this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
182    } else {
183      this.rpcControllerFactory = rpcControllerFactory;
184    }
185
186    this.tableName = builder.tableName;
187    this.operationTimeoutMs = builder.operationTimeout;
188    this.rpcTimeoutMs = builder.rpcTimeout;
189    this.readRpcTimeoutMs = builder.readRpcTimeout;
190    this.writeRpcTimeoutMs = builder.writeRpcTimeout;
191    this.scannerCaching = connConfiguration.getScannerCaching();
192    this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
193
194    // puts need to track errors globally due to how the APIs currently work.
195    multiAp = this.connection.getAsyncProcess();
196    this.locator = new HRegionLocator(tableName, connection);
197  }
198
199  /**
200   * @return maxKeyValueSize from configuration.
201   */
202  public static int getMaxKeyValueSize(Configuration conf) {
203    return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1);
204  }
205
206  @Override
207  public Configuration getConfiguration() {
208    return configuration;
209  }
210
211  @Override
212  public TableName getName() {
213    return tableName;
214  }
215
216  /**
217   * <em>INTERNAL</em> Used by unit tests and tools to do low-level
218   * manipulations.
219   * @return A Connection instance.
220   */
221  @VisibleForTesting
222  protected Connection getConnection() {
223    return this.connection;
224  }
225
226  @Override
227  @Deprecated
228  public HTableDescriptor getTableDescriptor() throws IOException {
229    HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory,
230      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
231    if (htd != null) {
232      return new ImmutableHTableDescriptor(htd);
233    }
234    return null;
235  }
236
237  @Override
238  public TableDescriptor getDescriptor() throws IOException {
239    return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
240      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
241  }
242
243  /**
244   * Get the corresponding start keys and regions for an arbitrary range of
245   * keys.
246   * <p>
247   * @param startKey Starting row in range, inclusive
248   * @param endKey Ending row in range
249   * @param includeEndKey true if endRow is inclusive, false if exclusive
250   * @return A pair of list of start keys and list of HRegionLocations that
251   *         contain the specified range
252   * @throws IOException if a remote or network exception occurs
253   */
254  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
255      final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
256      throws IOException {
257    return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
258  }
259
260  /**
261   * Get the corresponding start keys and regions for an arbitrary range of
262   * keys.
263   * <p>
264   * @param startKey Starting row in range, inclusive
265   * @param endKey Ending row in range
266   * @param includeEndKey true if endRow is inclusive, false if exclusive
267   * @param reload true to reload information or false to use cached information
268   * @return A pair of list of start keys and list of HRegionLocations that
269   *         contain the specified range
270   * @throws IOException if a remote or network exception occurs
271   */
272  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
273      final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
274      final boolean reload) throws IOException {
275    final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
276    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
277      throw new IllegalArgumentException(
278        "Invalid range: " + Bytes.toStringBinary(startKey) +
279        " > " + Bytes.toStringBinary(endKey));
280    }
281    List<byte[]> keysInRange = new ArrayList<>();
282    List<HRegionLocation> regionsInRange = new ArrayList<>();
283    byte[] currentKey = startKey;
284    do {
285      HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
286      keysInRange.add(currentKey);
287      regionsInRange.add(regionLocation);
288      currentKey = regionLocation.getRegionInfo().getEndKey();
289    } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
290        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
291            || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
292    return new Pair<>(keysInRange, regionsInRange);
293  }
294
295  /**
296   * The underlying {@link HTable} must not be closed.
297   * {@link Table#getScanner(Scan)} has other usage details.
298   */
299  @Override
300  public ResultScanner getScanner(Scan scan) throws IOException {
301    if (scan.getCaching() <= 0) {
302      scan.setCaching(scannerCaching);
303    }
304    if (scan.getMaxResultSize() <= 0) {
305      scan.setMaxResultSize(scannerMaxResultSize);
306    }
307    if (scan.getMvccReadPoint() > 0) {
308      // it is not supposed to be set by user, clear
309      scan.resetMvccReadPoint();
310    }
311    Boolean async = scan.isAsyncPrefetch();
312    if (async == null) {
313      async = connConfiguration.isClientScannerAsyncPrefetch();
314    }
315
316    if (scan.isReversed()) {
317      return new ReversedClientScanner(getConfiguration(), scan, getName(),
318        this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
319        pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
320    } else {
321      if (async) {
322        return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,
323            this.rpcCallerFactory, this.rpcControllerFactory,
324            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
325      } else {
326        return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection,
327            this.rpcCallerFactory, this.rpcControllerFactory,
328            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
329      }
330    }
331  }
332
333  /**
334   * The underlying {@link HTable} must not be closed.
335   * {@link Table#getScanner(byte[])} has other usage details.
336   */
337  @Override
338  public ResultScanner getScanner(byte [] family) throws IOException {
339    Scan scan = new Scan();
340    scan.addFamily(family);
341    return getScanner(scan);
342  }
343
344  /**
345   * The underlying {@link HTable} must not be closed.
346   * {@link Table#getScanner(byte[], byte[])} has other usage details.
347   */
348  @Override
349  public ResultScanner getScanner(byte [] family, byte [] qualifier)
350  throws IOException {
351    Scan scan = new Scan();
352    scan.addColumn(family, qualifier);
353    return getScanner(scan);
354  }
355
356  @Override
357  public Result get(final Get get) throws IOException {
358    return get(get, get.isCheckExistenceOnly());
359  }
360
361  private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
362    // if we are changing settings to the get, clone it.
363    if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
364      get = ReflectionUtils.newInstance(get.getClass(), get);
365      get.setCheckExistenceOnly(checkExistenceOnly);
366      if (get.getConsistency() == null){
367        get.setConsistency(DEFAULT_CONSISTENCY);
368      }
369    }
370
371    if (get.getConsistency() == Consistency.STRONG) {
372      final Get configuredGet = get;
373      ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(),
374          get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) {
375        @Override
376        protected Result rpcCall() throws Exception {
377          ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
378              getLocation().getRegionInfo().getRegionName(), configuredGet);
379          ClientProtos.GetResponse response = doGet(request);
380          return response == null? null:
381            ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
382        }
383      };
384      return rpcCallerFactory.<Result>newCaller(readRpcTimeoutMs).callWithRetries(callable,
385          this.operationTimeoutMs);
386    }
387
388    // Call that takes into account the replica
389    RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
390        rpcControllerFactory, tableName, this.connection, get, pool,
391        connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs,
392        connConfiguration.getPrimaryCallTimeoutMicroSecond());
393    return callable.call(operationTimeoutMs);
394  }
395
396  @Override
397  public Result[] get(List<Get> gets) throws IOException {
398    if (gets.size() == 1) {
399      return new Result[]{get(gets.get(0))};
400    }
401    try {
402      Object[] r1 = new Object[gets.size()];
403      batch((List<? extends Row>)gets, r1, readRpcTimeoutMs);
404      // Translate.
405      Result [] results = new Result[r1.length];
406      int i = 0;
407      for (Object obj: r1) {
408        // Batch ensures if there is a failure we get an exception instead
409        results[i++] = (Result)obj;
410      }
411      return results;
412    } catch (InterruptedException e) {
413      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
414    }
415  }
416
417  @Override
418  public void batch(final List<? extends Row> actions, final Object[] results)
419      throws InterruptedException, IOException {
420    int rpcTimeout = writeRpcTimeoutMs;
421    boolean hasRead = false;
422    boolean hasWrite = false;
423    for (Row action : actions) {
424      if (action instanceof Mutation) {
425        hasWrite = true;
426      } else {
427        hasRead = true;
428      }
429      if (hasRead && hasWrite) {
430        break;
431      }
432    }
433    if (hasRead && !hasWrite) {
434      rpcTimeout = readRpcTimeoutMs;
435    }
436    batch(actions, results, rpcTimeout);
437  }
438
439  public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
440      throws InterruptedException, IOException {
441    AsyncProcessTask task = AsyncProcessTask.newBuilder()
442            .setPool(pool)
443            .setTableName(tableName)
444            .setRowAccess(actions)
445            .setResults(results)
446            .setRpcTimeout(rpcTimeout)
447            .setOperationTimeout(operationTimeoutMs)
448            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
449            .build();
450    AsyncRequestFuture ars = multiAp.submit(task);
451    ars.waitUntilDone();
452    if (ars.hasError()) {
453      throw ars.getErrors();
454    }
455  }
456
457  @Override
458  public <R> void batchCallback(
459    final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
460    throws IOException, InterruptedException {
461    doBatchWithCallback(actions, results, callback, connection, pool, tableName);
462  }
463
464  public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
465    Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
466    throws InterruptedIOException, RetriesExhaustedWithDetailsException {
467    int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
468    int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
469        connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
470            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
471    AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback)
472            .setPool(pool)
473            .setTableName(tableName)
474            .setRowAccess(actions)
475            .setResults(results)
476            .setOperationTimeout(operationTimeout)
477            .setRpcTimeout(writeTimeout)
478            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
479            .build();
480    AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
481    ars.waitUntilDone();
482    if (ars.hasError()) {
483      throw ars.getErrors();
484    }
485  }
486
487  @Override
488  public void delete(final Delete delete) throws IOException {
489    ClientServiceCallable<Void> callable =
490        new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(),
491            this.rpcControllerFactory.newController(), delete.getPriority()) {
492      @Override
493      protected Void rpcCall() throws Exception {
494        MutateRequest request = RequestConverter
495            .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete);
496        doMutate(request);
497        return null;
498      }
499    };
500    rpcCallerFactory.<Void>newCaller(this.writeRpcTimeoutMs)
501        .callWithRetries(callable, this.operationTimeoutMs);
502  }
503
504  @Override
505  public void delete(final List<Delete> deletes)
506  throws IOException {
507    Object[] results = new Object[deletes.size()];
508    try {
509      batch(deletes, results, writeRpcTimeoutMs);
510    } catch (InterruptedException e) {
511      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
512    } finally {
513      // TODO: to be consistent with batch put(), do not modify input list
514      // mutate list so that it is empty for complete success, or contains only failed records
515      // results are returned in the same order as the requests in list walk the list backwards,
516      // so we can remove from list without impacting the indexes of earlier members
517      for (int i = results.length - 1; i>=0; i--) {
518        // if result is not null, it succeeded
519        if (results[i] instanceof Result) {
520          deletes.remove(i);
521        }
522      }
523    }
524  }
525
526  @Override
527  public void put(final Put put) throws IOException {
528    validatePut(put);
529    ClientServiceCallable<Void> callable =
530        new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(),
531            this.rpcControllerFactory.newController(), put.getPriority()) {
532      @Override
533      protected Void rpcCall() throws Exception {
534        MutateRequest request =
535            RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put);
536        doMutate(request);
537        return null;
538      }
539    };
540    rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
541        this.operationTimeoutMs);
542  }
543
544  @Override
545  public void put(final List<Put> puts) throws IOException {
546    for (Put put : puts) {
547      validatePut(put);
548    }
549    Object[] results = new Object[puts.size()];
550    try {
551      batch(puts, results, writeRpcTimeoutMs);
552    } catch (InterruptedException e) {
553      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
554    }
555  }
556
557  @Override
558  public void mutateRow(final RowMutations rm) throws IOException {
559    CancellableRegionServerCallable<MultiResponse> callable =
560      new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
561          rpcControllerFactory.newController(), writeRpcTimeoutMs,
562          new RetryingTimeTracker().start(), rm.getMaxPriority()) {
563      @Override
564      protected MultiResponse rpcCall() throws Exception {
565        RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
566            getLocation().getRegionInfo().getRegionName(), rm);
567        regionMutationBuilder.setAtomic(true);
568        MultiRequest request =
569            MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
570        ClientProtos.MultiResponse response = doMulti(request);
571        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
572        if (res.hasException()) {
573          Throwable ex = ProtobufUtil.toException(res.getException());
574          if (ex instanceof IOException) {
575            throw (IOException) ex;
576          }
577          throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
578        }
579        return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
580      }
581    };
582    AsyncProcessTask task = AsyncProcessTask.newBuilder()
583            .setPool(pool)
584            .setTableName(tableName)
585            .setRowAccess(rm.getMutations())
586            .setCallable(callable)
587            .setRpcTimeout(writeRpcTimeoutMs)
588            .setOperationTimeout(operationTimeoutMs)
589            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
590            .build();
591    AsyncRequestFuture ars = multiAp.submit(task);
592    ars.waitUntilDone();
593    if (ars.hasError()) {
594      throw ars.getErrors();
595    }
596  }
597
598  @Override
599  public Result append(final Append append) throws IOException {
600    checkHasFamilies(append);
601    NoncedRegionServerCallable<Result> callable =
602        new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(),
603            this.rpcControllerFactory.newController(), append.getPriority()) {
604      @Override
605      protected Result rpcCall() throws Exception {
606        MutateRequest request = RequestConverter.buildMutateRequest(
607          getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
608        MutateResponse response = doMutate(request);
609        if (!response.hasResult()) return null;
610        return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
611      }
612    };
613    return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs).
614        callWithRetries(callable, this.operationTimeoutMs);
615  }
616
617  @Override
618  public Result increment(final Increment increment) throws IOException {
619    checkHasFamilies(increment);
620    NoncedRegionServerCallable<Result> callable =
621        new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(),
622            this.rpcControllerFactory.newController(), increment.getPriority()) {
623      @Override
624      protected Result rpcCall() throws Exception {
625        MutateRequest request = RequestConverter.buildMutateRequest(
626          getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
627        MutateResponse response = doMutate(request);
628        // Should this check for null like append does?
629        return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
630      }
631    };
632    return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable,
633        this.operationTimeoutMs);
634  }
635
636  @Override
637  public long incrementColumnValue(final byte [] row, final byte [] family,
638      final byte [] qualifier, final long amount)
639  throws IOException {
640    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
641  }
642
643  @Override
644  public long incrementColumnValue(final byte [] row, final byte [] family,
645      final byte [] qualifier, final long amount, final Durability durability)
646  throws IOException {
647    NullPointerException npe = null;
648    if (row == null) {
649      npe = new NullPointerException("row is null");
650    } else if (family == null) {
651      npe = new NullPointerException("family is null");
652    }
653    if (npe != null) {
654      throw new IOException(
655          "Invalid arguments to incrementColumnValue", npe);
656    }
657
658    NoncedRegionServerCallable<Long> callable =
659        new NoncedRegionServerCallable<Long>(this.connection, getName(), row,
660            this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
661      @Override
662      protected Long rpcCall() throws Exception {
663        MutateRequest request = RequestConverter.buildIncrementRequest(
664          getLocation().getRegionInfo().getRegionName(), row, family,
665          qualifier, amount, durability, getNonceGroup(), getNonce());
666        MutateResponse response = doMutate(request);
667        Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
668        return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
669      }
670    };
671    return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs).
672        callWithRetries(callable, this.operationTimeoutMs);
673  }
674
675  @Override
676  @Deprecated
677  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
678      final byte [] value, final Put put) throws IOException {
679    return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put);
680  }
681
682  @Override
683  @Deprecated
684  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
685      final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
686    return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put);
687  }
688
689  @Override
690  @Deprecated
691  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
692      final CompareOperator op, final byte [] value, final Put put) throws IOException {
693    // The name of the operators in CompareOperator are intentionally those of the
694    // operators in the filter's CompareOp enum.
695    return doCheckAndPut(row, family, qualifier, op.name(), value, null, put);
696  }
697
698  private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
699    final String opName, final byte[] value, final TimeRange timeRange, final Put put)
700    throws IOException {
701    ClientServiceCallable<Boolean> callable =
702        new ClientServiceCallable<Boolean>(this.connection, getName(), row,
703            this.rpcControllerFactory.newController(), put.getPriority()) {
704      @Override
705      protected Boolean rpcCall() throws Exception {
706        CompareType compareType = CompareType.valueOf(opName);
707        MutateRequest request = RequestConverter.buildMutateRequest(
708            getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
709            new BinaryComparator(value), compareType, timeRange, put);
710        MutateResponse response = doMutate(request);
711        return Boolean.valueOf(response.getProcessed());
712      }
713    };
714    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeoutMs)
715        .callWithRetries(callable, this.operationTimeoutMs);
716  }
717
718  @Override
719  @Deprecated
720  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
721    final byte[] value, final Delete delete) throws IOException {
722    return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null,
723      delete);
724  }
725
726  @Override
727  @Deprecated
728  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
729    final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
730    return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete);
731  }
732
733  @Override
734  @Deprecated
735  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
736    final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
737    return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete);
738  }
739
740  private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
741    final String opName, final byte[] value, final TimeRange timeRange, final Delete delete)
742    throws IOException {
743    CancellableRegionServerCallable<SingleResponse> callable =
744      new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
745        this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
746        new RetryingTimeTracker().start(), delete.getPriority()) {
747        @Override
748        protected SingleResponse rpcCall() throws Exception {
749          CompareType compareType = CompareType.valueOf(opName);
750          MutateRequest request = RequestConverter
751            .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
752              qualifier, new BinaryComparator(value), compareType, timeRange, delete);
753          MutateResponse response = doMutate(request);
754          return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
755        }
756      };
757    List<Delete> rows = Collections.singletonList(delete);
758    Object[] results = new Object[1];
759    AsyncProcessTask task =
760      AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows)
761        .setCallable(callable)
762        // TODO any better timeout?
763        .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
764        .setOperationTimeout(operationTimeoutMs)
765        .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build();
766    AsyncRequestFuture ars = multiAp.submit(task);
767    ars.waitUntilDone();
768    if (ars.hasError()) {
769      throw ars.getErrors();
770    }
771    return ((SingleResponse.Entry) results[0]).isProcessed();
772  }
773
774  @Override
775  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
776    return new CheckAndMutateBuilderImpl(row, family);
777  }
778
779  private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
780    final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm)
781    throws IOException {
782    CancellableRegionServerCallable<MultiResponse> callable =
783    new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
784    rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
785        rm.getMaxPriority()) {
786      @Override
787      protected MultiResponse rpcCall() throws Exception {
788        CompareType compareType = CompareType.valueOf(opName);
789        MultiRequest request = RequestConverter
790          .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
791            new BinaryComparator(value), compareType, timeRange, rm);
792        ClientProtos.MultiResponse response = doMulti(request);
793        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
794        if (res.hasException()) {
795          Throwable ex = ProtobufUtil.toException(res.getException());
796          if (ex instanceof IOException) {
797            throw (IOException) ex;
798          }
799          throw new IOException(
800            "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
801        }
802        return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
803      }
804    };
805
806    /**
807     *  Currently, we use one array to store 'processed' flag which is returned by server.
808     *  It is excessive to send such a large array, but that is required by the framework right now
809     * */
810    Object[] results = new Object[rm.getMutations().size()];
811    AsyncProcessTask task = AsyncProcessTask.newBuilder()
812    .setPool(pool)
813    .setTableName(tableName)
814    .setRowAccess(rm.getMutations())
815    .setResults(results)
816    .setCallable(callable)
817    // TODO any better timeout?
818    .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
819    .setOperationTimeout(operationTimeoutMs)
820    .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
821    .build();
822    AsyncRequestFuture ars = multiAp.submit(task);
823    ars.waitUntilDone();
824    if (ars.hasError()) {
825      throw ars.getErrors();
826    }
827
828    return ((Result)results[0]).getExists();
829  }
830
831  @Override
832  @Deprecated
833  public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
834    final CompareOp compareOp, final byte [] value, final RowMutations rm)
835  throws IOException {
836    return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm);
837  }
838
839  @Override
840  @Deprecated
841  public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
842      final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
843    return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm);
844  }
845
846  @Override
847  public boolean exists(final Get get) throws IOException {
848    Result r = get(get, true);
849    assert r.getExists() != null;
850    return r.getExists();
851  }
852
853  @Override
854  public boolean[] exists(List<Get> gets) throws IOException {
855    if (gets.isEmpty()) return new boolean[]{};
856    if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
857
858    ArrayList<Get> exists = new ArrayList<>(gets.size());
859    for (Get g: gets){
860      Get ge = new Get(g);
861      ge.setCheckExistenceOnly(true);
862      exists.add(ge);
863    }
864
865    Object[] r1= new Object[exists.size()];
866    try {
867      batch(exists, r1, readRpcTimeoutMs);
868    } catch (InterruptedException e) {
869      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
870    }
871
872    // translate.
873    boolean[] results = new boolean[r1.length];
874    int i = 0;
875    for (Object o : r1) {
876      // batch ensures if there is a failure we get an exception instead
877      results[i++] = ((Result)o).getExists();
878    }
879
880    return results;
881  }
882
883  /**
884   * Process a mixed batch of Get, Put and Delete actions. All actions for a
885   * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
886   *
887   * @param list The collection of actions.
888   * @param results An empty array, same size as list. If an exception is thrown,
889   *   you can test here for partial results, and to determine which actions
890   *   processed successfully.
891   * @throws IOException if there are problems talking to META. Per-item
892   *   exceptions are stored in the results array.
893   */
894  public <R> void processBatchCallback(
895    final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
896    throws IOException, InterruptedException {
897    this.batchCallback(list, results, callback);
898  }
899
900  @Override
901  public void close() throws IOException {
902    if (this.closed) {
903      return;
904    }
905    if (cleanupPoolOnClose) {
906      this.pool.shutdown();
907      try {
908        boolean terminated = false;
909        do {
910          // wait until the pool has terminated
911          terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
912        } while (!terminated);
913      } catch (InterruptedException e) {
914        this.pool.shutdownNow();
915        LOG.warn("waitForTermination interrupted");
916      }
917    }
918    this.closed = true;
919  }
920
921  // validate for well-formedness
922  private void validatePut(final Put put) throws IllegalArgumentException {
923    ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize());
924  }
925
926  /**
927   * The pool is used for mutli requests for this HTable
928   * @return the pool used for mutli
929   */
930  ExecutorService getPool() {
931    return this.pool;
932  }
933
934  /**
935   * Explicitly clears the region cache to fetch the latest value from META.
936   * This is a power user function: avoid unless you know the ramifications.
937   */
938  public void clearRegionCache() {
939    this.connection.clearRegionLocationCache();
940  }
941
942  @Override
943  public CoprocessorRpcChannel coprocessorService(byte[] row) {
944    return new RegionCoprocessorRpcChannel(connection, tableName, row);
945  }
946
947  @Override
948  public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
949      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
950      throws ServiceException, Throwable {
951    final Map<byte[],R> results =  Collections.synchronizedMap(
952        new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
953    coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
954      @Override
955      public void update(byte[] region, byte[] row, R value) {
956        if (region != null) {
957          results.put(region, value);
958        }
959      }
960    });
961    return results;
962  }
963
964  @Override
965  public <T extends Service, R> void coprocessorService(final Class<T> service,
966      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
967      final Batch.Callback<R> callback) throws ServiceException, Throwable {
968    // get regions covered by the row range
969    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
970    Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
971    for (final byte[] r : keys) {
972      final RegionCoprocessorRpcChannel channel =
973          new RegionCoprocessorRpcChannel(connection, tableName, r);
974      Future<R> future = pool.submit(new Callable<R>() {
975        @Override
976        public R call() throws Exception {
977          T instance =
978              org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel);
979          R result = callable.call(instance);
980          byte[] region = channel.getLastRegion();
981          if (callback != null) {
982            callback.update(region, r, result);
983          }
984          return result;
985        }
986      });
987      futures.put(r, future);
988    }
989    for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
990      try {
991        e.getValue().get();
992      } catch (ExecutionException ee) {
993        LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
994            + Bytes.toStringBinary(e.getKey()), ee);
995        throw ee.getCause();
996      } catch (InterruptedException ie) {
997        throw new InterruptedIOException("Interrupted calling coprocessor service "
998            + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
999      }
1000    }
1001  }
1002
1003  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1004  throws IOException {
1005    if (start == null) {
1006      start = HConstants.EMPTY_START_ROW;
1007    }
1008    if (end == null) {
1009      end = HConstants.EMPTY_END_ROW;
1010    }
1011    return getKeysAndRegionsInRange(start, end, true).getFirst();
1012  }
1013
1014  @Override
1015  public long getRpcTimeout(TimeUnit unit) {
1016    return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS);
1017  }
1018
1019  @Override
1020  @Deprecated
1021  public int getRpcTimeout() {
1022    return rpcTimeoutMs;
1023  }
1024
1025  @Override
1026  @Deprecated
1027  public void setRpcTimeout(int rpcTimeout) {
1028    setReadRpcTimeout(rpcTimeout);
1029    setWriteRpcTimeout(rpcTimeout);
1030  }
1031
1032  @Override
1033  public long getReadRpcTimeout(TimeUnit unit) {
1034    return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS);
1035  }
1036
1037  @Override
1038  @Deprecated
1039  public int getReadRpcTimeout() {
1040    return readRpcTimeoutMs;
1041  }
1042
1043  @Override
1044  @Deprecated
1045  public void setReadRpcTimeout(int readRpcTimeout) {
1046    this.readRpcTimeoutMs = readRpcTimeout;
1047  }
1048
1049  @Override
1050  public long getWriteRpcTimeout(TimeUnit unit) {
1051    return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS);
1052  }
1053
1054  @Override
1055  @Deprecated
1056  public int getWriteRpcTimeout() {
1057    return writeRpcTimeoutMs;
1058  }
1059
1060  @Override
1061  @Deprecated
1062  public void setWriteRpcTimeout(int writeRpcTimeout) {
1063    this.writeRpcTimeoutMs = writeRpcTimeout;
1064  }
1065
1066  @Override
1067  public long getOperationTimeout(TimeUnit unit) {
1068    return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS);
1069  }
1070
1071  @Override
1072  @Deprecated
1073  public int getOperationTimeout() {
1074    return operationTimeoutMs;
1075  }
1076
1077  @Override
1078  @Deprecated
1079  public void setOperationTimeout(int operationTimeout) {
1080    this.operationTimeoutMs = operationTimeout;
1081  }
1082
1083  @Override
1084  public String toString() {
1085    return tableName + ";" + connection;
1086  }
1087
1088  @Override
1089  public <R extends Message> Map<byte[], R> batchCoprocessorService(
1090      Descriptors.MethodDescriptor methodDescriptor, Message request,
1091      byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1092    final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1093        Bytes.BYTES_COMPARATOR));
1094    batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1095        new Callback<R>() {
1096      @Override
1097      public void update(byte[] region, byte[] row, R result) {
1098        if (region != null) {
1099          results.put(region, result);
1100        }
1101      }
1102    });
1103    return results;
1104  }
1105
1106  @Override
1107  public <R extends Message> void batchCoprocessorService(
1108      final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1109      byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1110      throws ServiceException, Throwable {
1111
1112    if (startKey == null) {
1113      startKey = HConstants.EMPTY_START_ROW;
1114    }
1115    if (endKey == null) {
1116      endKey = HConstants.EMPTY_END_ROW;
1117    }
1118    // get regions covered by the row range
1119    Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1120        getKeysAndRegionsInRange(startKey, endKey, true);
1121    List<byte[]> keys = keysAndRegions.getFirst();
1122    List<HRegionLocation> regions = keysAndRegions.getSecond();
1123
1124    // check if we have any calls to make
1125    if (keys.isEmpty()) {
1126      LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1127          ", end=" + Bytes.toStringBinary(endKey));
1128      return;
1129    }
1130
1131    List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size());
1132    final Map<byte[], RegionCoprocessorServiceExec> execsByRow = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1133    for (int i = 0; i < keys.size(); i++) {
1134      final byte[] rowKey = keys.get(i);
1135      final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1136      RegionCoprocessorServiceExec exec =
1137          new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1138      execs.add(exec);
1139      execsByRow.put(rowKey, exec);
1140    }
1141
1142    // tracking for any possible deserialization errors on success callback
1143    // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1144    final List<Throwable> callbackErrorExceptions = new ArrayList<>();
1145    final List<Row> callbackErrorActions = new ArrayList<>();
1146    final List<String> callbackErrorServers = new ArrayList<>();
1147    Object[] results = new Object[execs.size()];
1148
1149    AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
1150        RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1151        RpcControllerFactory.instantiate(configuration));
1152
1153    Callback<ClientProtos.CoprocessorServiceResult> resultsCallback
1154    = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
1155      if (LOG.isTraceEnabled()) {
1156        LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1157            ": region=" + Bytes.toStringBinary(region) +
1158            ", row=" + Bytes.toStringBinary(row) +
1159            ", value=" + serviceResult.getValue().getValue());
1160      }
1161      try {
1162        Message.Builder builder = responsePrototype.newBuilderForType();
1163        org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
1164            serviceResult.getValue().getValue().toByteArray());
1165        callback.update(region, row, (R) builder.build());
1166      } catch (IOException e) {
1167        LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1168            e);
1169        callbackErrorExceptions.add(e);
1170        callbackErrorActions.add(execsByRow.get(row));
1171        callbackErrorServers.add("null");
1172      }
1173    };
1174    AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task =
1175        AsyncProcessTask.newBuilder(resultsCallback)
1176            .setPool(pool)
1177            .setTableName(tableName)
1178            .setRowAccess(execs)
1179            .setResults(results)
1180            .setRpcTimeout(readRpcTimeoutMs)
1181            .setOperationTimeout(operationTimeoutMs)
1182            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
1183            .build();
1184    AsyncRequestFuture future = asyncProcess.submit(task);
1185    future.waitUntilDone();
1186
1187    if (future.hasError()) {
1188      throw future.getErrors();
1189    } else if (!callbackErrorExceptions.isEmpty()) {
1190      throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1191          callbackErrorServers);
1192    }
1193  }
1194
1195  public RegionLocator getRegionLocator() {
1196    return this.locator;
1197  }
1198
1199  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
1200
1201    private final byte[] row;
1202    private final byte[] family;
1203    private byte[] qualifier;
1204    private TimeRange timeRange;
1205    private CompareOperator op;
1206    private byte[] value;
1207
1208    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
1209      this.row = Preconditions.checkNotNull(row, "row is null");
1210      this.family = Preconditions.checkNotNull(family, "family is null");
1211    }
1212
1213    @Override
1214    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
1215      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
1216          " an empty byte array, or just do not call this method if you want a null qualifier");
1217      return this;
1218    }
1219
1220    @Override
1221    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
1222      this.timeRange = timeRange;
1223      return this;
1224    }
1225
1226    @Override
1227    public CheckAndMutateBuilder ifNotExists() {
1228      this.op = CompareOperator.EQUAL;
1229      this.value = null;
1230      return this;
1231    }
1232
1233    @Override
1234    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
1235      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
1236      this.value = Preconditions.checkNotNull(value, "value is null");
1237      return this;
1238    }
1239
1240    private void preCheck() {
1241      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
1242          " calling ifNotExists/ifEquals/ifMatches before executing the request");
1243    }
1244
1245    @Override
1246    public boolean thenPut(Put put) throws IOException {
1247      validatePut(put);
1248      preCheck();
1249      return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put);
1250    }
1251
1252    @Override
1253    public boolean thenDelete(Delete delete) throws IOException {
1254      preCheck();
1255      return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete);
1256    }
1257
1258    @Override
1259    public boolean thenMutate(RowMutations mutation) throws IOException {
1260      preCheck();
1261      return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation);
1262    }
1263  }
1264}