001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; 025import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 026 027import com.google.protobuf.RpcChannel; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.concurrent.CompletableFuture; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.function.Function; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.CompareOperator; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HRegionLocation; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; 044import org.apache.hadoop.hbase.filter.Filter; 045import org.apache.hadoop.hbase.io.TimeRange; 046import org.apache.hadoop.hbase.ipc.HBaseRpcController; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.ReflectionUtils; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 054import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 055import org.apache.hbase.thirdparty.io.netty.util.Timer; 056 057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 058import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 059import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 068 069/** 070 * The implementation of RawAsyncTable. 071 * <p/> 072 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 073 * be finished inside the rpc framework thread, which means that the callbacks registered to the 074 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 075 * this class should not try to do time consuming tasks in the callbacks. 076 * @since 2.0.0 077 * @see AsyncTableImpl 078 */ 079@InterfaceAudience.Private 080class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { 081 082 private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); 083 084 private final AsyncConnectionImpl conn; 085 086 private final Timer retryTimer; 087 088 private final TableName tableName; 089 090 private final int defaultScannerCaching; 091 092 private final long defaultScannerMaxResultSize; 093 094 private final long rpcTimeoutNs; 095 096 private final long readRpcTimeoutNs; 097 098 private final long writeRpcTimeoutNs; 099 100 private final long operationTimeoutNs; 101 102 private final long scanTimeoutNs; 103 104 private final long pauseNs; 105 106 private final long pauseForCQTBENs; 107 108 private final int maxAttempts; 109 110 private final int startLogErrorsCnt; 111 112 RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) { 113 this.conn = conn; 114 this.retryTimer = retryTimer; 115 this.tableName = builder.tableName; 116 this.rpcTimeoutNs = builder.rpcTimeoutNs; 117 this.readRpcTimeoutNs = builder.readRpcTimeoutNs; 118 this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs; 119 this.operationTimeoutNs = builder.operationTimeoutNs; 120 this.scanTimeoutNs = builder.scanTimeoutNs; 121 this.pauseNs = builder.pauseNs; 122 if (builder.pauseForCQTBENs < builder.pauseNs) { 123 LOG.warn( 124 "Configured value of pauseForCQTBENs is {} ms, which is less than" + 125 " the normal pause value {} ms, use the greater one instead", 126 TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), 127 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 128 this.pauseForCQTBENs = builder.pauseNs; 129 } else { 130 this.pauseForCQTBENs = builder.pauseForCQTBENs; 131 } 132 this.maxAttempts = builder.maxAttempts; 133 this.startLogErrorsCnt = builder.startLogErrorsCnt; 134 this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() 135 : conn.connConf.getScannerCaching(); 136 this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); 137 } 138 139 @Override 140 public TableName getName() { 141 return tableName; 142 } 143 144 @Override 145 public Configuration getConfiguration() { 146 return conn.getConfiguration(); 147 } 148 149 @Override 150 public CompletableFuture<TableDescriptor> getDescriptor() { 151 return conn.getAdmin().getDescriptor(tableName); 152 } 153 154 @Override 155 public AsyncTableRegionLocator getRegionLocator() { 156 return conn.getRegionLocator(tableName); 157 } 158 159 @FunctionalInterface 160 private interface Converter<D, I, S> { 161 D convert(I info, S src) throws IOException; 162 } 163 164 @FunctionalInterface 165 private interface RpcCall<RESP, REQ> { 166 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 167 RpcCallback<RESP> done); 168 } 169 170 private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call( 171 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 172 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 173 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 174 CompletableFuture<RESP> future = new CompletableFuture<>(); 175 try { 176 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 177 new RpcCallback<PRESP>() { 178 179 @Override 180 public void run(PRESP resp) { 181 if (controller.failed()) { 182 future.completeExceptionally(controller.getFailed()); 183 } else { 184 try { 185 future.complete(respConverter.convert(controller, resp)); 186 } catch (IOException e) { 187 future.completeExceptionally(e); 188 } 189 } 190 } 191 }); 192 } catch (IOException e) { 193 future.completeExceptionally(e); 194 } 195 return future; 196 } 197 198 private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller, 199 HRegionLocation loc, ClientService.Interface stub, REQ req, 200 Converter<MutateRequest, byte[], REQ> reqConvert, 201 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 202 return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), 203 respConverter); 204 } 205 206 private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, 207 HRegionLocation loc, ClientService.Interface stub, REQ req, 208 Converter<MutateRequest, byte[], REQ> reqConvert) { 209 return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { 210 return null; 211 }); 212 } 213 214 private static Result toResult(HBaseRpcController controller, MutateResponse resp) 215 throws IOException { 216 if (!resp.hasResult()) { 217 return null; 218 } 219 return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); 220 } 221 222 @FunctionalInterface 223 private interface NoncedConverter<D, I, S> { 224 D convert(I info, S src, long nonceGroup, long nonce) throws IOException; 225 } 226 227 private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, 228 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 229 NoncedConverter<MutateRequest, byte[], REQ> reqConvert, 230 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 231 return mutate(controller, loc, stub, req, 232 (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); 233 } 234 235 private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) { 236 return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority) 237 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 238 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 239 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) 240 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 241 } 242 243 private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller( 244 R row, long rpcTimeoutNs) { 245 return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); 246 } 247 248 private CompletableFuture<Result> get(Get get, int replicaId) { 249 return this.<Result, Get> newCaller(get, readRpcTimeoutNs) 250 .action((controller, loc, stub) -> RawAsyncTableImpl 251 .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get, 252 RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), 253 (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) 254 .replicaId(replicaId).call(); 255 } 256 257 @Override 258 public CompletableFuture<Result> get(Get get) { 259 return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), 260 RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, 261 conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()); 262 } 263 264 @Override 265 public CompletableFuture<Void> put(Put put) { 266 validatePut(put, conn.connConf.getMaxKeyValueSize()); 267 return this.<Void, Put> newCaller(put, writeRpcTimeoutNs) 268 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, 269 put, RequestConverter::buildMutateRequest)) 270 .call(); 271 } 272 273 @Override 274 public CompletableFuture<Void> delete(Delete delete) { 275 return this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) 276 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, 277 stub, delete, RequestConverter::buildMutateRequest)) 278 .call(); 279 } 280 281 @Override 282 public CompletableFuture<Result> append(Append append) { 283 checkHasFamilies(append); 284 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 285 long nonce = conn.getNonceGenerator().newNonce(); 286 return this.<Result, Append> newCaller(append, rpcTimeoutNs) 287 .action( 288 (controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, controller, 289 loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) 290 .call(); 291 } 292 293 @Override 294 public CompletableFuture<Result> increment(Increment increment) { 295 checkHasFamilies(increment); 296 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 297 long nonce = conn.getNonceGenerator().newNonce(); 298 return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) 299 .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, 300 controller, loc, stub, increment, RequestConverter::buildMutateRequest, 301 RawAsyncTableImpl::toResult)) 302 .call(); 303 } 304 305 private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 306 307 private final byte[] row; 308 309 private final byte[] family; 310 311 private byte[] qualifier; 312 313 private TimeRange timeRange; 314 315 private CompareOperator op; 316 317 private byte[] value; 318 319 public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 320 this.row = Preconditions.checkNotNull(row, "row is null"); 321 this.family = Preconditions.checkNotNull(family, "family is null"); 322 } 323 324 @Override 325 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 326 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + 327 " an empty byte array, or just do not call this method if you want a null qualifier"); 328 return this; 329 } 330 331 @Override 332 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 333 this.timeRange = timeRange; 334 return this; 335 } 336 337 @Override 338 public CheckAndMutateBuilder ifNotExists() { 339 this.op = CompareOperator.EQUAL; 340 this.value = null; 341 return this; 342 } 343 344 @Override 345 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 346 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 347 this.value = Preconditions.checkNotNull(value, "value is null"); 348 return this; 349 } 350 351 private void preCheck() { 352 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + 353 " calling ifNotExists/ifEquals/ifMatches before executing the request"); 354 } 355 356 @Override 357 public CompletableFuture<Boolean> thenPut(Put put) { 358 validatePut(put, conn.connConf.getMaxKeyValueSize()); 359 preCheck(); 360 return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 361 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, 362 stub, put, 363 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 364 null, timeRange, p), 365 (c, r) -> r.getProcessed())) 366 .call(); 367 } 368 369 @Override 370 public CompletableFuture<Boolean> thenDelete(Delete delete) { 371 preCheck(); 372 return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 373 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, 374 loc, stub, delete, 375 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 376 null, timeRange, d), 377 (c, r) -> r.getProcessed())) 378 .call(); 379 } 380 381 @Override 382 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 383 preCheck(); 384 return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(), 385 rpcTimeoutNs) 386 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, 387 loc, stub, mutation, 388 (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 389 null, timeRange, rm), CheckAndMutateResult::isSuccess)) 390 .call(); 391 } 392 } 393 394 @Override 395 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 396 return new CheckAndMutateBuilderImpl(row, family); 397 } 398 399 private final class CheckAndMutateWithFilterBuilderImpl 400 implements CheckAndMutateWithFilterBuilder { 401 402 private final byte[] row; 403 404 private final Filter filter; 405 406 private TimeRange timeRange; 407 408 public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { 409 this.row = Preconditions.checkNotNull(row, "row is null"); 410 this.filter = Preconditions.checkNotNull(filter, "filter is null"); 411 } 412 413 @Override 414 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 415 this.timeRange = timeRange; 416 return this; 417 } 418 419 @Override 420 public CompletableFuture<Boolean> thenPut(Put put) { 421 validatePut(put, conn.connConf.getMaxKeyValueSize()); 422 return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 423 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, 424 stub, put, 425 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, 426 filter, timeRange, p), 427 (c, r) -> r.getProcessed())) 428 .call(); 429 } 430 431 @Override 432 public CompletableFuture<Boolean> thenDelete(Delete delete) { 433 return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 434 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, 435 loc, stub, delete, 436 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, 437 filter, timeRange, d), 438 (c, r) -> r.getProcessed())) 439 .call(); 440 } 441 442 @Override 443 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 444 return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(), 445 rpcTimeoutNs) 446 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, 447 loc, stub, mutation, 448 (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, 449 filter, timeRange, rm), CheckAndMutateResult::isSuccess)) 450 .call(); 451 } 452 } 453 454 @Override 455 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 456 return new CheckAndMutateWithFilterBuilderImpl(row, filter); 457 } 458 459 @Override 460 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 461 if (checkAndMutate.getAction() instanceof Put) { 462 validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); 463 } 464 if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete 465 || checkAndMutate.getAction() instanceof Increment 466 || checkAndMutate.getAction() instanceof Append) { 467 Mutation mutation = (Mutation) checkAndMutate.getAction(); 468 if (mutation instanceof Put) { 469 validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); 470 } 471 return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), 472 mutation.getPriority(), rpcTimeoutNs) 473 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, 474 loc, stub, mutation, 475 (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), 476 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 477 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), 478 checkAndMutate.getTimeRange(), m), 479 (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) 480 .call(); 481 } else if (checkAndMutate.getAction() instanceof RowMutations) { 482 RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); 483 return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), 484 rowMutations.getMaxPriority(), rpcTimeoutNs) 485 .action((controller, loc, stub) -> 486 RawAsyncTableImpl.this.<CheckAndMutateResult, CheckAndMutateResult> mutateRow( 487 controller, loc, stub, rowMutations, 488 (rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), 489 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 490 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), 491 checkAndMutate.getTimeRange(), rm), 492 resp -> resp)) 493 .call(); 494 } else { 495 CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>(); 496 future.completeExceptionally(new DoNotRetryIOException( 497 "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); 498 return future; 499 } 500 } 501 502 @Override 503 public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate( 504 List<CheckAndMutate> checkAndMutates) { 505 return batch(checkAndMutates, rpcTimeoutNs).stream() 506 .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()); 507 } 508 509 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, 510 // so here I write a new method as I do not want to change the abstraction of call method. 511 @SuppressWarnings("unchecked") 512 private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, 513 HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, 514 Converter<MultiRequest, byte[], RowMutations> reqConvert, 515 Function<RES, RESP> respConverter) { 516 CompletableFuture<RESP> future = new CompletableFuture<>(); 517 try { 518 byte[] regionName = loc.getRegion().getRegionName(); 519 MultiRequest req = reqConvert.convert(regionName, mutation); 520 stub.multi(controller, req, new RpcCallback<MultiResponse>() { 521 522 @Override 523 public void run(MultiResponse resp) { 524 if (controller.failed()) { 525 future.completeExceptionally(controller.getFailed()); 526 } else { 527 try { 528 org.apache.hadoop.hbase.client.MultiResponse multiResp = 529 ResponseConverter.getResults(req, resp, controller.cellScanner()); 530 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 531 loc.getServerName(), multiResp); 532 Throwable ex = multiResp.getException(regionName); 533 if (ex != null) { 534 future.completeExceptionally(ex instanceof IOException ? ex 535 : new IOException( 536 "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); 537 } else { 538 future.complete(respConverter 539 .apply((RES) multiResp.getResults().get(regionName).result.get(0))); 540 } 541 } catch (IOException e) { 542 future.completeExceptionally(e); 543 } 544 } 545 } 546 }); 547 } catch (IOException e) { 548 future.completeExceptionally(e); 549 } 550 return future; 551 } 552 553 @Override 554 public CompletableFuture<Result> mutateRow(RowMutations mutations) { 555 return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), 556 writeRpcTimeoutNs).action((controller, loc, stub) -> 557 this.<Result, Result> mutateRow(controller, loc, stub, mutations, 558 (rn, rm) -> { 559 RegionAction.Builder regionMutationBuilder = RequestConverter 560 .buildRegionAction(rn, rm); 561 regionMutationBuilder.setAtomic(true); 562 return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()) 563 .build(); 564 }, resp -> resp)) 565 .call(); 566 } 567 568 private Scan setDefaultScanConfig(Scan scan) { 569 // always create a new scan object as we may reset the start row later. 570 Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); 571 if (newScan.getCaching() <= 0) { 572 newScan.setCaching(defaultScannerCaching); 573 } 574 if (newScan.getMaxResultSize() <= 0) { 575 newScan.setMaxResultSize(defaultScannerMaxResultSize); 576 } 577 return newScan; 578 } 579 580 @Override 581 public void scan(Scan scan, AdvancedScanResultConsumer consumer) { 582 new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, 583 pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt) 584 .start(); 585 } 586 587 private long resultSize2CacheSize(long maxResultSize) { 588 // * 2 if possible 589 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 590 } 591 592 @Override 593 public ResultScanner getScanner(Scan scan) { 594 return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan), 595 resultSize2CacheSize( 596 scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); 597 } 598 599 @Override 600 public CompletableFuture<List<Result>> scanAll(Scan scan) { 601 CompletableFuture<List<Result>> future = new CompletableFuture<>(); 602 List<Result> scanResults = new ArrayList<>(); 603 scan(scan, new AdvancedScanResultConsumer() { 604 605 @Override 606 public void onNext(Result[] results, ScanController controller) { 607 scanResults.addAll(Arrays.asList(results)); 608 } 609 610 @Override 611 public void onError(Throwable error) { 612 future.completeExceptionally(error); 613 } 614 615 @Override 616 public void onComplete() { 617 future.complete(scanResults); 618 } 619 }); 620 return future; 621 } 622 623 @Override 624 public List<CompletableFuture<Result>> get(List<Get> gets) { 625 return batch(gets, readRpcTimeoutNs); 626 } 627 628 @Override 629 public List<CompletableFuture<Void>> put(List<Put> puts) { 630 return voidMutate(puts); 631 } 632 633 @Override 634 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 635 return voidMutate(deletes); 636 } 637 638 @Override 639 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 640 return batch(actions, rpcTimeoutNs); 641 } 642 643 private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { 644 return this.<Object> batch(actions, writeRpcTimeoutNs).stream() 645 .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); 646 } 647 648 private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) { 649 for (Row action : actions) { 650 if (action instanceof Put) { 651 validatePut((Put) action, conn.connConf.getMaxKeyValueSize()); 652 } else if (action instanceof CheckAndMutate) { 653 CheckAndMutate checkAndMutate = (CheckAndMutate) action; 654 if (checkAndMutate.getAction() instanceof Put) { 655 validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); 656 } 657 } 658 } 659 return conn.callerFactory.batch().table(tableName).actions(actions) 660 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 661 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 662 .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) 663 .startLogErrorsCnt(startLogErrorsCnt).call(); 664 } 665 666 @Override 667 public long getRpcTimeout(TimeUnit unit) { 668 return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); 669 } 670 671 @Override 672 public long getReadRpcTimeout(TimeUnit unit) { 673 return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); 674 } 675 676 @Override 677 public long getWriteRpcTimeout(TimeUnit unit) { 678 return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); 679 } 680 681 @Override 682 public long getOperationTimeout(TimeUnit unit) { 683 return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); 684 } 685 686 @Override 687 public long getScanTimeout(TimeUnit unit) { 688 return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); 689 } 690 691 private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 692 ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { 693 RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, 694 region, row, rpcTimeoutNs, operationTimeoutNs); 695 S stub = stubMaker.apply(channel); 696 CompletableFuture<R> future = new CompletableFuture<>(); 697 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 698 callable.call(stub, controller, resp -> { 699 if (controller.failed()) { 700 future.completeExceptionally(controller.getFailed()); 701 } else { 702 future.complete(resp); 703 } 704 }); 705 return future; 706 } 707 708 @Override 709 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 710 ServiceCaller<S, R> callable, byte[] row) { 711 return coprocessorService(stubMaker, callable, null, row); 712 } 713 714 private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) { 715 if (isEmptyStopRow(endKey)) { 716 if (isEmptyStopRow(region.getEndKey())) { 717 return true; 718 } 719 return false; 720 } else { 721 if (isEmptyStopRow(region.getEndKey())) { 722 return true; 723 } 724 int c = Bytes.compareTo(endKey, region.getEndKey()); 725 // 1. if the region contains endKey 726 // 2. endKey is equal to the region's endKey and we do not want to include endKey. 727 return c < 0 || c == 0 && !endKeyInclusive; 728 } 729 } 730 731 private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, 732 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, 733 byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, 734 AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { 735 if (error != null) { 736 callback.onError(error); 737 return; 738 } 739 unfinishedRequest.incrementAndGet(); 740 RegionInfo region = loc.getRegion(); 741 if (locateFinished(region, endKey, endKeyInclusive)) { 742 locateFinished.set(true); 743 } else { 744 addListener( 745 conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, 746 operationTimeoutNs), 747 (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, 748 locateFinished, unfinishedRequest, l, e)); 749 } 750 addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { 751 if (e != null) { 752 callback.onRegionError(region, e); 753 } else { 754 callback.onRegionComplete(region, r); 755 } 756 if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { 757 callback.onComplete(); 758 } 759 }); 760 } 761 762 private final class CoprocessorServiceBuilderImpl<S, R> 763 implements CoprocessorServiceBuilder<S, R> { 764 765 private final Function<RpcChannel, S> stubMaker; 766 767 private final ServiceCaller<S, R> callable; 768 769 private final CoprocessorCallback<R> callback; 770 771 private byte[] startKey = HConstants.EMPTY_START_ROW; 772 773 private boolean startKeyInclusive; 774 775 private byte[] endKey = HConstants.EMPTY_END_ROW; 776 777 private boolean endKeyInclusive; 778 779 public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, 780 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { 781 this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); 782 this.callable = Preconditions.checkNotNull(callable, "callable is null"); 783 this.callback = Preconditions.checkNotNull(callback, "callback is null"); 784 } 785 786 @Override 787 public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { 788 this.startKey = Preconditions.checkNotNull(startKey, 789 "startKey is null. Consider using" + 790 " an empty byte array, or just do not call this method if you want to start selection" + 791 " from the first region"); 792 this.startKeyInclusive = inclusive; 793 return this; 794 } 795 796 @Override 797 public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { 798 this.endKey = Preconditions.checkNotNull(endKey, 799 "endKey is null. Consider using" + 800 " an empty byte array, or just do not call this method if you want to continue" + 801 " selection to the last region"); 802 this.endKeyInclusive = inclusive; 803 return this; 804 } 805 806 @Override 807 public void execute() { 808 addListener(conn.getLocator().getRegionLocation(tableName, startKey, 809 startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs), 810 (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, 811 endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); 812 } 813 } 814 815 @Override 816 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 817 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 818 CoprocessorCallback<R> callback) { 819 return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); 820 } 821}