001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 024import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 025 026import com.google.protobuf.RpcChannel; 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.List; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.function.Function; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.CompareOperator; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; 042import org.apache.hadoop.hbase.filter.BinaryComparator; 043import org.apache.hadoop.hbase.io.TimeRange; 044import org.apache.hadoop.hbase.ipc.HBaseRpcController; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.ReflectionUtils; 047import org.apache.yetus.audience.InterfaceAudience; 048 049import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 050import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 051import org.apache.hbase.thirdparty.io.netty.util.Timer; 052 053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 054import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 055import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; 065 066/** 067 * The implementation of RawAsyncTable. 068 * <p/> 069 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 070 * be finished inside the rpc framework thread, which means that the callbacks registered to the 071 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 072 * this class should not try to do time consuming tasks in the callbacks. 073 * @since 2.0.0 074 * @see AsyncTableImpl 075 */ 076@InterfaceAudience.Private 077class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { 078 079 private final AsyncConnectionImpl conn; 080 081 private final Timer retryTimer; 082 083 private final TableName tableName; 084 085 private final int defaultScannerCaching; 086 087 private final long defaultScannerMaxResultSize; 088 089 private final long rpcTimeoutNs; 090 091 private final long readRpcTimeoutNs; 092 093 private final long writeRpcTimeoutNs; 094 095 private final long operationTimeoutNs; 096 097 private final long scanTimeoutNs; 098 099 private final long pauseNs; 100 101 private final int maxAttempts; 102 103 private final int startLogErrorsCnt; 104 105 RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) { 106 this.conn = conn; 107 this.retryTimer = retryTimer; 108 this.tableName = builder.tableName; 109 this.rpcTimeoutNs = builder.rpcTimeoutNs; 110 this.readRpcTimeoutNs = builder.readRpcTimeoutNs; 111 this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs; 112 this.operationTimeoutNs = builder.operationTimeoutNs; 113 this.scanTimeoutNs = builder.scanTimeoutNs; 114 this.pauseNs = builder.pauseNs; 115 this.maxAttempts = builder.maxAttempts; 116 this.startLogErrorsCnt = builder.startLogErrorsCnt; 117 this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() 118 : conn.connConf.getScannerCaching(); 119 this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); 120 } 121 122 @Override 123 public TableName getName() { 124 return tableName; 125 } 126 127 @Override 128 public Configuration getConfiguration() { 129 return conn.getConfiguration(); 130 } 131 132 @FunctionalInterface 133 private interface Converter<D, I, S> { 134 D convert(I info, S src) throws IOException; 135 } 136 137 @FunctionalInterface 138 private interface RpcCall<RESP, REQ> { 139 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 140 RpcCallback<RESP> done); 141 } 142 143 private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call( 144 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 145 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 146 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 147 CompletableFuture<RESP> future = new CompletableFuture<>(); 148 try { 149 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 150 new RpcCallback<PRESP>() { 151 152 @Override 153 public void run(PRESP resp) { 154 if (controller.failed()) { 155 future.completeExceptionally(controller.getFailed()); 156 } else { 157 try { 158 future.complete(respConverter.convert(controller, resp)); 159 } catch (IOException e) { 160 future.completeExceptionally(e); 161 } 162 } 163 } 164 }); 165 } catch (IOException e) { 166 future.completeExceptionally(e); 167 } 168 return future; 169 } 170 171 private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller, 172 HRegionLocation loc, ClientService.Interface stub, REQ req, 173 Converter<MutateRequest, byte[], REQ> reqConvert, 174 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 175 return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), 176 respConverter); 177 } 178 179 private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, 180 HRegionLocation loc, ClientService.Interface stub, REQ req, 181 Converter<MutateRequest, byte[], REQ> reqConvert) { 182 return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { 183 return null; 184 }); 185 } 186 187 private static Result toResult(HBaseRpcController controller, MutateResponse resp) 188 throws IOException { 189 if (!resp.hasResult()) { 190 return null; 191 } 192 return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); 193 } 194 195 @FunctionalInterface 196 private interface NoncedConverter<D, I, S> { 197 D convert(I info, S src, long nonceGroup, long nonce) throws IOException; 198 } 199 200 private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, 201 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 202 NoncedConverter<MutateRequest, byte[], REQ> reqConvert, 203 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 204 return mutate(controller, loc, stub, req, 205 (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); 206 } 207 208 private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) { 209 return conn.callerFactory.<T> single().table(tableName).row(row) 210 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 211 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 212 .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 213 .startLogErrorsCnt(startLogErrorsCnt); 214 } 215 216 private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) { 217 return newCaller(row.getRow(), rpcTimeoutNs); 218 } 219 220 private CompletableFuture<Result> get(Get get, int replicaId) { 221 return this.<Result> newCaller(get, readRpcTimeoutNs) 222 .action((controller, loc, stub) -> RawAsyncTableImpl 223 .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get, 224 RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), 225 (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) 226 .replicaId(replicaId).call(); 227 } 228 229 @Override 230 public CompletableFuture<Result> get(Get get) { 231 return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), 232 RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, 233 conn.connConf.getPrimaryCallTimeoutNs(), retryTimer); 234 } 235 236 @Override 237 public CompletableFuture<Void> put(Put put) { 238 return this.<Void> newCaller(put, writeRpcTimeoutNs) 239 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, 240 put, RequestConverter::buildMutateRequest)) 241 .call(); 242 } 243 244 @Override 245 public CompletableFuture<Void> delete(Delete delete) { 246 return this.<Void> newCaller(delete, writeRpcTimeoutNs) 247 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, 248 stub, delete, RequestConverter::buildMutateRequest)) 249 .call(); 250 } 251 252 @Override 253 public CompletableFuture<Result> append(Append append) { 254 checkHasFamilies(append); 255 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 256 long nonce = conn.getNonceGenerator().newNonce(); 257 return this.<Result> newCaller(append, rpcTimeoutNs) 258 .action( 259 (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller, 260 loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) 261 .call(); 262 } 263 264 @Override 265 public CompletableFuture<Result> increment(Increment increment) { 266 checkHasFamilies(increment); 267 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 268 long nonce = conn.getNonceGenerator().newNonce(); 269 return this.<Result> newCaller(increment, rpcTimeoutNs) 270 .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, 271 controller, loc, stub, increment, RequestConverter::buildMutateRequest, 272 RawAsyncTableImpl::toResult)) 273 .call(); 274 } 275 276 private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 277 278 private final byte[] row; 279 280 private final byte[] family; 281 282 private byte[] qualifier; 283 284 private TimeRange timeRange; 285 286 private CompareOperator op; 287 288 private byte[] value; 289 290 public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 291 this.row = Preconditions.checkNotNull(row, "row is null"); 292 this.family = Preconditions.checkNotNull(family, "family is null"); 293 } 294 295 @Override 296 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 297 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + 298 " an empty byte array, or just do not call this method if you want a null qualifier"); 299 return this; 300 } 301 302 @Override 303 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 304 this.timeRange = timeRange; 305 return this; 306 } 307 308 @Override 309 public CheckAndMutateBuilder ifNotExists() { 310 this.op = CompareOperator.EQUAL; 311 this.value = null; 312 return this; 313 } 314 315 @Override 316 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 317 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 318 this.value = Preconditions.checkNotNull(value, "value is null"); 319 return this; 320 } 321 322 private void preCheck() { 323 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + 324 " calling ifNotExists/ifEquals/ifMatches before executing the request"); 325 } 326 327 @Override 328 public CompletableFuture<Boolean> thenPut(Put put) { 329 preCheck(); 330 return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs) 331 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc, 332 stub, put, 333 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, 334 new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p), 335 (c, r) -> r.getProcessed())) 336 .call(); 337 } 338 339 @Override 340 public CompletableFuture<Boolean> thenDelete(Delete delete) { 341 preCheck(); 342 return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs) 343 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller, 344 loc, stub, delete, 345 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, 346 new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d), 347 (c, r) -> r.getProcessed())) 348 .call(); 349 } 350 351 @Override 352 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 353 preCheck(); 354 return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs) 355 .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc, 356 stub, mutation, 357 (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, 358 new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), 359 resp -> resp.getExists())) 360 .call(); 361 } 362 } 363 364 @Override 365 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 366 return new CheckAndMutateBuilderImpl(row, family); 367 } 368 369 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, 370 // so here I write a new method as I do not want to change the abstraction of call method. 371 private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, 372 HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, 373 Converter<MultiRequest, byte[], RowMutations> reqConvert, 374 Function<Result, RESP> respConverter) { 375 CompletableFuture<RESP> future = new CompletableFuture<>(); 376 try { 377 byte[] regionName = loc.getRegion().getRegionName(); 378 MultiRequest req = reqConvert.convert(regionName, mutation); 379 stub.multi(controller, req, new RpcCallback<MultiResponse>() { 380 381 @Override 382 public void run(MultiResponse resp) { 383 if (controller.failed()) { 384 future.completeExceptionally(controller.getFailed()); 385 } else { 386 try { 387 org.apache.hadoop.hbase.client.MultiResponse multiResp = 388 ResponseConverter.getResults(req, resp, controller.cellScanner()); 389 Throwable ex = multiResp.getException(regionName); 390 if (ex != null) { 391 future.completeExceptionally(ex instanceof IOException ? ex 392 : new IOException( 393 "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); 394 } else { 395 future.complete(respConverter 396 .apply((Result) multiResp.getResults().get(regionName).result.get(0))); 397 } 398 } catch (IOException e) { 399 future.completeExceptionally(e); 400 } 401 } 402 } 403 }); 404 } catch (IOException e) { 405 future.completeExceptionally(e); 406 } 407 return future; 408 } 409 410 @Override 411 public CompletableFuture<Void> mutateRow(RowMutations mutation) { 412 return this.<Void> newCaller(mutation, writeRpcTimeoutNs).action((controller, loc, 413 stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub, mutation, (rn, rm) -> { 414 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); 415 regionMutationBuilder.setAtomic(true); 416 return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); 417 }, resp -> null)) 418 .call(); 419 } 420 421 private Scan setDefaultScanConfig(Scan scan) { 422 // always create a new scan object as we may reset the start row later. 423 Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); 424 if (newScan.getCaching() <= 0) { 425 newScan.setCaching(defaultScannerCaching); 426 } 427 if (newScan.getMaxResultSize() <= 0) { 428 newScan.setMaxResultSize(defaultScannerMaxResultSize); 429 } 430 return newScan; 431 } 432 433 @Override 434 public void scan(Scan scan, AdvancedScanResultConsumer consumer) { 435 new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, 436 pauseNs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); 437 } 438 439 private long resultSize2CacheSize(long maxResultSize) { 440 // * 2 if possible 441 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 442 } 443 444 @Override 445 public ResultScanner getScanner(Scan scan) { 446 return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan), 447 resultSize2CacheSize( 448 scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); 449 } 450 451 @Override 452 public CompletableFuture<List<Result>> scanAll(Scan scan) { 453 CompletableFuture<List<Result>> future = new CompletableFuture<>(); 454 List<Result> scanResults = new ArrayList<>(); 455 scan(scan, new AdvancedScanResultConsumer() { 456 457 @Override 458 public void onNext(Result[] results, ScanController controller) { 459 scanResults.addAll(Arrays.asList(results)); 460 } 461 462 @Override 463 public void onError(Throwable error) { 464 future.completeExceptionally(error); 465 } 466 467 @Override 468 public void onComplete() { 469 future.complete(scanResults); 470 } 471 }); 472 return future; 473 } 474 475 @Override 476 public List<CompletableFuture<Result>> get(List<Get> gets) { 477 return batch(gets, readRpcTimeoutNs); 478 } 479 480 @Override 481 public List<CompletableFuture<Void>> put(List<Put> puts) { 482 return voidMutate(puts); 483 } 484 485 @Override 486 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 487 return voidMutate(deletes); 488 } 489 490 @Override 491 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 492 return batch(actions, rpcTimeoutNs); 493 } 494 495 private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { 496 return this.<Object> batch(actions, writeRpcTimeoutNs).stream() 497 .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); 498 } 499 500 private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) { 501 return conn.callerFactory.batch().table(tableName).actions(actions) 502 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 503 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 504 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); 505 } 506 507 @Override 508 public long getRpcTimeout(TimeUnit unit) { 509 return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); 510 } 511 512 @Override 513 public long getReadRpcTimeout(TimeUnit unit) { 514 return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); 515 } 516 517 @Override 518 public long getWriteRpcTimeout(TimeUnit unit) { 519 return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); 520 } 521 522 @Override 523 public long getOperationTimeout(TimeUnit unit) { 524 return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); 525 } 526 527 @Override 528 public long getScanTimeout(TimeUnit unit) { 529 return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); 530 } 531 532 private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 533 ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { 534 RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, 535 region, row, rpcTimeoutNs, operationTimeoutNs); 536 S stub = stubMaker.apply(channel); 537 CompletableFuture<R> future = new CompletableFuture<>(); 538 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 539 callable.call(stub, controller, resp -> { 540 if (controller.failed()) { 541 future.completeExceptionally(controller.getFailed()); 542 } else { 543 future.complete(resp); 544 } 545 }); 546 return future; 547 } 548 549 @Override 550 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 551 ServiceCaller<S, R> callable, byte[] row) { 552 return coprocessorService(stubMaker, callable, null, row); 553 } 554 555 private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) { 556 if (isEmptyStopRow(endKey)) { 557 if (isEmptyStopRow(region.getEndKey())) { 558 return true; 559 } 560 return false; 561 } else { 562 if (isEmptyStopRow(region.getEndKey())) { 563 return true; 564 } 565 int c = Bytes.compareTo(endKey, region.getEndKey()); 566 // 1. if the region contains endKey 567 // 2. endKey is equal to the region's endKey and we do not want to include endKey. 568 return c < 0 || c == 0 && !endKeyInclusive; 569 } 570 } 571 572 private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, 573 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, 574 byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, 575 AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { 576 if (error != null) { 577 callback.onError(error); 578 return; 579 } 580 unfinishedRequest.incrementAndGet(); 581 RegionInfo region = loc.getRegion(); 582 if (locateFinished(region, endKey, endKeyInclusive)) { 583 locateFinished.set(true); 584 } else { 585 addListener( 586 conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, 587 operationTimeoutNs), 588 (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, 589 locateFinished, unfinishedRequest, l, e)); 590 } 591 addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { 592 if (e != null) { 593 callback.onRegionError(region, e); 594 } else { 595 callback.onRegionComplete(region, r); 596 } 597 if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { 598 callback.onComplete(); 599 } 600 }); 601 } 602 603 private final class CoprocessorServiceBuilderImpl<S, R> 604 implements CoprocessorServiceBuilder<S, R> { 605 606 private final Function<RpcChannel, S> stubMaker; 607 608 private final ServiceCaller<S, R> callable; 609 610 private final CoprocessorCallback<R> callback; 611 612 private byte[] startKey = HConstants.EMPTY_START_ROW; 613 614 private boolean startKeyInclusive; 615 616 private byte[] endKey = HConstants.EMPTY_END_ROW; 617 618 private boolean endKeyInclusive; 619 620 public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, 621 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { 622 this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); 623 this.callable = Preconditions.checkNotNull(callable, "callable is null"); 624 this.callback = Preconditions.checkNotNull(callback, "callback is null"); 625 } 626 627 @Override 628 public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { 629 this.startKey = Preconditions.checkNotNull(startKey, 630 "startKey is null. Consider using" + 631 " an empty byte array, or just do not call this method if you want to start selection" + 632 " from the first region"); 633 this.startKeyInclusive = inclusive; 634 return this; 635 } 636 637 @Override 638 public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { 639 this.endKey = Preconditions.checkNotNull(endKey, 640 "endKey is null. Consider using" + 641 " an empty byte array, or just do not call this method if you want to continue" + 642 " selection to the last region"); 643 this.endKeyInclusive = inclusive; 644 return this; 645 } 646 647 @Override 648 public void execute() { 649 addListener(conn.getLocator().getRegionLocation(tableName, startKey, 650 startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs), 651 (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, 652 endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); 653 } 654 } 655 656 @Override 657 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 658 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 659 CoprocessorCallback<R> callback) { 660 return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); 661 } 662}