001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations;
026import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
027import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
028import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
029
030import io.opentelemetry.api.trace.Span;
031import io.opentelemetry.api.trace.StatusCode;
032import io.opentelemetry.context.Scope;
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.List;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.function.Function;
042import java.util.function.Supplier;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.CompareOperator;
045import org.apache.hadoop.hbase.DoNotRetryIOException;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
050import org.apache.hadoop.hbase.client.ConnectionUtils.Converter;
051import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
052import org.apache.hadoop.hbase.filter.Filter;
053import org.apache.hadoop.hbase.io.TimeRange;
054import org.apache.hadoop.hbase.ipc.HBaseRpcController;
055import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
056import org.apache.hadoop.hbase.trace.TraceUtil;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.ReflectionUtils;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
064import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
065import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
066import org.apache.hbase.thirdparty.io.netty.util.Timer;
067
068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
070import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
078
079/**
080 * The implementation of RawAsyncTable.
081 * <p/>
082 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
083 * be finished inside the rpc framework thread, which means that the callbacks registered to the
084 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
085 * this class should not try to do time consuming tasks in the callbacks.
086 * @since 2.0.0
087 * @see AsyncTableImpl
088 */
089@InterfaceAudience.Private
090class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
091
092  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
093
094  private final AsyncConnectionImpl conn;
095
096  private final Timer retryTimer;
097
098  private final TableName tableName;
099
100  private final int defaultScannerCaching;
101
102  private final long defaultScannerMaxResultSize;
103
104  private final long rpcTimeoutNs;
105
106  private final long readRpcTimeoutNs;
107
108  private final long writeRpcTimeoutNs;
109
110  private final long operationTimeoutNs;
111
112  private final long scanTimeoutNs;
113
114  private final long pauseNs;
115
116  private final long pauseNsForServerOverloaded;
117
118  private final int maxAttempts;
119
120  private final int startLogErrorsCnt;
121
122  RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
123    this.conn = conn;
124    this.retryTimer = retryTimer;
125    this.tableName = builder.tableName;
126    this.rpcTimeoutNs = builder.rpcTimeoutNs;
127    this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
128    this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs;
129    this.operationTimeoutNs = builder.operationTimeoutNs;
130    this.scanTimeoutNs = builder.scanTimeoutNs;
131    this.pauseNs = builder.pauseNs;
132    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
133      LOG.warn(
134        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
135          + " the normal pause value {} ms, use the greater one instead",
136        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
137        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
138      this.pauseNsForServerOverloaded = builder.pauseNs;
139    } else {
140      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
141    }
142    this.maxAttempts = builder.maxAttempts;
143    this.startLogErrorsCnt = builder.startLogErrorsCnt;
144    this.defaultScannerCaching = tableName.isSystemTable()
145      ? conn.connConf.getMetaScannerCaching()
146      : conn.connConf.getScannerCaching();
147    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
148  }
149
150  @Override
151  public TableName getName() {
152    return tableName;
153  }
154
155  @Override
156  public Configuration getConfiguration() {
157    return conn.getConfiguration();
158  }
159
160  @Override
161  public CompletableFuture<TableDescriptor> getDescriptor() {
162    return conn.getAdmin().getDescriptor(tableName);
163  }
164
165  @Override
166  public AsyncTableRegionLocator getRegionLocator() {
167    return conn.getRegionLocator(tableName);
168  }
169
170  private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
171    HRegionLocation loc, ClientService.Interface stub, REQ req,
172    Converter<MutateRequest, byte[], REQ> reqConvert,
173    Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
174    return ConnectionUtils.call(controller, loc, stub, req, reqConvert,
175      (s, c, r, done) -> s.mutate(c, r, done), respConverter);
176  }
177
178  private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
179    HRegionLocation loc, ClientService.Interface stub, REQ req,
180    Converter<MutateRequest, byte[], REQ> reqConvert) {
181    return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
182      return null;
183    });
184  }
185
186  private static Result toResult(HBaseRpcController controller, MutateResponse resp)
187    throws IOException {
188    if (!resp.hasResult()) {
189      return null;
190    }
191    return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
192  }
193
194  @FunctionalInterface
195  private interface NoncedConverter<D, I, S> {
196    D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
197  }
198
199  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
200    HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
201    NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
202    Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
203    return mutate(controller, loc, stub, req,
204      (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
205  }
206
207  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
208    return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
209      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
210      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
211      .pause(pauseNs, TimeUnit.NANOSECONDS)
212      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
213      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
214  }
215
216  private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T>
217    newCaller(R row, long rpcTimeoutNs) {
218    return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
219  }
220
221  private CompletableFuture<Result> get(Get get, int replicaId) {
222    return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
223      .action((controller, loc, stub) -> ConnectionUtils.<Get, GetRequest, GetResponse,
224        Result> call(controller, loc, stub, get, RequestConverter::buildGetRequest,
225          (s, c, req, done) -> s.get(c, req, done),
226          (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
227      .replicaId(replicaId).call();
228  }
229
230  private TableOperationSpanBuilder newTableOperationSpanBuilder() {
231    return new TableOperationSpanBuilder(conn).setTableName(tableName);
232  }
233
234  @Override
235  public CompletableFuture<Result> get(Get get) {
236    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(get);
237    return tracedFuture(
238      () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
239        RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
240        conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()),
241      supplier);
242  }
243
244  @Override
245  public CompletableFuture<Void> put(Put put) {
246    validatePut(put, conn.connConf.getMaxKeyValueSize());
247    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(put);
248    return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
249      .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
250        put, RequestConverter::buildMutateRequest))
251      .call(), supplier);
252  }
253
254  @Override
255  public CompletableFuture<Void> delete(Delete delete) {
256    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(delete);
257    return tracedFuture(() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
258      .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
259        stub, delete, RequestConverter::buildMutateRequest))
260      .call(), supplier);
261  }
262
263  @Override
264  public CompletableFuture<Result> append(Append append) {
265    checkHasFamilies(append);
266    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(append);
267    return tracedFuture(() -> {
268      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
269      long nonce = conn.getNonceGenerator().newNonce();
270      return this.<Result, Append> newCaller(append, rpcTimeoutNs)
271        .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce,
272          controller, loc, stub, append, RequestConverter::buildMutateRequest,
273          RawAsyncTableImpl::toResult))
274        .call();
275    }, supplier);
276  }
277
278  @Override
279  public CompletableFuture<Result> increment(Increment increment) {
280    checkHasFamilies(increment);
281    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(increment);
282    return tracedFuture(() -> {
283      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
284      long nonce = conn.getNonceGenerator().newNonce();
285      return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
286        .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
287          controller, loc, stub, increment, RequestConverter::buildMutateRequest,
288          RawAsyncTableImpl::toResult))
289        .call();
290    }, supplier);
291  }
292
293  private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
294
295    private final byte[] row;
296
297    private final byte[] family;
298
299    private byte[] qualifier;
300
301    private TimeRange timeRange;
302
303    private CompareOperator op;
304
305    private byte[] value;
306
307    public CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
308      this.row = Preconditions.checkNotNull(row, "row is null");
309      this.family = Preconditions.checkNotNull(family, "family is null");
310    }
311
312    @Override
313    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
314      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
315        + " an empty byte array, or just do not call this method if you want a null qualifier");
316      return this;
317    }
318
319    @Override
320    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
321      this.timeRange = timeRange;
322      return this;
323    }
324
325    @Override
326    public CheckAndMutateBuilder ifNotExists() {
327      this.op = CompareOperator.EQUAL;
328      this.value = null;
329      return this;
330    }
331
332    @Override
333    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
334      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
335      this.value = Preconditions.checkNotNull(value, "value is null");
336      return this;
337    }
338
339    private void preCheck() {
340      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by"
341        + " calling ifNotExists/ifEquals/ifMatches before executing the request");
342    }
343
344    @Override
345    public CompletableFuture<Boolean> thenPut(Put put) {
346      validatePut(put, conn.connConf.getMaxKeyValueSize());
347      preCheck();
348      final Supplier<Span> supplier = newTableOperationSpanBuilder()
349        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
350        .setContainerOperations(put);
351      return tracedFuture(
352        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
353          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
354            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
355              null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
356            (c, r) -> r.getProcessed()))
357          .call(),
358        supplier);
359    }
360
361    @Override
362    public CompletableFuture<Boolean> thenDelete(Delete delete) {
363      preCheck();
364      final Supplier<Span> supplier = newTableOperationSpanBuilder()
365        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
366        .setContainerOperations(delete);
367      return tracedFuture(
368        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
369          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
370            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
371              null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
372            (c, r) -> r.getProcessed()))
373          .call(),
374        supplier);
375    }
376
377    @Override
378    public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
379      preCheck();
380      validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
381      final Supplier<Span> supplier = newTableOperationSpanBuilder()
382        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
383        .setContainerOperations(mutations);
384      return tracedFuture(() -> RawAsyncTableImpl.this
385        .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
386        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
387          mutations,
388          (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
389            null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
390          CheckAndMutateResult::isSuccess))
391        .call(), supplier);
392    }
393  }
394
395  @Override
396  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
397    return new CheckAndMutateBuilderImpl(row, family);
398  }
399
400  private final class CheckAndMutateWithFilterBuilderImpl
401    implements CheckAndMutateWithFilterBuilder {
402
403    private final byte[] row;
404
405    private final Filter filter;
406
407    private TimeRange timeRange;
408
409    public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
410      this.row = Preconditions.checkNotNull(row, "row is null");
411      this.filter = Preconditions.checkNotNull(filter, "filter is null");
412    }
413
414    @Override
415    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
416      this.timeRange = timeRange;
417      return this;
418    }
419
420    @Override
421    public CompletableFuture<Boolean> thenPut(Put put) {
422      validatePut(put, conn.connConf.getMaxKeyValueSize());
423      final Supplier<Span> supplier = newTableOperationSpanBuilder()
424        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
425        .setContainerOperations(put);
426      return tracedFuture(
427        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
428          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
429            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
430              timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
431            (c, r) -> r.getProcessed()))
432          .call(),
433        supplier);
434    }
435
436    @Override
437    public CompletableFuture<Boolean> thenDelete(Delete delete) {
438      final Supplier<Span> supplier = newTableOperationSpanBuilder()
439        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
440        .setContainerOperations(delete);
441      return tracedFuture(
442        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
443          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
444            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
445              timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
446            (c, r) -> r.getProcessed()))
447          .call(),
448        supplier);
449    }
450
451    @Override
452    public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
453      validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
454      final Supplier<Span> supplier = newTableOperationSpanBuilder()
455        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
456        .setContainerOperations(mutations);
457      return tracedFuture(() -> RawAsyncTableImpl.this
458        .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
459        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
460          mutations,
461          (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter,
462            timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
463          CheckAndMutateResult::isSuccess))
464        .call(), supplier);
465    }
466  }
467
468  @Override
469  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
470    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
471  }
472
473  @Override
474  public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
475    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutate)
476      .setContainerOperations(checkAndMutate.getAction());
477    return tracedFuture(() -> {
478      if (
479        checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
480          || checkAndMutate.getAction() instanceof Increment
481          || checkAndMutate.getAction() instanceof Append
482      ) {
483        Mutation mutation = (Mutation) checkAndMutate.getAction();
484        if (mutation instanceof Put) {
485          validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
486        }
487        long nonceGroup = conn.getNonceGenerator().getNonceGroup();
488        long nonce = conn.getNonceGenerator().newNonce();
489        return RawAsyncTableImpl.this
490          .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(),
491            rpcTimeoutNs)
492          .action(
493            (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation,
494              (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
495                checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
496                checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
497                checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
498              (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
499          .call();
500      } else if (checkAndMutate.getAction() instanceof RowMutations) {
501        RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
502        validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
503        long nonceGroup = conn.getNonceGenerator().getNonceGroup();
504        long nonce = conn.getNonceGenerator().newNonce();
505        return RawAsyncTableImpl.this
506          .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(),
507            rpcTimeoutNs)
508          .action((controller, loc, stub) -> RawAsyncTableImpl.this.<CheckAndMutateResult,
509            CheckAndMutateResult> mutateRow(controller, loc, stub, rowMutations,
510              (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
511                checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
512                checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
513                checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
514              resp -> resp))
515          .call();
516      } else {
517        CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>();
518        future.completeExceptionally(new DoNotRetryIOException(
519          "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
520        return future;
521      }
522    }, supplier);
523  }
524
525  @Override
526  public List<CompletableFuture<CheckAndMutateResult>>
527    checkAndMutate(List<CheckAndMutate> checkAndMutates) {
528    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutates)
529      .setContainerOperations(checkAndMutates);
530    return tracedFutures(() -> batch(checkAndMutates, rpcTimeoutNs).stream()
531      .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), supplier);
532  }
533
534  // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
535  // so here I write a new method as I do not want to change the abstraction of call method.
536  @SuppressWarnings("unchecked")
537  private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
538    HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
539    Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) {
540    CompletableFuture<RESP> future = new CompletableFuture<>();
541    try {
542      byte[] regionName = loc.getRegion().getRegionName();
543      MultiRequest req = reqConvert.convert(regionName, mutation);
544      stub.multi(controller, req, new RpcCallback<MultiResponse>() {
545
546        @Override
547        public void run(MultiResponse resp) {
548          if (controller.failed()) {
549            future.completeExceptionally(controller.getFailed());
550          } else {
551            try {
552              org.apache.hadoop.hbase.client.MultiResponse multiResp =
553                ResponseConverter.getResults(req, resp, controller.cellScanner());
554              ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
555                loc.getServerName(), multiResp);
556              Throwable ex = multiResp.getException(regionName);
557              if (ex != null) {
558                future.completeExceptionally(ex instanceof IOException
559                  ? ex
560                  : new IOException(
561                    "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
562              } else {
563                future.complete(
564                  respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0)));
565              }
566            } catch (IOException e) {
567              future.completeExceptionally(e);
568            }
569          }
570        }
571      });
572    } catch (IOException e) {
573      future.completeExceptionally(e);
574    }
575    return future;
576  }
577
578  @Override
579  public CompletableFuture<Result> mutateRow(RowMutations mutations) {
580    validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
581    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
582    long nonce = conn.getNonceGenerator().newNonce();
583    final Supplier<Span> supplier =
584      newTableOperationSpanBuilder().setOperation(mutations).setContainerOperations(mutations);
585    return tracedFuture(
586      () -> this
587        .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
588        .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub,
589          mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
590          resp -> resp))
591        .call(),
592      supplier);
593  }
594
595  private Scan setDefaultScanConfig(Scan scan) {
596    // always create a new scan object as we may reset the start row later.
597    Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
598    if (newScan.getCaching() <= 0) {
599      newScan.setCaching(defaultScannerCaching);
600    }
601    if (newScan.getMaxResultSize() <= 0) {
602      newScan.setMaxResultSize(defaultScannerMaxResultSize);
603    }
604    return newScan;
605  }
606
607  @Override
608  public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
609    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
610      pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs,
611      startLogErrorsCnt).start();
612  }
613
614  private long resultSize2CacheSize(long maxResultSize) {
615    // * 2 if possible
616    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
617  }
618
619  @Override
620  public AsyncTableResultScanner getScanner(Scan scan) {
621    final long maxCacheSize = resultSize2CacheSize(
622      scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize);
623    final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan);
624    final AsyncTableResultScanner scanner =
625      new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize);
626    scan(scan, scanner);
627    return scanner;
628  }
629
630  @Override
631  public CompletableFuture<List<Result>> scanAll(Scan scan) {
632    CompletableFuture<List<Result>> future = new CompletableFuture<>();
633    List<Result> scanResults = new ArrayList<>();
634    scan(scan, new AdvancedScanResultConsumer() {
635
636      @Override
637      public void onNext(Result[] results, ScanController controller) {
638        scanResults.addAll(Arrays.asList(results));
639      }
640
641      @Override
642      public void onError(Throwable error) {
643        future.completeExceptionally(error);
644      }
645
646      @Override
647      public void onComplete() {
648        future.complete(scanResults);
649      }
650    });
651    return future;
652  }
653
654  @Override
655  public List<CompletableFuture<Result>> get(List<Get> gets) {
656    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(gets)
657      .setContainerOperations(HBaseSemanticAttributes.Operation.GET);
658    return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
659  }
660
661  @Override
662  public List<CompletableFuture<Void>> put(List<Put> puts) {
663    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(puts)
664      .setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
665    return tracedFutures(() -> voidMutate(puts), supplier);
666  }
667
668  @Override
669  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
670    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(deletes)
671      .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
672    return tracedFutures(() -> voidMutate(deletes), supplier);
673  }
674
675  @Override
676  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
677    final Supplier<Span> supplier =
678      newTableOperationSpanBuilder().setOperation(actions).setContainerOperations(actions);
679    return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
680  }
681
682  private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
683    return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
684      .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
685  }
686
687  private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
688    for (Row action : actions) {
689      if (action instanceof Put) {
690        validatePut((Put) action, conn.connConf.getMaxKeyValueSize());
691      } else if (action instanceof CheckAndMutate) {
692        CheckAndMutate checkAndMutate = (CheckAndMutate) action;
693        if (checkAndMutate.getAction() instanceof Put) {
694          validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
695        } else if (checkAndMutate.getAction() instanceof RowMutations) {
696          validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(),
697            conn.connConf.getMaxKeyValueSize());
698        }
699      } else if (action instanceof RowMutations) {
700        validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize());
701      }
702    }
703    return conn.callerFactory.batch().table(tableName).actions(actions)
704      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
705      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
706      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
707      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
708  }
709
710  @Override
711  public long getRpcTimeout(TimeUnit unit) {
712    return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS);
713  }
714
715  @Override
716  public long getReadRpcTimeout(TimeUnit unit) {
717    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
718  }
719
720  @Override
721  public long getWriteRpcTimeout(TimeUnit unit) {
722    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
723  }
724
725  @Override
726  public long getOperationTimeout(TimeUnit unit) {
727    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
728  }
729
730  @Override
731  public long getScanTimeout(TimeUnit unit) {
732    return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
733  }
734
735  private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
736    ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
737    RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
738      region, row, rpcTimeoutNs, operationTimeoutNs);
739    final Span span = Span.current();
740    S stub = stubMaker.apply(channel);
741    CompletableFuture<R> future = new CompletableFuture<>();
742    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
743    callable.call(stub, controller, resp -> {
744      try (Scope ignored = span.makeCurrent()) {
745        if (controller.failed()) {
746          final Throwable failure = controller.getFailed();
747          future.completeExceptionally(failure);
748          TraceUtil.setError(span, failure);
749        } else {
750          future.complete(resp);
751          span.setStatus(StatusCode.OK);
752        }
753      } finally {
754        span.end();
755      }
756    });
757    return future;
758  }
759
760  @Override
761  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
762    ServiceCaller<S, R> callable, byte[] row) {
763    return coprocessorService(stubMaker, callable, null, row);
764  }
765
766  private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) {
767    if (isEmptyStopRow(endKey)) {
768      if (isEmptyStopRow(region.getEndKey())) {
769        return true;
770      }
771      return false;
772    } else {
773      if (isEmptyStopRow(region.getEndKey())) {
774        return true;
775      }
776      int c = Bytes.compareTo(endKey, region.getEndKey());
777      // 1. if the region contains endKey
778      // 2. endKey is equal to the region's endKey and we do not want to include endKey.
779      return c < 0 || (c == 0 && !endKeyInclusive);
780    }
781  }
782
783  private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
784    ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
785    byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
786    AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
787    final Span span = Span.current();
788    if (error != null) {
789      callback.onError(error);
790      TraceUtil.setError(span, error);
791      span.end();
792      return;
793    }
794    unfinishedRequest.incrementAndGet();
795    RegionInfo region = loc.getRegion();
796    if (locateFinished(region, endKey, endKeyInclusive)) {
797      locateFinished.set(true);
798    } else {
799      addListener(conn.getLocator().getRegionLocation(tableName, region.getEndKey(),
800        RegionLocateType.CURRENT, operationTimeoutNs), (l, e) -> {
801          try (Scope ignored = span.makeCurrent()) {
802            onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
803              locateFinished, unfinishedRequest, l, e);
804          }
805        });
806    }
807    addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
808      try (Scope ignored = span.makeCurrent()) {
809        if (e != null) {
810          callback.onRegionError(region, e);
811        } else {
812          callback.onRegionComplete(region, r);
813        }
814        if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
815          callback.onComplete();
816        }
817      }
818    });
819  }
820
821  private final class CoprocessorServiceBuilderImpl<S, R>
822    implements CoprocessorServiceBuilder<S, R> {
823
824    private final Function<RpcChannel, S> stubMaker;
825
826    private final ServiceCaller<S, R> callable;
827
828    private final CoprocessorCallback<R> callback;
829
830    private byte[] startKey = HConstants.EMPTY_START_ROW;
831
832    private boolean startKeyInclusive;
833
834    private byte[] endKey = HConstants.EMPTY_END_ROW;
835
836    private boolean endKeyInclusive;
837
838    public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
839      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
840      this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
841      this.callable = Preconditions.checkNotNull(callable, "callable is null");
842      this.callback = Preconditions.checkNotNull(callback, "callback is null");
843    }
844
845    @Override
846    public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
847      this.startKey = Preconditions.checkNotNull(startKey,
848        "startKey is null. Consider using"
849          + " an empty byte array, or just do not call this method if you want to start selection"
850          + " from the first region");
851      this.startKeyInclusive = inclusive;
852      return this;
853    }
854
855    @Override
856    public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
857      this.endKey = Preconditions.checkNotNull(endKey,
858        "endKey is null. Consider using"
859          + " an empty byte array, or just do not call this method if you want to continue"
860          + " selection to the last region");
861      this.endKeyInclusive = inclusive;
862      return this;
863    }
864
865    @Override
866    public void execute() {
867      final Span span = newTableOperationSpanBuilder()
868        .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC).build();
869      try (Scope ignored = span.makeCurrent()) {
870        final RegionLocateType regionLocateType =
871          startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER;
872        final CompletableFuture<HRegionLocation> future = conn.getLocator()
873          .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs);
874        addListener(future, (loc, error) -> {
875          try (Scope ignored1 = span.makeCurrent()) {
876            onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
877              endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error);
878          }
879        });
880      }
881    }
882  }
883
884  @Override
885  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
886    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
887    CoprocessorCallback<R> callback) {
888    return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
889  }
890}