001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
025import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
026
027import com.google.protobuf.RpcChannel;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.List;
032import java.util.concurrent.CompletableFuture;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.function.Function;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.CompareOperator;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
043import org.apache.hadoop.hbase.filter.Filter;
044import org.apache.hadoop.hbase.io.TimeRange;
045import org.apache.hadoop.hbase.ipc.HBaseRpcController;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.ReflectionUtils;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
053import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
054import org.apache.hbase.thirdparty.io.netty.util.Timer;
055
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
058import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
067
068/**
069 * The implementation of RawAsyncTable.
070 * <p/>
071 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
072 * be finished inside the rpc framework thread, which means that the callbacks registered to the
073 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
074 * this class should not try to do time consuming tasks in the callbacks.
075 * @since 2.0.0
076 * @see AsyncTableImpl
077 */
078@InterfaceAudience.Private
079class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
080
081  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
082
083  private final AsyncConnectionImpl conn;
084
085  private final Timer retryTimer;
086
087  private final TableName tableName;
088
089  private final int defaultScannerCaching;
090
091  private final long defaultScannerMaxResultSize;
092
093  private final long rpcTimeoutNs;
094
095  private final long readRpcTimeoutNs;
096
097  private final long writeRpcTimeoutNs;
098
099  private final long operationTimeoutNs;
100
101  private final long scanTimeoutNs;
102
103  private final long pauseNs;
104
105  private final long pauseForCQTBENs;
106
107  private final int maxAttempts;
108
109  private final int startLogErrorsCnt;
110
111  RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
112    this.conn = conn;
113    this.retryTimer = retryTimer;
114    this.tableName = builder.tableName;
115    this.rpcTimeoutNs = builder.rpcTimeoutNs;
116    this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
117    this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs;
118    this.operationTimeoutNs = builder.operationTimeoutNs;
119    this.scanTimeoutNs = builder.scanTimeoutNs;
120    this.pauseNs = builder.pauseNs;
121    if (builder.pauseForCQTBENs < builder.pauseNs) {
122      LOG.warn(
123        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
124          " the normal pause value {} ms, use the greater one instead",
125        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
126        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
127      this.pauseForCQTBENs = builder.pauseNs;
128    } else {
129      this.pauseForCQTBENs = builder.pauseForCQTBENs;
130    }
131    this.maxAttempts = builder.maxAttempts;
132    this.startLogErrorsCnt = builder.startLogErrorsCnt;
133    this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
134      : conn.connConf.getScannerCaching();
135    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
136  }
137
138  @Override
139  public TableName getName() {
140    return tableName;
141  }
142
143  @Override
144  public Configuration getConfiguration() {
145    return conn.getConfiguration();
146  }
147
148  @Override
149  public CompletableFuture<TableDescriptor> getDescriptor() {
150    return conn.getAdmin().getDescriptor(tableName);
151  }
152
153  @Override
154  public AsyncTableRegionLocator getRegionLocator() {
155    return conn.getRegionLocator(tableName);
156  }
157
158  @FunctionalInterface
159  private interface Converter<D, I, S> {
160    D convert(I info, S src) throws IOException;
161  }
162
163  @FunctionalInterface
164  private interface RpcCall<RESP, REQ> {
165    void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
166        RpcCallback<RESP> done);
167  }
168
169  private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(
170      HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
171      Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
172      Converter<RESP, HBaseRpcController, PRESP> respConverter) {
173    CompletableFuture<RESP> future = new CompletableFuture<>();
174    try {
175      rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req),
176        new RpcCallback<PRESP>() {
177
178          @Override
179          public void run(PRESP resp) {
180            if (controller.failed()) {
181              future.completeExceptionally(controller.getFailed());
182            } else {
183              try {
184                future.complete(respConverter.convert(controller, resp));
185              } catch (IOException e) {
186                future.completeExceptionally(e);
187              }
188            }
189          }
190        });
191    } catch (IOException e) {
192      future.completeExceptionally(e);
193    }
194    return future;
195  }
196
197  private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
198      HRegionLocation loc, ClientService.Interface stub, REQ req,
199      Converter<MutateRequest, byte[], REQ> reqConvert,
200      Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
201    return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done),
202      respConverter);
203  }
204
205  private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
206      HRegionLocation loc, ClientService.Interface stub, REQ req,
207      Converter<MutateRequest, byte[], REQ> reqConvert) {
208    return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
209      return null;
210    });
211  }
212
213  private static Result toResult(HBaseRpcController controller, MutateResponse resp)
214      throws IOException {
215    if (!resp.hasResult()) {
216      return null;
217    }
218    return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
219  }
220
221  @FunctionalInterface
222  private interface NoncedConverter<D, I, S> {
223    D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
224  }
225
226  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
227      HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
228      NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
229      Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
230    return mutate(controller, loc, stub, req,
231      (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
232  }
233
234  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
235    return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
236      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
237      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
238      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
239      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
240  }
241
242  private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller(
243      R row, long rpcTimeoutNs) {
244    return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
245  }
246
247  private CompletableFuture<Result> get(Get get, int replicaId) {
248    return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
249      .action((controller, loc, stub) -> RawAsyncTableImpl
250        .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
251          RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
252          (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
253      .replicaId(replicaId).call();
254  }
255
256  @Override
257  public CompletableFuture<Result> get(Get get) {
258    return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
259      RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
260      conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics());
261  }
262
263  @Override
264  public CompletableFuture<Void> put(Put put) {
265    validatePut(put, conn.connConf.getMaxKeyValueSize());
266    return this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
267      .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
268        put, RequestConverter::buildMutateRequest))
269      .call();
270  }
271
272  @Override
273  public CompletableFuture<Void> delete(Delete delete) {
274    return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
275      .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
276        stub, delete, RequestConverter::buildMutateRequest))
277      .call();
278  }
279
280  @Override
281  public CompletableFuture<Result> append(Append append) {
282    checkHasFamilies(append);
283    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
284    long nonce = conn.getNonceGenerator().newNonce();
285    return this.<Result, Append> newCaller(append, rpcTimeoutNs)
286      .action(
287        (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller,
288          loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
289      .call();
290  }
291
292  @Override
293  public CompletableFuture<Result> increment(Increment increment) {
294    checkHasFamilies(increment);
295    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
296    long nonce = conn.getNonceGenerator().newNonce();
297    return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
298      .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
299        controller, loc, stub, increment, RequestConverter::buildMutateRequest,
300        RawAsyncTableImpl::toResult))
301      .call();
302  }
303
304  private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
305
306    private final byte[] row;
307
308    private final byte[] family;
309
310    private byte[] qualifier;
311
312    private TimeRange timeRange;
313
314    private CompareOperator op;
315
316    private byte[] value;
317
318    public CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
319      this.row = Preconditions.checkNotNull(row, "row is null");
320      this.family = Preconditions.checkNotNull(family, "family is null");
321    }
322
323    @Override
324    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
325      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
326        " an empty byte array, or just do not call this method if you want a null qualifier");
327      return this;
328    }
329
330    @Override
331    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
332      this.timeRange = timeRange;
333      return this;
334    }
335
336    @Override
337    public CheckAndMutateBuilder ifNotExists() {
338      this.op = CompareOperator.EQUAL;
339      this.value = null;
340      return this;
341    }
342
343    @Override
344    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
345      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
346      this.value = Preconditions.checkNotNull(value, "value is null");
347      return this;
348    }
349
350    private void preCheck() {
351      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
352        " calling ifNotExists/ifEquals/ifMatches before executing the request");
353    }
354
355    @Override
356    public CompletableFuture<Boolean> thenPut(Put put) {
357      validatePut(put, conn.connConf.getMaxKeyValueSize());
358      preCheck();
359      return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
360        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
361          stub, put,
362          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
363            null, timeRange, p),
364          (c, r) -> r.getProcessed()))
365        .call();
366    }
367
368    @Override
369    public CompletableFuture<Boolean> thenDelete(Delete delete) {
370      preCheck();
371      return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
372        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
373          loc, stub, delete,
374          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
375            null, timeRange, d),
376          (c, r) -> r.getProcessed()))
377        .call();
378    }
379
380    @Override
381    public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
382      preCheck();
383      return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
384        rpcTimeoutNs)
385        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
386          loc, stub, mutation,
387          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
388            null, timeRange, rm),
389          resp -> resp.getExists()))
390        .call();
391    }
392  }
393
394  @Override
395  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
396    return new CheckAndMutateBuilderImpl(row, family);
397  }
398
399
400  private final class CheckAndMutateWithFilterBuilderImpl
401    implements CheckAndMutateWithFilterBuilder {
402
403    private final byte[] row;
404
405    private final Filter filter;
406
407    private TimeRange timeRange;
408
409    public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
410      this.row = Preconditions.checkNotNull(row, "row is null");
411      this.filter = Preconditions.checkNotNull(filter, "filter is null");
412    }
413
414    @Override
415    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
416      this.timeRange = timeRange;
417      return this;
418    }
419
420    @Override
421    public CompletableFuture<Boolean> thenPut(Put put) {
422      validatePut(put, conn.connConf.getMaxKeyValueSize());
423      return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
424        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
425          stub, put,
426          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
427            filter, timeRange, p),
428          (c, r) -> r.getProcessed()))
429        .call();
430    }
431
432    @Override
433    public CompletableFuture<Boolean> thenDelete(Delete delete) {
434      return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
435        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
436          loc, stub, delete,
437          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
438            filter, timeRange, d),
439          (c, r) -> r.getProcessed()))
440        .call();
441    }
442
443    @Override
444    public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
445      return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
446        rpcTimeoutNs)
447        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
448          loc, stub, mutation,
449          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
450            filter, timeRange, rm),
451          resp -> resp.getExists()))
452        .call();
453    }
454  }
455
456  @Override
457  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
458    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
459  }
460
461  // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
462  // so here I write a new method as I do not want to change the abstraction of call method.
463  private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
464      HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
465      Converter<MultiRequest, byte[], RowMutations> reqConvert,
466      Function<Result, RESP> respConverter) {
467    CompletableFuture<RESP> future = new CompletableFuture<>();
468    try {
469      byte[] regionName = loc.getRegion().getRegionName();
470      MultiRequest req = reqConvert.convert(regionName, mutation);
471      stub.multi(controller, req, new RpcCallback<MultiResponse>() {
472
473        @Override
474        public void run(MultiResponse resp) {
475          if (controller.failed()) {
476            future.completeExceptionally(controller.getFailed());
477          } else {
478            try {
479              org.apache.hadoop.hbase.client.MultiResponse multiResp =
480                ResponseConverter.getResults(req, resp, controller.cellScanner());
481              ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
482                loc.getServerName(), multiResp);
483              Throwable ex = multiResp.getException(regionName);
484              if (ex != null) {
485                future.completeExceptionally(ex instanceof IOException ? ex
486                  : new IOException(
487                    "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
488              } else {
489                future.complete(respConverter
490                  .apply((Result) multiResp.getResults().get(regionName).result.get(0)));
491              }
492            } catch (IOException e) {
493              future.completeExceptionally(e);
494            }
495          }
496        }
497      });
498    } catch (IOException e) {
499      future.completeExceptionally(e);
500    }
501    return future;
502  }
503
504  @Override
505  public CompletableFuture<Void> mutateRow(RowMutations mutation) {
506    return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
507      .action((controller, loc, stub) -> this.<Void> mutateRow(controller, loc, stub, mutation,
508        (rn, rm) -> {
509          RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
510          regionMutationBuilder.setAtomic(true);
511          return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
512        }, resp -> null))
513      .call();
514  }
515
516  private Scan setDefaultScanConfig(Scan scan) {
517    // always create a new scan object as we may reset the start row later.
518    Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
519    if (newScan.getCaching() <= 0) {
520      newScan.setCaching(defaultScannerCaching);
521    }
522    if (newScan.getMaxResultSize() <= 0) {
523      newScan.setMaxResultSize(defaultScannerMaxResultSize);
524    }
525    return newScan;
526  }
527
528  @Override
529  public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
530    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
531      pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt)
532        .start();
533  }
534
535  private long resultSize2CacheSize(long maxResultSize) {
536    // * 2 if possible
537    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
538  }
539
540  @Override
541  public ResultScanner getScanner(Scan scan) {
542    return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
543      resultSize2CacheSize(
544        scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
545  }
546
547  @Override
548  public CompletableFuture<List<Result>> scanAll(Scan scan) {
549    CompletableFuture<List<Result>> future = new CompletableFuture<>();
550    List<Result> scanResults = new ArrayList<>();
551    scan(scan, new AdvancedScanResultConsumer() {
552
553      @Override
554      public void onNext(Result[] results, ScanController controller) {
555        scanResults.addAll(Arrays.asList(results));
556      }
557
558      @Override
559      public void onError(Throwable error) {
560        future.completeExceptionally(error);
561      }
562
563      @Override
564      public void onComplete() {
565        future.complete(scanResults);
566      }
567    });
568    return future;
569  }
570
571  @Override
572  public List<CompletableFuture<Result>> get(List<Get> gets) {
573    return batch(gets, readRpcTimeoutNs);
574  }
575
576  @Override
577  public List<CompletableFuture<Void>> put(List<Put> puts) {
578    return voidMutate(puts);
579  }
580
581  @Override
582  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
583    return voidMutate(deletes);
584  }
585
586  @Override
587  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
588    return batch(actions, rpcTimeoutNs);
589  }
590
591  private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
592    return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
593      .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
594  }
595
596  private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
597    actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action)
598      .forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize()));
599    return conn.callerFactory.batch().table(tableName).actions(actions)
600      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
601      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
602      .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
603      .startLogErrorsCnt(startLogErrorsCnt).call();
604  }
605
606  @Override
607  public long getRpcTimeout(TimeUnit unit) {
608    return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS);
609  }
610
611  @Override
612  public long getReadRpcTimeout(TimeUnit unit) {
613    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
614  }
615
616  @Override
617  public long getWriteRpcTimeout(TimeUnit unit) {
618    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
619  }
620
621  @Override
622  public long getOperationTimeout(TimeUnit unit) {
623    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
624  }
625
626  @Override
627  public long getScanTimeout(TimeUnit unit) {
628    return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
629  }
630
631  private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
632      ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
633    RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
634      region, row, rpcTimeoutNs, operationTimeoutNs);
635    S stub = stubMaker.apply(channel);
636    CompletableFuture<R> future = new CompletableFuture<>();
637    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
638    callable.call(stub, controller, resp -> {
639      if (controller.failed()) {
640        future.completeExceptionally(controller.getFailed());
641      } else {
642        future.complete(resp);
643      }
644    });
645    return future;
646  }
647
648  @Override
649  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
650      ServiceCaller<S, R> callable, byte[] row) {
651    return coprocessorService(stubMaker, callable, null, row);
652  }
653
654  private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) {
655    if (isEmptyStopRow(endKey)) {
656      if (isEmptyStopRow(region.getEndKey())) {
657        return true;
658      }
659      return false;
660    } else {
661      if (isEmptyStopRow(region.getEndKey())) {
662        return true;
663      }
664      int c = Bytes.compareTo(endKey, region.getEndKey());
665      // 1. if the region contains endKey
666      // 2. endKey is equal to the region's endKey and we do not want to include endKey.
667      return c < 0 || c == 0 && !endKeyInclusive;
668    }
669  }
670
671  private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
672      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
673      byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
674      AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
675    if (error != null) {
676      callback.onError(error);
677      return;
678    }
679    unfinishedRequest.incrementAndGet();
680    RegionInfo region = loc.getRegion();
681    if (locateFinished(region, endKey, endKeyInclusive)) {
682      locateFinished.set(true);
683    } else {
684      addListener(
685        conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
686          operationTimeoutNs),
687        (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
688          locateFinished, unfinishedRequest, l, e));
689    }
690    addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
691      if (e != null) {
692        callback.onRegionError(region, e);
693      } else {
694        callback.onRegionComplete(region, r);
695      }
696      if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
697        callback.onComplete();
698      }
699    });
700  }
701
702  private final class CoprocessorServiceBuilderImpl<S, R>
703      implements CoprocessorServiceBuilder<S, R> {
704
705    private final Function<RpcChannel, S> stubMaker;
706
707    private final ServiceCaller<S, R> callable;
708
709    private final CoprocessorCallback<R> callback;
710
711    private byte[] startKey = HConstants.EMPTY_START_ROW;
712
713    private boolean startKeyInclusive;
714
715    private byte[] endKey = HConstants.EMPTY_END_ROW;
716
717    private boolean endKeyInclusive;
718
719    public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
720        ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
721      this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
722      this.callable = Preconditions.checkNotNull(callable, "callable is null");
723      this.callback = Preconditions.checkNotNull(callback, "callback is null");
724    }
725
726    @Override
727    public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
728      this.startKey = Preconditions.checkNotNull(startKey,
729        "startKey is null. Consider using" +
730          " an empty byte array, or just do not call this method if you want to start selection" +
731          " from the first region");
732      this.startKeyInclusive = inclusive;
733      return this;
734    }
735
736    @Override
737    public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
738      this.endKey = Preconditions.checkNotNull(endKey,
739        "endKey is null. Consider using" +
740          " an empty byte array, or just do not call this method if you want to continue" +
741          " selection to the last region");
742      this.endKeyInclusive = inclusive;
743      return this;
744    }
745
746    @Override
747    public void execute() {
748      addListener(conn.getLocator().getRegionLocation(tableName, startKey,
749        startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs),
750        (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
751          endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
752    }
753  }
754
755  @Override
756  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
757      Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
758      CoprocessorCallback<R> callback) {
759    return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
760  }
761}