001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; 025import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 026 027import com.google.protobuf.RpcChannel; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.concurrent.CompletableFuture; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.function.Function; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.CompareOperator; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; 043import org.apache.hadoop.hbase.filter.BinaryComparator; 044import org.apache.hadoop.hbase.io.TimeRange; 045import org.apache.hadoop.hbase.ipc.HBaseRpcController; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.ReflectionUtils; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 053import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 054import org.apache.hbase.thirdparty.io.netty.util.Timer; 055 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 058import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; 068 069/** 070 * The implementation of RawAsyncTable. 071 * <p/> 072 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 073 * be finished inside the rpc framework thread, which means that the callbacks registered to the 074 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 075 * this class should not try to do time consuming tasks in the callbacks. 076 * @since 2.0.0 077 * @see AsyncTableImpl 078 */ 079@InterfaceAudience.Private 080class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { 081 082 private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); 083 084 private final AsyncConnectionImpl conn; 085 086 private final Timer retryTimer; 087 088 private final TableName tableName; 089 090 private final int defaultScannerCaching; 091 092 private final long defaultScannerMaxResultSize; 093 094 private final long rpcTimeoutNs; 095 096 private final long readRpcTimeoutNs; 097 098 private final long writeRpcTimeoutNs; 099 100 private final long operationTimeoutNs; 101 102 private final long scanTimeoutNs; 103 104 private final long pauseNs; 105 106 private final long pauseForCQTBENs; 107 108 private final int maxAttempts; 109 110 private final int startLogErrorsCnt; 111 112 RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) { 113 this.conn = conn; 114 this.retryTimer = retryTimer; 115 this.tableName = builder.tableName; 116 this.rpcTimeoutNs = builder.rpcTimeoutNs; 117 this.readRpcTimeoutNs = builder.readRpcTimeoutNs; 118 this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs; 119 this.operationTimeoutNs = builder.operationTimeoutNs; 120 this.scanTimeoutNs = builder.scanTimeoutNs; 121 this.pauseNs = builder.pauseNs; 122 if (builder.pauseForCQTBENs < builder.pauseNs) { 123 LOG.warn( 124 "Configured value of pauseForCQTBENs is {} ms, which is less than" + 125 " the normal pause value {} ms, use the greater one instead", 126 TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), 127 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 128 this.pauseForCQTBENs = builder.pauseNs; 129 } else { 130 this.pauseForCQTBENs = builder.pauseForCQTBENs; 131 } 132 this.maxAttempts = builder.maxAttempts; 133 this.startLogErrorsCnt = builder.startLogErrorsCnt; 134 this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() 135 : conn.connConf.getScannerCaching(); 136 this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); 137 } 138 139 @Override 140 public TableName getName() { 141 return tableName; 142 } 143 144 @Override 145 public Configuration getConfiguration() { 146 return conn.getConfiguration(); 147 } 148 149 @FunctionalInterface 150 private interface Converter<D, I, S> { 151 D convert(I info, S src) throws IOException; 152 } 153 154 @FunctionalInterface 155 private interface RpcCall<RESP, REQ> { 156 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 157 RpcCallback<RESP> done); 158 } 159 160 private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call( 161 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 162 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 163 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 164 CompletableFuture<RESP> future = new CompletableFuture<>(); 165 try { 166 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 167 new RpcCallback<PRESP>() { 168 169 @Override 170 public void run(PRESP resp) { 171 if (controller.failed()) { 172 future.completeExceptionally(controller.getFailed()); 173 } else { 174 try { 175 future.complete(respConverter.convert(controller, resp)); 176 } catch (IOException e) { 177 future.completeExceptionally(e); 178 } 179 } 180 } 181 }); 182 } catch (IOException e) { 183 future.completeExceptionally(e); 184 } 185 return future; 186 } 187 188 private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller, 189 HRegionLocation loc, ClientService.Interface stub, REQ req, 190 Converter<MutateRequest, byte[], REQ> reqConvert, 191 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 192 return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), 193 respConverter); 194 } 195 196 private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, 197 HRegionLocation loc, ClientService.Interface stub, REQ req, 198 Converter<MutateRequest, byte[], REQ> reqConvert) { 199 return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { 200 return null; 201 }); 202 } 203 204 private static Result toResult(HBaseRpcController controller, MutateResponse resp) 205 throws IOException { 206 if (!resp.hasResult()) { 207 return null; 208 } 209 return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); 210 } 211 212 @FunctionalInterface 213 private interface NoncedConverter<D, I, S> { 214 D convert(I info, S src, long nonceGroup, long nonce) throws IOException; 215 } 216 217 private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, 218 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 219 NoncedConverter<MutateRequest, byte[], REQ> reqConvert, 220 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 221 return mutate(controller, loc, stub, req, 222 (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); 223 } 224 225 private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) { 226 return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority) 227 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 228 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 229 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) 230 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 231 } 232 233 private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller( 234 R row, long rpcTimeoutNs) { 235 return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); 236 } 237 238 private CompletableFuture<Result> get(Get get, int replicaId) { 239 return this.<Result, Get> newCaller(get, readRpcTimeoutNs) 240 .action((controller, loc, stub) -> RawAsyncTableImpl 241 .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get, 242 RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), 243 (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) 244 .replicaId(replicaId).call(); 245 } 246 247 @Override 248 public CompletableFuture<Result> get(Get get) { 249 return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), 250 RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, 251 conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()); 252 } 253 254 @Override 255 public CompletableFuture<Void> put(Put put) { 256 validatePut(put, conn.connConf.getMaxKeyValueSize()); 257 return this.<Void, Put> newCaller(put, writeRpcTimeoutNs) 258 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, 259 put, RequestConverter::buildMutateRequest)) 260 .call(); 261 } 262 263 @Override 264 public CompletableFuture<Void> delete(Delete delete) { 265 return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) 266 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, 267 stub, delete, RequestConverter::buildMutateRequest)) 268 .call(); 269 } 270 271 @Override 272 public CompletableFuture<Result> append(Append append) { 273 checkHasFamilies(append); 274 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 275 long nonce = conn.getNonceGenerator().newNonce(); 276 return this.<Result, Append> newCaller(append, rpcTimeoutNs) 277 .action( 278 (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller, 279 loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) 280 .call(); 281 } 282 283 @Override 284 public CompletableFuture<Result> increment(Increment increment) { 285 checkHasFamilies(increment); 286 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 287 long nonce = conn.getNonceGenerator().newNonce(); 288 return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) 289 .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, 290 controller, loc, stub, increment, RequestConverter::buildMutateRequest, 291 RawAsyncTableImpl::toResult)) 292 .call(); 293 } 294 295 private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 296 297 private final byte[] row; 298 299 private final byte[] family; 300 301 private byte[] qualifier; 302 303 private TimeRange timeRange; 304 305 private CompareOperator op; 306 307 private byte[] value; 308 309 public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 310 this.row = Preconditions.checkNotNull(row, "row is null"); 311 this.family = Preconditions.checkNotNull(family, "family is null"); 312 } 313 314 @Override 315 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 316 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + 317 " an empty byte array, or just do not call this method if you want a null qualifier"); 318 return this; 319 } 320 321 @Override 322 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 323 this.timeRange = timeRange; 324 return this; 325 } 326 327 @Override 328 public CheckAndMutateBuilder ifNotExists() { 329 this.op = CompareOperator.EQUAL; 330 this.value = null; 331 return this; 332 } 333 334 @Override 335 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 336 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 337 this.value = Preconditions.checkNotNull(value, "value is null"); 338 return this; 339 } 340 341 private void preCheck() { 342 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + 343 " calling ifNotExists/ifEquals/ifMatches before executing the request"); 344 } 345 346 @Override 347 public CompletableFuture<Boolean> thenPut(Put put) { 348 validatePut(put, conn.connConf.getMaxKeyValueSize()); 349 preCheck(); 350 return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 351 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc, 352 stub, put, 353 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, 354 new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p), 355 (c, r) -> r.getProcessed())) 356 .call(); 357 } 358 359 @Override 360 public CompletableFuture<Boolean> thenDelete(Delete delete) { 361 preCheck(); 362 return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 363 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller, 364 loc, stub, delete, 365 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, 366 new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d), 367 (c, r) -> r.getProcessed())) 368 .call(); 369 } 370 371 @Override 372 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 373 preCheck(); 374 return RawAsyncTableImpl.this 375 .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) 376 .action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean> mutateRow(controller, 377 loc, stub, mutation, 378 (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, 379 new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), 380 resp -> resp.getExists())) 381 .call(); 382 } 383 } 384 385 @Override 386 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 387 return new CheckAndMutateBuilderImpl(row, family); 388 } 389 390 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, 391 // so here I write a new method as I do not want to change the abstraction of call method. 392 private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, 393 HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, 394 Converter<MultiRequest, byte[], RowMutations> reqConvert, 395 Function<Result, RESP> respConverter) { 396 CompletableFuture<RESP> future = new CompletableFuture<>(); 397 try { 398 byte[] regionName = loc.getRegion().getRegionName(); 399 MultiRequest req = reqConvert.convert(regionName, mutation); 400 stub.multi(controller, req, new RpcCallback<MultiResponse>() { 401 402 @Override 403 public void run(MultiResponse resp) { 404 if (controller.failed()) { 405 future.completeExceptionally(controller.getFailed()); 406 } else { 407 try { 408 org.apache.hadoop.hbase.client.MultiResponse multiResp = 409 ResponseConverter.getResults(req, resp, controller.cellScanner()); 410 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 411 loc.getServerName(), multiResp); 412 Throwable ex = multiResp.getException(regionName); 413 if (ex != null) { 414 future.completeExceptionally(ex instanceof IOException ? ex 415 : new IOException( 416 "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); 417 } else { 418 future.complete(respConverter 419 .apply((Result) multiResp.getResults().get(regionName).result.get(0))); 420 } 421 } catch (IOException e) { 422 future.completeExceptionally(e); 423 } 424 } 425 } 426 }); 427 } catch (IOException e) { 428 future.completeExceptionally(e); 429 } 430 return future; 431 } 432 433 @Override 434 public CompletableFuture<Void> mutateRow(RowMutations mutation) { 435 return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs) 436 .action((controller, loc, stub) -> this.<Void> mutateRow(controller, loc, stub, mutation, 437 (rn, rm) -> { 438 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); 439 regionMutationBuilder.setAtomic(true); 440 return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); 441 }, resp -> null)) 442 .call(); 443 } 444 445 private Scan setDefaultScanConfig(Scan scan) { 446 // always create a new scan object as we may reset the start row later. 447 Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); 448 if (newScan.getCaching() <= 0) { 449 newScan.setCaching(defaultScannerCaching); 450 } 451 if (newScan.getMaxResultSize() <= 0) { 452 newScan.setMaxResultSize(defaultScannerMaxResultSize); 453 } 454 return newScan; 455 } 456 457 @Override 458 public void scan(Scan scan, AdvancedScanResultConsumer consumer) { 459 new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, 460 pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt) 461 .start(); 462 } 463 464 private long resultSize2CacheSize(long maxResultSize) { 465 // * 2 if possible 466 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 467 } 468 469 @Override 470 public ResultScanner getScanner(Scan scan) { 471 return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan), 472 resultSize2CacheSize( 473 scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); 474 } 475 476 @Override 477 public CompletableFuture<List<Result>> scanAll(Scan scan) { 478 CompletableFuture<List<Result>> future = new CompletableFuture<>(); 479 List<Result> scanResults = new ArrayList<>(); 480 scan(scan, new AdvancedScanResultConsumer() { 481 482 @Override 483 public void onNext(Result[] results, ScanController controller) { 484 scanResults.addAll(Arrays.asList(results)); 485 } 486 487 @Override 488 public void onError(Throwable error) { 489 future.completeExceptionally(error); 490 } 491 492 @Override 493 public void onComplete() { 494 future.complete(scanResults); 495 } 496 }); 497 return future; 498 } 499 500 @Override 501 public List<CompletableFuture<Result>> get(List<Get> gets) { 502 return batch(gets, readRpcTimeoutNs); 503 } 504 505 @Override 506 public List<CompletableFuture<Void>> put(List<Put> puts) { 507 return voidMutate(puts); 508 } 509 510 @Override 511 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 512 return voidMutate(deletes); 513 } 514 515 @Override 516 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 517 return batch(actions, rpcTimeoutNs); 518 } 519 520 private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { 521 return this.<Object> batch(actions, writeRpcTimeoutNs).stream() 522 .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); 523 } 524 525 private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) { 526 actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action) 527 .forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize())); 528 return conn.callerFactory.batch().table(tableName).actions(actions) 529 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 530 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 531 .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 532 .startLogErrorsCnt(startLogErrorsCnt).call(); 533 } 534 535 @Override 536 public long getRpcTimeout(TimeUnit unit) { 537 return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); 538 } 539 540 @Override 541 public long getReadRpcTimeout(TimeUnit unit) { 542 return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); 543 } 544 545 @Override 546 public long getWriteRpcTimeout(TimeUnit unit) { 547 return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); 548 } 549 550 @Override 551 public long getOperationTimeout(TimeUnit unit) { 552 return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); 553 } 554 555 @Override 556 public long getScanTimeout(TimeUnit unit) { 557 return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); 558 } 559 560 private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 561 ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { 562 RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, 563 region, row, rpcTimeoutNs, operationTimeoutNs); 564 S stub = stubMaker.apply(channel); 565 CompletableFuture<R> future = new CompletableFuture<>(); 566 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 567 callable.call(stub, controller, resp -> { 568 if (controller.failed()) { 569 future.completeExceptionally(controller.getFailed()); 570 } else { 571 future.complete(resp); 572 } 573 }); 574 return future; 575 } 576 577 @Override 578 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 579 ServiceCaller<S, R> callable, byte[] row) { 580 return coprocessorService(stubMaker, callable, null, row); 581 } 582 583 private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) { 584 if (isEmptyStopRow(endKey)) { 585 if (isEmptyStopRow(region.getEndKey())) { 586 return true; 587 } 588 return false; 589 } else { 590 if (isEmptyStopRow(region.getEndKey())) { 591 return true; 592 } 593 int c = Bytes.compareTo(endKey, region.getEndKey()); 594 // 1. if the region contains endKey 595 // 2. endKey is equal to the region's endKey and we do not want to include endKey. 596 return c < 0 || c == 0 && !endKeyInclusive; 597 } 598 } 599 600 private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, 601 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, 602 byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, 603 AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { 604 if (error != null) { 605 callback.onError(error); 606 return; 607 } 608 unfinishedRequest.incrementAndGet(); 609 RegionInfo region = loc.getRegion(); 610 if (locateFinished(region, endKey, endKeyInclusive)) { 611 locateFinished.set(true); 612 } else { 613 addListener( 614 conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, 615 operationTimeoutNs), 616 (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, 617 locateFinished, unfinishedRequest, l, e)); 618 } 619 addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { 620 if (e != null) { 621 callback.onRegionError(region, e); 622 } else { 623 callback.onRegionComplete(region, r); 624 } 625 if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { 626 callback.onComplete(); 627 } 628 }); 629 } 630 631 private final class CoprocessorServiceBuilderImpl<S, R> 632 implements CoprocessorServiceBuilder<S, R> { 633 634 private final Function<RpcChannel, S> stubMaker; 635 636 private final ServiceCaller<S, R> callable; 637 638 private final CoprocessorCallback<R> callback; 639 640 private byte[] startKey = HConstants.EMPTY_START_ROW; 641 642 private boolean startKeyInclusive; 643 644 private byte[] endKey = HConstants.EMPTY_END_ROW; 645 646 private boolean endKeyInclusive; 647 648 public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, 649 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { 650 this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); 651 this.callable = Preconditions.checkNotNull(callable, "callable is null"); 652 this.callback = Preconditions.checkNotNull(callback, "callback is null"); 653 } 654 655 @Override 656 public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { 657 this.startKey = Preconditions.checkNotNull(startKey, 658 "startKey is null. Consider using" + 659 " an empty byte array, or just do not call this method if you want to start selection" + 660 " from the first region"); 661 this.startKeyInclusive = inclusive; 662 return this; 663 } 664 665 @Override 666 public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { 667 this.endKey = Preconditions.checkNotNull(endKey, 668 "endKey is null. Consider using" + 669 " an empty byte array, or just do not call this method if you want to continue" + 670 " selection to the last region"); 671 this.endKeyInclusive = inclusive; 672 return this; 673 } 674 675 @Override 676 public void execute() { 677 addListener(conn.getLocator().getRegionLocation(tableName, startKey, 678 startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs), 679 (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, 680 endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); 681 } 682 } 683 684 @Override 685 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 686 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 687 CoprocessorCallback<R> callback) { 688 return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); 689 } 690}