001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations;
026import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
027import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
028import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
029
030import io.opentelemetry.api.trace.Span;
031import io.opentelemetry.api.trace.StatusCode;
032import io.opentelemetry.context.Scope;
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.List;
037import java.util.Map;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042import java.util.function.Function;
043import java.util.function.Supplier;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.CompareOperator;
046import org.apache.hadoop.hbase.DoNotRetryIOException;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
051import org.apache.hadoop.hbase.client.ConnectionUtils.Converter;
052import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
053import org.apache.hadoop.hbase.filter.Filter;
054import org.apache.hadoop.hbase.io.TimeRange;
055import org.apache.hadoop.hbase.ipc.HBaseRpcController;
056import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
057import org.apache.hadoop.hbase.trace.TraceUtil;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.ReflectionUtils;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
065import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
066import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
067import org.apache.hbase.thirdparty.io.netty.util.Timer;
068
069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
070import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
071import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
079
080/**
081 * The implementation of RawAsyncTable.
082 * <p/>
083 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
084 * be finished inside the rpc framework thread, which means that the callbacks registered to the
085 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
086 * this class should not try to do time consuming tasks in the callbacks.
087 * @since 2.0.0
088 * @see AsyncTableImpl
089 */
090@InterfaceAudience.Private
091class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
092
093  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
094
095  private final AsyncConnectionImpl conn;
096
097  private final Timer retryTimer;
098
099  private final TableName tableName;
100
101  private final int defaultScannerCaching;
102
103  private final long defaultScannerMaxResultSize;
104
105  private final long rpcTimeoutNs;
106
107  private final long readRpcTimeoutNs;
108
109  private final long writeRpcTimeoutNs;
110
111  private final long operationTimeoutNs;
112
113  private final long scanTimeoutNs;
114
115  private final long pauseNs;
116
117  private final long pauseNsForServerOverloaded;
118
119  private final int maxAttempts;
120
121  private final int startLogErrorsCnt;
122
123  private final Map<String, byte[]> requestAttributes;
124
125  RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
126    this.conn = conn;
127    this.retryTimer = retryTimer;
128    this.tableName = builder.tableName;
129    this.rpcTimeoutNs = builder.rpcTimeoutNs;
130    this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
131    this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs;
132    this.operationTimeoutNs = builder.operationTimeoutNs;
133    this.scanTimeoutNs = builder.scanTimeoutNs;
134    this.pauseNs = builder.pauseNs;
135    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
136      LOG.warn(
137        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
138          + " the normal pause value {} ms, use the greater one instead",
139        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
140        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
141      this.pauseNsForServerOverloaded = builder.pauseNs;
142    } else {
143      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
144    }
145    this.maxAttempts = builder.maxAttempts;
146    this.startLogErrorsCnt = builder.startLogErrorsCnt;
147    this.defaultScannerCaching = tableName.isSystemTable()
148      ? conn.connConf.getMetaScannerCaching()
149      : conn.connConf.getScannerCaching();
150    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
151    this.requestAttributes = builder.requestAttributes;
152  }
153
154  @Override
155  public TableName getName() {
156    return tableName;
157  }
158
159  @Override
160  public Configuration getConfiguration() {
161    return conn.getConfiguration();
162  }
163
164  @Override
165  public CompletableFuture<TableDescriptor> getDescriptor() {
166    return conn.getAdmin().getDescriptor(tableName);
167  }
168
169  @Override
170  public AsyncTableRegionLocator getRegionLocator() {
171    return conn.getRegionLocator(tableName);
172  }
173
174  private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
175    HRegionLocation loc, ClientService.Interface stub, REQ req,
176    Converter<MutateRequest, byte[], REQ> reqConvert,
177    Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
178    return ConnectionUtils.call(controller, loc, stub, req, reqConvert,
179      (s, c, r, done) -> s.mutate(c, r, done), respConverter);
180  }
181
182  private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
183    HRegionLocation loc, ClientService.Interface stub, REQ req,
184    Converter<MutateRequest, byte[], REQ> reqConvert) {
185    return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
186      return null;
187    });
188  }
189
190  private static Result toResult(HBaseRpcController controller, MutateResponse resp)
191    throws IOException {
192    if (!resp.hasResult()) {
193      return null;
194    }
195    return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
196  }
197
198  @FunctionalInterface
199  private interface NoncedConverter<D, I, S> {
200    D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
201  }
202
203  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
204    HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
205    NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
206    Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
207    return mutate(controller, loc, stub, req,
208      (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
209  }
210
211  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
212    return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
213      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
214      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
215      .pause(pauseNs, TimeUnit.NANOSECONDS)
216      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
217      .maxAttempts(maxAttempts).setRequestAttributes(requestAttributes)
218      .startLogErrorsCnt(startLogErrorsCnt).setRequestAttributes(requestAttributes);
219  }
220
221  private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T>
222    newCaller(R row, long rpcTimeoutNs) {
223    return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
224  }
225
226  private CompletableFuture<Result> get(Get get, int replicaId) {
227    return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
228      .action((controller, loc, stub) -> ConnectionUtils.<Get, GetRequest, GetResponse,
229        Result> call(controller, loc, stub, get, RequestConverter::buildGetRequest,
230          (s, c, req, done) -> s.get(c, req, done),
231          (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
232      .replicaId(replicaId).call();
233  }
234
235  private TableOperationSpanBuilder newTableOperationSpanBuilder() {
236    return new TableOperationSpanBuilder(conn).setTableName(tableName);
237  }
238
239  @Override
240  public CompletableFuture<Result> get(Get get) {
241    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(get);
242    return tracedFuture(
243      () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
244        RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
245        conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()),
246      supplier);
247  }
248
249  @Override
250  public CompletableFuture<Void> put(Put put) {
251    validatePut(put, conn.connConf.getMaxKeyValueSize());
252    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(put);
253    return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
254      .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
255        put, RequestConverter::buildMutateRequest))
256      .call(), supplier);
257  }
258
259  @Override
260  public CompletableFuture<Void> delete(Delete delete) {
261    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(delete);
262    return tracedFuture(() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
263      .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
264        stub, delete, RequestConverter::buildMutateRequest))
265      .call(), supplier);
266  }
267
268  @Override
269  public CompletableFuture<Result> append(Append append) {
270    checkHasFamilies(append);
271    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(append);
272    return tracedFuture(() -> {
273      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
274      long nonce = conn.getNonceGenerator().newNonce();
275      return this.<Result, Append> newCaller(append, rpcTimeoutNs)
276        .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce,
277          controller, loc, stub, append, RequestConverter::buildMutateRequest,
278          RawAsyncTableImpl::toResult))
279        .call();
280    }, supplier);
281  }
282
283  @Override
284  public CompletableFuture<Result> increment(Increment increment) {
285    checkHasFamilies(increment);
286    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(increment);
287    return tracedFuture(() -> {
288      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
289      long nonce = conn.getNonceGenerator().newNonce();
290      return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
291        .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
292          controller, loc, stub, increment, RequestConverter::buildMutateRequest,
293          RawAsyncTableImpl::toResult))
294        .call();
295    }, supplier);
296  }
297
298  private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
299
300    private final byte[] row;
301
302    private final byte[] family;
303
304    private byte[] qualifier;
305
306    private TimeRange timeRange;
307
308    private CompareOperator op;
309
310    private byte[] value;
311
312    public CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
313      this.row = Preconditions.checkNotNull(row, "row is null");
314      this.family = Preconditions.checkNotNull(family, "family is null");
315    }
316
317    @Override
318    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
319      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
320        + " an empty byte array, or just do not call this method if you want a null qualifier");
321      return this;
322    }
323
324    @Override
325    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
326      this.timeRange = timeRange;
327      return this;
328    }
329
330    @Override
331    public CheckAndMutateBuilder ifNotExists() {
332      this.op = CompareOperator.EQUAL;
333      this.value = null;
334      return this;
335    }
336
337    @Override
338    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
339      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
340      this.value = Preconditions.checkNotNull(value, "value is null");
341      return this;
342    }
343
344    private void preCheck() {
345      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by"
346        + " calling ifNotExists/ifEquals/ifMatches before executing the request");
347    }
348
349    @Override
350    public CompletableFuture<Boolean> thenPut(Put put) {
351      validatePut(put, conn.connConf.getMaxKeyValueSize());
352      preCheck();
353      final Supplier<Span> supplier = newTableOperationSpanBuilder()
354        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
355        .setContainerOperations(put);
356      return tracedFuture(
357        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
358          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
359            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
360              null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
361            (c, r) -> r.getProcessed()))
362          .call(),
363        supplier);
364    }
365
366    @Override
367    public CompletableFuture<Boolean> thenDelete(Delete delete) {
368      preCheck();
369      final Supplier<Span> supplier = newTableOperationSpanBuilder()
370        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
371        .setContainerOperations(delete);
372      return tracedFuture(
373        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
374          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
375            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
376              null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
377            (c, r) -> r.getProcessed()))
378          .call(),
379        supplier);
380    }
381
382    @Override
383    public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
384      preCheck();
385      validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
386      final Supplier<Span> supplier = newTableOperationSpanBuilder()
387        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
388        .setContainerOperations(mutations);
389      return tracedFuture(() -> RawAsyncTableImpl.this
390        .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
391        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
392          mutations,
393          (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
394            null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
395          CheckAndMutateResult::isSuccess))
396        .call(), supplier);
397    }
398  }
399
400  @Override
401  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
402    return new CheckAndMutateBuilderImpl(row, family);
403  }
404
405  private final class CheckAndMutateWithFilterBuilderImpl
406    implements CheckAndMutateWithFilterBuilder {
407
408    private final byte[] row;
409
410    private final Filter filter;
411
412    private TimeRange timeRange;
413
414    public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
415      this.row = Preconditions.checkNotNull(row, "row is null");
416      this.filter = Preconditions.checkNotNull(filter, "filter is null");
417    }
418
419    @Override
420    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
421      this.timeRange = timeRange;
422      return this;
423    }
424
425    @Override
426    public CompletableFuture<Boolean> thenPut(Put put) {
427      validatePut(put, conn.connConf.getMaxKeyValueSize());
428      final Supplier<Span> supplier = newTableOperationSpanBuilder()
429        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
430        .setContainerOperations(put);
431      return tracedFuture(
432        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
433          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
434            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
435              timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
436            (c, r) -> r.getProcessed()))
437          .call(),
438        supplier);
439    }
440
441    @Override
442    public CompletableFuture<Boolean> thenDelete(Delete delete) {
443      final Supplier<Span> supplier = newTableOperationSpanBuilder()
444        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
445        .setContainerOperations(delete);
446      return tracedFuture(
447        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
448          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
449            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
450              timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
451            (c, r) -> r.getProcessed()))
452          .call(),
453        supplier);
454    }
455
456    @Override
457    public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
458      validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
459      final Supplier<Span> supplier = newTableOperationSpanBuilder()
460        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
461        .setContainerOperations(mutations);
462      return tracedFuture(() -> RawAsyncTableImpl.this
463        .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
464        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
465          mutations,
466          (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter,
467            timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
468          CheckAndMutateResult::isSuccess))
469        .call(), supplier);
470    }
471  }
472
473  @Override
474  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
475    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
476  }
477
478  @Override
479  public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
480    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutate)
481      .setContainerOperations(checkAndMutate.getAction());
482    return tracedFuture(() -> {
483      if (
484        checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
485          || checkAndMutate.getAction() instanceof Increment
486          || checkAndMutate.getAction() instanceof Append
487      ) {
488        Mutation mutation = (Mutation) checkAndMutate.getAction();
489        if (mutation instanceof Put) {
490          validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
491        }
492        long nonceGroup = conn.getNonceGenerator().getNonceGroup();
493        long nonce = conn.getNonceGenerator().newNonce();
494        return RawAsyncTableImpl.this
495          .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(),
496            rpcTimeoutNs)
497          .action(
498            (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation,
499              (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
500                checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
501                checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
502                checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
503              (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
504          .call();
505      } else if (checkAndMutate.getAction() instanceof RowMutations) {
506        RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
507        validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
508        long nonceGroup = conn.getNonceGenerator().getNonceGroup();
509        long nonce = conn.getNonceGenerator().newNonce();
510        return RawAsyncTableImpl.this
511          .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(),
512            rpcTimeoutNs)
513          .action((controller, loc, stub) -> RawAsyncTableImpl.this.<CheckAndMutateResult,
514            CheckAndMutateResult> mutateRow(controller, loc, stub, rowMutations,
515              (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
516                checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
517                checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
518                checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
519              resp -> resp))
520          .call();
521      } else {
522        CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>();
523        future.completeExceptionally(new DoNotRetryIOException(
524          "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
525        return future;
526      }
527    }, supplier);
528  }
529
530  @Override
531  public List<CompletableFuture<CheckAndMutateResult>>
532    checkAndMutate(List<CheckAndMutate> checkAndMutates) {
533    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutates)
534      .setContainerOperations(checkAndMutates);
535    return tracedFutures(() -> batch(checkAndMutates, rpcTimeoutNs).stream()
536      .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), supplier);
537  }
538
539  // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
540  // so here I write a new method as I do not want to change the abstraction of call method.
541  @SuppressWarnings("unchecked")
542  private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
543    HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
544    Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) {
545    CompletableFuture<RESP> future = new CompletableFuture<>();
546    try {
547      byte[] regionName = loc.getRegion().getRegionName();
548      MultiRequest req = reqConvert.convert(regionName, mutation);
549      stub.multi(controller, req, new RpcCallback<MultiResponse>() {
550
551        @Override
552        public void run(MultiResponse resp) {
553          if (controller.failed()) {
554            future.completeExceptionally(controller.getFailed());
555          } else {
556            try {
557              org.apache.hadoop.hbase.client.MultiResponse multiResp =
558                ResponseConverter.getResults(req, resp, controller.cellScanner());
559              ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
560                loc.getServerName(), multiResp);
561              Throwable ex = multiResp.getException(regionName);
562              if (ex != null) {
563                future.completeExceptionally(ex instanceof IOException
564                  ? ex
565                  : new IOException(
566                    "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
567              } else {
568                future.complete(
569                  respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0)));
570              }
571            } catch (IOException e) {
572              future.completeExceptionally(e);
573            }
574          }
575        }
576      });
577    } catch (IOException e) {
578      future.completeExceptionally(e);
579    }
580    return future;
581  }
582
583  @Override
584  public CompletableFuture<Result> mutateRow(RowMutations mutations) {
585    validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
586    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
587    long nonce = conn.getNonceGenerator().newNonce();
588    final Supplier<Span> supplier =
589      newTableOperationSpanBuilder().setOperation(mutations).setContainerOperations(mutations);
590    return tracedFuture(
591      () -> this
592        .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
593        .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub,
594          mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
595          resp -> resp))
596        .call(),
597      supplier);
598  }
599
600  private Scan setDefaultScanConfig(Scan scan) {
601    // always create a new scan object as we may reset the start row later.
602    Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
603    if (newScan.getCaching() <= 0) {
604      newScan.setCaching(defaultScannerCaching);
605    }
606    if (newScan.getMaxResultSize() <= 0) {
607      newScan.setMaxResultSize(defaultScannerMaxResultSize);
608    }
609    return newScan;
610  }
611
612  @Override
613  public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
614    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
615      pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs,
616      startLogErrorsCnt, requestAttributes).start();
617  }
618
619  private long resultSize2CacheSize(long maxResultSize) {
620    // * 2 if possible
621    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
622  }
623
624  @Override
625  public AsyncTableResultScanner getScanner(Scan scan) {
626    final long maxCacheSize = resultSize2CacheSize(
627      scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize);
628    final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan);
629    final AsyncTableResultScanner scanner =
630      new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize);
631    scan(scan, scanner);
632    return scanner;
633  }
634
635  @Override
636  public CompletableFuture<List<Result>> scanAll(Scan scan) {
637    CompletableFuture<List<Result>> future = new CompletableFuture<>();
638    List<Result> scanResults = new ArrayList<>();
639    scan(scan, new AdvancedScanResultConsumer() {
640
641      @Override
642      public void onNext(Result[] results, ScanController controller) {
643        scanResults.addAll(Arrays.asList(results));
644      }
645
646      @Override
647      public void onError(Throwable error) {
648        future.completeExceptionally(error);
649      }
650
651      @Override
652      public void onComplete() {
653        future.complete(scanResults);
654      }
655    });
656    return future;
657  }
658
659  @Override
660  public List<CompletableFuture<Result>> get(List<Get> gets) {
661    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(gets)
662      .setContainerOperations(HBaseSemanticAttributes.Operation.GET);
663    return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
664  }
665
666  @Override
667  public List<CompletableFuture<Void>> put(List<Put> puts) {
668    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(puts)
669      .setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
670    return tracedFutures(() -> voidMutate(puts), supplier);
671  }
672
673  @Override
674  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
675    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(deletes)
676      .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
677    return tracedFutures(() -> voidMutate(deletes), supplier);
678  }
679
680  @Override
681  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
682    final Supplier<Span> supplier =
683      newTableOperationSpanBuilder().setOperation(actions).setContainerOperations(actions);
684    return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
685  }
686
687  private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
688    return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
689      .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
690  }
691
692  private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
693    for (Row action : actions) {
694      if (action instanceof Put) {
695        validatePut((Put) action, conn.connConf.getMaxKeyValueSize());
696      } else if (action instanceof CheckAndMutate) {
697        CheckAndMutate checkAndMutate = (CheckAndMutate) action;
698        if (checkAndMutate.getAction() instanceof Put) {
699          validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
700        } else if (checkAndMutate.getAction() instanceof RowMutations) {
701          validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(),
702            conn.connConf.getMaxKeyValueSize());
703        }
704      } else if (action instanceof RowMutations) {
705        validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize());
706      }
707    }
708    return conn.callerFactory.batch().table(tableName).actions(actions)
709      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
710      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
711      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
712      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
713      .setRequestAttributes(requestAttributes).call();
714  }
715
716  @Override
717  public long getRpcTimeout(TimeUnit unit) {
718    return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS);
719  }
720
721  @Override
722  public long getReadRpcTimeout(TimeUnit unit) {
723    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
724  }
725
726  @Override
727  public long getWriteRpcTimeout(TimeUnit unit) {
728    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
729  }
730
731  @Override
732  public long getOperationTimeout(TimeUnit unit) {
733    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
734  }
735
736  @Override
737  public long getScanTimeout(TimeUnit unit) {
738    return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
739  }
740
741  @Override
742  public Map<String, byte[]> getRequestAttributes() {
743    return requestAttributes;
744  }
745
746  private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
747    ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
748    RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
749      region, row, rpcTimeoutNs, operationTimeoutNs);
750    final Span span = Span.current();
751    S stub = stubMaker.apply(channel);
752    CompletableFuture<R> future = new CompletableFuture<>();
753    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
754    callable.call(stub, controller, resp -> {
755      try (Scope ignored = span.makeCurrent()) {
756        if (controller.failed()) {
757          final Throwable failure = controller.getFailed();
758          future.completeExceptionally(failure);
759          TraceUtil.setError(span, failure);
760        } else {
761          future.complete(resp);
762          span.setStatus(StatusCode.OK);
763        }
764      } finally {
765        span.end();
766      }
767    });
768    return future;
769  }
770
771  @Override
772  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
773    ServiceCaller<S, R> callable, byte[] row) {
774    return coprocessorService(stubMaker, callable, null, row);
775  }
776
777  private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) {
778    if (isEmptyStopRow(endKey)) {
779      if (isEmptyStopRow(region.getEndKey())) {
780        return true;
781      }
782      return false;
783    } else {
784      if (isEmptyStopRow(region.getEndKey())) {
785        return true;
786      }
787      int c = Bytes.compareTo(endKey, region.getEndKey());
788      // 1. if the region contains endKey
789      // 2. endKey is equal to the region's endKey and we do not want to include endKey.
790      return c < 0 || (c == 0 && !endKeyInclusive);
791    }
792  }
793
794  private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
795    ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
796    byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
797    AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
798    final Span span = Span.current();
799    if (error != null) {
800      callback.onError(error);
801      TraceUtil.setError(span, error);
802      span.end();
803      return;
804    }
805    unfinishedRequest.incrementAndGet();
806    RegionInfo region = loc.getRegion();
807    if (locateFinished(region, endKey, endKeyInclusive)) {
808      locateFinished.set(true);
809    } else {
810      addListener(conn.getLocator().getRegionLocation(tableName, region.getEndKey(),
811        RegionLocateType.CURRENT, operationTimeoutNs), (l, e) -> {
812          try (Scope ignored = span.makeCurrent()) {
813            onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
814              locateFinished, unfinishedRequest, l, e);
815          }
816        });
817    }
818    addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
819      try (Scope ignored = span.makeCurrent()) {
820        if (e != null) {
821          callback.onRegionError(region, e);
822        } else {
823          callback.onRegionComplete(region, r);
824        }
825        if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
826          callback.onComplete();
827        }
828      }
829    });
830  }
831
832  private final class CoprocessorServiceBuilderImpl<S, R>
833    implements CoprocessorServiceBuilder<S, R> {
834
835    private final Function<RpcChannel, S> stubMaker;
836
837    private final ServiceCaller<S, R> callable;
838
839    private final CoprocessorCallback<R> callback;
840
841    private byte[] startKey = HConstants.EMPTY_START_ROW;
842
843    private boolean startKeyInclusive;
844
845    private byte[] endKey = HConstants.EMPTY_END_ROW;
846
847    private boolean endKeyInclusive;
848
849    public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
850      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
851      this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
852      this.callable = Preconditions.checkNotNull(callable, "callable is null");
853      this.callback = Preconditions.checkNotNull(callback, "callback is null");
854    }
855
856    @Override
857    public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
858      this.startKey = Preconditions.checkNotNull(startKey,
859        "startKey is null. Consider using"
860          + " an empty byte array, or just do not call this method if you want to start selection"
861          + " from the first region");
862      this.startKeyInclusive = inclusive;
863      return this;
864    }
865
866    @Override
867    public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
868      this.endKey = Preconditions.checkNotNull(endKey,
869        "endKey is null. Consider using"
870          + " an empty byte array, or just do not call this method if you want to continue"
871          + " selection to the last region");
872      this.endKeyInclusive = inclusive;
873      return this;
874    }
875
876    @Override
877    public void execute() {
878      final Span span = newTableOperationSpanBuilder()
879        .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC).build();
880      try (Scope ignored = span.makeCurrent()) {
881        final RegionLocateType regionLocateType =
882          startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER;
883        final CompletableFuture<HRegionLocation> future = conn.getLocator()
884          .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs);
885        addListener(future, (loc, error) -> {
886          try (Scope ignored1 = span.makeCurrent()) {
887            onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
888              endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error);
889          }
890        });
891      }
892    }
893  }
894
895  @Override
896  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
897    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
898    CoprocessorCallback<R> callback) {
899    return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
900  }
901}