001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
025import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
026
027import com.google.protobuf.RpcChannel;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.List;
032import java.util.concurrent.CompletableFuture;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.function.Function;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.CompareOperator;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HRegionLocation;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
044import org.apache.hadoop.hbase.filter.Filter;
045import org.apache.hadoop.hbase.io.TimeRange;
046import org.apache.hadoop.hbase.ipc.HBaseRpcController;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.ReflectionUtils;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
054import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
055import org.apache.hbase.thirdparty.io.netty.util.Timer;
056
057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
058import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
059import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
068
069/**
070 * The implementation of RawAsyncTable.
071 * <p/>
072 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
073 * be finished inside the rpc framework thread, which means that the callbacks registered to the
074 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
075 * this class should not try to do time consuming tasks in the callbacks.
076 * @since 2.0.0
077 * @see AsyncTableImpl
078 */
079@InterfaceAudience.Private
080class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
081
082  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
083
084  private final AsyncConnectionImpl conn;
085
086  private final Timer retryTimer;
087
088  private final TableName tableName;
089
090  private final int defaultScannerCaching;
091
092  private final long defaultScannerMaxResultSize;
093
094  private final long rpcTimeoutNs;
095
096  private final long readRpcTimeoutNs;
097
098  private final long writeRpcTimeoutNs;
099
100  private final long operationTimeoutNs;
101
102  private final long scanTimeoutNs;
103
104  private final long pauseNs;
105
106  private final long pauseForCQTBENs;
107
108  private final int maxAttempts;
109
110  private final int startLogErrorsCnt;
111
112  RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
113    this.conn = conn;
114    this.retryTimer = retryTimer;
115    this.tableName = builder.tableName;
116    this.rpcTimeoutNs = builder.rpcTimeoutNs;
117    this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
118    this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs;
119    this.operationTimeoutNs = builder.operationTimeoutNs;
120    this.scanTimeoutNs = builder.scanTimeoutNs;
121    this.pauseNs = builder.pauseNs;
122    if (builder.pauseForCQTBENs < builder.pauseNs) {
123      LOG.warn(
124        "Configured value of pauseForCQTBENs is {} ms, which is less than" +
125          " the normal pause value {} ms, use the greater one instead",
126        TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
127        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
128      this.pauseForCQTBENs = builder.pauseNs;
129    } else {
130      this.pauseForCQTBENs = builder.pauseForCQTBENs;
131    }
132    this.maxAttempts = builder.maxAttempts;
133    this.startLogErrorsCnt = builder.startLogErrorsCnt;
134    this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
135      : conn.connConf.getScannerCaching();
136    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
137  }
138
139  @Override
140  public TableName getName() {
141    return tableName;
142  }
143
144  @Override
145  public Configuration getConfiguration() {
146    return conn.getConfiguration();
147  }
148
149  @Override
150  public CompletableFuture<TableDescriptor> getDescriptor() {
151    return conn.getAdmin().getDescriptor(tableName);
152  }
153
154  @Override
155  public AsyncTableRegionLocator getRegionLocator() {
156    return conn.getRegionLocator(tableName);
157  }
158
159  @FunctionalInterface
160  private interface Converter<D, I, S> {
161    D convert(I info, S src) throws IOException;
162  }
163
164  @FunctionalInterface
165  private interface RpcCall<RESP, REQ> {
166    void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
167        RpcCallback<RESP> done);
168  }
169
170  private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(
171      HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
172      Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
173      Converter<RESP, HBaseRpcController, PRESP> respConverter) {
174    CompletableFuture<RESP> future = new CompletableFuture<>();
175    try {
176      rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req),
177        new RpcCallback<PRESP>() {
178
179          @Override
180          public void run(PRESP resp) {
181            if (controller.failed()) {
182              future.completeExceptionally(controller.getFailed());
183            } else {
184              try {
185                future.complete(respConverter.convert(controller, resp));
186              } catch (IOException e) {
187                future.completeExceptionally(e);
188              }
189            }
190          }
191        });
192    } catch (IOException e) {
193      future.completeExceptionally(e);
194    }
195    return future;
196  }
197
198  private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
199      HRegionLocation loc, ClientService.Interface stub, REQ req,
200      Converter<MutateRequest, byte[], REQ> reqConvert,
201      Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
202    return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done),
203      respConverter);
204  }
205
206  private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
207      HRegionLocation loc, ClientService.Interface stub, REQ req,
208      Converter<MutateRequest, byte[], REQ> reqConvert) {
209    return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
210      return null;
211    });
212  }
213
214  private static Result toResult(HBaseRpcController controller, MutateResponse resp)
215      throws IOException {
216    if (!resp.hasResult()) {
217      return null;
218    }
219    return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
220  }
221
222  @FunctionalInterface
223  private interface NoncedConverter<D, I, S> {
224    D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
225  }
226
227  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
228      HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
229      NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
230      Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
231    return mutate(controller, loc, stub, req,
232      (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
233  }
234
235  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
236    return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
237      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
238      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
239      .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
240      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
241  }
242
243  private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller(
244      R row, long rpcTimeoutNs) {
245    return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
246  }
247
248  private CompletableFuture<Result> get(Get get, int replicaId) {
249    return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
250      .action((controller, loc, stub) -> RawAsyncTableImpl
251        .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
252          RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
253          (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
254      .replicaId(replicaId).call();
255  }
256
257  @Override
258  public CompletableFuture<Result> get(Get get) {
259    return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
260      RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
261      conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics());
262  }
263
264  @Override
265  public CompletableFuture<Void> put(Put put) {
266    validatePut(put, conn.connConf.getMaxKeyValueSize());
267    return this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
268      .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
269        put, RequestConverter::buildMutateRequest))
270      .call();
271  }
272
273  @Override
274  public CompletableFuture<Void> delete(Delete delete) {
275    return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
276      .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
277        stub, delete, RequestConverter::buildMutateRequest))
278      .call();
279  }
280
281  @Override
282  public CompletableFuture<Result> append(Append append) {
283    checkHasFamilies(append);
284    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
285    long nonce = conn.getNonceGenerator().newNonce();
286    return this.<Result, Append> newCaller(append, rpcTimeoutNs)
287      .action(
288        (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller,
289          loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
290      .call();
291  }
292
293  @Override
294  public CompletableFuture<Result> increment(Increment increment) {
295    checkHasFamilies(increment);
296    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
297    long nonce = conn.getNonceGenerator().newNonce();
298    return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
299      .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
300        controller, loc, stub, increment, RequestConverter::buildMutateRequest,
301        RawAsyncTableImpl::toResult))
302      .call();
303  }
304
305  private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
306
307    private final byte[] row;
308
309    private final byte[] family;
310
311    private byte[] qualifier;
312
313    private TimeRange timeRange;
314
315    private CompareOperator op;
316
317    private byte[] value;
318
319    public CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
320      this.row = Preconditions.checkNotNull(row, "row is null");
321      this.family = Preconditions.checkNotNull(family, "family is null");
322    }
323
324    @Override
325    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
326      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
327        " an empty byte array, or just do not call this method if you want a null qualifier");
328      return this;
329    }
330
331    @Override
332    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
333      this.timeRange = timeRange;
334      return this;
335    }
336
337    @Override
338    public CheckAndMutateBuilder ifNotExists() {
339      this.op = CompareOperator.EQUAL;
340      this.value = null;
341      return this;
342    }
343
344    @Override
345    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
346      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
347      this.value = Preconditions.checkNotNull(value, "value is null");
348      return this;
349    }
350
351    private void preCheck() {
352      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
353        " calling ifNotExists/ifEquals/ifMatches before executing the request");
354    }
355
356    @Override
357    public CompletableFuture<Boolean> thenPut(Put put) {
358      validatePut(put, conn.connConf.getMaxKeyValueSize());
359      preCheck();
360      return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
361        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
362          stub, put,
363          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
364            null, timeRange, p),
365          (c, r) -> r.getProcessed()))
366        .call();
367    }
368
369    @Override
370    public CompletableFuture<Boolean> thenDelete(Delete delete) {
371      preCheck();
372      return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
373        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
374          loc, stub, delete,
375          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
376            null, timeRange, d),
377          (c, r) -> r.getProcessed()))
378        .call();
379    }
380
381    @Override
382    public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
383      preCheck();
384      return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
385        rpcTimeoutNs)
386        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
387          loc, stub, mutation,
388          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
389            null, timeRange, rm), CheckAndMutateResult::isSuccess))
390        .call();
391    }
392  }
393
394  @Override
395  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
396    return new CheckAndMutateBuilderImpl(row, family);
397  }
398
399  private final class CheckAndMutateWithFilterBuilderImpl
400    implements CheckAndMutateWithFilterBuilder {
401
402    private final byte[] row;
403
404    private final Filter filter;
405
406    private TimeRange timeRange;
407
408    public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
409      this.row = Preconditions.checkNotNull(row, "row is null");
410      this.filter = Preconditions.checkNotNull(filter, "filter is null");
411    }
412
413    @Override
414    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
415      this.timeRange = timeRange;
416      return this;
417    }
418
419    @Override
420    public CompletableFuture<Boolean> thenPut(Put put) {
421      validatePut(put, conn.connConf.getMaxKeyValueSize());
422      return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
423        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
424          stub, put,
425          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
426            filter, timeRange, p),
427          (c, r) -> r.getProcessed()))
428        .call();
429    }
430
431    @Override
432    public CompletableFuture<Boolean> thenDelete(Delete delete) {
433      return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
434        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
435          loc, stub, delete,
436          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
437            filter, timeRange, d),
438          (c, r) -> r.getProcessed()))
439        .call();
440    }
441
442    @Override
443    public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
444      return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
445        rpcTimeoutNs)
446        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
447          loc, stub, mutation,
448          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
449            filter, timeRange, rm), CheckAndMutateResult::isSuccess))
450        .call();
451    }
452  }
453
454  @Override
455  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
456    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
457  }
458
459  @Override
460  public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
461    if (checkAndMutate.getAction() instanceof Put) {
462      validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
463    }
464    if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
465      || checkAndMutate.getAction() instanceof Increment
466      || checkAndMutate.getAction() instanceof Append) {
467      Mutation mutation = (Mutation) checkAndMutate.getAction();
468      if (mutation instanceof Put) {
469        validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
470      }
471      return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
472        mutation.getPriority(), rpcTimeoutNs)
473        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
474          loc, stub, mutation,
475          (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
476            checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
477            checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
478            checkAndMutate.getTimeRange(), m),
479          (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
480        .call();
481    } else if (checkAndMutate.getAction() instanceof RowMutations) {
482      RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
483      return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
484        rowMutations.getMaxPriority(), rpcTimeoutNs)
485        .action((controller, loc, stub) ->
486          RawAsyncTableImpl.this.<CheckAndMutateResult, CheckAndMutateResult> mutateRow(
487            controller, loc, stub, rowMutations,
488            (rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
489            checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
490            checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
491            checkAndMutate.getTimeRange(), rm),
492            resp -> resp))
493        .call();
494    } else {
495      CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>();
496      future.completeExceptionally(new DoNotRetryIOException(
497        "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
498      return future;
499    }
500  }
501
502  @Override
503  public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate(
504    List<CheckAndMutate> checkAndMutates) {
505    return batch(checkAndMutates, rpcTimeoutNs).stream()
506      .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList());
507  }
508
509  // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
510  // so here I write a new method as I do not want to change the abstraction of call method.
511  @SuppressWarnings("unchecked")
512  private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
513      HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
514      Converter<MultiRequest, byte[], RowMutations> reqConvert,
515      Function<RES, RESP> respConverter) {
516    CompletableFuture<RESP> future = new CompletableFuture<>();
517    try {
518      byte[] regionName = loc.getRegion().getRegionName();
519      MultiRequest req = reqConvert.convert(regionName, mutation);
520      stub.multi(controller, req, new RpcCallback<MultiResponse>() {
521
522        @Override
523        public void run(MultiResponse resp) {
524          if (controller.failed()) {
525            future.completeExceptionally(controller.getFailed());
526          } else {
527            try {
528              org.apache.hadoop.hbase.client.MultiResponse multiResp =
529                ResponseConverter.getResults(req, resp, controller.cellScanner());
530              ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
531                loc.getServerName(), multiResp);
532              Throwable ex = multiResp.getException(regionName);
533              if (ex != null) {
534                future.completeExceptionally(ex instanceof IOException ? ex
535                  : new IOException(
536                    "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
537              } else {
538                future.complete(respConverter
539                  .apply((RES) multiResp.getResults().get(regionName).result.get(0)));
540              }
541            } catch (IOException e) {
542              future.completeExceptionally(e);
543            }
544          }
545        }
546      });
547    } catch (IOException e) {
548      future.completeExceptionally(e);
549    }
550    return future;
551  }
552
553  @Override
554  public CompletableFuture<Result> mutateRow(RowMutations mutations) {
555    return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
556      writeRpcTimeoutNs).action((controller, loc, stub) ->
557        this.<Result, Result> mutateRow(controller, loc, stub, mutations,
558          (rn, rm) -> {
559            RegionAction.Builder regionMutationBuilder = RequestConverter
560              .buildRegionAction(rn, rm);
561            regionMutationBuilder.setAtomic(true);
562            return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build())
563              .build();
564          }, resp -> resp))
565      .call();
566  }
567
568  private Scan setDefaultScanConfig(Scan scan) {
569    // always create a new scan object as we may reset the start row later.
570    Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
571    if (newScan.getCaching() <= 0) {
572      newScan.setCaching(defaultScannerCaching);
573    }
574    if (newScan.getMaxResultSize() <= 0) {
575      newScan.setMaxResultSize(defaultScannerMaxResultSize);
576    }
577    return newScan;
578  }
579
580  @Override
581  public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
582    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
583      pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt)
584        .start();
585  }
586
587  private long resultSize2CacheSize(long maxResultSize) {
588    // * 2 if possible
589    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
590  }
591
592  @Override
593  public ResultScanner getScanner(Scan scan) {
594    return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
595      resultSize2CacheSize(
596        scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
597  }
598
599  @Override
600  public CompletableFuture<List<Result>> scanAll(Scan scan) {
601    CompletableFuture<List<Result>> future = new CompletableFuture<>();
602    List<Result> scanResults = new ArrayList<>();
603    scan(scan, new AdvancedScanResultConsumer() {
604
605      @Override
606      public void onNext(Result[] results, ScanController controller) {
607        scanResults.addAll(Arrays.asList(results));
608      }
609
610      @Override
611      public void onError(Throwable error) {
612        future.completeExceptionally(error);
613      }
614
615      @Override
616      public void onComplete() {
617        future.complete(scanResults);
618      }
619    });
620    return future;
621  }
622
623  @Override
624  public List<CompletableFuture<Result>> get(List<Get> gets) {
625    return batch(gets, readRpcTimeoutNs);
626  }
627
628  @Override
629  public List<CompletableFuture<Void>> put(List<Put> puts) {
630    return voidMutate(puts);
631  }
632
633  @Override
634  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
635    return voidMutate(deletes);
636  }
637
638  @Override
639  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
640    return batch(actions, rpcTimeoutNs);
641  }
642
643  private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
644    return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
645      .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
646  }
647
648  private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
649    for (Row action : actions) {
650      if (action instanceof Put) {
651        validatePut((Put) action, conn.connConf.getMaxKeyValueSize());
652      } else if (action instanceof CheckAndMutate) {
653        CheckAndMutate checkAndMutate = (CheckAndMutate) action;
654        if (checkAndMutate.getAction() instanceof Put) {
655          validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
656        }
657      }
658    }
659    return conn.callerFactory.batch().table(tableName).actions(actions)
660      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
661      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
662      .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
663      .startLogErrorsCnt(startLogErrorsCnt).call();
664  }
665
666  @Override
667  public long getRpcTimeout(TimeUnit unit) {
668    return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS);
669  }
670
671  @Override
672  public long getReadRpcTimeout(TimeUnit unit) {
673    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
674  }
675
676  @Override
677  public long getWriteRpcTimeout(TimeUnit unit) {
678    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
679  }
680
681  @Override
682  public long getOperationTimeout(TimeUnit unit) {
683    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
684  }
685
686  @Override
687  public long getScanTimeout(TimeUnit unit) {
688    return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
689  }
690
691  private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
692      ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
693    RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
694      region, row, rpcTimeoutNs, operationTimeoutNs);
695    S stub = stubMaker.apply(channel);
696    CompletableFuture<R> future = new CompletableFuture<>();
697    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
698    callable.call(stub, controller, resp -> {
699      if (controller.failed()) {
700        future.completeExceptionally(controller.getFailed());
701      } else {
702        future.complete(resp);
703      }
704    });
705    return future;
706  }
707
708  @Override
709  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
710      ServiceCaller<S, R> callable, byte[] row) {
711    return coprocessorService(stubMaker, callable, null, row);
712  }
713
714  private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) {
715    if (isEmptyStopRow(endKey)) {
716      if (isEmptyStopRow(region.getEndKey())) {
717        return true;
718      }
719      return false;
720    } else {
721      if (isEmptyStopRow(region.getEndKey())) {
722        return true;
723      }
724      int c = Bytes.compareTo(endKey, region.getEndKey());
725      // 1. if the region contains endKey
726      // 2. endKey is equal to the region's endKey and we do not want to include endKey.
727      return c < 0 || c == 0 && !endKeyInclusive;
728    }
729  }
730
731  private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
732      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
733      byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
734      AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
735    if (error != null) {
736      callback.onError(error);
737      return;
738    }
739    unfinishedRequest.incrementAndGet();
740    RegionInfo region = loc.getRegion();
741    if (locateFinished(region, endKey, endKeyInclusive)) {
742      locateFinished.set(true);
743    } else {
744      addListener(
745        conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
746          operationTimeoutNs),
747        (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
748          locateFinished, unfinishedRequest, l, e));
749    }
750    addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
751      if (e != null) {
752        callback.onRegionError(region, e);
753      } else {
754        callback.onRegionComplete(region, r);
755      }
756      if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
757        callback.onComplete();
758      }
759    });
760  }
761
762  private final class CoprocessorServiceBuilderImpl<S, R>
763      implements CoprocessorServiceBuilder<S, R> {
764
765    private final Function<RpcChannel, S> stubMaker;
766
767    private final ServiceCaller<S, R> callable;
768
769    private final CoprocessorCallback<R> callback;
770
771    private byte[] startKey = HConstants.EMPTY_START_ROW;
772
773    private boolean startKeyInclusive;
774
775    private byte[] endKey = HConstants.EMPTY_END_ROW;
776
777    private boolean endKeyInclusive;
778
779    public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
780        ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
781      this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
782      this.callable = Preconditions.checkNotNull(callable, "callable is null");
783      this.callback = Preconditions.checkNotNull(callback, "callback is null");
784    }
785
786    @Override
787    public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
788      this.startKey = Preconditions.checkNotNull(startKey,
789        "startKey is null. Consider using" +
790          " an empty byte array, or just do not call this method if you want to start selection" +
791          " from the first region");
792      this.startKeyInclusive = inclusive;
793      return this;
794    }
795
796    @Override
797    public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
798      this.endKey = Preconditions.checkNotNull(endKey,
799        "endKey is null. Consider using" +
800          " an empty byte array, or just do not call this method if you want to continue" +
801          " selection to the last region");
802      this.endKeyInclusive = inclusive;
803      return this;
804    }
805
806    @Override
807    public void execute() {
808      addListener(conn.getLocator().getRegionLocation(tableName, startKey,
809        startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs),
810        (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
811          endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
812    }
813  }
814
815  @Override
816  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
817      Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
818      CoprocessorCallback<R> callback) {
819    return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
820  }
821}