001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
025import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.List;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.function.Function;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.CompareOperator;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
042import org.apache.hadoop.hbase.client.ConnectionUtils.Converter;
043import org.apache.hadoop.hbase.filter.Filter;
044import org.apache.hadoop.hbase.io.TimeRange;
045import org.apache.hadoop.hbase.ipc.HBaseRpcController;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.ReflectionUtils;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
053import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
054import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
055import org.apache.hbase.thirdparty.io.netty.util.Timer;
056
057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
058import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
059import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
068
069/**
070 * The implementation of RawAsyncTable.
071 * <p/>
072 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
073 * be finished inside the rpc framework thread, which means that the callbacks registered to the
074 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
075 * this class should not try to do time consuming tasks in the callbacks.
076 * @since 2.0.0
077 * @see AsyncTableImpl
078 */
079@InterfaceAudience.Private
080class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
081
082  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
083
084  private final AsyncConnectionImpl conn;
085
086  private final Timer retryTimer;
087
088  private final TableName tableName;
089
090  private final int defaultScannerCaching;
091
092  private final long defaultScannerMaxResultSize;
093
094  private final long rpcTimeoutNs;
095
096  private final long readRpcTimeoutNs;
097
098  private final long writeRpcTimeoutNs;
099
100  private final long operationTimeoutNs;
101
102  private final long scanTimeoutNs;
103
104  private final long pauseNs;
105
106  private final long pauseForCQTBENs;
107
108  private final int maxAttempts;
109
110  private final int startLogErrorsCnt;
111
112  RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
113    this.conn = conn;
114    this.retryTimer = retryTimer;
115    this.tableName = builder.tableName;
116    this.rpcTimeoutNs = builder.rpcTimeoutNs;
117    this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
118    this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs;
119    this.operationTimeoutNs = builder.operationTimeoutNs;
120    this.scanTimeoutNs = builder.scanTimeoutNs;
121    this.pauseNs = builder.pauseNs;
122    if (builder.pauseForCQTBENs < builder.pauseNs) {
123      LOG.warn(
124        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
125          " the normal pause value {} ms, use the greater one instead",
126        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
127        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
128      this.pauseForCQTBENs = builder.pauseNs;
129    } else {
130      this.pauseForCQTBENs = builder.pauseForCQTBENs;
131    }
132    this.maxAttempts = builder.maxAttempts;
133    this.startLogErrorsCnt = builder.startLogErrorsCnt;
134    this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
135      : conn.connConf.getScannerCaching();
136    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
137  }
138
139  @Override
140  public TableName getName() {
141    return tableName;
142  }
143
144  @Override
145  public Configuration getConfiguration() {
146    return conn.getConfiguration();
147  }
148
149  @Override
150  public CompletableFuture<TableDescriptor> getDescriptor() {
151    return conn.getAdmin().getDescriptor(tableName);
152  }
153
154  @Override
155  public AsyncTableRegionLocator getRegionLocator() {
156    return conn.getRegionLocator(tableName);
157  }
158
159  private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
160      HRegionLocation loc, ClientService.Interface stub, REQ req,
161      Converter<MutateRequest, byte[], REQ> reqConvert,
162      Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
163    return ConnectionUtils.call(controller, loc, stub, req, reqConvert,
164      (s, c, r, done) -> s.mutate(c, r, done), respConverter);
165  }
166
167  private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
168      HRegionLocation loc, ClientService.Interface stub, REQ req,
169      Converter<MutateRequest, byte[], REQ> reqConvert) {
170    return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
171      return null;
172    });
173  }
174
175  private static Result toResult(HBaseRpcController controller, MutateResponse resp)
176      throws IOException {
177    if (!resp.hasResult()) {
178      return null;
179    }
180    return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
181  }
182
183  @FunctionalInterface
184  private interface NoncedConverter<D, I, S> {
185    D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
186  }
187
188  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
189      HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
190      NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
191      Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
192    return mutate(controller, loc, stub, req,
193      (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
194  }
195
196  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
197    return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
198      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
199      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
200      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
201      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
202  }
203
204  private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller(
205      R row, long rpcTimeoutNs) {
206    return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
207  }
208
209  private CompletableFuture<Result> get(Get get, int replicaId) {
210    return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
211      .action((controller, loc, stub) -> ConnectionUtils
212        .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
213          RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
214          (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
215      .replicaId(replicaId).call();
216  }
217
218  @Override
219  public CompletableFuture<Result> get(Get get) {
220    return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
221      RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
222      conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics());
223  }
224
225  @Override
226  public CompletableFuture<Void> put(Put put) {
227    validatePut(put, conn.connConf.getMaxKeyValueSize());
228    return this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
229      .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
230        put, RequestConverter::buildMutateRequest))
231      .call();
232  }
233
234  @Override
235  public CompletableFuture<Void> delete(Delete delete) {
236    return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
237      .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
238        stub, delete, RequestConverter::buildMutateRequest))
239      .call();
240  }
241
242  @Override
243  public CompletableFuture<Result> append(Append append) {
244    checkHasFamilies(append);
245    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
246    long nonce = conn.getNonceGenerator().newNonce();
247    return this.<Result, Append> newCaller(append, rpcTimeoutNs)
248      .action(
249        (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller,
250          loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
251      .call();
252  }
253
254  @Override
255  public CompletableFuture<Result> increment(Increment increment) {
256    checkHasFamilies(increment);
257    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
258    long nonce = conn.getNonceGenerator().newNonce();
259    return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
260      .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
261        controller, loc, stub, increment, RequestConverter::buildMutateRequest,
262        RawAsyncTableImpl::toResult))
263      .call();
264  }
265
266  private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
267
268    private final byte[] row;
269
270    private final byte[] family;
271
272    private byte[] qualifier;
273
274    private TimeRange timeRange;
275
276    private CompareOperator op;
277
278    private byte[] value;
279
280    public CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
281      this.row = Preconditions.checkNotNull(row, "row is null");
282      this.family = Preconditions.checkNotNull(family, "family is null");
283    }
284
285    @Override
286    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
287      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
288        " an empty byte array, or just do not call this method if you want a null qualifier");
289      return this;
290    }
291
292    @Override
293    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
294      this.timeRange = timeRange;
295      return this;
296    }
297
298    @Override
299    public CheckAndMutateBuilder ifNotExists() {
300      this.op = CompareOperator.EQUAL;
301      this.value = null;
302      return this;
303    }
304
305    @Override
306    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
307      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
308      this.value = Preconditions.checkNotNull(value, "value is null");
309      return this;
310    }
311
312    private void preCheck() {
313      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
314        " calling ifNotExists/ifEquals/ifMatches before executing the request");
315    }
316
317    @Override
318    public CompletableFuture<Boolean> thenPut(Put put) {
319      validatePut(put, conn.connConf.getMaxKeyValueSize());
320      preCheck();
321      return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
322        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
323          stub, put,
324          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
325            null, timeRange, p),
326          (c, r) -> r.getProcessed()))
327        .call();
328    }
329
330    @Override
331    public CompletableFuture<Boolean> thenDelete(Delete delete) {
332      preCheck();
333      return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
334        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
335          loc, stub, delete,
336          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
337            null, timeRange, d),
338          (c, r) -> r.getProcessed()))
339        .call();
340    }
341
342    @Override
343    public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
344      preCheck();
345      return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
346        rpcTimeoutNs)
347        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
348          loc, stub, mutation,
349          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
350            null, timeRange, rm),
351          resp -> resp.getExists()))
352        .call();
353    }
354  }
355
356  @Override
357  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
358    return new CheckAndMutateBuilderImpl(row, family);
359  }
360
361
362  private final class CheckAndMutateWithFilterBuilderImpl
363    implements CheckAndMutateWithFilterBuilder {
364
365    private final byte[] row;
366
367    private final Filter filter;
368
369    private TimeRange timeRange;
370
371    public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
372      this.row = Preconditions.checkNotNull(row, "row is null");
373      this.filter = Preconditions.checkNotNull(filter, "filter is null");
374    }
375
376    @Override
377    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
378      this.timeRange = timeRange;
379      return this;
380    }
381
382    @Override
383    public CompletableFuture<Boolean> thenPut(Put put) {
384      validatePut(put, conn.connConf.getMaxKeyValueSize());
385      return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
386        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
387          stub, put,
388          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
389            filter, timeRange, p),
390          (c, r) -> r.getProcessed()))
391        .call();
392    }
393
394    @Override
395    public CompletableFuture<Boolean> thenDelete(Delete delete) {
396      return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
397        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
398          loc, stub, delete,
399          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
400            filter, timeRange, d),
401          (c, r) -> r.getProcessed()))
402        .call();
403    }
404
405    @Override
406    public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
407      return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
408        rpcTimeoutNs)
409        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
410          loc, stub, mutation,
411          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
412            filter, timeRange, rm),
413          resp -> resp.getExists()))
414        .call();
415    }
416  }
417
418  @Override
419  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
420    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
421  }
422
423  // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
424  // so here I write a new method as I do not want to change the abstraction of call method.
425  private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
426      HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
427      Converter<MultiRequest, byte[], RowMutations> reqConvert,
428      Function<Result, RESP> respConverter) {
429    CompletableFuture<RESP> future = new CompletableFuture<>();
430    try {
431      byte[] regionName = loc.getRegion().getRegionName();
432      MultiRequest req = reqConvert.convert(regionName, mutation);
433      stub.multi(controller, req, new RpcCallback<MultiResponse>() {
434
435        @Override
436        public void run(MultiResponse resp) {
437          if (controller.failed()) {
438            future.completeExceptionally(controller.getFailed());
439          } else {
440            try {
441              org.apache.hadoop.hbase.client.MultiResponse multiResp =
442                ResponseConverter.getResults(req, resp, controller.cellScanner());
443              ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
444                loc.getServerName(), multiResp);
445              Throwable ex = multiResp.getException(regionName);
446              if (ex != null) {
447                future.completeExceptionally(ex instanceof IOException ? ex
448                  : new IOException(
449                    "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
450              } else {
451                future.complete(respConverter
452                  .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
453              }
454            } catch (IOException e) {
455              future.completeExceptionally(e);
456            }
457          }
458        }
459      });
460    } catch (IOException e) {
461      future.completeExceptionally(e);
462    }
463    return future;
464  }
465
466  @Override
467  public CompletableFuture<Void> mutateRow(RowMutations mutation) {
468    return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
469      .action((controller, loc, stub) -> this.<Void> mutateRow(controller, loc, stub, mutation,
470        (rn, rm) -> {
471          RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
472          regionMutationBuilder.setAtomic(true);
473          return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
474        }, resp -> null))
475      .call();
476  }
477
478  private Scan setDefaultScanConfig(Scan scan) {
479    // always create a new scan object as we may reset the start row later.
480    Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
481    if (newScan.getCaching() <= 0) {
482      newScan.setCaching(defaultScannerCaching);
483    }
484    if (newScan.getMaxResultSize() <= 0) {
485      newScan.setMaxResultSize(defaultScannerMaxResultSize);
486    }
487    return newScan;
488  }
489
490  @Override
491  public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
492    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
493      pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt)
494        .start();
495  }
496
497  private long resultSize2CacheSize(long maxResultSize) {
498    // * 2 if possible
499    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
500  }
501
502  @Override
503  public ResultScanner getScanner(Scan scan) {
504    return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
505      resultSize2CacheSize(
506        scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
507  }
508
509  @Override
510  public CompletableFuture<List<Result>> scanAll(Scan scan) {
511    CompletableFuture<List<Result>> future = new CompletableFuture<>();
512    List<Result> scanResults = new ArrayList<>();
513    scan(scan, new AdvancedScanResultConsumer() {
514
515      @Override
516      public void onNext(Result[] results, ScanController controller) {
517        scanResults.addAll(Arrays.asList(results));
518      }
519
520      @Override
521      public void onError(Throwable error) {
522        future.completeExceptionally(error);
523      }
524
525      @Override
526      public void onComplete() {
527        future.complete(scanResults);
528      }
529    });
530    return future;
531  }
532
533  @Override
534  public List<CompletableFuture<Result>> get(List<Get> gets) {
535    return batch(gets, readRpcTimeoutNs);
536  }
537
538  @Override
539  public List<CompletableFuture<Void>> put(List<Put> puts) {
540    return voidMutate(puts);
541  }
542
543  @Override
544  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
545    return voidMutate(deletes);
546  }
547
548  @Override
549  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
550    return batch(actions, rpcTimeoutNs);
551  }
552
553  private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
554    return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
555      .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
556  }
557
558  private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
559    actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action)
560      .forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize()));
561    return conn.callerFactory.batch().table(tableName).actions(actions)
562      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
563      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
564      .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
565      .startLogErrorsCnt(startLogErrorsCnt).call();
566  }
567
568  @Override
569  public long getRpcTimeout(TimeUnit unit) {
570    return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS);
571  }
572
573  @Override
574  public long getReadRpcTimeout(TimeUnit unit) {
575    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
576  }
577
578  @Override
579  public long getWriteRpcTimeout(TimeUnit unit) {
580    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
581  }
582
583  @Override
584  public long getOperationTimeout(TimeUnit unit) {
585    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
586  }
587
588  @Override
589  public long getScanTimeout(TimeUnit unit) {
590    return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
591  }
592
593  private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
594      ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
595    RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
596      region, row, rpcTimeoutNs, operationTimeoutNs);
597    S stub = stubMaker.apply(channel);
598    CompletableFuture<R> future = new CompletableFuture<>();
599    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
600    callable.call(stub, controller, resp -> {
601      if (controller.failed()) {
602        future.completeExceptionally(controller.getFailed());
603      } else {
604        future.complete(resp);
605      }
606    });
607    return future;
608  }
609
610  @Override
611  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
612      ServiceCaller<S, R> callable, byte[] row) {
613    return coprocessorService(stubMaker, callable, null, row);
614  }
615
616  private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) {
617    if (isEmptyStopRow(endKey)) {
618      if (isEmptyStopRow(region.getEndKey())) {
619        return true;
620      }
621      return false;
622    } else {
623      if (isEmptyStopRow(region.getEndKey())) {
624        return true;
625      }
626      int c = Bytes.compareTo(endKey, region.getEndKey());
627      // 1. if the region contains endKey
628      // 2. endKey is equal to the region's endKey and we do not want to include endKey.
629      return c < 0 || c == 0 && !endKeyInclusive;
630    }
631  }
632
633  private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
634      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
635      byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
636      AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
637    if (error != null) {
638      callback.onError(error);
639      return;
640    }
641    unfinishedRequest.incrementAndGet();
642    RegionInfo region = loc.getRegion();
643    if (locateFinished(region, endKey, endKeyInclusive)) {
644      locateFinished.set(true);
645    } else {
646      addListener(
647        conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
648          operationTimeoutNs),
649        (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
650          locateFinished, unfinishedRequest, l, e));
651    }
652    addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
653      if (e != null) {
654        callback.onRegionError(region, e);
655      } else {
656        callback.onRegionComplete(region, r);
657      }
658      if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
659        callback.onComplete();
660      }
661    });
662  }
663
664  private final class CoprocessorServiceBuilderImpl<S, R>
665      implements CoprocessorServiceBuilder<S, R> {
666
667    private final Function<RpcChannel, S> stubMaker;
668
669    private final ServiceCaller<S, R> callable;
670
671    private final CoprocessorCallback<R> callback;
672
673    private byte[] startKey = HConstants.EMPTY_START_ROW;
674
675    private boolean startKeyInclusive;
676
677    private byte[] endKey = HConstants.EMPTY_END_ROW;
678
679    private boolean endKeyInclusive;
680
681    public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
682        ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
683      this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
684      this.callable = Preconditions.checkNotNull(callable, "callable is null");
685      this.callback = Preconditions.checkNotNull(callback, "callback is null");
686    }
687
688    @Override
689    public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
690      this.startKey = Preconditions.checkNotNull(startKey,
691        "startKey is null. Consider using" +
692          " an empty byte array, or just do not call this method if you want to start selection" +
693          " from the first region");
694      this.startKeyInclusive = inclusive;
695      return this;
696    }
697
698    @Override
699    public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
700      this.endKey = Preconditions.checkNotNull(endKey,
701        "endKey is null. Consider using" +
702          " an empty byte array, or just do not call this method if you want to continue" +
703          " selection to the last region");
704      this.endKeyInclusive = inclusive;
705      return this;
706    }
707
708    @Override
709    public void execute() {
710      addListener(conn.getLocator().getRegionLocation(tableName, startKey,
711        startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs),
712        (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
713          endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
714    }
715  }
716
717  @Override
718  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
719      Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
720      CoprocessorCallback<R> callback) {
721    return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
722  }
723}