001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
021// Internally, we use shaded protobuf. This below are part of our public API.
022// SEE ABOVE NOTE!
023
024import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
025
026import com.google.protobuf.Descriptors;
027import com.google.protobuf.Message;
028import com.google.protobuf.Service;
029import com.google.protobuf.ServiceException;
030import io.opentelemetry.api.trace.Span;
031import io.opentelemetry.api.trace.StatusCode;
032import io.opentelemetry.context.Context;
033import io.opentelemetry.context.Scope;
034import java.io.IOException;
035import java.io.InterruptedIOException;
036import java.util.ArrayList;
037import java.util.Collections;
038import java.util.List;
039import java.util.Map;
040import java.util.Optional;
041import java.util.TreeMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Future;
045import java.util.concurrent.SynchronousQueue;
046import java.util.concurrent.ThreadPoolExecutor;
047import java.util.concurrent.TimeUnit;
048import java.util.function.Supplier;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.CompareOperator;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.HTableDescriptor;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.coprocessor.Batch;
056import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
057import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
058import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
059import org.apache.hadoop.hbase.filter.Filter;
060import org.apache.hadoop.hbase.io.TimeRange;
061import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
062import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
063import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
064import org.apache.hadoop.hbase.trace.TraceUtil;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.Pair;
067import org.apache.hadoop.hbase.util.ReflectionUtils;
068import org.apache.hadoop.hbase.util.Threads;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.apache.yetus.audience.InterfaceStability;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
075import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
079import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
084
085/**
086 * An implementation of {@link Table}. Used to communicate with a single HBase table. Lightweight.
087 * Get as needed and just close when done. Instances of this class SHOULD NOT be constructed
088 * directly. Obtain an instance via {@link Connection}. See {@link ConnectionFactory} class comment
089 * for an example of how.
090 * <p>
091 * This class is thread safe since 2.0.0 if not invoking any of the setter methods. All setters are
092 * moved into {@link TableBuilder} and reserved here only for keeping backward compatibility, and
093 * TODO will be removed soon.
094 * <p>
095 * HTable is no longer a client API. Use {@link Table} instead. It is marked
096 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in <a href=
097 * "https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
098 * Interface Classification</a> There are no guarantees for backwards source / binary compatibility
099 * and methods or class can change or go away without deprecation.
100 * @see Table
101 * @see Admin
102 * @see Connection
103 * @see ConnectionFactory
104 */
105@InterfaceAudience.Private
106@InterfaceStability.Stable
107public class HTable implements Table {
108  private static final Logger LOG = LoggerFactory.getLogger(HTable.class);
109  private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
110  private final ClusterConnection connection;
111  private final TableName tableName;
112  private final Configuration configuration;
113  private final ConnectionConfiguration connConfiguration;
114  private boolean closed = false;
115  private final int scannerCaching;
116  private final long scannerMaxResultSize;
117  private final ExecutorService pool; // For Multi & Scan
118  private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc
119  private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX
120  private int readRpcTimeoutMs; // timeout for each read rpc request
121  private int writeRpcTimeoutMs; // timeout for each write rpc request
122
123  private final int scanReadRpcTimeout;
124  private final int scanTimeout;
125  private final boolean cleanupPoolOnClose; // shutdown the pool in close()
126  private final HRegionLocator locator;
127
128  /** The Async process for batch */
129  AsyncProcess multiAp;
130  private final RpcRetryingCallerFactory rpcCallerFactory;
131  private final RpcControllerFactory rpcControllerFactory;
132
133  // Marked Private @since 1.0
134  @InterfaceAudience.Private
135  public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
136    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
137    if (maxThreads == 0) {
138      maxThreads = 1; // is there a better default?
139    }
140    int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
141    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
142
143    // Using the "direct handoff" approach, new threads will only be created
144    // if it is necessary and will grow unbounded. This could be bad but in HCM
145    // we only create as many Runnables as there are region servers. It means
146    // it also scales when new region servers are added.
147    ThreadPoolExecutor pool =
148      new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, TimeUnit.SECONDS,
149        new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("htable-pool-%d")
150          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
151    pool.allowCoreThreadTimeOut(true);
152    return pool;
153  }
154
155  /**
156   * Creates an object to access a HBase table. Used by HBase internally. DO NOT USE. See
157   * {@link ConnectionFactory} class comment for how to get a {@link Table} instance (use
158   * {@link Table} instead of {@link HTable}).
159   * @param connection           Connection to be used.
160   * @param builder              The table builder
161   * @param rpcCallerFactory     The RPC caller factory
162   * @param rpcControllerFactory The RPC controller factory
163   * @param pool                 ExecutorService to be used.
164   */
165  @InterfaceAudience.Private
166  protected HTable(final ConnectionImplementation connection, final TableBuilderBase builder,
167    final RpcRetryingCallerFactory rpcCallerFactory,
168    final RpcControllerFactory rpcControllerFactory, final ExecutorService pool) {
169    this.connection = Preconditions.checkNotNull(connection, "connection is null");
170    this.configuration = connection.getConfiguration();
171    this.connConfiguration = connection.getConnectionConfiguration();
172    if (pool == null) {
173      this.pool = getDefaultExecutor(this.configuration);
174      this.cleanupPoolOnClose = true;
175    } else {
176      this.pool = pool;
177      this.cleanupPoolOnClose = false;
178    }
179    if (rpcCallerFactory == null) {
180      this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
181    } else {
182      this.rpcCallerFactory = rpcCallerFactory;
183    }
184
185    if (rpcControllerFactory == null) {
186      this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
187    } else {
188      this.rpcControllerFactory = rpcControllerFactory;
189    }
190
191    this.tableName = builder.tableName;
192    this.operationTimeoutMs = builder.operationTimeout;
193    this.rpcTimeoutMs = builder.rpcTimeout;
194    this.readRpcTimeoutMs = builder.readRpcTimeout;
195    this.writeRpcTimeoutMs = builder.writeRpcTimeout;
196    this.scanReadRpcTimeout = builder.scanReadRpcTimeout;
197    this.scanTimeout = builder.scanTimeout;
198    this.scannerCaching = connConfiguration.getScannerCaching();
199    this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
200
201    // puts need to track errors globally due to how the APIs currently work.
202    multiAp = this.connection.getAsyncProcess();
203    this.locator = new HRegionLocator(tableName, connection);
204  }
205
206  /** Returns maxKeyValueSize from configuration. */
207  public static int getMaxKeyValueSize(Configuration conf) {
208    return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1);
209  }
210
211  @Override
212  public Configuration getConfiguration() {
213    return configuration;
214  }
215
216  @Override
217  public TableName getName() {
218    return tableName;
219  }
220
221  /**
222   * <em>INTERNAL</em> Used by unit tests and tools to do low-level manipulations.
223   * @return A Connection instance.
224   */
225  protected Connection getConnection() {
226    return this.connection;
227  }
228
229  @Override
230  @Deprecated
231  public HTableDescriptor getTableDescriptor() throws IOException {
232    HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory,
233      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
234    if (htd != null) {
235      return new ImmutableHTableDescriptor(htd);
236    }
237    return null;
238  }
239
240  @Override
241  public TableDescriptor getDescriptor() throws IOException {
242    return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
243      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
244  }
245
246  /**
247   * Get the corresponding start keys and regions for an arbitrary range of keys.
248   * <p>
249   * @param startKey      Starting row in range, inclusive
250   * @param endKey        Ending row in range
251   * @param includeEndKey true if endRow is inclusive, false if exclusive
252   * @return A pair of list of start keys and list of HRegionLocations that contain the specified
253   *         range
254   * @throws IOException if a remote or network exception occurs
255   */
256  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
257    final byte[] endKey, final boolean includeEndKey) throws IOException {
258    return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
259  }
260
261  /**
262   * Get the corresponding start keys and regions for an arbitrary range of keys.
263   * <p>
264   * @param startKey      Starting row in range, inclusive
265   * @param endKey        Ending row in range
266   * @param includeEndKey true if endRow is inclusive, false if exclusive
267   * @param reload        true to reload information or false to use cached information
268   * @return A pair of list of start keys and list of HRegionLocations that contain the specified
269   *         range
270   * @throws IOException if a remote or network exception occurs
271   */
272  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
273    final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException {
274    final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
275    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
276      throw new IllegalArgumentException(
277        "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
278    }
279    List<byte[]> keysInRange = new ArrayList<>();
280    List<HRegionLocation> regionsInRange = new ArrayList<>();
281    byte[] currentKey = startKey;
282    do {
283      HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
284      keysInRange.add(currentKey);
285      regionsInRange.add(regionLocation);
286      currentKey = regionLocation.getRegionInfo().getEndKey();
287    } while (
288      !Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
289        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
290          || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))
291    );
292    return new Pair<>(keysInRange, regionsInRange);
293  }
294
295  /**
296   * The underlying {@link HTable} must not be closed. {@link Table#getScanner(Scan)} has other
297   * usage details.
298   */
299  @Override
300  public ResultScanner getScanner(Scan scan) throws IOException {
301    final Span span =
302      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(scan).build();
303    try (Scope ignored = span.makeCurrent()) {
304      if (scan.getCaching() <= 0) {
305        scan.setCaching(scannerCaching);
306      }
307      if (scan.getMaxResultSize() <= 0) {
308        scan.setMaxResultSize(scannerMaxResultSize);
309      }
310      if (scan.getMvccReadPoint() > 0) {
311        // it is not supposed to be set by user, clear
312        scan.resetMvccReadPoint();
313      }
314      final boolean async = scan.isAsyncPrefetch() != null
315        ? scan.isAsyncPrefetch()
316        : connConfiguration.isClientScannerAsyncPrefetch();
317      final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan();
318
319      if (scan.isReversed()) {
320        return new ReversedClientScanner(getConfiguration(), scan, getName(), connection,
321          rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
322          replicaTimeout);
323      } else {
324        if (async) {
325          return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection,
326            rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
327            replicaTimeout);
328        } else {
329          return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection,
330            rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
331            replicaTimeout);
332        }
333      }
334    }
335  }
336
337  /**
338   * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[])} has other
339   * usage details.
340   */
341  @Override
342  public ResultScanner getScanner(byte[] family) throws IOException {
343    Scan scan = new Scan();
344    scan.addFamily(family);
345    return getScanner(scan);
346  }
347
348  /**
349   * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[], byte[])} has
350   * other usage details.
351   */
352  @Override
353  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
354    Scan scan = new Scan();
355    scan.addColumn(family, qualifier);
356    return getScanner(scan);
357  }
358
359  @Override
360  public Result get(final Get get) throws IOException {
361    final Supplier<Span> supplier =
362      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(get);
363    return TraceUtil.trace(() -> get(get, get.isCheckExistenceOnly()), supplier);
364  }
365
366  private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
367    // if we are changing settings to the get, clone it.
368    if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
369      get = ReflectionUtils.newInstance(get.getClass(), get);
370      get.setCheckExistenceOnly(checkExistenceOnly);
371      if (get.getConsistency() == null) {
372        get.setConsistency(DEFAULT_CONSISTENCY);
373      }
374    }
375
376    if (get.getConsistency() == Consistency.STRONG) {
377      final Get configuredGet = get;
378      ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection,
379        getName(), get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) {
380        @Override
381        protected Result rpcCall() throws Exception {
382          ClientProtos.GetRequest request = RequestConverter
383            .buildGetRequest(getLocation().getRegionInfo().getRegionName(), configuredGet);
384          ClientProtos.GetResponse response = doGet(request);
385          return response == null
386            ? null
387            : ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
388        }
389      };
390      return rpcCallerFactory.<Result> newCaller(readRpcTimeoutMs).callWithRetries(callable,
391        this.operationTimeoutMs);
392    }
393
394    // Call that takes into account the replica
395    RpcRetryingCallerWithReadReplicas callable =
396      new RpcRetryingCallerWithReadReplicas(rpcControllerFactory, tableName, this.connection, get,
397        pool, connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs,
398        connConfiguration.getPrimaryCallTimeoutMicroSecond());
399    return callable.call(operationTimeoutMs);
400  }
401
402  @Override
403  public Result[] get(List<Get> gets) throws IOException {
404    final Supplier<Span> supplier =
405      new TableOperationSpanBuilder(connection).setTableName(tableName)
406        .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(gets);
407    return TraceUtil.trace(() -> {
408      if (gets.size() == 1) {
409        return new Result[] { get(gets.get(0)) };
410      }
411      try {
412        Object[] r1 = new Object[gets.size()];
413        batch((List<? extends Row>) gets, r1, readRpcTimeoutMs);
414        // Translate.
415        Result[] results = new Result[r1.length];
416        int i = 0;
417        for (Object obj : r1) {
418          // Batch ensures if there is a failure we get an exception instead
419          results[i++] = (Result) obj;
420        }
421        return results;
422      } catch (InterruptedException e) {
423        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
424      }
425    }, supplier);
426  }
427
428  @Override
429  public void batch(final List<? extends Row> actions, final Object[] results)
430    throws InterruptedException, IOException {
431    int rpcTimeout = writeRpcTimeoutMs;
432    boolean hasRead = false;
433    boolean hasWrite = false;
434    for (Row action : actions) {
435      if (action instanceof Mutation) {
436        hasWrite = true;
437      } else {
438        hasRead = true;
439      }
440      if (hasRead && hasWrite) {
441        break;
442      }
443    }
444    if (hasRead && !hasWrite) {
445      rpcTimeout = readRpcTimeoutMs;
446    }
447    try {
448      batch(actions, results, rpcTimeout);
449    } catch (InterruptedException e) {
450      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
451    }
452  }
453
454  public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
455    throws InterruptedException, IOException {
456    AsyncProcessTask task =
457      AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(actions)
458        .setResults(results).setRpcTimeout(rpcTimeout).setOperationTimeout(operationTimeoutMs)
459        .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
460    final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName)
461      .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(actions)
462      .build();
463    try (Scope ignored = span.makeCurrent()) {
464      AsyncRequestFuture ars = multiAp.submit(task);
465      ars.waitUntilDone();
466      if (ars.hasError()) {
467        TraceUtil.setError(span, ars.getErrors());
468        throw ars.getErrors();
469      }
470      span.setStatus(StatusCode.OK);
471    } finally {
472      span.end();
473    }
474  }
475
476  @Override
477  public <R> void batchCallback(final List<? extends Row> actions, final Object[] results,
478    final Batch.Callback<R> callback) throws IOException, InterruptedException {
479    doBatchWithCallback(actions, results, callback, connection, pool, tableName);
480  }
481
482  public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
483    Batch.Callback<R> callback, ClusterConnection connection, ExecutorService pool,
484    TableName tableName) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
485    int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
486    int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
487      connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
488        HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
489    AsyncProcessTask<R> task =
490      AsyncProcessTask.newBuilder(callback).setPool(pool).setTableName(tableName)
491        .setRowAccess(actions).setResults(results).setOperationTimeout(operationTimeout)
492        .setRpcTimeout(writeTimeout).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
493    final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName)
494      .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(actions)
495      .build();
496    try (Scope ignored = span.makeCurrent()) {
497      AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
498      ars.waitUntilDone();
499      if (ars.hasError()) {
500        TraceUtil.setError(span, ars.getErrors());
501        throw ars.getErrors();
502      }
503    } finally {
504      span.end();
505    }
506  }
507
508  @Override
509  public void delete(final Delete delete) throws IOException {
510    final Supplier<Span> supplier =
511      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(delete);
512    TraceUtil.trace(() -> {
513      ClientServiceCallable<Void> callable =
514        new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(),
515          this.rpcControllerFactory.newController(), delete.getPriority()) {
516          @Override
517          protected Void rpcCall() throws Exception {
518            MutateRequest request = RequestConverter
519              .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete);
520            doMutate(request);
521            return null;
522          }
523        };
524      rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
525        this.operationTimeoutMs);
526    }, supplier);
527  }
528
529  @Override
530  public void delete(final List<Delete> deletes) throws IOException {
531    Object[] results = new Object[deletes.size()];
532    try {
533      batch(deletes, results, writeRpcTimeoutMs);
534    } catch (InterruptedException e) {
535      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
536    } finally {
537      // TODO: to be consistent with batch put(), do not modify input list
538      // mutate list so that it is empty for complete success, or contains only failed records
539      // results are returned in the same order as the requests in list walk the list backwards,
540      // so we can remove from list without impacting the indexes of earlier members
541      for (int i = results.length - 1; i >= 0; i--) {
542        // if result is not null, it succeeded
543        if (results[i] instanceof Result) {
544          deletes.remove(i);
545        }
546      }
547    }
548  }
549
550  @Override
551  public void put(final Put put) throws IOException {
552    final Supplier<Span> supplier =
553      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(put);
554    TraceUtil.trace(() -> {
555      validatePut(put);
556      ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(this.connection,
557        getName(), put.getRow(), this.rpcControllerFactory.newController(), put.getPriority()) {
558        @Override
559        protected Void rpcCall() throws Exception {
560          MutateRequest request =
561            RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put);
562          doMutate(request);
563          return null;
564        }
565      };
566      rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
567        this.operationTimeoutMs);
568    }, supplier);
569  }
570
571  @Override
572  public void put(final List<Put> puts) throws IOException {
573    for (Put put : puts) {
574      validatePut(put);
575    }
576    Object[] results = new Object[puts.size()];
577    try {
578      batch(puts, results, writeRpcTimeoutMs);
579    } catch (InterruptedException e) {
580      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
581    }
582  }
583
584  @Override
585  public Result mutateRow(final RowMutations rm) throws IOException {
586    final Supplier<Span> supplier =
587      new TableOperationSpanBuilder(connection).setTableName(tableName)
588        .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(rm);
589    return TraceUtil.trace(() -> {
590      long nonceGroup = getNonceGroup();
591      long nonce = getNonce();
592      CancellableRegionServerCallable<MultiResponse> callable =
593        new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
594          rpcControllerFactory.newController(), writeRpcTimeoutMs,
595          new RetryingTimeTracker().start(), rm.getMaxPriority()) {
596          @Override
597          protected MultiResponse rpcCall() throws Exception {
598            MultiRequest request = RequestConverter.buildMultiRequest(
599              getLocation().getRegionInfo().getRegionName(), rm, nonceGroup, nonce);
600            ClientProtos.MultiResponse response = doMulti(request);
601            ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
602            if (res.hasException()) {
603              Throwable ex = ProtobufUtil.toException(res.getException());
604              if (ex instanceof IOException) {
605                throw (IOException) ex;
606              }
607              throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()),
608                ex);
609            }
610            return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
611          }
612        };
613      Object[] results = new Object[rm.getMutations().size()];
614      AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName)
615        .setRowAccess(rm.getMutations()).setCallable(callable).setRpcTimeout(writeRpcTimeoutMs)
616        .setOperationTimeout(operationTimeoutMs)
617        .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build();
618      AsyncRequestFuture ars = multiAp.submit(task);
619      ars.waitUntilDone();
620      if (ars.hasError()) {
621        throw ars.getErrors();
622      }
623      return (Result) results[0];
624    }, supplier);
625  }
626
627  private long getNonceGroup() {
628    return ((ClusterConnection) getConnection()).getNonceGenerator().getNonceGroup();
629  }
630
631  private long getNonce() {
632    return ((ClusterConnection) getConnection()).getNonceGenerator().newNonce();
633  }
634
635  @Override
636  public Result append(final Append append) throws IOException {
637    final Supplier<Span> supplier =
638      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(append);
639    return TraceUtil.trace(() -> {
640      checkHasFamilies(append);
641      NoncedRegionServerCallable<Result> callable =
642        new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(),
643          this.rpcControllerFactory.newController(), append.getPriority()) {
644          @Override
645          protected Result rpcCall() throws Exception {
646            MutateRequest request =
647              RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
648                append, super.getNonceGroup(), super.getNonce());
649            MutateResponse response = doMutate(request);
650            if (!response.hasResult()) {
651              return null;
652            }
653            return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
654          }
655        };
656      return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
657        this.operationTimeoutMs);
658    }, supplier);
659  }
660
661  @Override
662  public Result increment(final Increment increment) throws IOException {
663    final Supplier<Span> supplier =
664      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(increment);
665    return TraceUtil.trace(() -> {
666      checkHasFamilies(increment);
667      NoncedRegionServerCallable<Result> callable =
668        new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(),
669          this.rpcControllerFactory.newController(), increment.getPriority()) {
670          @Override
671          protected Result rpcCall() throws Exception {
672            MutateRequest request =
673              RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
674                increment, super.getNonceGroup(), super.getNonce());
675            MutateResponse response = doMutate(request);
676            // Should this check for null like append does?
677            return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
678          }
679        };
680      return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable,
681        this.operationTimeoutMs);
682    }, supplier);
683  }
684
685  @Override
686  public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier,
687    final long amount) throws IOException {
688    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
689  }
690
691  @Override
692  public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier,
693    final long amount, final Durability durability) throws IOException {
694    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
695      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.INCREMENT);
696    return TraceUtil.trace(() -> {
697      NullPointerException npe = null;
698      if (row == null) {
699        npe = new NullPointerException("row is null");
700      } else if (family == null) {
701        npe = new NullPointerException("family is null");
702      }
703      if (npe != null) {
704        throw new IOException("Invalid arguments to incrementColumnValue", npe);
705      }
706
707      NoncedRegionServerCallable<Long> callable =
708        new NoncedRegionServerCallable<Long>(this.connection, getName(), row,
709          this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
710          @Override
711          protected Long rpcCall() throws Exception {
712            MutateRequest request = RequestConverter.buildIncrementRequest(
713              getLocation().getRegionInfo().getRegionName(), row, family, qualifier, amount,
714              durability, super.getNonceGroup(), super.getNonce());
715            MutateResponse response = doMutate(request);
716            Result result =
717              ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
718            return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
719          }
720        };
721      return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
722        this.operationTimeoutMs);
723    }, supplier);
724  }
725
726  @Override
727  @Deprecated
728  public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
729    final byte[] value, final Put put) throws IOException {
730    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
731      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
732      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
733        HBaseSemanticAttributes.Operation.PUT);
734    return TraceUtil.trace(
735      () -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put)
736        .isSuccess(),
737      supplier);
738  }
739
740  @Override
741  @Deprecated
742  public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
743    final CompareOp compareOp, final byte[] value, final Put put) throws IOException {
744    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
745      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
746      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
747        HBaseSemanticAttributes.Operation.PUT);
748    return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier,
749      toCompareOperator(compareOp), value, null, null, put).isSuccess(), supplier);
750  }
751
752  @Override
753  @Deprecated
754  public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
755    final CompareOperator op, final byte[] value, final Put put) throws IOException {
756    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
757      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
758      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
759        HBaseSemanticAttributes.Operation.PUT);
760    return TraceUtil.trace(
761      () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(),
762      supplier);
763  }
764
765  @Override
766  @Deprecated
767  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
768    final byte[] value, final Delete delete) throws IOException {
769    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
770      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
771      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
772        HBaseSemanticAttributes.Operation.DELETE);
773    return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL,
774      value, null, null, delete).isSuccess(), supplier);
775  }
776
777  @Override
778  @Deprecated
779  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
780    final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
781    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
782      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
783      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
784        HBaseSemanticAttributes.Operation.DELETE);
785    return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier,
786      toCompareOperator(compareOp), value, null, null, delete).isSuccess(), supplier);
787  }
788
789  @Override
790  @Deprecated
791  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
792    final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
793    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
794      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
795      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
796        HBaseSemanticAttributes.Operation.DELETE);
797    return TraceUtil.trace(
798      () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(),
799      supplier);
800  }
801
802  @Override
803  @Deprecated
804  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
805    return new CheckAndMutateBuilderImpl(row, family);
806  }
807
808  @Override
809  @Deprecated
810  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
811    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
812  }
813
814  private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
815    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
816    final TimeRange timeRange, final RowMutations rm) throws IOException {
817    long nonceGroup = getNonceGroup();
818    long nonce = getNonce();
819    CancellableRegionServerCallable<MultiResponse> callable =
820      new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
821        rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
822        rm.getMaxPriority()) {
823        @Override
824        protected MultiResponse rpcCall() throws Exception {
825          MultiRequest request =
826            RequestConverter.buildMultiRequest(getLocation().getRegionInfo().getRegionName(), row,
827              family, qualifier, op, value, filter, timeRange, rm, nonceGroup, nonce);
828          ClientProtos.MultiResponse response = doMulti(request);
829          ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
830          if (res.hasException()) {
831            Throwable ex = ProtobufUtil.toException(res.getException());
832            if (ex instanceof IOException) {
833              throw (IOException) ex;
834            }
835            throw new IOException(
836              "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
837          }
838          return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
839        }
840      };
841
842    /**
843     * Currently, we use one array to store 'processed' flag which is returned by server. It is
844     * excessive to send such a large array, but that is required by the framework right now
845     */
846    Object[] results = new Object[rm.getMutations().size()];
847    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName)
848      .setRowAccess(rm.getMutations()).setResults(results).setCallable(callable)
849      // TODO any better timeout?
850      .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
851      .setOperationTimeout(operationTimeoutMs).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
852      .build();
853    AsyncRequestFuture ars = multiAp.submit(task);
854    ars.waitUntilDone();
855    if (ars.hasError()) {
856      throw ars.getErrors();
857    }
858
859    return (CheckAndMutateResult) results[0];
860  }
861
862  @Override
863  @Deprecated
864  public boolean checkAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
865    final CompareOp compareOp, final byte[] value, final RowMutations rm) throws IOException {
866    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
867      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
868      .setContainerOperations(rm);
869    return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier,
870      toCompareOperator(compareOp), value, null, null, rm).isSuccess(), supplier);
871  }
872
873  @Override
874  @Deprecated
875  public boolean checkAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
876    final CompareOperator op, final byte[] value, final RowMutations rm) throws IOException {
877    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
878      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
879      .setContainerOperations(rm);
880    return TraceUtil.trace(
881      () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(),
882      supplier);
883  }
884
885  @Override
886  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
887    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
888      .setTableName(tableName).setOperation(checkAndMutate).setContainerOperations(checkAndMutate);
889    return TraceUtil.trace(() -> {
890      Row action = checkAndMutate.getAction();
891      if (
892        action instanceof Put || action instanceof Delete || action instanceof Increment
893          || action instanceof Append
894      ) {
895        if (action instanceof Put) {
896          validatePut((Put) action);
897        }
898        return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
899          checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
900          checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action);
901      } else {
902        return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
903          checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
904          checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action);
905      }
906    }, supplier);
907  }
908
909  private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
910    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
911    final TimeRange timeRange, final Mutation mutation) throws IOException {
912    long nonceGroup = getNonceGroup();
913    long nonce = getNonce();
914    ClientServiceCallable<CheckAndMutateResult> callable =
915      new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
916        this.rpcControllerFactory.newController(), mutation.getPriority()) {
917        @Override
918        protected CheckAndMutateResult rpcCall() throws Exception {
919          MutateRequest request =
920            RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row,
921              family, qualifier, op, value, filter, timeRange, mutation, nonceGroup, nonce);
922          MutateResponse response = doMutate(request);
923          if (response.hasResult()) {
924            return new CheckAndMutateResult(response.getProcessed(),
925              ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()));
926          }
927          return new CheckAndMutateResult(response.getProcessed(), null);
928        }
929      };
930    return rpcCallerFactory.<CheckAndMutateResult> newCaller(this.writeRpcTimeoutMs)
931      .callWithRetries(callable, this.operationTimeoutMs);
932  }
933
934  @Override
935  public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
936    throws IOException {
937    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
938      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.BATCH)
939      .setContainerOperations(checkAndMutates);
940    return TraceUtil.trace(() -> {
941      if (checkAndMutates.isEmpty()) {
942        return Collections.emptyList();
943      }
944      if (checkAndMutates.size() == 1) {
945        return Collections.singletonList(checkAndMutate(checkAndMutates.get(0)));
946      }
947
948      Object[] results = new Object[checkAndMutates.size()];
949      try {
950        batch(checkAndMutates, results, writeRpcTimeoutMs);
951      } catch (InterruptedException e) {
952        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
953      }
954
955      // translate.
956      List<CheckAndMutateResult> ret = new ArrayList<>(results.length);
957      for (Object r : results) {
958        // Batch ensures if there is a failure we get an exception instead
959        ret.add((CheckAndMutateResult) r);
960      }
961      return ret;
962    }, supplier);
963  }
964
965  private CompareOperator toCompareOperator(CompareOp compareOp) {
966    switch (compareOp) {
967      case LESS:
968        return CompareOperator.LESS;
969
970      case LESS_OR_EQUAL:
971        return CompareOperator.LESS_OR_EQUAL;
972
973      case EQUAL:
974        return CompareOperator.EQUAL;
975
976      case NOT_EQUAL:
977        return CompareOperator.NOT_EQUAL;
978
979      case GREATER_OR_EQUAL:
980        return CompareOperator.GREATER_OR_EQUAL;
981
982      case GREATER:
983        return CompareOperator.GREATER;
984
985      case NO_OP:
986        return CompareOperator.NO_OP;
987
988      default:
989        throw new AssertionError();
990    }
991  }
992
993  @Override
994  public boolean exists(final Get get) throws IOException {
995    final Supplier<Span> supplier =
996      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(get);
997    return TraceUtil.trace(() -> {
998      Result r = get(get, true);
999      assert r.getExists() != null;
1000      return r.getExists();
1001    }, supplier);
1002  }
1003
1004  @Override
1005  public boolean[] exists(List<Get> gets) throws IOException {
1006    final Supplier<Span> supplier =
1007      new TableOperationSpanBuilder(connection).setTableName(tableName)
1008        .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(gets);
1009    return TraceUtil.trace(() -> {
1010      if (gets.isEmpty()) {
1011        return new boolean[] {};
1012      }
1013      if (gets.size() == 1) {
1014        return new boolean[] { exists(gets.get(0)) };
1015      }
1016
1017      ArrayList<Get> exists = new ArrayList<>(gets.size());
1018      for (Get g : gets) {
1019        Get ge = new Get(g);
1020        ge.setCheckExistenceOnly(true);
1021        exists.add(ge);
1022      }
1023
1024      Object[] r1 = new Object[exists.size()];
1025      try {
1026        batch(exists, r1, readRpcTimeoutMs);
1027      } catch (InterruptedException e) {
1028        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1029      }
1030
1031      // translate.
1032      boolean[] results = new boolean[r1.length];
1033      int i = 0;
1034      for (Object o : r1) {
1035        // batch ensures if there is a failure we get an exception instead
1036        results[i++] = ((Result) o).getExists();
1037      }
1038
1039      return results;
1040    }, supplier);
1041  }
1042
1043  /**
1044   * Process a mixed batch of Get, Put and Delete actions. All actions for a RegionServer are
1045   * forwarded in one RPC call. Queries are executed in parallel.
1046   * @param list    The collection of actions.
1047   * @param results An empty array, same size as list. If an exception is thrown, you can test here
1048   *                for partial results, and to determine which actions processed successfully.
1049   * @throws IOException if there are problems talking to META. Per-item exceptions are stored in
1050   *                     the results array.
1051   */
1052  public <R> void processBatchCallback(final List<? extends Row> list, final Object[] results,
1053    final Batch.Callback<R> callback) throws IOException, InterruptedException {
1054    this.batchCallback(list, results, callback);
1055  }
1056
1057  @Override
1058  public void close() throws IOException {
1059    final Supplier<Span> supplier =
1060      new TableSpanBuilder(connection).setName("HTable.close").setTableName(tableName);
1061    TraceUtil.trace(() -> {
1062      if (this.closed) {
1063        return;
1064      }
1065      if (cleanupPoolOnClose) {
1066        this.pool.shutdown();
1067        try {
1068          boolean terminated = false;
1069          do {
1070            // wait until the pool has terminated
1071            terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1072          } while (!terminated);
1073        } catch (InterruptedException e) {
1074          this.pool.shutdownNow();
1075          LOG.warn("waitForTermination interrupted");
1076        }
1077      }
1078      this.closed = true;
1079    }, supplier);
1080  }
1081
1082  // validate for well-formedness
1083  private void validatePut(final Put put) throws IllegalArgumentException {
1084    ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize());
1085  }
1086
1087  /**
1088   * The pool is used for mutli requests for this HTable
1089   * @return the pool used for mutli
1090   */
1091  ExecutorService getPool() {
1092    return this.pool;
1093  }
1094
1095  /**
1096   * Explicitly clears the region cache to fetch the latest value from META. This is a power user
1097   * function: avoid unless you know the ramifications.
1098   */
1099  public void clearRegionCache() {
1100    this.connection.clearRegionLocationCache();
1101  }
1102
1103  @Override
1104  public CoprocessorRpcChannel coprocessorService(byte[] row) {
1105    return new RegionCoprocessorRpcChannel(connection, tableName, row);
1106  }
1107
1108  @Override
1109  public <T extends Service, R> Map<byte[], R> coprocessorService(final Class<T> service,
1110    byte[] startKey, byte[] endKey, final Batch.Call<T, R> callable)
1111    throws ServiceException, Throwable {
1112    final Map<byte[], R> results =
1113      Collections.synchronizedMap(new TreeMap<>(Bytes.BYTES_COMPARATOR));
1114    coprocessorService(service, startKey, endKey, callable, (region, row, value) -> {
1115      if (region != null) {
1116        results.put(region, value);
1117      }
1118    });
1119    return results;
1120  }
1121
1122  @Override
1123  public <T extends Service, R> void coprocessorService(final Class<T> service, byte[] startKey,
1124    byte[] endKey, final Batch.Call<T, R> callable, final Batch.Callback<R> callback)
1125    throws ServiceException, Throwable {
1126    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1127      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC);
1128    TraceUtil.trace(() -> {
1129      final Context context = Context.current();
1130      final ExecutorService wrappedPool = context.wrap(pool);
1131      // get regions covered by the row range
1132      List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1133      Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1134      for (final byte[] r : keys) {
1135        final RegionCoprocessorRpcChannel channel =
1136          new RegionCoprocessorRpcChannel(connection, tableName, r);
1137        Future<R> future = wrappedPool.submit(() -> {
1138          T instance =
1139            org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel);
1140          R result = callable.call(instance);
1141          byte[] region = channel.getLastRegion();
1142          if (callback != null) {
1143            callback.update(region, r, result);
1144          }
1145          return result;
1146        });
1147        futures.put(r, future);
1148      }
1149      for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) {
1150        try {
1151          e.getValue().get();
1152        } catch (ExecutionException ee) {
1153          LOG.warn("Error calling coprocessor service {} for row {}", service.getName(),
1154            Bytes.toStringBinary(e.getKey()), ee);
1155          throw ee.getCause();
1156        } catch (InterruptedException ie) {
1157          throw new InterruptedIOException("Interrupted calling coprocessor service "
1158            + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1159        }
1160      }
1161    }, supplier);
1162  }
1163
1164  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) throws IOException {
1165    if (start == null) {
1166      start = HConstants.EMPTY_START_ROW;
1167    }
1168    if (end == null) {
1169      end = HConstants.EMPTY_END_ROW;
1170    }
1171    return getKeysAndRegionsInRange(start, end, true).getFirst();
1172  }
1173
1174  @Override
1175  public long getRpcTimeout(TimeUnit unit) {
1176    return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS);
1177  }
1178
1179  @Override
1180  @Deprecated
1181  public int getRpcTimeout() {
1182    return rpcTimeoutMs;
1183  }
1184
1185  @Override
1186  @Deprecated
1187  public void setRpcTimeout(int rpcTimeout) {
1188    setReadRpcTimeout(rpcTimeout);
1189    setWriteRpcTimeout(rpcTimeout);
1190  }
1191
1192  @Override
1193  public long getReadRpcTimeout(TimeUnit unit) {
1194    return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS);
1195  }
1196
1197  @Override
1198  @Deprecated
1199  public int getReadRpcTimeout() {
1200    return readRpcTimeoutMs;
1201  }
1202
1203  @Override
1204  @Deprecated
1205  public void setReadRpcTimeout(int readRpcTimeout) {
1206    this.readRpcTimeoutMs = readRpcTimeout;
1207  }
1208
1209  @Override
1210  public long getWriteRpcTimeout(TimeUnit unit) {
1211    return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS);
1212  }
1213
1214  @Override
1215  @Deprecated
1216  public int getWriteRpcTimeout() {
1217    return writeRpcTimeoutMs;
1218  }
1219
1220  @Override
1221  @Deprecated
1222  public void setWriteRpcTimeout(int writeRpcTimeout) {
1223    this.writeRpcTimeoutMs = writeRpcTimeout;
1224  }
1225
1226  @Override
1227  public long getOperationTimeout(TimeUnit unit) {
1228    return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS);
1229  }
1230
1231  @Override
1232  @Deprecated
1233  public int getOperationTimeout() {
1234    return operationTimeoutMs;
1235  }
1236
1237  @Override
1238  @Deprecated
1239  public void setOperationTimeout(int operationTimeout) {
1240    this.operationTimeoutMs = operationTimeout;
1241  }
1242
1243  @Override
1244  public String toString() {
1245    return tableName + ";" + connection;
1246  }
1247
1248  @Override
1249  public <R extends Message> Map<byte[], R> batchCoprocessorService(
1250    Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
1251    R responsePrototype) throws ServiceException, Throwable {
1252    final Map<byte[], R> results =
1253      Collections.synchronizedMap(new TreeMap<>(Bytes.BYTES_COMPARATOR));
1254    batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1255      (region, row, result) -> {
1256        if (region != null) {
1257          results.put(region, result);
1258        }
1259      });
1260    return results;
1261  }
1262
1263  @Override
1264  public <R extends Message> void batchCoprocessorService(
1265    final Descriptors.MethodDescriptor methodDescriptor, final Message request, byte[] startKey,
1266    byte[] endKey, final R responsePrototype, final Batch.Callback<R> callback)
1267    throws ServiceException, Throwable {
1268    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1269      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC);
1270    TraceUtil.trace(() -> {
1271      final Context context = Context.current();
1272      final byte[] sanitizedStartKey =
1273        Optional.ofNullable(startKey).orElse(HConstants.EMPTY_START_ROW);
1274      final byte[] sanitizedEndKey = Optional.ofNullable(endKey).orElse(HConstants.EMPTY_END_ROW);
1275
1276      // get regions covered by the row range
1277      Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1278        getKeysAndRegionsInRange(sanitizedStartKey, sanitizedEndKey, true);
1279      List<byte[]> keys = keysAndRegions.getFirst();
1280      List<HRegionLocation> regions = keysAndRegions.getSecond();
1281
1282      // check if we have any calls to make
1283      if (keys.isEmpty()) {
1284        LOG.info("No regions were selected by key range start={}, end={}",
1285          Bytes.toStringBinary(sanitizedStartKey), Bytes.toStringBinary(sanitizedEndKey));
1286        return;
1287      }
1288
1289      List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size());
1290      final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1291        new TreeMap<>(Bytes.BYTES_COMPARATOR);
1292      for (int i = 0; i < keys.size(); i++) {
1293        final byte[] rowKey = keys.get(i);
1294        final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1295        RegionCoprocessorServiceExec exec =
1296          new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1297        execs.add(exec);
1298        execsByRow.put(rowKey, exec);
1299      }
1300
1301      // tracking for any possible deserialization errors on success callback
1302      // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1303      final List<Throwable> callbackErrorExceptions = new ArrayList<>();
1304      final List<Row> callbackErrorActions = new ArrayList<>();
1305      final List<String> callbackErrorServers = new ArrayList<>();
1306      Object[] results = new Object[execs.size()];
1307
1308      AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
1309        RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1310        RpcControllerFactory.instantiate(configuration));
1311
1312      Batch.Callback<ClientProtos.CoprocessorServiceResult> resultsCallback =
1313        (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
1314          if (LOG.isTraceEnabled()) {
1315            LOG.trace("Received result for endpoint {}: region={}, row={}, value={}",
1316              methodDescriptor.getFullName(), Bytes.toStringBinary(region),
1317              Bytes.toStringBinary(row), serviceResult.getValue().getValue());
1318          }
1319          try {
1320            Message.Builder builder = responsePrototype.newBuilderForType();
1321            org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
1322              serviceResult.getValue().getValue().toByteArray());
1323            callback.update(region, row, (R) builder.build());
1324          } catch (IOException e) {
1325            LOG.error("Unexpected response type from endpoint {}", methodDescriptor.getFullName(),
1326              e);
1327            callbackErrorExceptions.add(e);
1328            callbackErrorActions.add(execsByRow.get(row));
1329            callbackErrorServers.add("null");
1330          }
1331        };
1332      AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task =
1333        AsyncProcessTask.newBuilder(resultsCallback).setPool(context.wrap(pool))
1334          .setTableName(tableName).setRowAccess(execs).setResults(results)
1335          .setRpcTimeout(readRpcTimeoutMs).setOperationTimeout(operationTimeoutMs)
1336          .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build();
1337      AsyncRequestFuture future = asyncProcess.submit(task);
1338      future.waitUntilDone();
1339
1340      if (future.hasError()) {
1341        throw future.getErrors();
1342      } else if (!callbackErrorExceptions.isEmpty()) {
1343        throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions,
1344          callbackErrorActions, callbackErrorServers);
1345      }
1346    }, supplier);
1347  }
1348
1349  @Override
1350  public RegionLocator getRegionLocator() {
1351    return this.locator;
1352  }
1353
1354  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
1355
1356    private final byte[] row;
1357    private final byte[] family;
1358    private byte[] qualifier;
1359    private TimeRange timeRange;
1360    private CompareOperator op;
1361    private byte[] value;
1362
1363    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
1364      this.row = Preconditions.checkNotNull(row, "row is null");
1365      this.family = Preconditions.checkNotNull(family, "family is null");
1366    }
1367
1368    @Override
1369    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
1370      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
1371        + " an empty byte array, or just do not call this method if you want a null qualifier");
1372      return this;
1373    }
1374
1375    @Override
1376    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
1377      this.timeRange = timeRange;
1378      return this;
1379    }
1380
1381    @Override
1382    public CheckAndMutateBuilder ifNotExists() {
1383      this.op = CompareOperator.EQUAL;
1384      this.value = null;
1385      return this;
1386    }
1387
1388    @Override
1389    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
1390      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
1391      this.value = Preconditions.checkNotNull(value, "value is null");
1392      return this;
1393    }
1394
1395    private void preCheck() {
1396      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by"
1397        + " calling ifNotExists/ifEquals/ifMatches before executing the request");
1398    }
1399
1400    @Override
1401    public boolean thenPut(Put put) throws IOException {
1402      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1403        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1404      return TraceUtil.trace(() -> {
1405        validatePut(put);
1406        preCheck();
1407        return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put)
1408          .isSuccess();
1409      }, supplier);
1410    }
1411
1412    @Override
1413    public boolean thenDelete(Delete delete) throws IOException {
1414      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1415        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1416      return TraceUtil.trace(() -> {
1417        preCheck();
1418        return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete)
1419          .isSuccess();
1420      }, supplier);
1421    }
1422
1423    @Override
1424    public boolean thenMutate(RowMutations mutation) throws IOException {
1425      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1426        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1427      return TraceUtil.trace(() -> {
1428        preCheck();
1429        return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, mutation)
1430          .isSuccess();
1431      }, supplier);
1432    }
1433  }
1434
1435  private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder {
1436
1437    private final byte[] row;
1438    private final Filter filter;
1439    private TimeRange timeRange;
1440
1441    CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
1442      this.row = Preconditions.checkNotNull(row, "row is null");
1443      this.filter = Preconditions.checkNotNull(filter, "filter is null");
1444    }
1445
1446    @Override
1447    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
1448      this.timeRange = timeRange;
1449      return this;
1450    }
1451
1452    @Override
1453    public boolean thenPut(Put put) throws IOException {
1454      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1455        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1456      return TraceUtil.trace(() -> {
1457        validatePut(put);
1458        return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put).isSuccess();
1459      }, supplier);
1460    }
1461
1462    @Override
1463    public boolean thenDelete(Delete delete) throws IOException {
1464      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1465        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1466      return TraceUtil.trace(
1467        () -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess(),
1468        supplier);
1469    }
1470
1471    @Override
1472    public boolean thenMutate(RowMutations mutation) throws IOException {
1473      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1474        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1475      return TraceUtil
1476        .trace(() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation)
1477          .isSuccess(), supplier);
1478    }
1479  }
1480}