001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations; 026import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 027import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; 028import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 029 030import com.google.protobuf.RpcChannel; 031import io.opentelemetry.api.trace.Span; 032import io.opentelemetry.api.trace.StatusCode; 033import io.opentelemetry.context.Scope; 034import java.io.IOException; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.List; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.function.Function; 043import java.util.function.Supplier; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.CompareOperator; 046import org.apache.hadoop.hbase.DoNotRetryIOException; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; 051import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 052import org.apache.hadoop.hbase.filter.Filter; 053import org.apache.hadoop.hbase.io.TimeRange; 054import org.apache.hadoop.hbase.ipc.HBaseRpcController; 055import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 056import org.apache.hadoop.hbase.trace.TraceUtil; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.ReflectionUtils; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 064import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 065import org.apache.hbase.thirdparty.io.netty.util.Timer; 066 067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 068import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 069import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 077 078/** 079 * The implementation of RawAsyncTable. 080 * <p/> 081 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 082 * be finished inside the rpc framework thread, which means that the callbacks registered to the 083 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 084 * this class should not try to do time consuming tasks in the callbacks. 085 * @since 2.0.0 086 * @see AsyncTableImpl 087 */ 088@InterfaceAudience.Private 089class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { 090 091 private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); 092 093 private final AsyncConnectionImpl conn; 094 095 private final Timer retryTimer; 096 097 private final TableName tableName; 098 099 private final int defaultScannerCaching; 100 101 private final long defaultScannerMaxResultSize; 102 103 private final long rpcTimeoutNs; 104 105 private final long readRpcTimeoutNs; 106 107 private final long writeRpcTimeoutNs; 108 109 private final long operationTimeoutNs; 110 111 private final long scanTimeoutNs; 112 113 private final long pauseNs; 114 115 private final long pauseNsForServerOverloaded; 116 117 private final int maxAttempts; 118 119 private final int startLogErrorsCnt; 120 121 RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) { 122 this.conn = conn; 123 this.retryTimer = retryTimer; 124 this.tableName = builder.tableName; 125 this.rpcTimeoutNs = builder.rpcTimeoutNs; 126 this.readRpcTimeoutNs = builder.readRpcTimeoutNs; 127 this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs; 128 this.operationTimeoutNs = builder.operationTimeoutNs; 129 this.scanTimeoutNs = builder.scanTimeoutNs; 130 this.pauseNs = builder.pauseNs; 131 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 132 LOG.warn( 133 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 134 + " the normal pause value {} ms, use the greater one instead", 135 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 136 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 137 this.pauseNsForServerOverloaded = builder.pauseNs; 138 } else { 139 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 140 } 141 this.maxAttempts = builder.maxAttempts; 142 this.startLogErrorsCnt = builder.startLogErrorsCnt; 143 this.defaultScannerCaching = tableName.isSystemTable() 144 ? conn.connConf.getMetaScannerCaching() 145 : conn.connConf.getScannerCaching(); 146 this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); 147 } 148 149 @Override 150 public TableName getName() { 151 return tableName; 152 } 153 154 @Override 155 public Configuration getConfiguration() { 156 return conn.getConfiguration(); 157 } 158 159 @Override 160 public CompletableFuture<TableDescriptor> getDescriptor() { 161 return conn.getAdmin().getDescriptor(tableName); 162 } 163 164 @Override 165 public AsyncTableRegionLocator getRegionLocator() { 166 return conn.getRegionLocator(tableName); 167 } 168 169 @FunctionalInterface 170 private interface Converter<D, I, S> { 171 D convert(I info, S src) throws IOException; 172 } 173 174 @FunctionalInterface 175 private interface RpcCall<RESP, REQ> { 176 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 177 RpcCallback<RESP> done); 178 } 179 180 private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call( 181 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 182 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 183 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 184 CompletableFuture<RESP> future = new CompletableFuture<>(); 185 try { 186 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 187 new RpcCallback<PRESP>() { 188 189 @Override 190 public void run(PRESP resp) { 191 if (controller.failed()) { 192 future.completeExceptionally(controller.getFailed()); 193 } else { 194 try { 195 future.complete(respConverter.convert(controller, resp)); 196 } catch (IOException e) { 197 future.completeExceptionally(e); 198 } 199 } 200 } 201 }); 202 } catch (IOException e) { 203 future.completeExceptionally(e); 204 } 205 return future; 206 } 207 208 private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller, 209 HRegionLocation loc, ClientService.Interface stub, REQ req, 210 Converter<MutateRequest, byte[], REQ> reqConvert, 211 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 212 return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), 213 respConverter); 214 } 215 216 private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, 217 HRegionLocation loc, ClientService.Interface stub, REQ req, 218 Converter<MutateRequest, byte[], REQ> reqConvert) { 219 return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { 220 return null; 221 }); 222 } 223 224 private static Result toResult(HBaseRpcController controller, MutateResponse resp) 225 throws IOException { 226 if (!resp.hasResult()) { 227 return null; 228 } 229 return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); 230 } 231 232 @FunctionalInterface 233 private interface NoncedConverter<D, I, S> { 234 D convert(I info, S src, long nonceGroup, long nonce) throws IOException; 235 } 236 237 private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, 238 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 239 NoncedConverter<MutateRequest, byte[], REQ> reqConvert, 240 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 241 return mutate(controller, loc, stub, req, 242 (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); 243 } 244 245 private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) { 246 return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority) 247 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 248 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 249 .pause(pauseNs, TimeUnit.NANOSECONDS) 250 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 251 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 252 } 253 254 private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> 255 newCaller(R row, long rpcTimeoutNs) { 256 return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); 257 } 258 259 private CompletableFuture<Result> get(Get get, int replicaId) { 260 return this.<Result, Get> newCaller(get, readRpcTimeoutNs) 261 .action((controller, loc, stub) -> RawAsyncTableImpl.<Get, GetRequest, GetResponse, 262 Result> call(controller, loc, stub, get, RequestConverter::buildGetRequest, 263 (s, c, req, done) -> s.get(c, req, done), 264 (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) 265 .replicaId(replicaId).call(); 266 } 267 268 private TableOperationSpanBuilder newTableOperationSpanBuilder() { 269 return new TableOperationSpanBuilder(conn).setTableName(tableName); 270 } 271 272 @Override 273 public CompletableFuture<Result> get(Get get) { 274 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(get); 275 return tracedFuture( 276 () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), 277 RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, 278 conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()), 279 supplier); 280 } 281 282 @Override 283 public CompletableFuture<Void> put(Put put) { 284 validatePut(put, conn.connConf.getMaxKeyValueSize()); 285 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(put); 286 return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs) 287 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, 288 put, RequestConverter::buildMutateRequest)) 289 .call(), supplier); 290 } 291 292 @Override 293 public CompletableFuture<Void> delete(Delete delete) { 294 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(delete); 295 return tracedFuture(() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) 296 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, 297 stub, delete, RequestConverter::buildMutateRequest)) 298 .call(), supplier); 299 } 300 301 @Override 302 public CompletableFuture<Result> append(Append append) { 303 checkHasFamilies(append); 304 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(append); 305 return tracedFuture(() -> { 306 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 307 long nonce = conn.getNonceGenerator().newNonce(); 308 return this.<Result, Append> newCaller(append, rpcTimeoutNs) 309 .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, 310 controller, loc, stub, append, RequestConverter::buildMutateRequest, 311 RawAsyncTableImpl::toResult)) 312 .call(); 313 }, supplier); 314 } 315 316 @Override 317 public CompletableFuture<Result> increment(Increment increment) { 318 checkHasFamilies(increment); 319 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(increment); 320 return tracedFuture(() -> { 321 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 322 long nonce = conn.getNonceGenerator().newNonce(); 323 return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) 324 .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, 325 controller, loc, stub, increment, RequestConverter::buildMutateRequest, 326 RawAsyncTableImpl::toResult)) 327 .call(); 328 }, supplier); 329 } 330 331 private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 332 333 private final byte[] row; 334 335 private final byte[] family; 336 337 private byte[] qualifier; 338 339 private TimeRange timeRange; 340 341 private CompareOperator op; 342 343 private byte[] value; 344 345 public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 346 this.row = Preconditions.checkNotNull(row, "row is null"); 347 this.family = Preconditions.checkNotNull(family, "family is null"); 348 } 349 350 @Override 351 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 352 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" 353 + " an empty byte array, or just do not call this method if you want a null qualifier"); 354 return this; 355 } 356 357 @Override 358 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 359 this.timeRange = timeRange; 360 return this; 361 } 362 363 @Override 364 public CheckAndMutateBuilder ifNotExists() { 365 this.op = CompareOperator.EQUAL; 366 this.value = null; 367 return this; 368 } 369 370 @Override 371 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 372 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 373 this.value = Preconditions.checkNotNull(value, "value is null"); 374 return this; 375 } 376 377 private void preCheck() { 378 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" 379 + " calling ifNotExists/ifEquals/ifMatches before executing the request"); 380 } 381 382 @Override 383 public CompletableFuture<Boolean> thenPut(Put put) { 384 validatePut(put, conn.connConf.getMaxKeyValueSize()); 385 preCheck(); 386 final Supplier<Span> supplier = newTableOperationSpanBuilder() 387 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 388 .setContainerOperations(put); 389 return tracedFuture( 390 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 391 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, 392 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 393 null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), 394 (c, r) -> r.getProcessed())) 395 .call(), 396 supplier); 397 } 398 399 @Override 400 public CompletableFuture<Boolean> thenDelete(Delete delete) { 401 preCheck(); 402 final Supplier<Span> supplier = newTableOperationSpanBuilder() 403 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 404 .setContainerOperations(delete); 405 return tracedFuture( 406 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 407 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, 408 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 409 null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), 410 (c, r) -> r.getProcessed())) 411 .call(), 412 supplier); 413 } 414 415 @Override 416 public CompletableFuture<Boolean> thenMutate(RowMutations mutations) { 417 preCheck(); 418 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 419 final Supplier<Span> supplier = newTableOperationSpanBuilder() 420 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 421 .setContainerOperations(mutations); 422 return tracedFuture(() -> RawAsyncTableImpl.this 423 .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) 424 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, 425 mutations, 426 (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, 427 null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), 428 CheckAndMutateResult::isSuccess)) 429 .call(), supplier); 430 } 431 } 432 433 @Override 434 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 435 return new CheckAndMutateBuilderImpl(row, family); 436 } 437 438 private final class CheckAndMutateWithFilterBuilderImpl 439 implements CheckAndMutateWithFilterBuilder { 440 441 private final byte[] row; 442 443 private final Filter filter; 444 445 private TimeRange timeRange; 446 447 public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { 448 this.row = Preconditions.checkNotNull(row, "row is null"); 449 this.filter = Preconditions.checkNotNull(filter, "filter is null"); 450 } 451 452 @Override 453 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 454 this.timeRange = timeRange; 455 return this; 456 } 457 458 @Override 459 public CompletableFuture<Boolean> thenPut(Put put) { 460 validatePut(put, conn.connConf.getMaxKeyValueSize()); 461 final Supplier<Span> supplier = newTableOperationSpanBuilder() 462 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 463 .setContainerOperations(put); 464 return tracedFuture( 465 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 466 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, 467 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, 468 timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), 469 (c, r) -> r.getProcessed())) 470 .call(), 471 supplier); 472 } 473 474 @Override 475 public CompletableFuture<Boolean> thenDelete(Delete delete) { 476 final Supplier<Span> supplier = newTableOperationSpanBuilder() 477 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 478 .setContainerOperations(delete); 479 return tracedFuture( 480 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 481 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, 482 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, 483 timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), 484 (c, r) -> r.getProcessed())) 485 .call(), 486 supplier); 487 } 488 489 @Override 490 public CompletableFuture<Boolean> thenMutate(RowMutations mutations) { 491 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 492 final Supplier<Span> supplier = newTableOperationSpanBuilder() 493 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 494 .setContainerOperations(mutations); 495 return tracedFuture(() -> RawAsyncTableImpl.this 496 .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) 497 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, 498 mutations, 499 (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter, 500 timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), 501 CheckAndMutateResult::isSuccess)) 502 .call(), supplier); 503 } 504 } 505 506 @Override 507 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 508 return new CheckAndMutateWithFilterBuilderImpl(row, filter); 509 } 510 511 @Override 512 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 513 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutate) 514 .setContainerOperations(checkAndMutate.getAction()); 515 return tracedFuture(() -> { 516 if ( 517 checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete 518 || checkAndMutate.getAction() instanceof Increment 519 || checkAndMutate.getAction() instanceof Append 520 ) { 521 Mutation mutation = (Mutation) checkAndMutate.getAction(); 522 if (mutation instanceof Put) { 523 validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); 524 } 525 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 526 long nonce = conn.getNonceGenerator().newNonce(); 527 return RawAsyncTableImpl.this 528 .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(), 529 rpcTimeoutNs) 530 .action( 531 (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation, 532 (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), 533 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 534 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 535 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce), 536 (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) 537 .call(); 538 } else if (checkAndMutate.getAction() instanceof RowMutations) { 539 RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); 540 validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize()); 541 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 542 long nonce = conn.getNonceGenerator().newNonce(); 543 return RawAsyncTableImpl.this 544 .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(), 545 rpcTimeoutNs) 546 .action((controller, loc, stub) -> RawAsyncTableImpl.this.<CheckAndMutateResult, 547 CheckAndMutateResult> mutateRow(controller, loc, stub, rowMutations, 548 (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(), 549 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 550 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 551 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce), 552 resp -> resp)) 553 .call(); 554 } else { 555 CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>(); 556 future.completeExceptionally(new DoNotRetryIOException( 557 "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); 558 return future; 559 } 560 }, supplier); 561 } 562 563 @Override 564 public List<CompletableFuture<CheckAndMutateResult>> 565 checkAndMutate(List<CheckAndMutate> checkAndMutates) { 566 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutates) 567 .setContainerOperations(checkAndMutates); 568 return tracedFutures(() -> batch(checkAndMutates, rpcTimeoutNs).stream() 569 .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), supplier); 570 } 571 572 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, 573 // so here I write a new method as I do not want to change the abstraction of call method. 574 @SuppressWarnings("unchecked") 575 private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, 576 HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, 577 Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) { 578 CompletableFuture<RESP> future = new CompletableFuture<>(); 579 try { 580 byte[] regionName = loc.getRegion().getRegionName(); 581 MultiRequest req = reqConvert.convert(regionName, mutation); 582 stub.multi(controller, req, new RpcCallback<MultiResponse>() { 583 584 @Override 585 public void run(MultiResponse resp) { 586 if (controller.failed()) { 587 future.completeExceptionally(controller.getFailed()); 588 } else { 589 try { 590 org.apache.hadoop.hbase.client.MultiResponse multiResp = 591 ResponseConverter.getResults(req, resp, controller.cellScanner()); 592 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 593 loc.getServerName(), multiResp); 594 Throwable ex = multiResp.getException(regionName); 595 if (ex != null) { 596 future.completeExceptionally(ex instanceof IOException 597 ? ex 598 : new IOException( 599 "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); 600 } else { 601 future.complete( 602 respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0))); 603 } 604 } catch (IOException e) { 605 future.completeExceptionally(e); 606 } 607 } 608 } 609 }); 610 } catch (IOException e) { 611 future.completeExceptionally(e); 612 } 613 return future; 614 } 615 616 @Override 617 public CompletableFuture<Result> mutateRow(RowMutations mutations) { 618 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 619 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 620 long nonce = conn.getNonceGenerator().newNonce(); 621 final Supplier<Span> supplier = 622 newTableOperationSpanBuilder().setOperation(mutations).setContainerOperations(mutations); 623 return tracedFuture( 624 () -> this 625 .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) 626 .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub, 627 mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), 628 resp -> resp)) 629 .call(), 630 supplier); 631 } 632 633 private Scan setDefaultScanConfig(Scan scan) { 634 // always create a new scan object as we may reset the start row later. 635 Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); 636 if (newScan.getCaching() <= 0) { 637 newScan.setCaching(defaultScannerCaching); 638 } 639 if (newScan.getMaxResultSize() <= 0) { 640 newScan.setMaxResultSize(defaultScannerMaxResultSize); 641 } 642 return newScan; 643 } 644 645 @Override 646 public void scan(Scan scan, AdvancedScanResultConsumer consumer) { 647 new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, 648 pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, 649 startLogErrorsCnt).start(); 650 } 651 652 private long resultSize2CacheSize(long maxResultSize) { 653 // * 2 if possible 654 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 655 } 656 657 @Override 658 public AsyncTableResultScanner getScanner(Scan scan) { 659 final long maxCacheSize = resultSize2CacheSize( 660 scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize); 661 final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan); 662 final AsyncTableResultScanner scanner = 663 new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize); 664 scan(scan, scanner); 665 return scanner; 666 } 667 668 @Override 669 public CompletableFuture<List<Result>> scanAll(Scan scan) { 670 CompletableFuture<List<Result>> future = new CompletableFuture<>(); 671 List<Result> scanResults = new ArrayList<>(); 672 scan(scan, new AdvancedScanResultConsumer() { 673 674 @Override 675 public void onNext(Result[] results, ScanController controller) { 676 scanResults.addAll(Arrays.asList(results)); 677 } 678 679 @Override 680 public void onError(Throwable error) { 681 future.completeExceptionally(error); 682 } 683 684 @Override 685 public void onComplete() { 686 future.complete(scanResults); 687 } 688 }); 689 return future; 690 } 691 692 @Override 693 public List<CompletableFuture<Result>> get(List<Get> gets) { 694 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(gets) 695 .setContainerOperations(HBaseSemanticAttributes.Operation.GET); 696 return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier); 697 } 698 699 @Override 700 public List<CompletableFuture<Void>> put(List<Put> puts) { 701 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(puts) 702 .setContainerOperations(HBaseSemanticAttributes.Operation.PUT); 703 return tracedFutures(() -> voidMutate(puts), supplier); 704 } 705 706 @Override 707 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 708 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(deletes) 709 .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE); 710 return tracedFutures(() -> voidMutate(deletes), supplier); 711 } 712 713 @Override 714 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 715 final Supplier<Span> supplier = 716 newTableOperationSpanBuilder().setOperation(actions).setContainerOperations(actions); 717 return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier); 718 } 719 720 private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { 721 return this.<Object> batch(actions, writeRpcTimeoutNs).stream() 722 .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); 723 } 724 725 private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) { 726 for (Row action : actions) { 727 if (action instanceof Put) { 728 validatePut((Put) action, conn.connConf.getMaxKeyValueSize()); 729 } else if (action instanceof CheckAndMutate) { 730 CheckAndMutate checkAndMutate = (CheckAndMutate) action; 731 if (checkAndMutate.getAction() instanceof Put) { 732 validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); 733 } else if (checkAndMutate.getAction() instanceof RowMutations) { 734 validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(), 735 conn.connConf.getMaxKeyValueSize()); 736 } 737 } else if (action instanceof RowMutations) { 738 validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize()); 739 } 740 } 741 return conn.callerFactory.batch().table(tableName).actions(actions) 742 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 743 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 744 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 745 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); 746 } 747 748 @Override 749 public long getRpcTimeout(TimeUnit unit) { 750 return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); 751 } 752 753 @Override 754 public long getReadRpcTimeout(TimeUnit unit) { 755 return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); 756 } 757 758 @Override 759 public long getWriteRpcTimeout(TimeUnit unit) { 760 return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); 761 } 762 763 @Override 764 public long getOperationTimeout(TimeUnit unit) { 765 return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); 766 } 767 768 @Override 769 public long getScanTimeout(TimeUnit unit) { 770 return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); 771 } 772 773 private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 774 ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { 775 RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, 776 region, row, rpcTimeoutNs, operationTimeoutNs); 777 final Span span = Span.current(); 778 S stub = stubMaker.apply(channel); 779 CompletableFuture<R> future = new CompletableFuture<>(); 780 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 781 callable.call(stub, controller, resp -> { 782 try (Scope ignored = span.makeCurrent()) { 783 if (controller.failed()) { 784 final Throwable failure = controller.getFailed(); 785 future.completeExceptionally(failure); 786 TraceUtil.setError(span, failure); 787 } else { 788 future.complete(resp); 789 span.setStatus(StatusCode.OK); 790 } 791 } finally { 792 span.end(); 793 } 794 }); 795 return future; 796 } 797 798 @Override 799 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 800 ServiceCaller<S, R> callable, byte[] row) { 801 return coprocessorService(stubMaker, callable, null, row); 802 } 803 804 private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) { 805 if (isEmptyStopRow(endKey)) { 806 if (isEmptyStopRow(region.getEndKey())) { 807 return true; 808 } 809 return false; 810 } else { 811 if (isEmptyStopRow(region.getEndKey())) { 812 return true; 813 } 814 int c = Bytes.compareTo(endKey, region.getEndKey()); 815 // 1. if the region contains endKey 816 // 2. endKey is equal to the region's endKey and we do not want to include endKey. 817 return c < 0 || (c == 0 && !endKeyInclusive); 818 } 819 } 820 821 private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, 822 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, 823 byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, 824 AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { 825 final Span span = Span.current(); 826 if (error != null) { 827 callback.onError(error); 828 TraceUtil.setError(span, error); 829 span.end(); 830 return; 831 } 832 unfinishedRequest.incrementAndGet(); 833 RegionInfo region = loc.getRegion(); 834 if (locateFinished(region, endKey, endKeyInclusive)) { 835 locateFinished.set(true); 836 } else { 837 addListener(conn.getLocator().getRegionLocation(tableName, region.getEndKey(), 838 RegionLocateType.CURRENT, operationTimeoutNs), (l, e) -> { 839 try (Scope ignored = span.makeCurrent()) { 840 onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, 841 locateFinished, unfinishedRequest, l, e); 842 } 843 }); 844 } 845 addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { 846 try (Scope ignored = span.makeCurrent()) { 847 if (e != null) { 848 callback.onRegionError(region, e); 849 } else { 850 callback.onRegionComplete(region, r); 851 } 852 if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { 853 callback.onComplete(); 854 } 855 } 856 }); 857 } 858 859 private final class CoprocessorServiceBuilderImpl<S, R> 860 implements CoprocessorServiceBuilder<S, R> { 861 862 private final Function<RpcChannel, S> stubMaker; 863 864 private final ServiceCaller<S, R> callable; 865 866 private final CoprocessorCallback<R> callback; 867 868 private byte[] startKey = HConstants.EMPTY_START_ROW; 869 870 private boolean startKeyInclusive; 871 872 private byte[] endKey = HConstants.EMPTY_END_ROW; 873 874 private boolean endKeyInclusive; 875 876 public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, 877 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { 878 this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); 879 this.callable = Preconditions.checkNotNull(callable, "callable is null"); 880 this.callback = Preconditions.checkNotNull(callback, "callback is null"); 881 } 882 883 @Override 884 public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { 885 this.startKey = Preconditions.checkNotNull(startKey, 886 "startKey is null. Consider using" 887 + " an empty byte array, or just do not call this method if you want to start selection" 888 + " from the first region"); 889 this.startKeyInclusive = inclusive; 890 return this; 891 } 892 893 @Override 894 public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { 895 this.endKey = Preconditions.checkNotNull(endKey, 896 "endKey is null. Consider using" 897 + " an empty byte array, or just do not call this method if you want to continue" 898 + " selection to the last region"); 899 this.endKeyInclusive = inclusive; 900 return this; 901 } 902 903 @Override 904 public void execute() { 905 final Span span = newTableOperationSpanBuilder() 906 .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC).build(); 907 try (Scope ignored = span.makeCurrent()) { 908 final RegionLocateType regionLocateType = 909 startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER; 910 final CompletableFuture<HRegionLocation> future = conn.getLocator() 911 .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs); 912 addListener(future, (loc, error) -> { 913 try (Scope ignored1 = span.makeCurrent()) { 914 onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, 915 endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error); 916 } 917 }); 918 } 919 } 920 } 921 922 @Override 923 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 924 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 925 CoprocessorCallback<R> callback) { 926 return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); 927 } 928}