001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
022// Internally, we use shaded protobuf. This below are part of our public API.
023//SEE ABOVE NOTE!
024import com.google.protobuf.Descriptors;
025import com.google.protobuf.Message;
026import com.google.protobuf.Service;
027import com.google.protobuf.ServiceException;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CompareOperator;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.HTableDescriptor;
035import org.apache.hadoop.hbase.KeyValueUtil;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.io.TimeRange;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.hadoop.hbase.client.coprocessor.Batch;
043import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
044import org.apache.hadoop.hbase.filter.BinaryComparator;
045import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
046import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
047import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
051import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
052import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.hbase.util.ReflectionUtils;
062import org.apache.hadoop.hbase.util.Threads;
063
064import java.io.IOException;
065import java.io.InterruptedIOException;
066import java.util.ArrayList;
067import java.util.Collections;
068import java.util.List;
069import java.util.Map;
070import java.util.TreeMap;
071import java.util.concurrent.Callable;
072import java.util.concurrent.ExecutionException;
073import java.util.concurrent.ExecutorService;
074import java.util.concurrent.Future;
075import java.util.concurrent.SynchronousQueue;
076import java.util.concurrent.ThreadPoolExecutor;
077import java.util.concurrent.TimeUnit;
078
079import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
080
081/**
082 * An implementation of {@link Table}. Used to communicate with a single HBase table.
083 * Lightweight. Get as needed and just close when done.
084 * Instances of this class SHOULD NOT be constructed directly.
085 * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
086 * class comment for an example of how.
087 *
088 * <p>This class is thread safe since 2.0.0 if not invoking any of the setter methods.
089 * All setters are moved into {@link TableBuilder} and reserved here only for keeping
090 * backward compatibility, and TODO will be removed soon.
091 *
092 * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
093 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
094 * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
095 * Interface Classification</a>
096 * There are no guarantees for backwards source / binary compatibility and methods or class can
097 * change or go away without deprecation.
098 *
099 * @see Table
100 * @see Admin
101 * @see Connection
102 * @see ConnectionFactory
103 */
104@InterfaceAudience.Private
105@InterfaceStability.Stable
106public class HTable implements Table {
107  private static final Logger LOG = LoggerFactory.getLogger(HTable.class);
108  private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
109  private final ClusterConnection connection;
110  private final TableName tableName;
111  private final Configuration configuration;
112  private final ConnectionConfiguration connConfiguration;
113  private boolean closed = false;
114  private final int scannerCaching;
115  private final long scannerMaxResultSize;
116  private final ExecutorService pool;  // For Multi & Scan
117  private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc
118  private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX
119  private int readRpcTimeoutMs; // timeout for each read rpc request
120  private int writeRpcTimeoutMs; // timeout for each write rpc request
121  private final boolean cleanupPoolOnClose; // shutdown the pool in close()
122  private final HRegionLocator locator;
123
124  /** The Async process for batch */
125  @VisibleForTesting
126  AsyncProcess multiAp;
127  private final RpcRetryingCallerFactory rpcCallerFactory;
128  private final RpcControllerFactory rpcControllerFactory;
129
130  // Marked Private @since 1.0
131  @InterfaceAudience.Private
132  public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
133    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
134    if (maxThreads == 0) {
135      maxThreads = 1; // is there a better default?
136    }
137    int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
138    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
139
140    // Using the "direct handoff" approach, new threads will only be created
141    // if it is necessary and will grow unbounded. This could be bad but in HCM
142    // we only create as many Runnables as there are region servers. It means
143    // it also scales when new region servers are added.
144    ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
145      TimeUnit.SECONDS, new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable"));
146    pool.allowCoreThreadTimeOut(true);
147    return pool;
148  }
149
150  /**
151   * Creates an object to access a HBase table.
152   * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
153   * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
154   * @param connection Connection to be used.
155   * @param builder The table builder
156   * @param rpcCallerFactory The RPC caller factory
157   * @param rpcControllerFactory The RPC controller factory
158   * @param pool ExecutorService to be used.
159   */
160  @InterfaceAudience.Private
161  protected HTable(final ClusterConnection connection,
162      final TableBuilderBase builder,
163      final RpcRetryingCallerFactory rpcCallerFactory,
164      final RpcControllerFactory rpcControllerFactory,
165      final ExecutorService pool) {
166    this.connection = Preconditions.checkNotNull(connection, "connection is null");
167    this.configuration = connection.getConfiguration();
168    this.connConfiguration = connection.getConnectionConfiguration();
169    if (pool == null) {
170      this.pool = getDefaultExecutor(this.configuration);
171      this.cleanupPoolOnClose = true;
172    } else {
173      this.pool = pool;
174      this.cleanupPoolOnClose = false;
175    }
176    if (rpcCallerFactory == null) {
177      this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
178    } else {
179      this.rpcCallerFactory = rpcCallerFactory;
180    }
181
182    if (rpcControllerFactory == null) {
183      this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
184    } else {
185      this.rpcControllerFactory = rpcControllerFactory;
186    }
187
188    this.tableName = builder.tableName;
189    this.operationTimeoutMs = builder.operationTimeout;
190    this.rpcTimeoutMs = builder.rpcTimeout;
191    this.readRpcTimeoutMs = builder.readRpcTimeout;
192    this.writeRpcTimeoutMs = builder.writeRpcTimeout;
193    this.scannerCaching = connConfiguration.getScannerCaching();
194    this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
195
196    // puts need to track errors globally due to how the APIs currently work.
197    multiAp = this.connection.getAsyncProcess();
198    this.locator = new HRegionLocator(tableName, connection);
199  }
200
201  /**
202   * @return maxKeyValueSize from configuration.
203   */
204  public static int getMaxKeyValueSize(Configuration conf) {
205    return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1);
206  }
207
208  @Override
209  public Configuration getConfiguration() {
210    return configuration;
211  }
212
213  @Override
214  public TableName getName() {
215    return tableName;
216  }
217
218  /**
219   * <em>INTERNAL</em> Used by unit tests and tools to do low-level
220   * manipulations.
221   * @return A Connection instance.
222   */
223  @VisibleForTesting
224  protected Connection getConnection() {
225    return this.connection;
226  }
227
228  @Override
229  @Deprecated
230  public HTableDescriptor getTableDescriptor() throws IOException {
231    HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory,
232      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
233    if (htd != null) {
234      return new ImmutableHTableDescriptor(htd);
235    }
236    return null;
237  }
238
239  @Override
240  public TableDescriptor getDescriptor() throws IOException {
241    return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
242      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
243  }
244
245  /**
246   * Get the corresponding start keys and regions for an arbitrary range of
247   * keys.
248   * <p>
249   * @param startKey Starting row in range, inclusive
250   * @param endKey Ending row in range
251   * @param includeEndKey true if endRow is inclusive, false if exclusive
252   * @return A pair of list of start keys and list of HRegionLocations that
253   *         contain the specified range
254   * @throws IOException if a remote or network exception occurs
255   */
256  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
257      final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
258      throws IOException {
259    return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
260  }
261
262  /**
263   * Get the corresponding start keys and regions for an arbitrary range of
264   * keys.
265   * <p>
266   * @param startKey Starting row in range, inclusive
267   * @param endKey Ending row in range
268   * @param includeEndKey true if endRow is inclusive, false if exclusive
269   * @param reload true to reload information or false to use cached information
270   * @return A pair of list of start keys and list of HRegionLocations that
271   *         contain the specified range
272   * @throws IOException if a remote or network exception occurs
273   */
274  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
275      final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
276      final boolean reload) throws IOException {
277    final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
278    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
279      throw new IllegalArgumentException(
280        "Invalid range: " + Bytes.toStringBinary(startKey) +
281        " > " + Bytes.toStringBinary(endKey));
282    }
283    List<byte[]> keysInRange = new ArrayList<>();
284    List<HRegionLocation> regionsInRange = new ArrayList<>();
285    byte[] currentKey = startKey;
286    do {
287      HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
288      keysInRange.add(currentKey);
289      regionsInRange.add(regionLocation);
290      currentKey = regionLocation.getRegionInfo().getEndKey();
291    } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
292        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
293            || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
294    return new Pair<>(keysInRange, regionsInRange);
295  }
296
297  /**
298   * The underlying {@link HTable} must not be closed.
299   * {@link Table#getScanner(Scan)} has other usage details.
300   */
301  @Override
302  public ResultScanner getScanner(Scan scan) throws IOException {
303    if (scan.getCaching() <= 0) {
304      scan.setCaching(scannerCaching);
305    }
306    if (scan.getMaxResultSize() <= 0) {
307      scan.setMaxResultSize(scannerMaxResultSize);
308    }
309    if (scan.getMvccReadPoint() > 0) {
310      // it is not supposed to be set by user, clear
311      scan.resetMvccReadPoint();
312    }
313    Boolean async = scan.isAsyncPrefetch();
314    if (async == null) {
315      async = connConfiguration.isClientScannerAsyncPrefetch();
316    }
317
318    if (scan.isReversed()) {
319      return new ReversedClientScanner(getConfiguration(), scan, getName(),
320        this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
321        pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
322    } else {
323      if (async) {
324        return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,
325            this.rpcCallerFactory, this.rpcControllerFactory,
326            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
327      } else {
328        return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection,
329            this.rpcCallerFactory, this.rpcControllerFactory,
330            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
331      }
332    }
333  }
334
335  /**
336   * The underlying {@link HTable} must not be closed.
337   * {@link Table#getScanner(byte[])} has other usage details.
338   */
339  @Override
340  public ResultScanner getScanner(byte [] family) throws IOException {
341    Scan scan = new Scan();
342    scan.addFamily(family);
343    return getScanner(scan);
344  }
345
346  /**
347   * The underlying {@link HTable} must not be closed.
348   * {@link Table#getScanner(byte[], byte[])} has other usage details.
349   */
350  @Override
351  public ResultScanner getScanner(byte [] family, byte [] qualifier)
352  throws IOException {
353    Scan scan = new Scan();
354    scan.addColumn(family, qualifier);
355    return getScanner(scan);
356  }
357
358  @Override
359  public Result get(final Get get) throws IOException {
360    return get(get, get.isCheckExistenceOnly());
361  }
362
363  private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
364    // if we are changing settings to the get, clone it.
365    if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
366      get = ReflectionUtils.newInstance(get.getClass(), get);
367      get.setCheckExistenceOnly(checkExistenceOnly);
368      if (get.getConsistency() == null){
369        get.setConsistency(DEFAULT_CONSISTENCY);
370      }
371    }
372
373    if (get.getConsistency() == Consistency.STRONG) {
374      final Get configuredGet = get;
375      ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(),
376          get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) {
377        @Override
378        protected Result rpcCall() throws Exception {
379          ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
380              getLocation().getRegionInfo().getRegionName(), configuredGet);
381          ClientProtos.GetResponse response = doGet(request);
382          return response == null? null:
383            ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
384        }
385      };
386      return rpcCallerFactory.<Result>newCaller(readRpcTimeoutMs).callWithRetries(callable,
387          this.operationTimeoutMs);
388    }
389
390    // Call that takes into account the replica
391    RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
392        rpcControllerFactory, tableName, this.connection, get, pool,
393        connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs,
394        connConfiguration.getPrimaryCallTimeoutMicroSecond());
395    return callable.call(operationTimeoutMs);
396  }
397
398  @Override
399  public Result[] get(List<Get> gets) throws IOException {
400    if (gets.size() == 1) {
401      return new Result[]{get(gets.get(0))};
402    }
403    try {
404      Object[] r1 = new Object[gets.size()];
405      batch((List<? extends Row>)gets, r1, readRpcTimeoutMs);
406      // Translate.
407      Result [] results = new Result[r1.length];
408      int i = 0;
409      for (Object obj: r1) {
410        // Batch ensures if there is a failure we get an exception instead
411        results[i++] = (Result)obj;
412      }
413      return results;
414    } catch (InterruptedException e) {
415      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
416    }
417  }
418
419  @Override
420  public void batch(final List<? extends Row> actions, final Object[] results)
421      throws InterruptedException, IOException {
422    int rpcTimeout = writeRpcTimeoutMs;
423    boolean hasRead = false;
424    boolean hasWrite = false;
425    for (Row action : actions) {
426      if (action instanceof Mutation) {
427        hasWrite = true;
428      } else {
429        hasRead = true;
430      }
431      if (hasRead && hasWrite) {
432        break;
433      }
434    }
435    if (hasRead && !hasWrite) {
436      rpcTimeout = readRpcTimeoutMs;
437    }
438    batch(actions, results, rpcTimeout);
439  }
440
441  public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
442      throws InterruptedException, IOException {
443    AsyncProcessTask task = AsyncProcessTask.newBuilder()
444            .setPool(pool)
445            .setTableName(tableName)
446            .setRowAccess(actions)
447            .setResults(results)
448            .setRpcTimeout(rpcTimeout)
449            .setOperationTimeout(operationTimeoutMs)
450            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
451            .build();
452    AsyncRequestFuture ars = multiAp.submit(task);
453    ars.waitUntilDone();
454    if (ars.hasError()) {
455      throw ars.getErrors();
456    }
457  }
458
459  @Override
460  public <R> void batchCallback(
461    final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
462    throws IOException, InterruptedException {
463    doBatchWithCallback(actions, results, callback, connection, pool, tableName);
464  }
465
466  public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
467    Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
468    throws InterruptedIOException, RetriesExhaustedWithDetailsException {
469    int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
470    int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
471        connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
472            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
473    AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback)
474            .setPool(pool)
475            .setTableName(tableName)
476            .setRowAccess(actions)
477            .setResults(results)
478            .setOperationTimeout(operationTimeout)
479            .setRpcTimeout(writeTimeout)
480            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
481            .build();
482    AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
483    ars.waitUntilDone();
484    if (ars.hasError()) {
485      throw ars.getErrors();
486    }
487  }
488
489  @Override
490  public void delete(final Delete delete) throws IOException {
491    ClientServiceCallable<Void> callable =
492        new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(),
493            this.rpcControllerFactory.newController(), delete.getPriority()) {
494      @Override
495      protected Void rpcCall() throws Exception {
496        MutateRequest request = RequestConverter
497            .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete);
498        doMutate(request);
499        return null;
500      }
501    };
502    rpcCallerFactory.<Void>newCaller(this.writeRpcTimeoutMs)
503        .callWithRetries(callable, this.operationTimeoutMs);
504  }
505
506  @Override
507  public void delete(final List<Delete> deletes)
508  throws IOException {
509    Object[] results = new Object[deletes.size()];
510    try {
511      batch(deletes, results, writeRpcTimeoutMs);
512    } catch (InterruptedException e) {
513      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
514    } finally {
515      // TODO: to be consistent with batch put(), do not modify input list
516      // mutate list so that it is empty for complete success, or contains only failed records
517      // results are returned in the same order as the requests in list walk the list backwards,
518      // so we can remove from list without impacting the indexes of earlier members
519      for (int i = results.length - 1; i>=0; i--) {
520        // if result is not null, it succeeded
521        if (results[i] instanceof Result) {
522          deletes.remove(i);
523        }
524      }
525    }
526  }
527
528  @Override
529  public void put(final Put put) throws IOException {
530    validatePut(put);
531    ClientServiceCallable<Void> callable =
532        new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(),
533            this.rpcControllerFactory.newController(), put.getPriority()) {
534      @Override
535      protected Void rpcCall() throws Exception {
536        MutateRequest request =
537            RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put);
538        doMutate(request);
539        return null;
540      }
541    };
542    rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
543        this.operationTimeoutMs);
544  }
545
546  @Override
547  public void put(final List<Put> puts) throws IOException {
548    for (Put put : puts) {
549      validatePut(put);
550    }
551    Object[] results = new Object[puts.size()];
552    try {
553      batch(puts, results, writeRpcTimeoutMs);
554    } catch (InterruptedException e) {
555      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
556    }
557  }
558
559  @Override
560  public void mutateRow(final RowMutations rm) throws IOException {
561    CancellableRegionServerCallable<MultiResponse> callable =
562      new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
563          rpcControllerFactory.newController(), writeRpcTimeoutMs,
564          new RetryingTimeTracker().start(), rm.getMaxPriority()) {
565      @Override
566      protected MultiResponse rpcCall() throws Exception {
567        RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
568            getLocation().getRegionInfo().getRegionName(), rm);
569        regionMutationBuilder.setAtomic(true);
570        MultiRequest request =
571            MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
572        ClientProtos.MultiResponse response = doMulti(request);
573        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
574        if (res.hasException()) {
575          Throwable ex = ProtobufUtil.toException(res.getException());
576          if (ex instanceof IOException) {
577            throw (IOException) ex;
578          }
579          throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
580        }
581        return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
582      }
583    };
584    AsyncProcessTask task = AsyncProcessTask.newBuilder()
585            .setPool(pool)
586            .setTableName(tableName)
587            .setRowAccess(rm.getMutations())
588            .setCallable(callable)
589            .setRpcTimeout(writeRpcTimeoutMs)
590            .setOperationTimeout(operationTimeoutMs)
591            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
592            .build();
593    AsyncRequestFuture ars = multiAp.submit(task);
594    ars.waitUntilDone();
595    if (ars.hasError()) {
596      throw ars.getErrors();
597    }
598  }
599
600  @Override
601  public Result append(final Append append) throws IOException {
602    checkHasFamilies(append);
603    NoncedRegionServerCallable<Result> callable =
604        new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(),
605            this.rpcControllerFactory.newController(), append.getPriority()) {
606      @Override
607      protected Result rpcCall() throws Exception {
608        MutateRequest request = RequestConverter.buildMutateRequest(
609          getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
610        MutateResponse response = doMutate(request);
611        if (!response.hasResult()) return null;
612        return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
613      }
614    };
615    return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs).
616        callWithRetries(callable, this.operationTimeoutMs);
617  }
618
619  @Override
620  public Result increment(final Increment increment) throws IOException {
621    checkHasFamilies(increment);
622    NoncedRegionServerCallable<Result> callable =
623        new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(),
624            this.rpcControllerFactory.newController(), increment.getPriority()) {
625      @Override
626      protected Result rpcCall() throws Exception {
627        MutateRequest request = RequestConverter.buildMutateRequest(
628          getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
629        MutateResponse response = doMutate(request);
630        // Should this check for null like append does?
631        return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
632      }
633    };
634    return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable,
635        this.operationTimeoutMs);
636  }
637
638  @Override
639  public long incrementColumnValue(final byte [] row, final byte [] family,
640      final byte [] qualifier, final long amount)
641  throws IOException {
642    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
643  }
644
645  @Override
646  public long incrementColumnValue(final byte [] row, final byte [] family,
647      final byte [] qualifier, final long amount, final Durability durability)
648  throws IOException {
649    NullPointerException npe = null;
650    if (row == null) {
651      npe = new NullPointerException("row is null");
652    } else if (family == null) {
653      npe = new NullPointerException("family is null");
654    }
655    if (npe != null) {
656      throw new IOException(
657          "Invalid arguments to incrementColumnValue", npe);
658    }
659
660    NoncedRegionServerCallable<Long> callable =
661        new NoncedRegionServerCallable<Long>(this.connection, getName(), row,
662            this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
663      @Override
664      protected Long rpcCall() throws Exception {
665        MutateRequest request = RequestConverter.buildIncrementRequest(
666          getLocation().getRegionInfo().getRegionName(), row, family,
667          qualifier, amount, durability, getNonceGroup(), getNonce());
668        MutateResponse response = doMutate(request);
669        Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
670        return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
671      }
672    };
673    return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs).
674        callWithRetries(callable, this.operationTimeoutMs);
675  }
676
677  @Override
678  @Deprecated
679  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
680      final byte [] value, final Put put) throws IOException {
681    return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put);
682  }
683
684  @Override
685  @Deprecated
686  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
687      final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
688    return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put);
689  }
690
691  @Override
692  @Deprecated
693  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
694      final CompareOperator op, final byte [] value, final Put put) throws IOException {
695    // The name of the operators in CompareOperator are intentionally those of the
696    // operators in the filter's CompareOp enum.
697    return doCheckAndPut(row, family, qualifier, op.name(), value, null, put);
698  }
699
700  private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
701    final String opName, final byte[] value, final TimeRange timeRange, final Put put)
702    throws IOException {
703    ClientServiceCallable<Boolean> callable =
704        new ClientServiceCallable<Boolean>(this.connection, getName(), row,
705            this.rpcControllerFactory.newController(), put.getPriority()) {
706      @Override
707      protected Boolean rpcCall() throws Exception {
708        CompareType compareType = CompareType.valueOf(opName);
709        MutateRequest request = RequestConverter.buildMutateRequest(
710            getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
711            new BinaryComparator(value), compareType, timeRange, put);
712        MutateResponse response = doMutate(request);
713        return Boolean.valueOf(response.getProcessed());
714      }
715    };
716    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeoutMs)
717        .callWithRetries(callable, this.operationTimeoutMs);
718  }
719
720  @Override
721  @Deprecated
722  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
723    final byte[] value, final Delete delete) throws IOException {
724    return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null,
725      delete);
726  }
727
728  @Override
729  @Deprecated
730  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
731    final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
732    return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete);
733  }
734
735  @Override
736  @Deprecated
737  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
738    final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
739    return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete);
740  }
741
742  private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
743    final String opName, final byte[] value, final TimeRange timeRange, final Delete delete)
744    throws IOException {
745    CancellableRegionServerCallable<SingleResponse> callable =
746      new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
747        this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
748        new RetryingTimeTracker().start(), delete.getPriority()) {
749        @Override
750        protected SingleResponse rpcCall() throws Exception {
751          CompareType compareType = CompareType.valueOf(opName);
752          MutateRequest request = RequestConverter
753            .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
754              qualifier, new BinaryComparator(value), compareType, timeRange, delete);
755          MutateResponse response = doMutate(request);
756          return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
757        }
758      };
759    List<Delete> rows = Collections.singletonList(delete);
760    Object[] results = new Object[1];
761    AsyncProcessTask task =
762      AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows)
763        .setCallable(callable)
764        // TODO any better timeout?
765        .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
766        .setOperationTimeout(operationTimeoutMs)
767        .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build();
768    AsyncRequestFuture ars = multiAp.submit(task);
769    ars.waitUntilDone();
770    if (ars.hasError()) {
771      throw ars.getErrors();
772    }
773    return ((SingleResponse.Entry) results[0]).isProcessed();
774  }
775
776  @Override
777  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
778    return new CheckAndMutateBuilderImpl(row, family);
779  }
780
781  private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
782    final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm)
783    throws IOException {
784    CancellableRegionServerCallable<MultiResponse> callable =
785    new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
786    rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
787        rm.getMaxPriority()) {
788      @Override
789      protected MultiResponse rpcCall() throws Exception {
790        CompareType compareType = CompareType.valueOf(opName);
791        MultiRequest request = RequestConverter
792          .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
793            new BinaryComparator(value), compareType, timeRange, rm);
794        ClientProtos.MultiResponse response = doMulti(request);
795        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
796        if (res.hasException()) {
797          Throwable ex = ProtobufUtil.toException(res.getException());
798          if (ex instanceof IOException) {
799            throw (IOException) ex;
800          }
801          throw new IOException(
802            "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
803        }
804        return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
805      }
806    };
807
808    /**
809     *  Currently, we use one array to store 'processed' flag which is returned by server.
810     *  It is excessive to send such a large array, but that is required by the framework right now
811     * */
812    Object[] results = new Object[rm.getMutations().size()];
813    AsyncProcessTask task = AsyncProcessTask.newBuilder()
814    .setPool(pool)
815    .setTableName(tableName)
816    .setRowAccess(rm.getMutations())
817    .setResults(results)
818    .setCallable(callable)
819    // TODO any better timeout?
820    .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
821    .setOperationTimeout(operationTimeoutMs)
822    .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
823    .build();
824    AsyncRequestFuture ars = multiAp.submit(task);
825    ars.waitUntilDone();
826    if (ars.hasError()) {
827      throw ars.getErrors();
828    }
829
830    return ((Result)results[0]).getExists();
831  }
832
833  @Override
834  @Deprecated
835  public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
836    final CompareOp compareOp, final byte [] value, final RowMutations rm)
837  throws IOException {
838    return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm);
839  }
840
841  @Override
842  @Deprecated
843  public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
844      final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
845    return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm);
846  }
847
848  @Override
849  public boolean exists(final Get get) throws IOException {
850    Result r = get(get, true);
851    assert r.getExists() != null;
852    return r.getExists();
853  }
854
855  @Override
856  public boolean[] exists(List<Get> gets) throws IOException {
857    if (gets.isEmpty()) return new boolean[]{};
858    if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
859
860    ArrayList<Get> exists = new ArrayList<>(gets.size());
861    for (Get g: gets){
862      Get ge = new Get(g);
863      ge.setCheckExistenceOnly(true);
864      exists.add(ge);
865    }
866
867    Object[] r1= new Object[exists.size()];
868    try {
869      batch(exists, r1, readRpcTimeoutMs);
870    } catch (InterruptedException e) {
871      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
872    }
873
874    // translate.
875    boolean[] results = new boolean[r1.length];
876    int i = 0;
877    for (Object o : r1) {
878      // batch ensures if there is a failure we get an exception instead
879      results[i++] = ((Result)o).getExists();
880    }
881
882    return results;
883  }
884
885  /**
886   * Process a mixed batch of Get, Put and Delete actions. All actions for a
887   * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
888   *
889   * @param list The collection of actions.
890   * @param results An empty array, same size as list. If an exception is thrown,
891   *   you can test here for partial results, and to determine which actions
892   *   processed successfully.
893   * @throws IOException if there are problems talking to META. Per-item
894   *   exceptions are stored in the results array.
895   */
896  public <R> void processBatchCallback(
897    final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
898    throws IOException, InterruptedException {
899    this.batchCallback(list, results, callback);
900  }
901
902  @Override
903  public void close() throws IOException {
904    if (this.closed) {
905      return;
906    }
907    if (cleanupPoolOnClose) {
908      this.pool.shutdown();
909      try {
910        boolean terminated = false;
911        do {
912          // wait until the pool has terminated
913          terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
914        } while (!terminated);
915      } catch (InterruptedException e) {
916        this.pool.shutdownNow();
917        LOG.warn("waitForTermination interrupted");
918      }
919    }
920    this.closed = true;
921  }
922
923  // validate for well-formedness
924  public void validatePut(final Put put) throws IllegalArgumentException {
925    validatePut(put, connConfiguration.getMaxKeyValueSize());
926  }
927
928  // validate for well-formedness
929  public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
930    if (put.isEmpty()) {
931      throw new IllegalArgumentException("No columns to insert");
932    }
933    if (maxKeyValueSize > 0) {
934      for (List<Cell> list : put.getFamilyCellMap().values()) {
935        for (Cell cell : list) {
936          if (KeyValueUtil.length(cell) > maxKeyValueSize) {
937            throw new IllegalArgumentException("KeyValue size too large");
938          }
939        }
940      }
941    }
942  }
943
944  /**
945   * The pool is used for mutli requests for this HTable
946   * @return the pool used for mutli
947   */
948  ExecutorService getPool() {
949    return this.pool;
950  }
951
952  /**
953   * Explicitly clears the region cache to fetch the latest value from META.
954   * This is a power user function: avoid unless you know the ramifications.
955   */
956  public void clearRegionCache() {
957    this.connection.clearRegionCache();
958  }
959
960  @Override
961  public CoprocessorRpcChannel coprocessorService(byte[] row) {
962    return new RegionCoprocessorRpcChannel(connection, tableName, row);
963  }
964
965  @Override
966  public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
967      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
968      throws ServiceException, Throwable {
969    final Map<byte[],R> results =  Collections.synchronizedMap(
970        new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
971    coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
972      @Override
973      public void update(byte[] region, byte[] row, R value) {
974        if (region != null) {
975          results.put(region, value);
976        }
977      }
978    });
979    return results;
980  }
981
982  @Override
983  public <T extends Service, R> void coprocessorService(final Class<T> service,
984      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
985      final Batch.Callback<R> callback) throws ServiceException, Throwable {
986    // get regions covered by the row range
987    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
988    Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
989    for (final byte[] r : keys) {
990      final RegionCoprocessorRpcChannel channel =
991          new RegionCoprocessorRpcChannel(connection, tableName, r);
992      Future<R> future = pool.submit(new Callable<R>() {
993        @Override
994        public R call() throws Exception {
995          T instance =
996              org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel);
997          R result = callable.call(instance);
998          byte[] region = channel.getLastRegion();
999          if (callback != null) {
1000            callback.update(region, r, result);
1001          }
1002          return result;
1003        }
1004      });
1005      futures.put(r, future);
1006    }
1007    for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1008      try {
1009        e.getValue().get();
1010      } catch (ExecutionException ee) {
1011        LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1012            + Bytes.toStringBinary(e.getKey()), ee);
1013        throw ee.getCause();
1014      } catch (InterruptedException ie) {
1015        throw new InterruptedIOException("Interrupted calling coprocessor service "
1016            + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1017      }
1018    }
1019  }
1020
1021  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1022  throws IOException {
1023    if (start == null) {
1024      start = HConstants.EMPTY_START_ROW;
1025    }
1026    if (end == null) {
1027      end = HConstants.EMPTY_END_ROW;
1028    }
1029    return getKeysAndRegionsInRange(start, end, true).getFirst();
1030  }
1031
1032  @Override
1033  public long getRpcTimeout(TimeUnit unit) {
1034    return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS);
1035  }
1036
1037  @Override
1038  @Deprecated
1039  public int getRpcTimeout() {
1040    return rpcTimeoutMs;
1041  }
1042
1043  @Override
1044  @Deprecated
1045  public void setRpcTimeout(int rpcTimeout) {
1046    setReadRpcTimeout(rpcTimeout);
1047    setWriteRpcTimeout(rpcTimeout);
1048  }
1049
1050  @Override
1051  public long getReadRpcTimeout(TimeUnit unit) {
1052    return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS);
1053  }
1054
1055  @Override
1056  @Deprecated
1057  public int getReadRpcTimeout() {
1058    return readRpcTimeoutMs;
1059  }
1060
1061  @Override
1062  @Deprecated
1063  public void setReadRpcTimeout(int readRpcTimeout) {
1064    this.readRpcTimeoutMs = readRpcTimeout;
1065  }
1066
1067  @Override
1068  public long getWriteRpcTimeout(TimeUnit unit) {
1069    return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS);
1070  }
1071
1072  @Override
1073  @Deprecated
1074  public int getWriteRpcTimeout() {
1075    return writeRpcTimeoutMs;
1076  }
1077
1078  @Override
1079  @Deprecated
1080  public void setWriteRpcTimeout(int writeRpcTimeout) {
1081    this.writeRpcTimeoutMs = writeRpcTimeout;
1082  }
1083
1084  @Override
1085  public long getOperationTimeout(TimeUnit unit) {
1086    return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS);
1087  }
1088
1089  @Override
1090  @Deprecated
1091  public int getOperationTimeout() {
1092    return operationTimeoutMs;
1093  }
1094
1095  @Override
1096  @Deprecated
1097  public void setOperationTimeout(int operationTimeout) {
1098    this.operationTimeoutMs = operationTimeout;
1099  }
1100
1101  @Override
1102  public String toString() {
1103    return tableName + ";" + connection;
1104  }
1105
1106  @Override
1107  public <R extends Message> Map<byte[], R> batchCoprocessorService(
1108      Descriptors.MethodDescriptor methodDescriptor, Message request,
1109      byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1110    final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1111        Bytes.BYTES_COMPARATOR));
1112    batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1113        new Callback<R>() {
1114      @Override
1115      public void update(byte[] region, byte[] row, R result) {
1116        if (region != null) {
1117          results.put(region, result);
1118        }
1119      }
1120    });
1121    return results;
1122  }
1123
1124  @Override
1125  public <R extends Message> void batchCoprocessorService(
1126      final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1127      byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1128      throws ServiceException, Throwable {
1129
1130    if (startKey == null) {
1131      startKey = HConstants.EMPTY_START_ROW;
1132    }
1133    if (endKey == null) {
1134      endKey = HConstants.EMPTY_END_ROW;
1135    }
1136    // get regions covered by the row range
1137    Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1138        getKeysAndRegionsInRange(startKey, endKey, true);
1139    List<byte[]> keys = keysAndRegions.getFirst();
1140    List<HRegionLocation> regions = keysAndRegions.getSecond();
1141
1142    // check if we have any calls to make
1143    if (keys.isEmpty()) {
1144      LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1145          ", end=" + Bytes.toStringBinary(endKey));
1146      return;
1147    }
1148
1149    List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size());
1150    final Map<byte[], RegionCoprocessorServiceExec> execsByRow = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1151    for (int i = 0; i < keys.size(); i++) {
1152      final byte[] rowKey = keys.get(i);
1153      final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1154      RegionCoprocessorServiceExec exec =
1155          new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1156      execs.add(exec);
1157      execsByRow.put(rowKey, exec);
1158    }
1159
1160    // tracking for any possible deserialization errors on success callback
1161    // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1162    final List<Throwable> callbackErrorExceptions = new ArrayList<>();
1163    final List<Row> callbackErrorActions = new ArrayList<>();
1164    final List<String> callbackErrorServers = new ArrayList<>();
1165    Object[] results = new Object[execs.size()];
1166
1167    AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
1168        RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1169        RpcControllerFactory.instantiate(configuration));
1170
1171    Callback<ClientProtos.CoprocessorServiceResult> resultsCallback
1172    = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
1173      if (LOG.isTraceEnabled()) {
1174        LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1175            ": region=" + Bytes.toStringBinary(region) +
1176            ", row=" + Bytes.toStringBinary(row) +
1177            ", value=" + serviceResult.getValue().getValue());
1178      }
1179      try {
1180        Message.Builder builder = responsePrototype.newBuilderForType();
1181        org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
1182            serviceResult.getValue().getValue().toByteArray());
1183        callback.update(region, row, (R) builder.build());
1184      } catch (IOException e) {
1185        LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1186            e);
1187        callbackErrorExceptions.add(e);
1188        callbackErrorActions.add(execsByRow.get(row));
1189        callbackErrorServers.add("null");
1190      }
1191    };
1192    AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task =
1193        AsyncProcessTask.newBuilder(resultsCallback)
1194            .setPool(pool)
1195            .setTableName(tableName)
1196            .setRowAccess(execs)
1197            .setResults(results)
1198            .setRpcTimeout(readRpcTimeoutMs)
1199            .setOperationTimeout(operationTimeoutMs)
1200            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
1201            .build();
1202    AsyncRequestFuture future = asyncProcess.submit(task);
1203    future.waitUntilDone();
1204
1205    if (future.hasError()) {
1206      throw future.getErrors();
1207    } else if (!callbackErrorExceptions.isEmpty()) {
1208      throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1209          callbackErrorServers);
1210    }
1211  }
1212
1213  public RegionLocator getRegionLocator() {
1214    return this.locator;
1215  }
1216
1217  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
1218
1219    private final byte[] row;
1220    private final byte[] family;
1221    private byte[] qualifier;
1222    private TimeRange timeRange;
1223    private CompareOperator op;
1224    private byte[] value;
1225
1226    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
1227      this.row = Preconditions.checkNotNull(row, "row is null");
1228      this.family = Preconditions.checkNotNull(family, "family is null");
1229    }
1230
1231    @Override
1232    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
1233      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
1234          " an empty byte array, or just do not call this method if you want a null qualifier");
1235      return this;
1236    }
1237
1238    @Override
1239    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
1240      this.timeRange = timeRange;
1241      return this;
1242    }
1243
1244    @Override
1245    public CheckAndMutateBuilder ifNotExists() {
1246      this.op = CompareOperator.EQUAL;
1247      this.value = null;
1248      return this;
1249    }
1250
1251    @Override
1252    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
1253      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
1254      this.value = Preconditions.checkNotNull(value, "value is null");
1255      return this;
1256    }
1257
1258    private void preCheck() {
1259      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
1260          " calling ifNotExists/ifEquals/ifMatches before executing the request");
1261    }
1262
1263    @Override
1264    public boolean thenPut(Put put) throws IOException {
1265      preCheck();
1266      return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put);
1267    }
1268
1269    @Override
1270    public boolean thenDelete(Delete delete) throws IOException {
1271      preCheck();
1272      return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete);
1273    }
1274
1275    @Override
1276    public boolean thenMutate(RowMutations mutation) throws IOException {
1277      preCheck();
1278      return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation);
1279    }
1280  }
1281}