001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations;
026import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
027import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
028import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
029
030import io.opentelemetry.api.trace.Span;
031import io.opentelemetry.api.trace.StatusCode;
032import io.opentelemetry.context.Scope;
033import java.io.IOException;
034import java.time.Duration;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.List;
038import java.util.Map;
039import java.util.concurrent.CompletableFuture;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicInteger;
043import java.util.function.Function;
044import java.util.function.Supplier;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.hbase.CompareOperator;
047import org.apache.hadoop.hbase.DoNotRetryIOException;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
052import org.apache.hadoop.hbase.client.ConnectionUtils.Converter;
053import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
054import org.apache.hadoop.hbase.filter.Filter;
055import org.apache.hadoop.hbase.io.TimeRange;
056import org.apache.hadoop.hbase.ipc.HBaseRpcController;
057import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
058import org.apache.hadoop.hbase.trace.TraceUtil;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.ReflectionUtils;
061import org.apache.yetus.audience.InterfaceAudience;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
066import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
067import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
068import org.apache.hbase.thirdparty.io.netty.util.Timer;
069
070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
071import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
072import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
080
081/**
082 * The implementation of RawAsyncTable.
083 * <p/>
084 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
085 * be finished inside the rpc framework thread, which means that the callbacks registered to the
086 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
087 * this class should not try to do time consuming tasks in the callbacks.
088 * @since 2.0.0
089 * @see AsyncTableImpl
090 */
091@InterfaceAudience.Private
092class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
093
094  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
095
096  private final AsyncConnectionImpl conn;
097
098  private final Timer retryTimer;
099
100  private final TableName tableName;
101
102  private final int defaultScannerCaching;
103
104  private final long defaultScannerMaxResultSize;
105
106  private final long rpcTimeoutNs;
107
108  private final long readRpcTimeoutNs;
109
110  private final long writeRpcTimeoutNs;
111
112  private final long operationTimeoutNs;
113
114  private final long scanTimeoutNs;
115
116  private final long pauseNs;
117
118  private final long pauseNsForServerOverloaded;
119
120  private final int maxAttempts;
121
122  private final int startLogErrorsCnt;
123
124  private final Map<String, byte[]> requestAttributes;
125
126  RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
127    this.conn = conn;
128    this.retryTimer = retryTimer;
129    this.tableName = builder.tableName;
130    this.rpcTimeoutNs = builder.rpcTimeoutNs;
131    this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
132    this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs;
133    this.operationTimeoutNs = builder.operationTimeoutNs;
134    this.scanTimeoutNs = builder.scanTimeoutNs;
135    this.pauseNs = builder.pauseNs;
136    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
137      LOG.warn(
138        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
139          + " the normal pause value {} ms, use the greater one instead",
140        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
141        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
142      this.pauseNsForServerOverloaded = builder.pauseNs;
143    } else {
144      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
145    }
146    this.maxAttempts = builder.maxAttempts;
147    this.startLogErrorsCnt = builder.startLogErrorsCnt;
148    this.defaultScannerCaching = tableName.isSystemTable()
149      ? conn.connConf.getMetaScannerCaching()
150      : conn.connConf.getScannerCaching();
151    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
152    this.requestAttributes = builder.requestAttributes;
153  }
154
155  @Override
156  public TableName getName() {
157    return tableName;
158  }
159
160  @Override
161  public Configuration getConfiguration() {
162    return conn.getConfiguration();
163  }
164
165  @Override
166  public CompletableFuture<TableDescriptor> getDescriptor() {
167    return conn.getAdmin().getDescriptor(tableName);
168  }
169
170  @Override
171  public AsyncTableRegionLocator getRegionLocator() {
172    return conn.getRegionLocator(tableName);
173  }
174
175  private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
176    HRegionLocation loc, ClientService.Interface stub, REQ req,
177    Converter<MutateRequest, byte[], REQ> reqConvert,
178    Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
179    return ConnectionUtils.call(controller, loc, stub, req, reqConvert,
180      (s, c, r, done) -> s.mutate(c, r, done), respConverter);
181  }
182
183  private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
184    HRegionLocation loc, ClientService.Interface stub, REQ req,
185    Converter<MutateRequest, byte[], REQ> reqConvert) {
186    return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
187      return null;
188    });
189  }
190
191  private static Result toResult(HBaseRpcController controller, MutateResponse resp)
192    throws IOException {
193    if (!resp.hasResult()) {
194      return null;
195    }
196    return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
197  }
198
199  @FunctionalInterface
200  private interface NoncedConverter<D, I, S> {
201    D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
202  }
203
204  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
205    HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
206    NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
207    Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
208    return mutate(controller, loc, stub, req,
209      (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
210  }
211
212  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
213    return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
214      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
215      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
216      .pause(pauseNs, TimeUnit.NANOSECONDS)
217      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
218      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
219      .setRequestAttributes(requestAttributes);
220  }
221
222  private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T>
223    newCaller(R row, long rpcTimeoutNs) {
224    return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
225  }
226
227  private CompletableFuture<Result> get(Get get, int replicaId) {
228    return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
229      .action((controller, loc, stub) -> ConnectionUtils.<Get, GetRequest, GetResponse,
230        Result> call(controller, loc, stub, get, RequestConverter::buildGetRequest,
231          (s, c, req, done) -> s.get(c, req, done),
232          (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
233      .replicaId(replicaId).call();
234  }
235
236  private TableOperationSpanBuilder newTableOperationSpanBuilder() {
237    return new TableOperationSpanBuilder(conn).setTableName(tableName);
238  }
239
240  @Override
241  public CompletableFuture<Result> get(Get get) {
242    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(get);
243    return tracedFuture(
244      () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
245        RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
246        conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()),
247      supplier);
248  }
249
250  @Override
251  public CompletableFuture<Void> put(Put put) {
252    validatePut(put, conn.connConf.getMaxKeyValueSize());
253    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(put);
254    return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
255      .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
256        put, RequestConverter::buildMutateRequest))
257      .call(), supplier);
258  }
259
260  @Override
261  public CompletableFuture<Void> delete(Delete delete) {
262    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(delete);
263    return tracedFuture(() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
264      .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
265        stub, delete, RequestConverter::buildMutateRequest))
266      .call(), supplier);
267  }
268
269  @Override
270  public CompletableFuture<Result> append(Append append) {
271    checkHasFamilies(append);
272    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(append);
273    return tracedFuture(() -> {
274      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
275      long nonce = conn.getNonceGenerator().newNonce();
276      return this.<Result, Append> newCaller(append, rpcTimeoutNs)
277        .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce,
278          controller, loc, stub, append, RequestConverter::buildMutateRequest,
279          RawAsyncTableImpl::toResult))
280        .call();
281    }, supplier);
282  }
283
284  @Override
285  public CompletableFuture<Result> increment(Increment increment) {
286    checkHasFamilies(increment);
287    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(increment);
288    return tracedFuture(() -> {
289      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
290      long nonce = conn.getNonceGenerator().newNonce();
291      return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
292        .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
293          controller, loc, stub, increment, RequestConverter::buildMutateRequest,
294          RawAsyncTableImpl::toResult))
295        .call();
296    }, supplier);
297  }
298
299  private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
300
301    private final byte[] row;
302
303    private final byte[] family;
304
305    private byte[] qualifier;
306
307    private TimeRange timeRange;
308
309    private CompareOperator op;
310
311    private byte[] value;
312
313    public CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
314      this.row = Preconditions.checkNotNull(row, "row is null");
315      this.family = Preconditions.checkNotNull(family, "family is null");
316    }
317
318    @Override
319    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
320      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
321        + " an empty byte array, or just do not call this method if you want a null qualifier");
322      return this;
323    }
324
325    @Override
326    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
327      this.timeRange = timeRange;
328      return this;
329    }
330
331    @Override
332    public CheckAndMutateBuilder ifNotExists() {
333      this.op = CompareOperator.EQUAL;
334      this.value = null;
335      return this;
336    }
337
338    @Override
339    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
340      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
341      this.value = Preconditions.checkNotNull(value, "value is null");
342      return this;
343    }
344
345    private void preCheck() {
346      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by"
347        + " calling ifNotExists/ifEquals/ifMatches before executing the request");
348    }
349
350    @Override
351    public CompletableFuture<Boolean> thenPut(Put put) {
352      validatePut(put, conn.connConf.getMaxKeyValueSize());
353      preCheck();
354      final Supplier<Span> supplier = newTableOperationSpanBuilder()
355        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
356        .setContainerOperations(put);
357      return tracedFuture(
358        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
359          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
360            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
361              null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE, false),
362            (c, r) -> r.getProcessed()))
363          .call(),
364        supplier);
365    }
366
367    @Override
368    public CompletableFuture<Boolean> thenDelete(Delete delete) {
369      preCheck();
370      final Supplier<Span> supplier = newTableOperationSpanBuilder()
371        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
372        .setContainerOperations(delete);
373      return tracedFuture(
374        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
375          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
376            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
377              null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE, false),
378            (c, r) -> r.getProcessed()))
379          .call(),
380        supplier);
381    }
382
383    @Override
384    public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
385      preCheck();
386      validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
387      final Supplier<Span> supplier = newTableOperationSpanBuilder()
388        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
389        .setContainerOperations(mutations);
390      return tracedFuture(() -> RawAsyncTableImpl.this
391        .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
392        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
393          mutations,
394          (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
395            null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE, false),
396          CheckAndMutateResult::isSuccess))
397        .call(), supplier);
398    }
399  }
400
401  @Override
402  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
403    return new CheckAndMutateBuilderImpl(row, family);
404  }
405
406  private final class CheckAndMutateWithFilterBuilderImpl
407    implements CheckAndMutateWithFilterBuilder {
408
409    private final byte[] row;
410
411    private final Filter filter;
412
413    private TimeRange timeRange;
414
415    public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
416      this.row = Preconditions.checkNotNull(row, "row is null");
417      this.filter = Preconditions.checkNotNull(filter, "filter is null");
418    }
419
420    @Override
421    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
422      this.timeRange = timeRange;
423      return this;
424    }
425
426    @Override
427    public CompletableFuture<Boolean> thenPut(Put put) {
428      validatePut(put, conn.connConf.getMaxKeyValueSize());
429      final Supplier<Span> supplier = newTableOperationSpanBuilder()
430        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
431        .setContainerOperations(put);
432      return tracedFuture(
433        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
434          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
435            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
436              timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE, false),
437            (c, r) -> r.getProcessed()))
438          .call(),
439        supplier);
440    }
441
442    @Override
443    public CompletableFuture<Boolean> thenDelete(Delete delete) {
444      final Supplier<Span> supplier = newTableOperationSpanBuilder()
445        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
446        .setContainerOperations(delete);
447      return tracedFuture(
448        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
449          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
450            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
451              timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE, false),
452            (c, r) -> r.getProcessed()))
453          .call(),
454        supplier);
455    }
456
457    @Override
458    public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
459      validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
460      final Supplier<Span> supplier = newTableOperationSpanBuilder()
461        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
462        .setContainerOperations(mutations);
463      return tracedFuture(() -> RawAsyncTableImpl.this
464        .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
465        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
466          mutations,
467          (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter,
468            timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE, false),
469          CheckAndMutateResult::isSuccess))
470        .call(), supplier);
471    }
472  }
473
474  @Override
475  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
476    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
477  }
478
479  @Override
480  public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
481    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutate)
482      .setContainerOperations(checkAndMutate.getAction());
483    return tracedFuture(() -> {
484      if (
485        checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
486          || checkAndMutate.getAction() instanceof Increment
487          || checkAndMutate.getAction() instanceof Append
488      ) {
489        Mutation mutation = (Mutation) checkAndMutate.getAction();
490        if (mutation instanceof Put) {
491          validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
492        }
493        long nonceGroup = conn.getNonceGenerator().getNonceGroup();
494        long nonce = conn.getNonceGenerator().newNonce();
495        return RawAsyncTableImpl.this
496          .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(),
497            rpcTimeoutNs)
498          .action(
499            (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation,
500              (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
501                checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
502                checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
503                checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce,
504                checkAndMutate.isQueryMetricsEnabled()),
505              (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
506          .call();
507      } else if (checkAndMutate.getAction() instanceof RowMutations) {
508        RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
509        validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
510        long nonceGroup = conn.getNonceGenerator().getNonceGroup();
511        long nonce = conn.getNonceGenerator().newNonce();
512        return RawAsyncTableImpl.this
513          .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(),
514            rpcTimeoutNs)
515          .action((controller, loc, stub) -> RawAsyncTableImpl.this.<CheckAndMutateResult,
516            CheckAndMutateResult> mutateRow(controller, loc, stub, rowMutations,
517              (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
518                checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
519                checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
520                checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce,
521                checkAndMutate.isQueryMetricsEnabled()),
522              resp -> resp))
523          .call();
524      } else {
525        CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>();
526        future.completeExceptionally(new DoNotRetryIOException(
527          "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
528        return future;
529      }
530    }, supplier);
531  }
532
533  @Override
534  public List<CompletableFuture<CheckAndMutateResult>>
535    checkAndMutate(List<CheckAndMutate> checkAndMutates) {
536    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutates)
537      .setContainerOperations(checkAndMutates);
538    return tracedFutures(() -> batch(checkAndMutates, rpcTimeoutNs).stream()
539      .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), supplier);
540  }
541
542  // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
543  // so here I write a new method as I do not want to change the abstraction of call method.
544  @SuppressWarnings("unchecked")
545  private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
546    HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
547    Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) {
548    CompletableFuture<RESP> future = new CompletableFuture<>();
549    try {
550      byte[] regionName = loc.getRegion().getRegionName();
551      MultiRequest req = reqConvert.convert(regionName, mutation);
552      stub.multi(controller, req, new RpcCallback<MultiResponse>() {
553
554        @Override
555        public void run(MultiResponse resp) {
556          if (controller.failed()) {
557            future.completeExceptionally(controller.getFailed());
558          } else {
559            try {
560              org.apache.hadoop.hbase.client.MultiResponse multiResp =
561                ResponseConverter.getResults(req, resp, controller.cellScanner());
562              ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
563                loc.getServerName(), multiResp);
564              Throwable ex = multiResp.getException(regionName);
565              if (ex != null) {
566                future.completeExceptionally(ex instanceof IOException
567                  ? ex
568                  : new IOException(
569                    "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
570              } else {
571                future.complete(
572                  respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0)));
573              }
574            } catch (IOException e) {
575              future.completeExceptionally(e);
576            }
577          }
578        }
579      });
580    } catch (IOException e) {
581      future.completeExceptionally(e);
582    }
583    return future;
584  }
585
586  @Override
587  public CompletableFuture<Result> mutateRow(RowMutations mutations) {
588    validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
589    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
590    long nonce = conn.getNonceGenerator().newNonce();
591    final Supplier<Span> supplier =
592      newTableOperationSpanBuilder().setOperation(mutations).setContainerOperations(mutations);
593    return tracedFuture(
594      () -> this
595        .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
596        .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub,
597          mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
598          resp -> resp))
599        .call(),
600      supplier);
601  }
602
603  private Scan setDefaultScanConfig(Scan scan) {
604    // always create a new scan object as we may reset the start row later.
605    Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
606    if (newScan.getCaching() <= 0) {
607      newScan.setCaching(defaultScannerCaching);
608    }
609    if (newScan.getMaxResultSize() <= 0) {
610      newScan.setMaxResultSize(defaultScannerMaxResultSize);
611    }
612    return newScan;
613  }
614
615  @Override
616  public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
617    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
618      pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs,
619      startLogErrorsCnt, requestAttributes).start();
620  }
621
622  private long resultSize2CacheSize(long maxResultSize) {
623    // * 2 if possible
624    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
625  }
626
627  @Override
628  public AsyncTableResultScanner getScanner(Scan scan) {
629    final long maxCacheSize = resultSize2CacheSize(
630      scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize);
631    final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan);
632    final AsyncTableResultScanner scanner =
633      new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize);
634    scan(scan, scanner);
635    return scanner;
636  }
637
638  @Override
639  public CompletableFuture<List<Result>> scanAll(Scan scan) {
640    CompletableFuture<List<Result>> future = new CompletableFuture<>();
641    List<Result> scanResults = new ArrayList<>();
642    scan(scan, new AdvancedScanResultConsumer() {
643
644      @Override
645      public void onNext(Result[] results, ScanController controller) {
646        scanResults.addAll(Arrays.asList(results));
647      }
648
649      @Override
650      public void onError(Throwable error) {
651        future.completeExceptionally(error);
652      }
653
654      @Override
655      public void onComplete() {
656        future.complete(scanResults);
657      }
658    });
659    return future;
660  }
661
662  @Override
663  public List<CompletableFuture<Result>> get(List<Get> gets) {
664    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(gets)
665      .setContainerOperations(HBaseSemanticAttributes.Operation.GET);
666    return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
667  }
668
669  @Override
670  public List<CompletableFuture<Void>> put(List<Put> puts) {
671    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(puts)
672      .setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
673    return tracedFutures(() -> voidMutate(puts), supplier);
674  }
675
676  @Override
677  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
678    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(deletes)
679      .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
680    return tracedFutures(() -> voidMutate(deletes), supplier);
681  }
682
683  @Override
684  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
685    final Supplier<Span> supplier =
686      newTableOperationSpanBuilder().setOperation(actions).setContainerOperations(actions);
687    return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
688  }
689
690  private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
691    return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
692      .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
693  }
694
695  private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
696    for (Row action : actions) {
697      if (action instanceof Put) {
698        validatePut((Put) action, conn.connConf.getMaxKeyValueSize());
699      } else if (action instanceof CheckAndMutate) {
700        CheckAndMutate checkAndMutate = (CheckAndMutate) action;
701        if (checkAndMutate.getAction() instanceof Put) {
702          validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
703        } else if (checkAndMutate.getAction() instanceof RowMutations) {
704          validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(),
705            conn.connConf.getMaxKeyValueSize());
706        }
707      } else if (action instanceof RowMutations) {
708        validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize());
709      }
710    }
711    return conn.callerFactory.batch().table(tableName).actions(actions)
712      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
713      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
714      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
715      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
716      .setRequestAttributes(requestAttributes).call();
717  }
718
719  @Override
720  public long getRpcTimeout(TimeUnit unit) {
721    return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS);
722  }
723
724  @Override
725  public long getReadRpcTimeout(TimeUnit unit) {
726    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
727  }
728
729  @Override
730  public long getWriteRpcTimeout(TimeUnit unit) {
731    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
732  }
733
734  @Override
735  public long getOperationTimeout(TimeUnit unit) {
736    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
737  }
738
739  @Override
740  public long getScanTimeout(TimeUnit unit) {
741    return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
742  }
743
744  @Override
745  public Map<String, byte[]> getRequestAttributes() {
746    return requestAttributes;
747  }
748
749  private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
750    ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
751    RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
752      region, row, rpcTimeoutNs, operationTimeoutNs);
753    final Span span = Span.current();
754    S stub = stubMaker.apply(channel);
755    CompletableFuture<R> future = new CompletableFuture<>();
756    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
757    callable.call(stub, controller, resp -> {
758      try (Scope ignored = span.makeCurrent()) {
759        if (controller.failed()) {
760          final Throwable failure = controller.getFailed();
761          future.completeExceptionally(failure);
762          TraceUtil.setError(span, failure);
763        } else {
764          future.complete(resp);
765          span.setStatus(StatusCode.OK);
766        }
767      } finally {
768        span.end();
769      }
770    });
771    return future;
772  }
773
774  @Override
775  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
776    ServiceCaller<S, R> callable, byte[] row) {
777    return coprocessorService(stubMaker, callable, null, row);
778  }
779
780  private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) {
781    if (isEmptyStopRow(endKey)) {
782      if (isEmptyStopRow(region.getEndKey())) {
783        return true;
784      }
785      return false;
786    } else {
787      if (isEmptyStopRow(region.getEndKey())) {
788        return true;
789      }
790      int c = Bytes.compareTo(endKey, region.getEndKey());
791      // 1. if the region contains endKey
792      // 2. endKey is equal to the region's endKey and we do not want to include endKey.
793      return c < 0 || (c == 0 && !endKeyInclusive);
794    }
795  }
796
797  private <S, R> void coprocessorServiceUntilComplete(Function<RpcChannel, S> stubMaker,
798    ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> callback,
799    AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, RegionInfo region, Span span) {
800    addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
801      try (Scope ignored = span.makeCurrent()) {
802        if (e != null) {
803          callback.onRegionError(region, e);
804        } else {
805          callback.onRegionComplete(region, r);
806        }
807
808        ServiceCaller<S, R> updatedCallable;
809        if (e == null && r != null) {
810          updatedCallable = callback.getNextCallable(r, region);
811        } else {
812          updatedCallable = null;
813        }
814
815        // If updatedCallable is non-null, we will be sending another request, so no need to
816        // decrement unfinishedRequest (recall that && short-circuits).
817        // If updatedCallable is null, and unfinishedRequest decrements to 0, we're done with the
818        // requests for this coprocessor call.
819        if (
820          updatedCallable == null && unfinishedRequest.decrementAndGet() == 0
821            && locateFinished.get()
822        ) {
823          callback.onComplete();
824        } else if (updatedCallable != null) {
825          Duration waitInterval = callback.getWaitInterval(r, region);
826          LOG.trace("Coprocessor returned incomplete result. "
827            + "Sleeping for {} before making follow-up request.", waitInterval);
828          if (waitInterval.isZero()) {
829            AsyncConnectionImpl.RETRY_TIMER.newTimeout(
830              (timeout) -> coprocessorServiceUntilComplete(stubMaker, updatedCallable, callback,
831                locateFinished, unfinishedRequest, region, span),
832              waitInterval.toMillis(), TimeUnit.MILLISECONDS);
833          } else {
834            coprocessorServiceUntilComplete(stubMaker, updatedCallable, callback, locateFinished,
835              unfinishedRequest, region, span);
836          }
837        }
838      }
839    });
840  }
841
842  private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
843    ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> callback, byte[] endKey,
844    boolean endKeyInclusive, AtomicBoolean locateFinished, AtomicInteger unfinishedRequest,
845    HRegionLocation loc, Throwable error) {
846    final Span span = Span.current();
847    if (error != null) {
848      callback.onError(error);
849      TraceUtil.setError(span, error);
850      span.end();
851      return;
852    }
853    unfinishedRequest.incrementAndGet();
854    RegionInfo region = loc.getRegion();
855    if (locateFinished(region, endKey, endKeyInclusive)) {
856      locateFinished.set(true);
857    } else {
858      addListener(conn.getLocator().getRegionLocation(tableName, region.getEndKey(),
859        RegionLocateType.CURRENT, operationTimeoutNs), (l, e) -> {
860          try (Scope ignored = span.makeCurrent()) {
861            onLocateComplete(stubMaker, callable, callback, endKey, endKeyInclusive, locateFinished,
862              unfinishedRequest, l, e);
863          }
864        });
865    }
866    coprocessorServiceUntilComplete(stubMaker, callable, callback, locateFinished,
867      unfinishedRequest, region, span);
868  }
869
870  private final class CoprocessorServiceBuilderImpl<S, R>
871    implements CoprocessorServiceBuilder<S, R> {
872
873    private final Function<RpcChannel, S> stubMaker;
874
875    private final ServiceCaller<S, R> callable;
876
877    private final PartialResultCoprocessorCallback<S, R> callback;
878
879    private byte[] startKey = HConstants.EMPTY_START_ROW;
880
881    private boolean startKeyInclusive;
882
883    private byte[] endKey = HConstants.EMPTY_END_ROW;
884
885    private boolean endKeyInclusive;
886
887    public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
888      ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> callback) {
889      this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
890      this.callable = Preconditions.checkNotNull(callable, "callable is null");
891      this.callback = Preconditions.checkNotNull(callback, "callback is null");
892    }
893
894    @Override
895    public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
896      this.startKey = Preconditions.checkNotNull(startKey,
897        "startKey is null. Consider using"
898          + " an empty byte array, or just do not call this method if you want to start selection"
899          + " from the first region");
900      this.startKeyInclusive = inclusive;
901      return this;
902    }
903
904    @Override
905    public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
906      this.endKey = Preconditions.checkNotNull(endKey,
907        "endKey is null. Consider using"
908          + " an empty byte array, or just do not call this method if you want to continue"
909          + " selection to the last region");
910      this.endKeyInclusive = inclusive;
911      return this;
912    }
913
914    @Override
915    public void execute() {
916      final Span span = newTableOperationSpanBuilder()
917        .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC).build();
918      try (Scope ignored = span.makeCurrent()) {
919        final RegionLocateType regionLocateType =
920          startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER;
921        final CompletableFuture<HRegionLocation> future = conn.getLocator()
922          .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs);
923        addListener(future, (loc, error) -> {
924          try (Scope ignored1 = span.makeCurrent()) {
925            onLocateComplete(stubMaker, callable, callback, endKey, endKeyInclusive,
926              new AtomicBoolean(false), new AtomicInteger(0), loc, error);
927          }
928        });
929      }
930    }
931  }
932
933  @Override
934  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
935    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
936    CoprocessorCallback<R> callback) {
937    return new CoprocessorServiceBuilderImpl<>(stubMaker, callable,
938      new NoopPartialResultCoprocessorCallback<>(callback));
939  }
940
941  @Override
942  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
943    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
944    PartialResultCoprocessorCallback<S, R> callback) {
945    return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
946  }
947}