001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; 025import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 026 027import com.google.protobuf.RpcChannel; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.concurrent.CompletableFuture; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.function.Function; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.CompareOperator; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; 043import org.apache.hadoop.hbase.filter.Filter; 044import org.apache.hadoop.hbase.io.TimeRange; 045import org.apache.hadoop.hbase.ipc.HBaseRpcController; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.ReflectionUtils; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 053import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 054import org.apache.hbase.thirdparty.io.netty.util.Timer; 055 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 058import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 067 068/** 069 * The implementation of RawAsyncTable. 070 * <p/> 071 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 072 * be finished inside the rpc framework thread, which means that the callbacks registered to the 073 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 074 * this class should not try to do time consuming tasks in the callbacks. 075 * @since 2.0.0 076 * @see AsyncTableImpl 077 */ 078@InterfaceAudience.Private 079class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { 080 081 private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); 082 083 private final AsyncConnectionImpl conn; 084 085 private final Timer retryTimer; 086 087 private final TableName tableName; 088 089 private final int defaultScannerCaching; 090 091 private final long defaultScannerMaxResultSize; 092 093 private final long rpcTimeoutNs; 094 095 private final long readRpcTimeoutNs; 096 097 private final long writeRpcTimeoutNs; 098 099 private final long operationTimeoutNs; 100 101 private final long scanTimeoutNs; 102 103 private final long pauseNs; 104 105 private final long pauseForCQTBENs; 106 107 private final int maxAttempts; 108 109 private final int startLogErrorsCnt; 110 111 RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) { 112 this.conn = conn; 113 this.retryTimer = retryTimer; 114 this.tableName = builder.tableName; 115 this.rpcTimeoutNs = builder.rpcTimeoutNs; 116 this.readRpcTimeoutNs = builder.readRpcTimeoutNs; 117 this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs; 118 this.operationTimeoutNs = builder.operationTimeoutNs; 119 this.scanTimeoutNs = builder.scanTimeoutNs; 120 this.pauseNs = builder.pauseNs; 121 if (builder.pauseForCQTBENs < builder.pauseNs) { 122 LOG.warn( 123 "Configured value of pauseForCQTBENs is {} ms, which is less than" + 124 " the normal pause value {} ms, use the greater one instead", 125 TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), 126 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 127 this.pauseForCQTBENs = builder.pauseNs; 128 } else { 129 this.pauseForCQTBENs = builder.pauseForCQTBENs; 130 } 131 this.maxAttempts = builder.maxAttempts; 132 this.startLogErrorsCnt = builder.startLogErrorsCnt; 133 this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() 134 : conn.connConf.getScannerCaching(); 135 this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); 136 } 137 138 @Override 139 public TableName getName() { 140 return tableName; 141 } 142 143 @Override 144 public Configuration getConfiguration() { 145 return conn.getConfiguration(); 146 } 147 148 @Override 149 public CompletableFuture<TableDescriptor> getDescriptor() { 150 return conn.getAdmin().getDescriptor(tableName); 151 } 152 153 @Override 154 public AsyncTableRegionLocator getRegionLocator() { 155 return conn.getRegionLocator(tableName); 156 } 157 158 @FunctionalInterface 159 private interface Converter<D, I, S> { 160 D convert(I info, S src) throws IOException; 161 } 162 163 @FunctionalInterface 164 private interface RpcCall<RESP, REQ> { 165 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 166 RpcCallback<RESP> done); 167 } 168 169 private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call( 170 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 171 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 172 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 173 CompletableFuture<RESP> future = new CompletableFuture<>(); 174 try { 175 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 176 new RpcCallback<PRESP>() { 177 178 @Override 179 public void run(PRESP resp) { 180 if (controller.failed()) { 181 future.completeExceptionally(controller.getFailed()); 182 } else { 183 try { 184 future.complete(respConverter.convert(controller, resp)); 185 } catch (IOException e) { 186 future.completeExceptionally(e); 187 } 188 } 189 } 190 }); 191 } catch (IOException e) { 192 future.completeExceptionally(e); 193 } 194 return future; 195 } 196 197 private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller, 198 HRegionLocation loc, ClientService.Interface stub, REQ req, 199 Converter<MutateRequest, byte[], REQ> reqConvert, 200 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 201 return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), 202 respConverter); 203 } 204 205 private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, 206 HRegionLocation loc, ClientService.Interface stub, REQ req, 207 Converter<MutateRequest, byte[], REQ> reqConvert) { 208 return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { 209 return null; 210 }); 211 } 212 213 private static Result toResult(HBaseRpcController controller, MutateResponse resp) 214 throws IOException { 215 if (!resp.hasResult()) { 216 return null; 217 } 218 return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); 219 } 220 221 @FunctionalInterface 222 private interface NoncedConverter<D, I, S> { 223 D convert(I info, S src, long nonceGroup, long nonce) throws IOException; 224 } 225 226 private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, 227 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 228 NoncedConverter<MutateRequest, byte[], REQ> reqConvert, 229 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 230 return mutate(controller, loc, stub, req, 231 (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); 232 } 233 234 private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) { 235 return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority) 236 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 237 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 238 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) 239 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 240 } 241 242 private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller( 243 R row, long rpcTimeoutNs) { 244 return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); 245 } 246 247 private CompletableFuture<Result> get(Get get, int replicaId) { 248 return this.<Result, Get> newCaller(get, readRpcTimeoutNs) 249 .action((controller, loc, stub) -> RawAsyncTableImpl 250 .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get, 251 RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), 252 (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) 253 .replicaId(replicaId).call(); 254 } 255 256 @Override 257 public CompletableFuture<Result> get(Get get) { 258 return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), 259 RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, 260 conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()); 261 } 262 263 @Override 264 public CompletableFuture<Void> put(Put put) { 265 validatePut(put, conn.connConf.getMaxKeyValueSize()); 266 return this.<Void, Put> newCaller(put, writeRpcTimeoutNs) 267 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, 268 put, RequestConverter::buildMutateRequest)) 269 .call(); 270 } 271 272 @Override 273 public CompletableFuture<Void> delete(Delete delete) { 274 return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) 275 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, 276 stub, delete, RequestConverter::buildMutateRequest)) 277 .call(); 278 } 279 280 @Override 281 public CompletableFuture<Result> append(Append append) { 282 checkHasFamilies(append); 283 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 284 long nonce = conn.getNonceGenerator().newNonce(); 285 return this.<Result, Append> newCaller(append, rpcTimeoutNs) 286 .action( 287 (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller, 288 loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) 289 .call(); 290 } 291 292 @Override 293 public CompletableFuture<Result> increment(Increment increment) { 294 checkHasFamilies(increment); 295 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 296 long nonce = conn.getNonceGenerator().newNonce(); 297 return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) 298 .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, 299 controller, loc, stub, increment, RequestConverter::buildMutateRequest, 300 RawAsyncTableImpl::toResult)) 301 .call(); 302 } 303 304 private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 305 306 private final byte[] row; 307 308 private final byte[] family; 309 310 private byte[] qualifier; 311 312 private TimeRange timeRange; 313 314 private CompareOperator op; 315 316 private byte[] value; 317 318 public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 319 this.row = Preconditions.checkNotNull(row, "row is null"); 320 this.family = Preconditions.checkNotNull(family, "family is null"); 321 } 322 323 @Override 324 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 325 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + 326 " an empty byte array, or just do not call this method if you want a null qualifier"); 327 return this; 328 } 329 330 @Override 331 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 332 this.timeRange = timeRange; 333 return this; 334 } 335 336 @Override 337 public CheckAndMutateBuilder ifNotExists() { 338 this.op = CompareOperator.EQUAL; 339 this.value = null; 340 return this; 341 } 342 343 @Override 344 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 345 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 346 this.value = Preconditions.checkNotNull(value, "value is null"); 347 return this; 348 } 349 350 private void preCheck() { 351 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + 352 " calling ifNotExists/ifEquals/ifMatches before executing the request"); 353 } 354 355 @Override 356 public CompletableFuture<Boolean> thenPut(Put put) { 357 validatePut(put, conn.connConf.getMaxKeyValueSize()); 358 preCheck(); 359 return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 360 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, 361 stub, put, 362 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 363 null, timeRange, p), 364 (c, r) -> r.getProcessed())) 365 .call(); 366 } 367 368 @Override 369 public CompletableFuture<Boolean> thenDelete(Delete delete) { 370 preCheck(); 371 return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 372 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, 373 loc, stub, delete, 374 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 375 null, timeRange, d), 376 (c, r) -> r.getProcessed())) 377 .call(); 378 } 379 380 @Override 381 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 382 preCheck(); 383 return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(), 384 rpcTimeoutNs) 385 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, 386 loc, stub, mutation, 387 (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 388 null, timeRange, rm), 389 resp -> resp.getExists())) 390 .call(); 391 } 392 } 393 394 @Override 395 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 396 return new CheckAndMutateBuilderImpl(row, family); 397 } 398 399 400 private final class CheckAndMutateWithFilterBuilderImpl 401 implements CheckAndMutateWithFilterBuilder { 402 403 private final byte[] row; 404 405 private final Filter filter; 406 407 private TimeRange timeRange; 408 409 public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { 410 this.row = Preconditions.checkNotNull(row, "row is null"); 411 this.filter = Preconditions.checkNotNull(filter, "filter is null"); 412 } 413 414 @Override 415 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 416 this.timeRange = timeRange; 417 return this; 418 } 419 420 @Override 421 public CompletableFuture<Boolean> thenPut(Put put) { 422 validatePut(put, conn.connConf.getMaxKeyValueSize()); 423 return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 424 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, 425 stub, put, 426 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, 427 filter, timeRange, p), 428 (c, r) -> r.getProcessed())) 429 .call(); 430 } 431 432 @Override 433 public CompletableFuture<Boolean> thenDelete(Delete delete) { 434 return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 435 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, 436 loc, stub, delete, 437 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, 438 filter, timeRange, d), 439 (c, r) -> r.getProcessed())) 440 .call(); 441 } 442 443 @Override 444 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 445 return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(), 446 rpcTimeoutNs) 447 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, 448 loc, stub, mutation, 449 (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, 450 filter, timeRange, rm), 451 resp -> resp.getExists())) 452 .call(); 453 } 454 } 455 456 @Override 457 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 458 return new CheckAndMutateWithFilterBuilderImpl(row, filter); 459 } 460 461 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, 462 // so here I write a new method as I do not want to change the abstraction of call method. 463 private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, 464 HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, 465 Converter<MultiRequest, byte[], RowMutations> reqConvert, 466 Function<Result, RESP> respConverter) { 467 CompletableFuture<RESP> future = new CompletableFuture<>(); 468 try { 469 byte[] regionName = loc.getRegion().getRegionName(); 470 MultiRequest req = reqConvert.convert(regionName, mutation); 471 stub.multi(controller, req, new RpcCallback<MultiResponse>() { 472 473 @Override 474 public void run(MultiResponse resp) { 475 if (controller.failed()) { 476 future.completeExceptionally(controller.getFailed()); 477 } else { 478 try { 479 org.apache.hadoop.hbase.client.MultiResponse multiResp = 480 ResponseConverter.getResults(req, resp, controller.cellScanner()); 481 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 482 loc.getServerName(), multiResp); 483 Throwable ex = multiResp.getException(regionName); 484 if (ex != null) { 485 future.completeExceptionally(ex instanceof IOException ? ex 486 : new IOException( 487 "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); 488 } else { 489 future.complete(respConverter 490 .apply((Result) multiResp.getResults().get(regionName).result.get(0))); 491 } 492 } catch (IOException e) { 493 future.completeExceptionally(e); 494 } 495 } 496 } 497 }); 498 } catch (IOException e) { 499 future.completeExceptionally(e); 500 } 501 return future; 502 } 503 504 @Override 505 public CompletableFuture<Void> mutateRow(RowMutations mutation) { 506 return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs) 507 .action((controller, loc, stub) -> this.<Void> mutateRow(controller, loc, stub, mutation, 508 (rn, rm) -> { 509 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); 510 regionMutationBuilder.setAtomic(true); 511 return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); 512 }, resp -> null)) 513 .call(); 514 } 515 516 private Scan setDefaultScanConfig(Scan scan) { 517 // always create a new scan object as we may reset the start row later. 518 Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); 519 if (newScan.getCaching() <= 0) { 520 newScan.setCaching(defaultScannerCaching); 521 } 522 if (newScan.getMaxResultSize() <= 0) { 523 newScan.setMaxResultSize(defaultScannerMaxResultSize); 524 } 525 return newScan; 526 } 527 528 @Override 529 public void scan(Scan scan, AdvancedScanResultConsumer consumer) { 530 new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, 531 pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt) 532 .start(); 533 } 534 535 private long resultSize2CacheSize(long maxResultSize) { 536 // * 2 if possible 537 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 538 } 539 540 @Override 541 public ResultScanner getScanner(Scan scan) { 542 return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan), 543 resultSize2CacheSize( 544 scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); 545 } 546 547 @Override 548 public CompletableFuture<List<Result>> scanAll(Scan scan) { 549 CompletableFuture<List<Result>> future = new CompletableFuture<>(); 550 List<Result> scanResults = new ArrayList<>(); 551 scan(scan, new AdvancedScanResultConsumer() { 552 553 @Override 554 public void onNext(Result[] results, ScanController controller) { 555 scanResults.addAll(Arrays.asList(results)); 556 } 557 558 @Override 559 public void onError(Throwable error) { 560 future.completeExceptionally(error); 561 } 562 563 @Override 564 public void onComplete() { 565 future.complete(scanResults); 566 } 567 }); 568 return future; 569 } 570 571 @Override 572 public List<CompletableFuture<Result>> get(List<Get> gets) { 573 return batch(gets, readRpcTimeoutNs); 574 } 575 576 @Override 577 public List<CompletableFuture<Void>> put(List<Put> puts) { 578 return voidMutate(puts); 579 } 580 581 @Override 582 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 583 return voidMutate(deletes); 584 } 585 586 @Override 587 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 588 return batch(actions, rpcTimeoutNs); 589 } 590 591 private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { 592 return this.<Object> batch(actions, writeRpcTimeoutNs).stream() 593 .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); 594 } 595 596 private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) { 597 actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action) 598 .forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize())); 599 return conn.callerFactory.batch().table(tableName).actions(actions) 600 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 601 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 602 .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 603 .startLogErrorsCnt(startLogErrorsCnt).call(); 604 } 605 606 @Override 607 public long getRpcTimeout(TimeUnit unit) { 608 return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); 609 } 610 611 @Override 612 public long getReadRpcTimeout(TimeUnit unit) { 613 return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); 614 } 615 616 @Override 617 public long getWriteRpcTimeout(TimeUnit unit) { 618 return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); 619 } 620 621 @Override 622 public long getOperationTimeout(TimeUnit unit) { 623 return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); 624 } 625 626 @Override 627 public long getScanTimeout(TimeUnit unit) { 628 return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); 629 } 630 631 private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 632 ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { 633 RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, 634 region, row, rpcTimeoutNs, operationTimeoutNs); 635 S stub = stubMaker.apply(channel); 636 CompletableFuture<R> future = new CompletableFuture<>(); 637 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 638 callable.call(stub, controller, resp -> { 639 if (controller.failed()) { 640 future.completeExceptionally(controller.getFailed()); 641 } else { 642 future.complete(resp); 643 } 644 }); 645 return future; 646 } 647 648 @Override 649 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 650 ServiceCaller<S, R> callable, byte[] row) { 651 return coprocessorService(stubMaker, callable, null, row); 652 } 653 654 private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) { 655 if (isEmptyStopRow(endKey)) { 656 if (isEmptyStopRow(region.getEndKey())) { 657 return true; 658 } 659 return false; 660 } else { 661 if (isEmptyStopRow(region.getEndKey())) { 662 return true; 663 } 664 int c = Bytes.compareTo(endKey, region.getEndKey()); 665 // 1. if the region contains endKey 666 // 2. endKey is equal to the region's endKey and we do not want to include endKey. 667 return c < 0 || c == 0 && !endKeyInclusive; 668 } 669 } 670 671 private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, 672 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, 673 byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, 674 AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { 675 if (error != null) { 676 callback.onError(error); 677 return; 678 } 679 unfinishedRequest.incrementAndGet(); 680 RegionInfo region = loc.getRegion(); 681 if (locateFinished(region, endKey, endKeyInclusive)) { 682 locateFinished.set(true); 683 } else { 684 addListener( 685 conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, 686 operationTimeoutNs), 687 (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, 688 locateFinished, unfinishedRequest, l, e)); 689 } 690 addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { 691 if (e != null) { 692 callback.onRegionError(region, e); 693 } else { 694 callback.onRegionComplete(region, r); 695 } 696 if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { 697 callback.onComplete(); 698 } 699 }); 700 } 701 702 private final class CoprocessorServiceBuilderImpl<S, R> 703 implements CoprocessorServiceBuilder<S, R> { 704 705 private final Function<RpcChannel, S> stubMaker; 706 707 private final ServiceCaller<S, R> callable; 708 709 private final CoprocessorCallback<R> callback; 710 711 private byte[] startKey = HConstants.EMPTY_START_ROW; 712 713 private boolean startKeyInclusive; 714 715 private byte[] endKey = HConstants.EMPTY_END_ROW; 716 717 private boolean endKeyInclusive; 718 719 public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, 720 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { 721 this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); 722 this.callable = Preconditions.checkNotNull(callable, "callable is null"); 723 this.callback = Preconditions.checkNotNull(callback, "callback is null"); 724 } 725 726 @Override 727 public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { 728 this.startKey = Preconditions.checkNotNull(startKey, 729 "startKey is null. Consider using" + 730 " an empty byte array, or just do not call this method if you want to start selection" + 731 " from the first region"); 732 this.startKeyInclusive = inclusive; 733 return this; 734 } 735 736 @Override 737 public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { 738 this.endKey = Preconditions.checkNotNull(endKey, 739 "endKey is null. Consider using" + 740 " an empty byte array, or just do not call this method if you want to continue" + 741 " selection to the last region"); 742 this.endKeyInclusive = inclusive; 743 return this; 744 } 745 746 @Override 747 public void execute() { 748 addListener(conn.getLocator().getRegionLocation(tableName, startKey, 749 startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs), 750 (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, 751 endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); 752 } 753 } 754 755 @Override 756 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 757 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 758 CoprocessorCallback<R> callback) { 759 return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); 760 } 761}