001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
022// Internally, we use shaded protobuf. This below are part of our public API.
023//SEE ABOVE NOTE!
024import com.google.protobuf.Descriptors;
025import com.google.protobuf.Message;
026import com.google.protobuf.Service;
027import com.google.protobuf.ServiceException;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.CompareOperator;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HRegionLocation;
033import org.apache.hadoop.hbase.HTableDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.filter.Filter;
036import org.apache.hadoop.hbase.io.TimeRange;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.yetus.audience.InterfaceStability;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041import org.apache.hadoop.hbase.client.coprocessor.Batch;
042import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
043import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
044import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
045import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
048import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
049import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
050import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.Pair;
058import org.apache.hadoop.hbase.util.ReflectionUtils;
059import org.apache.hadoop.hbase.util.Threads;
060
061import java.io.IOException;
062import java.io.InterruptedIOException;
063import java.util.ArrayList;
064import java.util.Collections;
065import java.util.List;
066import java.util.Map;
067import java.util.TreeMap;
068import java.util.concurrent.Callable;
069import java.util.concurrent.ExecutionException;
070import java.util.concurrent.ExecutorService;
071import java.util.concurrent.Future;
072import java.util.concurrent.SynchronousQueue;
073import java.util.concurrent.ThreadPoolExecutor;
074import java.util.concurrent.TimeUnit;
075
076import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
077
078/**
079 * An implementation of {@link Table}. Used to communicate with a single HBase table.
080 * Lightweight. Get as needed and just close when done.
081 * Instances of this class SHOULD NOT be constructed directly.
082 * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
083 * class comment for an example of how.
084 *
085 * <p>This class is thread safe since 2.0.0 if not invoking any of the setter methods.
086 * All setters are moved into {@link TableBuilder} and reserved here only for keeping
087 * backward compatibility, and TODO will be removed soon.
088 *
089 * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
090 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
091 * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
092 * Interface Classification</a>
093 * There are no guarantees for backwards source / binary compatibility and methods or class can
094 * change or go away without deprecation.
095 *
096 * @see Table
097 * @see Admin
098 * @see Connection
099 * @see ConnectionFactory
100 */
101@InterfaceAudience.Private
102@InterfaceStability.Stable
103public class HTable implements Table {
104  private static final Logger LOG = LoggerFactory.getLogger(HTable.class);
105  private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
106  private final ClusterConnection connection;
107  private final TableName tableName;
108  private final Configuration configuration;
109  private final ConnectionConfiguration connConfiguration;
110  private boolean closed = false;
111  private final int scannerCaching;
112  private final long scannerMaxResultSize;
113  private final ExecutorService pool;  // For Multi & Scan
114  private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc
115  private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX
116  private int readRpcTimeoutMs; // timeout for each read rpc request
117  private int writeRpcTimeoutMs; // timeout for each write rpc request
118  private final boolean cleanupPoolOnClose; // shutdown the pool in close()
119  private final HRegionLocator locator;
120
121  /** The Async process for batch */
122  @VisibleForTesting
123  AsyncProcess multiAp;
124  private final RpcRetryingCallerFactory rpcCallerFactory;
125  private final RpcControllerFactory rpcControllerFactory;
126
127  // Marked Private @since 1.0
128  @InterfaceAudience.Private
129  public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
130    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
131    if (maxThreads == 0) {
132      maxThreads = 1; // is there a better default?
133    }
134    int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
135    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
136
137    // Using the "direct handoff" approach, new threads will only be created
138    // if it is necessary and will grow unbounded. This could be bad but in HCM
139    // we only create as many Runnables as there are region servers. It means
140    // it also scales when new region servers are added.
141    ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
142      TimeUnit.SECONDS, new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable"));
143    pool.allowCoreThreadTimeOut(true);
144    return pool;
145  }
146
147  /**
148   * Creates an object to access a HBase table.
149   * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
150   * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
151   * @param connection Connection to be used.
152   * @param builder The table builder
153   * @param rpcCallerFactory The RPC caller factory
154   * @param rpcControllerFactory The RPC controller factory
155   * @param pool ExecutorService to be used.
156   */
157  @InterfaceAudience.Private
158  protected HTable(final ConnectionImplementation connection,
159      final TableBuilderBase builder,
160      final RpcRetryingCallerFactory rpcCallerFactory,
161      final RpcControllerFactory rpcControllerFactory,
162      final ExecutorService pool) {
163    this.connection = Preconditions.checkNotNull(connection, "connection is null");
164    this.configuration = connection.getConfiguration();
165    this.connConfiguration = connection.getConnectionConfiguration();
166    if (pool == null) {
167      this.pool = getDefaultExecutor(this.configuration);
168      this.cleanupPoolOnClose = true;
169    } else {
170      this.pool = pool;
171      this.cleanupPoolOnClose = false;
172    }
173    if (rpcCallerFactory == null) {
174      this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
175    } else {
176      this.rpcCallerFactory = rpcCallerFactory;
177    }
178
179    if (rpcControllerFactory == null) {
180      this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
181    } else {
182      this.rpcControllerFactory = rpcControllerFactory;
183    }
184
185    this.tableName = builder.tableName;
186    this.operationTimeoutMs = builder.operationTimeout;
187    this.rpcTimeoutMs = builder.rpcTimeout;
188    this.readRpcTimeoutMs = builder.readRpcTimeout;
189    this.writeRpcTimeoutMs = builder.writeRpcTimeout;
190    this.scannerCaching = connConfiguration.getScannerCaching();
191    this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
192
193    // puts need to track errors globally due to how the APIs currently work.
194    multiAp = this.connection.getAsyncProcess();
195    this.locator = new HRegionLocator(tableName, connection);
196  }
197
198  /**
199   * @return maxKeyValueSize from configuration.
200   */
201  public static int getMaxKeyValueSize(Configuration conf) {
202    return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1);
203  }
204
205  @Override
206  public Configuration getConfiguration() {
207    return configuration;
208  }
209
210  @Override
211  public TableName getName() {
212    return tableName;
213  }
214
215  /**
216   * <em>INTERNAL</em> Used by unit tests and tools to do low-level
217   * manipulations.
218   * @return A Connection instance.
219   */
220  @VisibleForTesting
221  protected Connection getConnection() {
222    return this.connection;
223  }
224
225  @Override
226  @Deprecated
227  public HTableDescriptor getTableDescriptor() throws IOException {
228    HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory,
229      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
230    if (htd != null) {
231      return new ImmutableHTableDescriptor(htd);
232    }
233    return null;
234  }
235
236  @Override
237  public TableDescriptor getDescriptor() throws IOException {
238    return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
239      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
240  }
241
242  /**
243   * Get the corresponding start keys and regions for an arbitrary range of
244   * keys.
245   * <p>
246   * @param startKey Starting row in range, inclusive
247   * @param endKey Ending row in range
248   * @param includeEndKey true if endRow is inclusive, false if exclusive
249   * @return A pair of list of start keys and list of HRegionLocations that
250   *         contain the specified range
251   * @throws IOException if a remote or network exception occurs
252   */
253  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
254      final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
255      throws IOException {
256    return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
257  }
258
259  /**
260   * Get the corresponding start keys and regions for an arbitrary range of
261   * keys.
262   * <p>
263   * @param startKey Starting row in range, inclusive
264   * @param endKey Ending row in range
265   * @param includeEndKey true if endRow is inclusive, false if exclusive
266   * @param reload true to reload information or false to use cached information
267   * @return A pair of list of start keys and list of HRegionLocations that
268   *         contain the specified range
269   * @throws IOException if a remote or network exception occurs
270   */
271  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
272      final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
273      final boolean reload) throws IOException {
274    final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
275    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
276      throw new IllegalArgumentException(
277        "Invalid range: " + Bytes.toStringBinary(startKey) +
278        " > " + Bytes.toStringBinary(endKey));
279    }
280    List<byte[]> keysInRange = new ArrayList<>();
281    List<HRegionLocation> regionsInRange = new ArrayList<>();
282    byte[] currentKey = startKey;
283    do {
284      HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
285      keysInRange.add(currentKey);
286      regionsInRange.add(regionLocation);
287      currentKey = regionLocation.getRegionInfo().getEndKey();
288    } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
289        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
290            || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
291    return new Pair<>(keysInRange, regionsInRange);
292  }
293
294  /**
295   * The underlying {@link HTable} must not be closed.
296   * {@link Table#getScanner(Scan)} has other usage details.
297   */
298  @Override
299  public ResultScanner getScanner(Scan scan) throws IOException {
300    if (scan.getCaching() <= 0) {
301      scan.setCaching(scannerCaching);
302    }
303    if (scan.getMaxResultSize() <= 0) {
304      scan.setMaxResultSize(scannerMaxResultSize);
305    }
306    if (scan.getMvccReadPoint() > 0) {
307      // it is not supposed to be set by user, clear
308      scan.resetMvccReadPoint();
309    }
310    Boolean async = scan.isAsyncPrefetch();
311    if (async == null) {
312      async = connConfiguration.isClientScannerAsyncPrefetch();
313    }
314
315    if (scan.isReversed()) {
316      return new ReversedClientScanner(getConfiguration(), scan, getName(),
317        this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
318        pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
319    } else {
320      if (async) {
321        return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection,
322            this.rpcCallerFactory, this.rpcControllerFactory,
323            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
324      } else {
325        return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection,
326            this.rpcCallerFactory, this.rpcControllerFactory,
327            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
328      }
329    }
330  }
331
332  /**
333   * The underlying {@link HTable} must not be closed.
334   * {@link Table#getScanner(byte[])} has other usage details.
335   */
336  @Override
337  public ResultScanner getScanner(byte [] family) throws IOException {
338    Scan scan = new Scan();
339    scan.addFamily(family);
340    return getScanner(scan);
341  }
342
343  /**
344   * The underlying {@link HTable} must not be closed.
345   * {@link Table#getScanner(byte[], byte[])} has other usage details.
346   */
347  @Override
348  public ResultScanner getScanner(byte [] family, byte [] qualifier)
349  throws IOException {
350    Scan scan = new Scan();
351    scan.addColumn(family, qualifier);
352    return getScanner(scan);
353  }
354
355  @Override
356  public Result get(final Get get) throws IOException {
357    return get(get, get.isCheckExistenceOnly());
358  }
359
360  private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
361    // if we are changing settings to the get, clone it.
362    if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
363      get = ReflectionUtils.newInstance(get.getClass(), get);
364      get.setCheckExistenceOnly(checkExistenceOnly);
365      if (get.getConsistency() == null){
366        get.setConsistency(DEFAULT_CONSISTENCY);
367      }
368    }
369
370    if (get.getConsistency() == Consistency.STRONG) {
371      final Get configuredGet = get;
372      ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(),
373          get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) {
374        @Override
375        protected Result rpcCall() throws Exception {
376          ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
377              getLocation().getRegionInfo().getRegionName(), configuredGet);
378          ClientProtos.GetResponse response = doGet(request);
379          return response == null? null:
380            ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
381        }
382      };
383      return rpcCallerFactory.<Result>newCaller(readRpcTimeoutMs).callWithRetries(callable,
384          this.operationTimeoutMs);
385    }
386
387    // Call that takes into account the replica
388    RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
389        rpcControllerFactory, tableName, this.connection, get, pool,
390        connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs,
391        connConfiguration.getPrimaryCallTimeoutMicroSecond());
392    return callable.call(operationTimeoutMs);
393  }
394
395  @Override
396  public Result[] get(List<Get> gets) throws IOException {
397    if (gets.size() == 1) {
398      return new Result[]{get(gets.get(0))};
399    }
400    try {
401      Object[] r1 = new Object[gets.size()];
402      batch((List<? extends Row>)gets, r1, readRpcTimeoutMs);
403      // Translate.
404      Result [] results = new Result[r1.length];
405      int i = 0;
406      for (Object obj: r1) {
407        // Batch ensures if there is a failure we get an exception instead
408        results[i++] = (Result)obj;
409      }
410      return results;
411    } catch (InterruptedException e) {
412      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
413    }
414  }
415
416  @Override
417  public void batch(final List<? extends Row> actions, final Object[] results)
418      throws InterruptedException, IOException {
419    int rpcTimeout = writeRpcTimeoutMs;
420    boolean hasRead = false;
421    boolean hasWrite = false;
422    for (Row action : actions) {
423      if (action instanceof Mutation) {
424        hasWrite = true;
425      } else {
426        hasRead = true;
427      }
428      if (hasRead && hasWrite) {
429        break;
430      }
431    }
432    if (hasRead && !hasWrite) {
433      rpcTimeout = readRpcTimeoutMs;
434    }
435    batch(actions, results, rpcTimeout);
436  }
437
438  public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
439      throws InterruptedException, IOException {
440    AsyncProcessTask task = AsyncProcessTask.newBuilder()
441            .setPool(pool)
442            .setTableName(tableName)
443            .setRowAccess(actions)
444            .setResults(results)
445            .setRpcTimeout(rpcTimeout)
446            .setOperationTimeout(operationTimeoutMs)
447            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
448            .build();
449    AsyncRequestFuture ars = multiAp.submit(task);
450    ars.waitUntilDone();
451    if (ars.hasError()) {
452      throw ars.getErrors();
453    }
454  }
455
456  @Override
457  public <R> void batchCallback(
458    final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
459    throws IOException, InterruptedException {
460    doBatchWithCallback(actions, results, callback, connection, pool, tableName);
461  }
462
463  public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
464    Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
465    throws InterruptedIOException, RetriesExhaustedWithDetailsException {
466    int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
467    int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
468        connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
469            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
470    AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback)
471            .setPool(pool)
472            .setTableName(tableName)
473            .setRowAccess(actions)
474            .setResults(results)
475            .setOperationTimeout(operationTimeout)
476            .setRpcTimeout(writeTimeout)
477            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
478            .build();
479    AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
480    ars.waitUntilDone();
481    if (ars.hasError()) {
482      throw ars.getErrors();
483    }
484  }
485
486  @Override
487  public void delete(final Delete delete) throws IOException {
488    ClientServiceCallable<Void> callable =
489        new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(),
490            this.rpcControllerFactory.newController(), delete.getPriority()) {
491      @Override
492      protected Void rpcCall() throws Exception {
493        MutateRequest request = RequestConverter
494            .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete);
495        doMutate(request);
496        return null;
497      }
498    };
499    rpcCallerFactory.<Void>newCaller(this.writeRpcTimeoutMs)
500        .callWithRetries(callable, this.operationTimeoutMs);
501  }
502
503  @Override
504  public void delete(final List<Delete> deletes)
505  throws IOException {
506    Object[] results = new Object[deletes.size()];
507    try {
508      batch(deletes, results, writeRpcTimeoutMs);
509    } catch (InterruptedException e) {
510      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
511    } finally {
512      // TODO: to be consistent with batch put(), do not modify input list
513      // mutate list so that it is empty for complete success, or contains only failed records
514      // results are returned in the same order as the requests in list walk the list backwards,
515      // so we can remove from list without impacting the indexes of earlier members
516      for (int i = results.length - 1; i>=0; i--) {
517        // if result is not null, it succeeded
518        if (results[i] instanceof Result) {
519          deletes.remove(i);
520        }
521      }
522    }
523  }
524
525  @Override
526  public void put(final Put put) throws IOException {
527    validatePut(put);
528    ClientServiceCallable<Void> callable =
529        new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(),
530            this.rpcControllerFactory.newController(), put.getPriority()) {
531      @Override
532      protected Void rpcCall() throws Exception {
533        MutateRequest request =
534            RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put);
535        doMutate(request);
536        return null;
537      }
538    };
539    rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
540        this.operationTimeoutMs);
541  }
542
543  @Override
544  public void put(final List<Put> puts) throws IOException {
545    for (Put put : puts) {
546      validatePut(put);
547    }
548    Object[] results = new Object[puts.size()];
549    try {
550      batch(puts, results, writeRpcTimeoutMs);
551    } catch (InterruptedException e) {
552      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
553    }
554  }
555
556  @Override
557  public void mutateRow(final RowMutations rm) throws IOException {
558    CancellableRegionServerCallable<MultiResponse> callable =
559      new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
560          rpcControllerFactory.newController(), writeRpcTimeoutMs,
561          new RetryingTimeTracker().start(), rm.getMaxPriority()) {
562      @Override
563      protected MultiResponse rpcCall() throws Exception {
564        RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
565            getLocation().getRegionInfo().getRegionName(), rm);
566        regionMutationBuilder.setAtomic(true);
567        MultiRequest request =
568            MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
569        ClientProtos.MultiResponse response = doMulti(request);
570        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
571        if (res.hasException()) {
572          Throwable ex = ProtobufUtil.toException(res.getException());
573          if (ex instanceof IOException) {
574            throw (IOException) ex;
575          }
576          throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
577        }
578        return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
579      }
580    };
581    AsyncProcessTask task = AsyncProcessTask.newBuilder()
582            .setPool(pool)
583            .setTableName(tableName)
584            .setRowAccess(rm.getMutations())
585            .setCallable(callable)
586            .setRpcTimeout(writeRpcTimeoutMs)
587            .setOperationTimeout(operationTimeoutMs)
588            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
589            .build();
590    AsyncRequestFuture ars = multiAp.submit(task);
591    ars.waitUntilDone();
592    if (ars.hasError()) {
593      throw ars.getErrors();
594    }
595  }
596
597  @Override
598  public Result append(final Append append) throws IOException {
599    checkHasFamilies(append);
600    NoncedRegionServerCallable<Result> callable =
601        new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(),
602            this.rpcControllerFactory.newController(), append.getPriority()) {
603      @Override
604      protected Result rpcCall() throws Exception {
605        MutateRequest request = RequestConverter.buildMutateRequest(
606          getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
607        MutateResponse response = doMutate(request);
608        if (!response.hasResult()) return null;
609        return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
610      }
611    };
612    return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs).
613        callWithRetries(callable, this.operationTimeoutMs);
614  }
615
616  @Override
617  public Result increment(final Increment increment) throws IOException {
618    checkHasFamilies(increment);
619    NoncedRegionServerCallable<Result> callable =
620        new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(),
621            this.rpcControllerFactory.newController(), increment.getPriority()) {
622      @Override
623      protected Result rpcCall() throws Exception {
624        MutateRequest request = RequestConverter.buildMutateRequest(
625          getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
626        MutateResponse response = doMutate(request);
627        // Should this check for null like append does?
628        return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
629      }
630    };
631    return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable,
632        this.operationTimeoutMs);
633  }
634
635  @Override
636  public long incrementColumnValue(final byte [] row, final byte [] family,
637      final byte [] qualifier, final long amount)
638  throws IOException {
639    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
640  }
641
642  @Override
643  public long incrementColumnValue(final byte [] row, final byte [] family,
644      final byte [] qualifier, final long amount, final Durability durability)
645  throws IOException {
646    NullPointerException npe = null;
647    if (row == null) {
648      npe = new NullPointerException("row is null");
649    } else if (family == null) {
650      npe = new NullPointerException("family is null");
651    }
652    if (npe != null) {
653      throw new IOException(
654          "Invalid arguments to incrementColumnValue", npe);
655    }
656
657    NoncedRegionServerCallable<Long> callable =
658        new NoncedRegionServerCallable<Long>(this.connection, getName(), row,
659            this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
660      @Override
661      protected Long rpcCall() throws Exception {
662        MutateRequest request = RequestConverter.buildIncrementRequest(
663          getLocation().getRegionInfo().getRegionName(), row, family,
664          qualifier, amount, durability, getNonceGroup(), getNonce());
665        MutateResponse response = doMutate(request);
666        Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
667        return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
668      }
669    };
670    return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs).
671        callWithRetries(callable, this.operationTimeoutMs);
672  }
673
674  @Override
675  @Deprecated
676  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
677      final byte [] value, final Put put) throws IOException {
678    return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
679      put);
680  }
681
682  @Override
683  @Deprecated
684  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
685      final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
686    return doCheckAndPut(row, family, qualifier, toCompareOperator(compareOp), value, null,
687      null, put);
688  }
689
690  @Override
691  @Deprecated
692  public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
693      final CompareOperator op, final byte [] value, final Put put) throws IOException {
694    // The name of the operators in CompareOperator are intentionally those of the
695    // operators in the filter's CompareOp enum.
696    return doCheckAndPut(row, family, qualifier, op, value, null, null, put);
697  }
698
699  private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
700    final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
701    final Put put) throws IOException {
702    ClientServiceCallable<Boolean> callable =
703        new ClientServiceCallable<Boolean>(this.connection, getName(), row,
704            this.rpcControllerFactory.newController(), put.getPriority()) {
705      @Override
706      protected Boolean rpcCall() throws Exception {
707        MutateRequest request = RequestConverter.buildMutateRequest(
708          getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
709          filter, timeRange, put);
710        MutateResponse response = doMutate(request);
711        return Boolean.valueOf(response.getProcessed());
712      }
713    };
714    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeoutMs)
715        .callWithRetries(callable, this.operationTimeoutMs);
716  }
717
718  @Override
719  @Deprecated
720  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
721    final byte[] value, final Delete delete) throws IOException {
722    return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, null,
723      null, delete);
724  }
725
726  @Override
727  @Deprecated
728  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
729    final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
730    return doCheckAndDelete(row, family, qualifier, toCompareOperator(compareOp), value, null,
731      null, delete);
732  }
733
734  @Override
735  @Deprecated
736  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
737    final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
738    return doCheckAndDelete(row, family, qualifier, op, value, null, null, delete);
739  }
740
741  private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
742    final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
743    final Delete delete) throws IOException {
744    CancellableRegionServerCallable<SingleResponse> callable =
745      new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
746        this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
747        new RetryingTimeTracker().start(), delete.getPriority()) {
748        @Override
749        protected SingleResponse rpcCall() throws Exception {
750          MutateRequest request = RequestConverter
751            .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
752              qualifier, op, value, filter, timeRange, delete);
753          MutateResponse response = doMutate(request);
754          return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
755        }
756      };
757    List<Delete> rows = Collections.singletonList(delete);
758    Object[] results = new Object[1];
759    AsyncProcessTask task =
760      AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows)
761        .setCallable(callable)
762        // TODO any better timeout?
763        .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
764        .setOperationTimeout(operationTimeoutMs)
765        .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build();
766    AsyncRequestFuture ars = multiAp.submit(task);
767    ars.waitUntilDone();
768    if (ars.hasError()) {
769      throw ars.getErrors();
770    }
771    return ((SingleResponse.Entry) results[0]).isProcessed();
772  }
773
774  @Override
775  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
776    return new CheckAndMutateBuilderImpl(row, family);
777  }
778
779  @Override
780  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
781    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
782  }
783
784  private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
785    final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
786    final RowMutations rm)
787    throws IOException {
788    CancellableRegionServerCallable<MultiResponse> callable =
789    new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
790    rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
791        rm.getMaxPriority()) {
792      @Override
793      protected MultiResponse rpcCall() throws Exception {
794        MultiRequest request = RequestConverter
795          .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
796            qualifier, op, value, filter, timeRange, rm);
797        ClientProtos.MultiResponse response = doMulti(request);
798        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
799        if (res.hasException()) {
800          Throwable ex = ProtobufUtil.toException(res.getException());
801          if (ex instanceof IOException) {
802            throw (IOException) ex;
803          }
804          throw new IOException(
805            "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
806        }
807        return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
808      }
809    };
810
811    /**
812     *  Currently, we use one array to store 'processed' flag which is returned by server.
813     *  It is excessive to send such a large array, but that is required by the framework right now
814     * */
815    Object[] results = new Object[rm.getMutations().size()];
816    AsyncProcessTask task = AsyncProcessTask.newBuilder()
817    .setPool(pool)
818    .setTableName(tableName)
819    .setRowAccess(rm.getMutations())
820    .setResults(results)
821    .setCallable(callable)
822    // TODO any better timeout?
823    .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
824    .setOperationTimeout(operationTimeoutMs)
825    .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
826    .build();
827    AsyncRequestFuture ars = multiAp.submit(task);
828    ars.waitUntilDone();
829    if (ars.hasError()) {
830      throw ars.getErrors();
831    }
832
833    return ((Result)results[0]).getExists();
834  }
835
836  @Override
837  @Deprecated
838  public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
839    final CompareOp compareOp, final byte [] value, final RowMutations rm)
840  throws IOException {
841    return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
842      null, rm);
843  }
844
845  @Override
846  @Deprecated
847  public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
848      final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
849    return doCheckAndMutate(row, family, qualifier, op, value, null, null, rm);
850  }
851
852  private CompareOperator toCompareOperator(CompareOp compareOp) {
853    switch (compareOp) {
854      case LESS:
855        return CompareOperator.LESS;
856
857      case LESS_OR_EQUAL:
858        return CompareOperator.LESS_OR_EQUAL;
859
860      case EQUAL:
861        return CompareOperator.EQUAL;
862
863      case NOT_EQUAL:
864        return CompareOperator.NOT_EQUAL;
865
866      case GREATER_OR_EQUAL:
867        return CompareOperator.GREATER_OR_EQUAL;
868
869      case GREATER:
870        return CompareOperator.GREATER;
871
872      case NO_OP:
873        return CompareOperator.NO_OP;
874
875      default:
876        throw new AssertionError();
877    }
878  }
879
880  @Override
881  public boolean exists(final Get get) throws IOException {
882    Result r = get(get, true);
883    assert r.getExists() != null;
884    return r.getExists();
885  }
886
887  @Override
888  public boolean[] exists(List<Get> gets) throws IOException {
889    if (gets.isEmpty()) return new boolean[]{};
890    if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
891
892    ArrayList<Get> exists = new ArrayList<>(gets.size());
893    for (Get g: gets){
894      Get ge = new Get(g);
895      ge.setCheckExistenceOnly(true);
896      exists.add(ge);
897    }
898
899    Object[] r1= new Object[exists.size()];
900    try {
901      batch(exists, r1, readRpcTimeoutMs);
902    } catch (InterruptedException e) {
903      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
904    }
905
906    // translate.
907    boolean[] results = new boolean[r1.length];
908    int i = 0;
909    for (Object o : r1) {
910      // batch ensures if there is a failure we get an exception instead
911      results[i++] = ((Result)o).getExists();
912    }
913
914    return results;
915  }
916
917  /**
918   * Process a mixed batch of Get, Put and Delete actions. All actions for a
919   * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
920   *
921   * @param list The collection of actions.
922   * @param results An empty array, same size as list. If an exception is thrown,
923   *   you can test here for partial results, and to determine which actions
924   *   processed successfully.
925   * @throws IOException if there are problems talking to META. Per-item
926   *   exceptions are stored in the results array.
927   */
928  public <R> void processBatchCallback(
929    final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
930    throws IOException, InterruptedException {
931    this.batchCallback(list, results, callback);
932  }
933
934  @Override
935  public void close() throws IOException {
936    if (this.closed) {
937      return;
938    }
939    if (cleanupPoolOnClose) {
940      this.pool.shutdown();
941      try {
942        boolean terminated = false;
943        do {
944          // wait until the pool has terminated
945          terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
946        } while (!terminated);
947      } catch (InterruptedException e) {
948        this.pool.shutdownNow();
949        LOG.warn("waitForTermination interrupted");
950      }
951    }
952    this.closed = true;
953  }
954
955  // validate for well-formedness
956  private void validatePut(final Put put) throws IllegalArgumentException {
957    ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize());
958  }
959
960  /**
961   * The pool is used for mutli requests for this HTable
962   * @return the pool used for mutli
963   */
964  ExecutorService getPool() {
965    return this.pool;
966  }
967
968  /**
969   * Explicitly clears the region cache to fetch the latest value from META.
970   * This is a power user function: avoid unless you know the ramifications.
971   */
972  public void clearRegionCache() {
973    this.connection.clearRegionLocationCache();
974  }
975
976  @Override
977  public CoprocessorRpcChannel coprocessorService(byte[] row) {
978    return new RegionCoprocessorRpcChannel(connection, tableName, row);
979  }
980
981  @Override
982  public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
983      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
984      throws ServiceException, Throwable {
985    final Map<byte[],R> results =  Collections.synchronizedMap(
986        new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
987    coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
988      @Override
989      public void update(byte[] region, byte[] row, R value) {
990        if (region != null) {
991          results.put(region, value);
992        }
993      }
994    });
995    return results;
996  }
997
998  @Override
999  public <T extends Service, R> void coprocessorService(final Class<T> service,
1000      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1001      final Batch.Callback<R> callback) throws ServiceException, Throwable {
1002    // get regions covered by the row range
1003    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1004    Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1005    for (final byte[] r : keys) {
1006      final RegionCoprocessorRpcChannel channel =
1007          new RegionCoprocessorRpcChannel(connection, tableName, r);
1008      Future<R> future = pool.submit(new Callable<R>() {
1009        @Override
1010        public R call() throws Exception {
1011          T instance =
1012              org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel);
1013          R result = callable.call(instance);
1014          byte[] region = channel.getLastRegion();
1015          if (callback != null) {
1016            callback.update(region, r, result);
1017          }
1018          return result;
1019        }
1020      });
1021      futures.put(r, future);
1022    }
1023    for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1024      try {
1025        e.getValue().get();
1026      } catch (ExecutionException ee) {
1027        LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1028            + Bytes.toStringBinary(e.getKey()), ee);
1029        throw ee.getCause();
1030      } catch (InterruptedException ie) {
1031        throw new InterruptedIOException("Interrupted calling coprocessor service "
1032            + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1033      }
1034    }
1035  }
1036
1037  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1038  throws IOException {
1039    if (start == null) {
1040      start = HConstants.EMPTY_START_ROW;
1041    }
1042    if (end == null) {
1043      end = HConstants.EMPTY_END_ROW;
1044    }
1045    return getKeysAndRegionsInRange(start, end, true).getFirst();
1046  }
1047
1048  @Override
1049  public long getRpcTimeout(TimeUnit unit) {
1050    return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS);
1051  }
1052
1053  @Override
1054  @Deprecated
1055  public int getRpcTimeout() {
1056    return rpcTimeoutMs;
1057  }
1058
1059  @Override
1060  @Deprecated
1061  public void setRpcTimeout(int rpcTimeout) {
1062    setReadRpcTimeout(rpcTimeout);
1063    setWriteRpcTimeout(rpcTimeout);
1064  }
1065
1066  @Override
1067  public long getReadRpcTimeout(TimeUnit unit) {
1068    return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS);
1069  }
1070
1071  @Override
1072  @Deprecated
1073  public int getReadRpcTimeout() {
1074    return readRpcTimeoutMs;
1075  }
1076
1077  @Override
1078  @Deprecated
1079  public void setReadRpcTimeout(int readRpcTimeout) {
1080    this.readRpcTimeoutMs = readRpcTimeout;
1081  }
1082
1083  @Override
1084  public long getWriteRpcTimeout(TimeUnit unit) {
1085    return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS);
1086  }
1087
1088  @Override
1089  @Deprecated
1090  public int getWriteRpcTimeout() {
1091    return writeRpcTimeoutMs;
1092  }
1093
1094  @Override
1095  @Deprecated
1096  public void setWriteRpcTimeout(int writeRpcTimeout) {
1097    this.writeRpcTimeoutMs = writeRpcTimeout;
1098  }
1099
1100  @Override
1101  public long getOperationTimeout(TimeUnit unit) {
1102    return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS);
1103  }
1104
1105  @Override
1106  @Deprecated
1107  public int getOperationTimeout() {
1108    return operationTimeoutMs;
1109  }
1110
1111  @Override
1112  @Deprecated
1113  public void setOperationTimeout(int operationTimeout) {
1114    this.operationTimeoutMs = operationTimeout;
1115  }
1116
1117  @Override
1118  public String toString() {
1119    return tableName + ";" + connection;
1120  }
1121
1122  @Override
1123  public <R extends Message> Map<byte[], R> batchCoprocessorService(
1124      Descriptors.MethodDescriptor methodDescriptor, Message request,
1125      byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1126    final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1127        Bytes.BYTES_COMPARATOR));
1128    batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1129        new Callback<R>() {
1130      @Override
1131      public void update(byte[] region, byte[] row, R result) {
1132        if (region != null) {
1133          results.put(region, result);
1134        }
1135      }
1136    });
1137    return results;
1138  }
1139
1140  @Override
1141  public <R extends Message> void batchCoprocessorService(
1142      final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1143      byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1144      throws ServiceException, Throwable {
1145
1146    if (startKey == null) {
1147      startKey = HConstants.EMPTY_START_ROW;
1148    }
1149    if (endKey == null) {
1150      endKey = HConstants.EMPTY_END_ROW;
1151    }
1152    // get regions covered by the row range
1153    Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1154        getKeysAndRegionsInRange(startKey, endKey, true);
1155    List<byte[]> keys = keysAndRegions.getFirst();
1156    List<HRegionLocation> regions = keysAndRegions.getSecond();
1157
1158    // check if we have any calls to make
1159    if (keys.isEmpty()) {
1160      LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1161          ", end=" + Bytes.toStringBinary(endKey));
1162      return;
1163    }
1164
1165    List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size());
1166    final Map<byte[], RegionCoprocessorServiceExec> execsByRow = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1167    for (int i = 0; i < keys.size(); i++) {
1168      final byte[] rowKey = keys.get(i);
1169      final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1170      RegionCoprocessorServiceExec exec =
1171          new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1172      execs.add(exec);
1173      execsByRow.put(rowKey, exec);
1174    }
1175
1176    // tracking for any possible deserialization errors on success callback
1177    // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1178    final List<Throwable> callbackErrorExceptions = new ArrayList<>();
1179    final List<Row> callbackErrorActions = new ArrayList<>();
1180    final List<String> callbackErrorServers = new ArrayList<>();
1181    Object[] results = new Object[execs.size()];
1182
1183    AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
1184        RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1185        RpcControllerFactory.instantiate(configuration));
1186
1187    Callback<ClientProtos.CoprocessorServiceResult> resultsCallback
1188    = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
1189      if (LOG.isTraceEnabled()) {
1190        LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1191            ": region=" + Bytes.toStringBinary(region) +
1192            ", row=" + Bytes.toStringBinary(row) +
1193            ", value=" + serviceResult.getValue().getValue());
1194      }
1195      try {
1196        Message.Builder builder = responsePrototype.newBuilderForType();
1197        org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
1198            serviceResult.getValue().getValue().toByteArray());
1199        callback.update(region, row, (R) builder.build());
1200      } catch (IOException e) {
1201        LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1202            e);
1203        callbackErrorExceptions.add(e);
1204        callbackErrorActions.add(execsByRow.get(row));
1205        callbackErrorServers.add("null");
1206      }
1207    };
1208    AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task =
1209        AsyncProcessTask.newBuilder(resultsCallback)
1210            .setPool(pool)
1211            .setTableName(tableName)
1212            .setRowAccess(execs)
1213            .setResults(results)
1214            .setRpcTimeout(readRpcTimeoutMs)
1215            .setOperationTimeout(operationTimeoutMs)
1216            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
1217            .build();
1218    AsyncRequestFuture future = asyncProcess.submit(task);
1219    future.waitUntilDone();
1220
1221    if (future.hasError()) {
1222      throw future.getErrors();
1223    } else if (!callbackErrorExceptions.isEmpty()) {
1224      throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1225          callbackErrorServers);
1226    }
1227  }
1228
1229  @Override
1230  public RegionLocator getRegionLocator() {
1231    return this.locator;
1232  }
1233
1234  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
1235
1236    private final byte[] row;
1237    private final byte[] family;
1238    private byte[] qualifier;
1239    private TimeRange timeRange;
1240    private CompareOperator op;
1241    private byte[] value;
1242
1243    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
1244      this.row = Preconditions.checkNotNull(row, "row is null");
1245      this.family = Preconditions.checkNotNull(family, "family is null");
1246    }
1247
1248    @Override
1249    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
1250      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
1251          " an empty byte array, or just do not call this method if you want a null qualifier");
1252      return this;
1253    }
1254
1255    @Override
1256    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
1257      this.timeRange = timeRange;
1258      return this;
1259    }
1260
1261    @Override
1262    public CheckAndMutateBuilder ifNotExists() {
1263      this.op = CompareOperator.EQUAL;
1264      this.value = null;
1265      return this;
1266    }
1267
1268    @Override
1269    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
1270      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
1271      this.value = Preconditions.checkNotNull(value, "value is null");
1272      return this;
1273    }
1274
1275    private void preCheck() {
1276      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
1277          " calling ifNotExists/ifEquals/ifMatches before executing the request");
1278    }
1279
1280    @Override
1281    public boolean thenPut(Put put) throws IOException {
1282      validatePut(put);
1283      preCheck();
1284      return doCheckAndPut(row, family, qualifier, op, value, null, timeRange, put);
1285    }
1286
1287    @Override
1288    public boolean thenDelete(Delete delete) throws IOException {
1289      preCheck();
1290      return doCheckAndDelete(row, family, qualifier, op, value, null, timeRange, delete);
1291    }
1292
1293    @Override
1294    public boolean thenMutate(RowMutations mutation) throws IOException {
1295      preCheck();
1296      return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange,
1297        mutation);
1298    }
1299  }
1300
1301  private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder {
1302
1303    private final byte[] row;
1304    private final Filter filter;
1305    private TimeRange timeRange;
1306
1307    CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
1308      this.row = Preconditions.checkNotNull(row, "row is null");
1309      this.filter = Preconditions.checkNotNull(filter, "filter is null");
1310    }
1311
1312    @Override
1313    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
1314      this.timeRange = timeRange;
1315      return this;
1316    }
1317
1318    @Override
1319    public boolean thenPut(Put put) throws IOException {
1320      validatePut(put);
1321      return doCheckAndPut(row, null, null, null, null, filter, timeRange, put);
1322    }
1323
1324    @Override
1325    public boolean thenDelete(Delete delete) throws IOException {
1326      return doCheckAndDelete(row, null, null, null, null, filter, timeRange, delete);
1327    }
1328
1329    @Override
1330    public boolean thenMutate(RowMutations mutation) throws IOException {
1331      return doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation);
1332    }
1333  }
1334}