001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
022// Internally, we use shaded protobuf. This below are part of our public API.
023//SEE ABOVE NOTE!
024import com.google.protobuf.Descriptors;
025import com.google.protobuf.Message;
026import com.google.protobuf.Service;
027import com.google.protobuf.ServiceException;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.CompareOperator;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HRegionLocation;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.coprocessor.Batch;
036import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
037import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
038import org.apache.hadoop.hbase.filter.Filter;
039import org.apache.hadoop.hbase.io.TimeRange;
040import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
041import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
042import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
043import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
044import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.Pair;
052import org.apache.hadoop.hbase.util.ReflectionUtils;
053import org.apache.hadoop.hbase.util.Threads;
054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.apache.yetus.audience.InterfaceStability;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import java.io.IOException;
062import java.io.InterruptedIOException;
063import java.util.ArrayList;
064import java.util.Collections;
065import java.util.List;
066import java.util.Map;
067import java.util.TreeMap;
068import java.util.concurrent.Callable;
069import java.util.concurrent.ExecutionException;
070import java.util.concurrent.ExecutorService;
071import java.util.concurrent.Future;
072import java.util.concurrent.SynchronousQueue;
073import java.util.concurrent.ThreadPoolExecutor;
074import java.util.concurrent.TimeUnit;
075
076import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
077
078/**
079 * An implementation of {@link Table}. Used to communicate with a single HBase table.
080 * Lightweight. Get as needed and just close when done.
081 * Instances of this class SHOULD NOT be constructed directly.
082 * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
083 * class comment for an example of how.
084 *
085 * <p>This class is thread safe since 2.0.0 if not invoking any of the setter methods.
086 * All setters are moved into {@link TableBuilder} and reserved here only for keeping
087 * backward compatibility, and TODO will be removed soon.
088 *
089 * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
090 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
091 * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
092 * Interface Classification</a>
093 * There are no guarantees for backwards source / binary compatibility and methods or class can
094 * change or go away without deprecation.
095 *
096 * @see Table
097 * @see Admin
098 * @see Connection
099 * @see ConnectionFactory
100 */
101@InterfaceAudience.Private
102@InterfaceStability.Stable
103public class HTable implements Table {
104  private static final Logger LOG = LoggerFactory.getLogger(HTable.class);
105  private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
106  private final ClusterConnection connection;
107  private final TableName tableName;
108  private final Configuration configuration;
109  private final ConnectionConfiguration connConfiguration;
110  private boolean closed = false;
111  private final int scannerCaching;
112  private final long scannerMaxResultSize;
113  private final ExecutorService pool;  // For Multi & Scan
114  private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc
115  private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX
116  private int readRpcTimeoutMs; // timeout for each read rpc request
117  private int writeRpcTimeoutMs; // timeout for each write rpc request
118  private final boolean cleanupPoolOnClose; // shutdown the pool in close()
119  private final HRegionLocator locator;
120
121  /** The Async process for batch */
122  AsyncProcess multiAp;
123  private final RpcRetryingCallerFactory rpcCallerFactory;
124  private final RpcControllerFactory rpcControllerFactory;
125
126  // Marked Private @since 1.0
127  @InterfaceAudience.Private
128  public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
129    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
130    if (maxThreads == 0) {
131      maxThreads = 1; // is there a better default?
132    }
133    int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
134    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
135
136    // Using the "direct handoff" approach, new threads will only be created
137    // if it is necessary and will grow unbounded. This could be bad but in HCM
138    // we only create as many Runnables as there are region servers. It means
139    // it also scales when new region servers are added.
140    ThreadPoolExecutor pool =
141      new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, TimeUnit.SECONDS,
142        new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("htable-pool-%d")
143        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
144    pool.allowCoreThreadTimeOut(true);
145    return pool;
146  }
147
148  /**
149   * Creates an object to access a HBase table.
150   * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
151   * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
152   * @param connection Connection to be used.
153   * @param builder The table builder
154   * @param rpcCallerFactory The RPC caller factory
155   * @param rpcControllerFactory The RPC controller factory
156   * @param pool ExecutorService to be used.
157   */
158  @InterfaceAudience.Private
159  protected HTable(final ConnectionImplementation connection,
160      final TableBuilderBase builder,
161      final RpcRetryingCallerFactory rpcCallerFactory,
162      final RpcControllerFactory rpcControllerFactory,
163      final ExecutorService pool) {
164    this.connection = Preconditions.checkNotNull(connection, "connection is null");
165    this.configuration = connection.getConfiguration();
166    this.connConfiguration = connection.getConnectionConfiguration();
167    if (pool == null) {
168      this.pool = getDefaultExecutor(this.configuration);
169      this.cleanupPoolOnClose = true;
170    } else {
171      this.pool = pool;
172      this.cleanupPoolOnClose = false;
173    }
174    if (rpcCallerFactory == null) {
175      this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
176    } else {
177      this.rpcCallerFactory = rpcCallerFactory;
178    }
179
180    if (rpcControllerFactory == null) {
181      this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
182    } else {
183      this.rpcControllerFactory = rpcControllerFactory;
184    }
185
186    this.tableName = builder.tableName;
187    this.operationTimeoutMs = builder.operationTimeout;
188    this.rpcTimeoutMs = builder.rpcTimeout;
189    this.readRpcTimeoutMs = builder.readRpcTimeout;
190    this.writeRpcTimeoutMs = builder.writeRpcTimeout;
191    this.scannerCaching = connConfiguration.getScannerCaching();
192    this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
193
194    // puts need to track errors globally due to how the APIs currently work.
195    multiAp = this.connection.getAsyncProcess();
196    this.locator = new HRegionLocator(tableName, connection);
197  }
198
199  /**
200   * @return maxKeyValueSize from configuration.
201   */
202  public static int getMaxKeyValueSize(Configuration conf) {
203    return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1);
204  }
205
206  @Override
207  public Configuration getConfiguration() {
208    return configuration;
209  }
210
211  @Override
212  public TableName getName() {
213    return tableName;
214  }
215
216  /**
217   * <em>INTERNAL</em> Used by unit tests and tools to do low-level
218   * manipulations.
219   * @return A Connection instance.
220   */
221  protected Connection getConnection() {
222    return this.connection;
223  }
224
225  @Override
226  @Deprecated
227  public HTableDescriptor getTableDescriptor() throws IOException {
228    HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory,
229      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
230    if (htd != null) {
231      return new ImmutableHTableDescriptor(htd);
232    }
233    return null;
234  }
235
236  @Override
237  public TableDescriptor getDescriptor() throws IOException {
238    return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
239      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
240  }
241
242  /**
243   * Get the corresponding start keys and regions for an arbitrary range of
244   * keys.
245   * <p>
246   * @param startKey Starting row in range, inclusive
247   * @param endKey Ending row in range
248   * @param includeEndKey true if endRow is inclusive, false if exclusive
249   * @return A pair of list of start keys and list of HRegionLocations that
250   *         contain the specified range
251   * @throws IOException if a remote or network exception occurs
252   */
253  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
254      final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
255      throws IOException {
256    return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
257  }
258
259  /**
260   * Get the corresponding start keys and regions for an arbitrary range of
261   * keys.
262   * <p>
263   * @param startKey Starting row in range, inclusive
264   * @param endKey Ending row in range
265   * @param includeEndKey true if endRow is inclusive, false if exclusive
266   * @param reload true to reload information or false to use cached information
267   * @return A pair of list of start keys and list of HRegionLocations that
268   *         contain the specified range
269   * @throws IOException if a remote or network exception occurs
270   */
271  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
272      final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
273      final boolean reload) throws IOException {
274    final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
275    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
276      throw new IllegalArgumentException(
277        "Invalid range: " + Bytes.toStringBinary(startKey) +
278        " > " + Bytes.toStringBinary(endKey));
279    }
280    List<byte[]> keysInRange = new ArrayList<>();
281    List<HRegionLocation> regionsInRange = new ArrayList<>();
282    byte[] currentKey = startKey;
283    do {
284      HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
285      keysInRange.add(currentKey);
286      regionsInRange.add(regionLocation);
287      currentKey = regionLocation.getRegionInfo().getEndKey();
288    } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
289        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
290            || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
291    return new Pair<>(keysInRange, regionsInRange);
292  }
293
294  /**
295   * The underlying {@link HTable} must not be closed.
296   * {@link Table#getScanner(Scan)} has other usage details.
297   */
298  @Override
299  public ResultScanner getScanner(Scan scan) throws IOException {
300    if (scan.getCaching() <= 0) {
301      scan.setCaching(scannerCaching);
302    }
303    if (scan.getMaxResultSize() <= 0) {
304      scan.setMaxResultSize(scannerMaxResultSize);
305    }
306    if (scan.getMvccReadPoint() > 0) {
307      // it is not supposed to be set by user, clear
308      scan.resetMvccReadPoint();
309    }
310    Boolean async = scan.isAsyncPrefetch();
311    if (async == null) {
312      async = connConfiguration.isClientScannerAsyncPrefetch();
313    }
314
315    if (scan.isReversed()) {
316      return new ReversedClientScanner(getConfiguration(), scan, getName(),
317        this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
318        pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
319    } else {
320      if (async) {
321        return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,
322            this.rpcCallerFactory, this.rpcControllerFactory,
323            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
324      } else {
325        return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection,
326            this.rpcCallerFactory, this.rpcControllerFactory,
327            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
328      }
329    }
330  }
331
332  /**
333   * The underlying {@link HTable} must not be closed.
334   * {@link Table#getScanner(byte[])} has other usage details.
335   */
336  @Override
337  public ResultScanner getScanner(byte [] family) throws IOException {
338    Scan scan = new Scan();
339    scan.addFamily(family);
340    return getScanner(scan);
341  }
342
343  /**
344   * The underlying {@link HTable} must not be closed.
345   * {@link Table#getScanner(byte[], byte[])} has other usage details.
346   */
347  @Override
348  public ResultScanner getScanner(byte [] family, byte [] qualifier)
349  throws IOException {
350    Scan scan = new Scan();
351    scan.addColumn(family, qualifier);
352    return getScanner(scan);
353  }
354
355  @Override
356  public Result get(final Get get) throws IOException {
357    return get(get, get.isCheckExistenceOnly());
358  }
359
360  private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
361    // if we are changing settings to the get, clone it.
362    if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
363      get = ReflectionUtils.newInstance(get.getClass(), get);
364      get.setCheckExistenceOnly(checkExistenceOnly);
365      if (get.getConsistency() == null){
366        get.setConsistency(DEFAULT_CONSISTENCY);
367      }
368    }
369
370    if (get.getConsistency() == Consistency.STRONG) {
371      final Get configuredGet = get;
372      ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(),
373          get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) {
374        @Override
375        protected Result rpcCall() throws Exception {
376          ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
377              getLocation().getRegionInfo().getRegionName(), configuredGet);
378          ClientProtos.GetResponse response = doGet(request);
379          return response == null? null:
380            ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
381        }
382      };
383      return rpcCallerFactory.<Result>newCaller(readRpcTimeoutMs).callWithRetries(callable,
384          this.operationTimeoutMs);
385    }
386
387    // Call that takes into account the replica
388    RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
389        rpcControllerFactory, tableName, this.connection, get, pool,
390        connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs,
391        connConfiguration.getPrimaryCallTimeoutMicroSecond());
392    return callable.call(operationTimeoutMs);
393  }
394
395  @Override
396  public Result[] get(List<Get> gets) throws IOException {
397    if (gets.size() == 1) {
398      return new Result[]{get(gets.get(0))};
399    }
400    try {
401      Object[] r1 = new Object[gets.size()];
402      batch((List<? extends Row>)gets, r1, readRpcTimeoutMs);
403      // Translate.
404      Result [] results = new Result[r1.length];
405      int i = 0;
406      for (Object obj: r1) {
407        // Batch ensures if there is a failure we get an exception instead
408        results[i++] = (Result)obj;
409      }
410      return results;
411    } catch (InterruptedException e) {
412      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
413    }
414  }
415
416  @Override
417  public void batch(final List<? extends Row> actions, final Object[] results)
418      throws InterruptedException, IOException {
419    int rpcTimeout = writeRpcTimeoutMs;
420    boolean hasRead = false;
421    boolean hasWrite = false;
422    for (Row action : actions) {
423      if (action instanceof Mutation) {
424        hasWrite = true;
425      } else {
426        hasRead = true;
427      }
428      if (hasRead && hasWrite) {
429        break;
430      }
431    }
432    if (hasRead && !hasWrite) {
433      rpcTimeout = readRpcTimeoutMs;
434    }
435    batch(actions, results, rpcTimeout);
436  }
437
438  public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
439      throws InterruptedException, IOException {
440    AsyncProcessTask task = AsyncProcessTask.newBuilder()
441            .setPool(pool)
442            .setTableName(tableName)
443            .setRowAccess(actions)
444            .setResults(results)
445            .setRpcTimeout(rpcTimeout)
446            .setOperationTimeout(operationTimeoutMs)
447            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
448            .build();
449    AsyncRequestFuture ars = multiAp.submit(task);
450    ars.waitUntilDone();
451    if (ars.hasError()) {
452      throw ars.getErrors();
453    }
454  }
455
456  @Override
457  public <R> void batchCallback(
458    final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
459    throws IOException, InterruptedException {
460    doBatchWithCallback(actions, results, callback, connection, pool, tableName);
461  }
462
463  public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
464    Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
465    throws InterruptedIOException, RetriesExhaustedWithDetailsException {
466    int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
467    int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
468        connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
469            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
470    AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback)
471            .setPool(pool)
472            .setTableName(tableName)
473            .setRowAccess(actions)
474            .setResults(results)
475            .setOperationTimeout(operationTimeout)
476            .setRpcTimeout(writeTimeout)
477            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
478            .build();
479    AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
480    ars.waitUntilDone();
481    if (ars.hasError()) {
482      throw ars.getErrors();
483    }
484  }
485
486  @Override
487  public void delete(final Delete delete) throws IOException {
488    ClientServiceCallable<Void> callable =
489        new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(),
490            this.rpcControllerFactory.newController(), delete.getPriority()) {
491      @Override
492      protected Void rpcCall() throws Exception {
493        MutateRequest request = RequestConverter
494            .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete);
495        doMutate(request);
496        return null;
497      }
498    };
499    rpcCallerFactory.<Void>newCaller(this.writeRpcTimeoutMs)
500        .callWithRetries(callable, this.operationTimeoutMs);
501  }
502
503  @Override
504  public void delete(final List<Delete> deletes)
505  throws IOException {
506    Object[] results = new Object[deletes.size()];
507    try {
508      batch(deletes, results, writeRpcTimeoutMs);
509    } catch (InterruptedException e) {
510      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
511    } finally {
512      // TODO: to be consistent with batch put(), do not modify input list
513      // mutate list so that it is empty for complete success, or contains only failed records
514      // results are returned in the same order as the requests in list walk the list backwards,
515      // so we can remove from list without impacting the indexes of earlier members
516      for (int i = results.length - 1; i>=0; i--) {
517        // if result is not null, it succeeded
518        if (results[i] instanceof Result) {
519          deletes.remove(i);
520        }
521      }
522    }
523  }
524
525  @Override
526  public void put(final Put put) throws IOException {
527    validatePut(put);
528    ClientServiceCallable<Void> callable =
529        new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(),
530            this.rpcControllerFactory.newController(), put.getPriority()) {
531      @Override
532      protected Void rpcCall() throws Exception {
533        MutateRequest request =
534            RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put);
535        doMutate(request);
536        return null;
537      }
538    };
539    rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
540        this.operationTimeoutMs);
541  }
542
543  @Override
544  public void put(final List<Put> puts) throws IOException {
545    for (Put put : puts) {
546      validatePut(put);
547    }
548    Object[] results = new Object[puts.size()];
549    try {
550      batch(puts, results, writeRpcTimeoutMs);
551    } catch (InterruptedException e) {
552      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
553    }
554  }
555
556  @Override
557  public Result mutateRow(final RowMutations rm) throws IOException {
558    CancellableRegionServerCallable<MultiResponse> callable =
559      new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
560          rpcControllerFactory.newController(), writeRpcTimeoutMs,
561          new RetryingTimeTracker().start(), rm.getMaxPriority()) {
562      @Override
563      protected MultiResponse rpcCall() throws Exception {
564        RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
565            getLocation().getRegionInfo().getRegionName(), rm);
566        regionMutationBuilder.setAtomic(true);
567        MultiRequest request =
568            MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
569        ClientProtos.MultiResponse response = doMulti(request);
570        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
571        if (res.hasException()) {
572          Throwable ex = ProtobufUtil.toException(res.getException());
573          if (ex instanceof IOException) {
574            throw (IOException) ex;
575          }
576          throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
577        }
578        return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
579      }
580    };
581    Object[] results = new Object[rm.getMutations().size()];
582    AsyncProcessTask task = AsyncProcessTask.newBuilder()
583      .setPool(pool)
584      .setTableName(tableName)
585      .setRowAccess(rm.getMutations())
586      .setCallable(callable)
587      .setRpcTimeout(writeRpcTimeoutMs)
588      .setOperationTimeout(operationTimeoutMs)
589      .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
590      .setResults(results)
591      .build();
592    AsyncRequestFuture ars = multiAp.submit(task);
593    ars.waitUntilDone();
594    if (ars.hasError()) {
595      throw ars.getErrors();
596    }
597    return (Result) results[0];
598  }
599
600  @Override
601  public Result append(final Append append) throws IOException {
602    checkHasFamilies(append);
603    NoncedRegionServerCallable<Result> callable =
604        new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(),
605            this.rpcControllerFactory.newController(), append.getPriority()) {
606      @Override
607      protected Result rpcCall() throws Exception {
608        MutateRequest request = RequestConverter.buildMutateRequest(
609          getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
610        MutateResponse response = doMutate(request);
611        if (!response.hasResult()) return null;
612        return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
613      }
614    };
615    return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs).
616        callWithRetries(callable, this.operationTimeoutMs);
617  }
618
619  @Override
620  public Result increment(final Increment increment) throws IOException {
621    checkHasFamilies(increment);
622    NoncedRegionServerCallable<Result> callable =
623        new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(),
624            this.rpcControllerFactory.newController(), increment.getPriority()) {
625      @Override
626      protected Result rpcCall() throws Exception {
627        MutateRequest request = RequestConverter.buildMutateRequest(
628          getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
629        MutateResponse response = doMutate(request);
630        // Should this check for null like append does?
631        return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
632      }
633    };
634    return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable,
635        this.operationTimeoutMs);
636  }
637
638  @Override
639  public long incrementColumnValue(final byte [] row, final byte [] family,
640      final byte [] qualifier, final long amount)
641  throws IOException {
642    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
643  }
644
645  @Override
646  public long incrementColumnValue(final byte [] row, final byte [] family,
647      final byte [] qualifier, final long amount, final Durability durability)
648  throws IOException {
649    NullPointerException npe = null;
650    if (row == null) {
651      npe = new NullPointerException("row is null");
652    } else if (family == null) {
653      npe = new NullPointerException("family is null");
654    }
655    if (npe != null) {
656      throw new IOException(
657          "Invalid arguments to incrementColumnValue", npe);
658    }
659
660    NoncedRegionServerCallable<Long> callable =
661        new NoncedRegionServerCallable<Long>(this.connection, getName(), row,
662            this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
663      @Override
664      protected Long rpcCall() throws Exception {
665        MutateRequest request = RequestConverter.buildIncrementRequest(
666          getLocation().getRegionInfo().getRegionName(), row, family,
667          qualifier, amount, durability, getNonceGroup(), getNonce());
668        MutateResponse response = doMutate(request);
669        Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
670        return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
671      }
672    };
673    return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs).
674        callWithRetries(callable, this.operationTimeoutMs);
675  }
676
677  @Override
678  @Deprecated
679  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
680      final byte [] value, final Put put) throws IOException {
681    return doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
682      put).isSuccess();
683  }
684
685  @Override
686  @Deprecated
687  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
688      final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
689    return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
690      null, put).isSuccess();
691  }
692
693  @Override
694  @Deprecated
695  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
696      final CompareOperator op, final byte [] value, final Put put) throws IOException {
697    // The name of the operators in CompareOperator are intentionally those of the
698    // operators in the filter's CompareOp enum.
699    return doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess();
700  }
701
702  @Override
703  @Deprecated
704  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
705    final byte[] value, final Delete delete) throws IOException {
706    return doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null,
707      null, delete).isSuccess();
708  }
709
710  @Override
711  @Deprecated
712  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
713    final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
714    return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
715      null, delete).isSuccess();
716  }
717
718  @Override
719  @Deprecated
720  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
721    final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
722    return doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess();
723  }
724
725  @Override
726  @Deprecated
727  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
728    return new CheckAndMutateBuilderImpl(row, family);
729  }
730
731  @Override
732  @Deprecated
733  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
734    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
735  }
736
737  private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
738    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
739    final TimeRange timeRange, final RowMutations rm) throws IOException {
740    CancellableRegionServerCallable<MultiResponse> callable =
741    new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
742    rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
743        rm.getMaxPriority()) {
744      @Override
745      protected MultiResponse rpcCall() throws Exception {
746        MultiRequest request = RequestConverter
747          .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
748            qualifier, op, value, filter, timeRange, rm);
749        ClientProtos.MultiResponse response = doMulti(request);
750        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
751        if (res.hasException()) {
752          Throwable ex = ProtobufUtil.toException(res.getException());
753          if (ex instanceof IOException) {
754            throw (IOException) ex;
755          }
756          throw new IOException(
757            "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
758        }
759        return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
760      }
761    };
762
763    /**
764     *  Currently, we use one array to store 'processed' flag which is returned by server.
765     *  It is excessive to send such a large array, but that is required by the framework right now
766     * */
767    Object[] results = new Object[rm.getMutations().size()];
768    AsyncProcessTask task = AsyncProcessTask.newBuilder()
769    .setPool(pool)
770    .setTableName(tableName)
771    .setRowAccess(rm.getMutations())
772    .setResults(results)
773    .setCallable(callable)
774    // TODO any better timeout?
775    .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
776    .setOperationTimeout(operationTimeoutMs)
777    .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
778    .build();
779    AsyncRequestFuture ars = multiAp.submit(task);
780    ars.waitUntilDone();
781    if (ars.hasError()) {
782      throw ars.getErrors();
783    }
784
785    return (CheckAndMutateResult) results[0];
786  }
787
788  @Override
789  @Deprecated
790  public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
791    final CompareOp compareOp, final byte [] value, final RowMutations rm)
792  throws IOException {
793    return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
794      null, rm).isSuccess();
795  }
796
797  @Override
798  @Deprecated
799  public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
800      final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
801    return doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess();
802  }
803
804  @Override
805  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
806    Row action = checkAndMutate.getAction();
807    if (action instanceof Put || action instanceof Delete || action instanceof Increment ||
808      action instanceof Append) {
809      if (action instanceof Put) {
810        validatePut((Put) action);
811      }
812      return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
813        checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
814        checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action);
815    } else {
816      return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
817        checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
818        checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action);
819    }
820  }
821
822  private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
823    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
824    final TimeRange timeRange, final Mutation mutation) throws IOException {
825    ClientServiceCallable<CheckAndMutateResult> callable =
826      new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
827        this.rpcControllerFactory.newController(), mutation.getPriority()) {
828        @Override
829        protected CheckAndMutateResult rpcCall() throws Exception {
830          MutateRequest request = RequestConverter.buildMutateRequest(
831            getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
832            filter, timeRange, mutation);
833          MutateResponse response = doMutate(request);
834          if (response.hasResult()) {
835            return new CheckAndMutateResult(response.getProcessed(),
836              ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()));
837          }
838          return new CheckAndMutateResult(response.getProcessed(), null);
839        }
840      };
841    return rpcCallerFactory.<CheckAndMutateResult> newCaller(this.writeRpcTimeoutMs)
842      .callWithRetries(callable, this.operationTimeoutMs);
843  }
844
845  @Override
846  public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
847    throws IOException {
848    if (checkAndMutates.isEmpty()) {
849      return Collections.emptyList();
850    }
851    if (checkAndMutates.size() == 1) {
852      return Collections.singletonList(checkAndMutate(checkAndMutates.get(0)));
853    }
854
855    Object[] results = new Object[checkAndMutates.size()];
856    try {
857      batch(checkAndMutates, results, writeRpcTimeoutMs);
858    } catch (InterruptedException e) {
859      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
860    }
861
862    // translate.
863    List<CheckAndMutateResult> ret = new ArrayList<>(results.length);
864    for (Object r : results) {
865      // Batch ensures if there is a failure we get an exception instead
866      ret.add((CheckAndMutateResult) r);
867    }
868    return ret;
869  }
870
871  private CompareOperator toCompareOperator(CompareOp compareOp) {
872    switch (compareOp) {
873      case LESS:
874        return CompareOperator.LESS;
875
876      case LESS_OR_EQUAL:
877        return CompareOperator.LESS_OR_EQUAL;
878
879      case EQUAL:
880        return CompareOperator.EQUAL;
881
882      case NOT_EQUAL:
883        return CompareOperator.NOT_EQUAL;
884
885      case GREATER_OR_EQUAL:
886        return CompareOperator.GREATER_OR_EQUAL;
887
888      case GREATER:
889        return CompareOperator.GREATER;
890
891      case NO_OP:
892        return CompareOperator.NO_OP;
893
894      default:
895        throw new AssertionError();
896    }
897  }
898
899  @Override
900  public boolean exists(final Get get) throws IOException {
901    Result r = get(get, true);
902    assert r.getExists() != null;
903    return r.getExists();
904  }
905
906  @Override
907  public boolean[] exists(List<Get> gets) throws IOException {
908    if (gets.isEmpty()) return new boolean[]{};
909    if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
910
911    ArrayList<Get> exists = new ArrayList<>(gets.size());
912    for (Get g: gets){
913      Get ge = new Get(g);
914      ge.setCheckExistenceOnly(true);
915      exists.add(ge);
916    }
917
918    Object[] r1= new Object[exists.size()];
919    try {
920      batch(exists, r1, readRpcTimeoutMs);
921    } catch (InterruptedException e) {
922      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
923    }
924
925    // translate.
926    boolean[] results = new boolean[r1.length];
927    int i = 0;
928    for (Object o : r1) {
929      // batch ensures if there is a failure we get an exception instead
930      results[i++] = ((Result)o).getExists();
931    }
932
933    return results;
934  }
935
936  /**
937   * Process a mixed batch of Get, Put and Delete actions. All actions for a
938   * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
939   *
940   * @param list The collection of actions.
941   * @param results An empty array, same size as list. If an exception is thrown,
942   *   you can test here for partial results, and to determine which actions
943   *   processed successfully.
944   * @throws IOException if there are problems talking to META. Per-item
945   *   exceptions are stored in the results array.
946   */
947  public <R> void processBatchCallback(
948    final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
949    throws IOException, InterruptedException {
950    this.batchCallback(list, results, callback);
951  }
952
953  @Override
954  public void close() throws IOException {
955    if (this.closed) {
956      return;
957    }
958    if (cleanupPoolOnClose) {
959      this.pool.shutdown();
960      try {
961        boolean terminated = false;
962        do {
963          // wait until the pool has terminated
964          terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
965        } while (!terminated);
966      } catch (InterruptedException e) {
967        this.pool.shutdownNow();
968        LOG.warn("waitForTermination interrupted");
969      }
970    }
971    this.closed = true;
972  }
973
974  // validate for well-formedness
975  private void validatePut(final Put put) throws IllegalArgumentException {
976    ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize());
977  }
978
979  /**
980   * The pool is used for mutli requests for this HTable
981   * @return the pool used for mutli
982   */
983  ExecutorService getPool() {
984    return this.pool;
985  }
986
987  /**
988   * Explicitly clears the region cache to fetch the latest value from META.
989   * This is a power user function: avoid unless you know the ramifications.
990   */
991  public void clearRegionCache() {
992    this.connection.clearRegionLocationCache();
993  }
994
995  @Override
996  public CoprocessorRpcChannel coprocessorService(byte[] row) {
997    return new RegionCoprocessorRpcChannel(connection, tableName, row);
998  }
999
1000  @Override
1001  public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1002      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1003      throws ServiceException, Throwable {
1004    final Map<byte[],R> results =  Collections.synchronizedMap(
1005        new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1006    coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1007      @Override
1008      public void update(byte[] region, byte[] row, R value) {
1009        if (region != null) {
1010          results.put(region, value);
1011        }
1012      }
1013    });
1014    return results;
1015  }
1016
1017  @Override
1018  public <T extends Service, R> void coprocessorService(final Class<T> service,
1019      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1020      final Batch.Callback<R> callback) throws ServiceException, Throwable {
1021    // get regions covered by the row range
1022    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1023    Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1024    for (final byte[] r : keys) {
1025      final RegionCoprocessorRpcChannel channel =
1026          new RegionCoprocessorRpcChannel(connection, tableName, r);
1027      Future<R> future = pool.submit(new Callable<R>() {
1028        @Override
1029        public R call() throws Exception {
1030          T instance =
1031              org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel);
1032          R result = callable.call(instance);
1033          byte[] region = channel.getLastRegion();
1034          if (callback != null) {
1035            callback.update(region, r, result);
1036          }
1037          return result;
1038        }
1039      });
1040      futures.put(r, future);
1041    }
1042    for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1043      try {
1044        e.getValue().get();
1045      } catch (ExecutionException ee) {
1046        LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1047            + Bytes.toStringBinary(e.getKey()), ee);
1048        throw ee.getCause();
1049      } catch (InterruptedException ie) {
1050        throw new InterruptedIOException("Interrupted calling coprocessor service "
1051            + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1052      }
1053    }
1054  }
1055
1056  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1057  throws IOException {
1058    if (start == null) {
1059      start = HConstants.EMPTY_START_ROW;
1060    }
1061    if (end == null) {
1062      end = HConstants.EMPTY_END_ROW;
1063    }
1064    return getKeysAndRegionsInRange(start, end, true).getFirst();
1065  }
1066
1067  @Override
1068  public long getRpcTimeout(TimeUnit unit) {
1069    return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS);
1070  }
1071
1072  @Override
1073  @Deprecated
1074  public int getRpcTimeout() {
1075    return rpcTimeoutMs;
1076  }
1077
1078  @Override
1079  @Deprecated
1080  public void setRpcTimeout(int rpcTimeout) {
1081    setReadRpcTimeout(rpcTimeout);
1082    setWriteRpcTimeout(rpcTimeout);
1083  }
1084
1085  @Override
1086  public long getReadRpcTimeout(TimeUnit unit) {
1087    return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS);
1088  }
1089
1090  @Override
1091  @Deprecated
1092  public int getReadRpcTimeout() {
1093    return readRpcTimeoutMs;
1094  }
1095
1096  @Override
1097  @Deprecated
1098  public void setReadRpcTimeout(int readRpcTimeout) {
1099    this.readRpcTimeoutMs = readRpcTimeout;
1100  }
1101
1102  @Override
1103  public long getWriteRpcTimeout(TimeUnit unit) {
1104    return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS);
1105  }
1106
1107  @Override
1108  @Deprecated
1109  public int getWriteRpcTimeout() {
1110    return writeRpcTimeoutMs;
1111  }
1112
1113  @Override
1114  @Deprecated
1115  public void setWriteRpcTimeout(int writeRpcTimeout) {
1116    this.writeRpcTimeoutMs = writeRpcTimeout;
1117  }
1118
1119  @Override
1120  public long getOperationTimeout(TimeUnit unit) {
1121    return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS);
1122  }
1123
1124  @Override
1125  @Deprecated
1126  public int getOperationTimeout() {
1127    return operationTimeoutMs;
1128  }
1129
1130  @Override
1131  @Deprecated
1132  public void setOperationTimeout(int operationTimeout) {
1133    this.operationTimeoutMs = operationTimeout;
1134  }
1135
1136  @Override
1137  public String toString() {
1138    return tableName + ";" + connection;
1139  }
1140
1141  @Override
1142  public <R extends Message> Map<byte[], R> batchCoprocessorService(
1143      Descriptors.MethodDescriptor methodDescriptor, Message request,
1144      byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1145    final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1146        Bytes.BYTES_COMPARATOR));
1147    batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1148        new Callback<R>() {
1149      @Override
1150      public void update(byte[] region, byte[] row, R result) {
1151        if (region != null) {
1152          results.put(region, result);
1153        }
1154      }
1155    });
1156    return results;
1157  }
1158
1159  @Override
1160  public <R extends Message> void batchCoprocessorService(
1161      final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1162      byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1163      throws ServiceException, Throwable {
1164
1165    if (startKey == null) {
1166      startKey = HConstants.EMPTY_START_ROW;
1167    }
1168    if (endKey == null) {
1169      endKey = HConstants.EMPTY_END_ROW;
1170    }
1171    // get regions covered by the row range
1172    Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1173        getKeysAndRegionsInRange(startKey, endKey, true);
1174    List<byte[]> keys = keysAndRegions.getFirst();
1175    List<HRegionLocation> regions = keysAndRegions.getSecond();
1176
1177    // check if we have any calls to make
1178    if (keys.isEmpty()) {
1179      LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1180          ", end=" + Bytes.toStringBinary(endKey));
1181      return;
1182    }
1183
1184    List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size());
1185    final Map<byte[], RegionCoprocessorServiceExec> execsByRow = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1186    for (int i = 0; i < keys.size(); i++) {
1187      final byte[] rowKey = keys.get(i);
1188      final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1189      RegionCoprocessorServiceExec exec =
1190          new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1191      execs.add(exec);
1192      execsByRow.put(rowKey, exec);
1193    }
1194
1195    // tracking for any possible deserialization errors on success callback
1196    // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1197    final List<Throwable> callbackErrorExceptions = new ArrayList<>();
1198    final List<Row> callbackErrorActions = new ArrayList<>();
1199    final List<String> callbackErrorServers = new ArrayList<>();
1200    Object[] results = new Object[execs.size()];
1201
1202    AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
1203        RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1204        RpcControllerFactory.instantiate(configuration));
1205
1206    Callback<ClientProtos.CoprocessorServiceResult> resultsCallback
1207    = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
1208      if (LOG.isTraceEnabled()) {
1209        LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1210            ": region=" + Bytes.toStringBinary(region) +
1211            ", row=" + Bytes.toStringBinary(row) +
1212            ", value=" + serviceResult.getValue().getValue());
1213      }
1214      try {
1215        Message.Builder builder = responsePrototype.newBuilderForType();
1216        org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
1217            serviceResult.getValue().getValue().toByteArray());
1218        callback.update(region, row, (R) builder.build());
1219      } catch (IOException e) {
1220        LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1221            e);
1222        callbackErrorExceptions.add(e);
1223        callbackErrorActions.add(execsByRow.get(row));
1224        callbackErrorServers.add("null");
1225      }
1226    };
1227    AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task =
1228        AsyncProcessTask.newBuilder(resultsCallback)
1229            .setPool(pool)
1230            .setTableName(tableName)
1231            .setRowAccess(execs)
1232            .setResults(results)
1233            .setRpcTimeout(readRpcTimeoutMs)
1234            .setOperationTimeout(operationTimeoutMs)
1235            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
1236            .build();
1237    AsyncRequestFuture future = asyncProcess.submit(task);
1238    future.waitUntilDone();
1239
1240    if (future.hasError()) {
1241      throw future.getErrors();
1242    } else if (!callbackErrorExceptions.isEmpty()) {
1243      throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1244          callbackErrorServers);
1245    }
1246  }
1247
1248  @Override
1249  public RegionLocator getRegionLocator() {
1250    return this.locator;
1251  }
1252
1253  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
1254
1255    private final byte[] row;
1256    private final byte[] family;
1257    private byte[] qualifier;
1258    private TimeRange timeRange;
1259    private CompareOperator op;
1260    private byte[] value;
1261
1262    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
1263      this.row = Preconditions.checkNotNull(row, "row is null");
1264      this.family = Preconditions.checkNotNull(family, "family is null");
1265    }
1266
1267    @Override
1268    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
1269      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
1270          " an empty byte array, or just do not call this method if you want a null qualifier");
1271      return this;
1272    }
1273
1274    @Override
1275    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
1276      this.timeRange = timeRange;
1277      return this;
1278    }
1279
1280    @Override
1281    public CheckAndMutateBuilder ifNotExists() {
1282      this.op = CompareOperator.EQUAL;
1283      this.value = null;
1284      return this;
1285    }
1286
1287    @Override
1288    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
1289      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
1290      this.value = Preconditions.checkNotNull(value, "value is null");
1291      return this;
1292    }
1293
1294    private void preCheck() {
1295      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
1296          " calling ifNotExists/ifEquals/ifMatches before executing the request");
1297    }
1298
1299    @Override
1300    public boolean thenPut(Put put) throws IOException {
1301      validatePut(put);
1302      preCheck();
1303      return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put)
1304        .isSuccess();
1305    }
1306
1307    @Override
1308    public boolean thenDelete(Delete delete) throws IOException {
1309      preCheck();
1310      return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete)
1311        .isSuccess();
1312    }
1313
1314    @Override
1315    public boolean thenMutate(RowMutations mutation) throws IOException {
1316      preCheck();
1317      return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange,
1318        mutation).isSuccess();
1319    }
1320  }
1321
1322  private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder {
1323
1324    private final byte[] row;
1325    private final Filter filter;
1326    private TimeRange timeRange;
1327
1328    CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
1329      this.row = Preconditions.checkNotNull(row, "row is null");
1330      this.filter = Preconditions.checkNotNull(filter, "filter is null");
1331    }
1332
1333    @Override
1334    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
1335      this.timeRange = timeRange;
1336      return this;
1337    }
1338
1339    @Override
1340    public boolean thenPut(Put put) throws IOException {
1341      validatePut(put);
1342      return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put).isSuccess();
1343    }
1344
1345    @Override
1346    public boolean thenDelete(Delete delete) throws IOException {
1347      return doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess();
1348    }
1349
1350    @Override
1351    public boolean thenMutate(RowMutations mutation) throws IOException {
1352      return doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation)
1353        .isSuccess();
1354    }
1355  }
1356}