001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations; 026import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 027import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; 028import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 029 030import io.opentelemetry.api.trace.Span; 031import io.opentelemetry.api.trace.StatusCode; 032import io.opentelemetry.context.Scope; 033import java.io.IOException; 034import java.time.Duration; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.List; 038import java.util.Map; 039import java.util.concurrent.CompletableFuture; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicInteger; 043import java.util.function.Function; 044import java.util.function.Supplier; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.hbase.CompareOperator; 047import org.apache.hadoop.hbase.DoNotRetryIOException; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.HRegionLocation; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; 052import org.apache.hadoop.hbase.client.ConnectionUtils.Converter; 053import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 054import org.apache.hadoop.hbase.filter.Filter; 055import org.apache.hadoop.hbase.io.TimeRange; 056import org.apache.hadoop.hbase.ipc.HBaseRpcController; 057import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 058import org.apache.hadoop.hbase.trace.TraceUtil; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.ReflectionUtils; 061import org.apache.yetus.audience.InterfaceAudience; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 066import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 067import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 068import org.apache.hbase.thirdparty.io.netty.util.Timer; 069 070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 071import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 072import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 080 081/** 082 * The implementation of RawAsyncTable. 083 * <p/> 084 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 085 * be finished inside the rpc framework thread, which means that the callbacks registered to the 086 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 087 * this class should not try to do time consuming tasks in the callbacks. 088 * @since 2.0.0 089 * @see AsyncTableImpl 090 */ 091@InterfaceAudience.Private 092class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { 093 094 private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); 095 096 private final AsyncConnectionImpl conn; 097 098 private final Timer retryTimer; 099 100 private final TableName tableName; 101 102 private final int defaultScannerCaching; 103 104 private final long defaultScannerMaxResultSize; 105 106 private final long rpcTimeoutNs; 107 108 private final long readRpcTimeoutNs; 109 110 private final long writeRpcTimeoutNs; 111 112 private final long operationTimeoutNs; 113 114 private final long scanTimeoutNs; 115 116 private final long pauseNs; 117 118 private final long pauseNsForServerOverloaded; 119 120 private final int maxAttempts; 121 122 private final int startLogErrorsCnt; 123 124 private final Map<String, byte[]> requestAttributes; 125 126 RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) { 127 this.conn = conn; 128 this.retryTimer = retryTimer; 129 this.tableName = builder.tableName; 130 this.rpcTimeoutNs = builder.rpcTimeoutNs; 131 this.readRpcTimeoutNs = builder.readRpcTimeoutNs; 132 this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs; 133 this.operationTimeoutNs = builder.operationTimeoutNs; 134 this.scanTimeoutNs = builder.scanTimeoutNs; 135 this.pauseNs = builder.pauseNs; 136 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 137 LOG.warn( 138 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 139 + " the normal pause value {} ms, use the greater one instead", 140 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 141 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 142 this.pauseNsForServerOverloaded = builder.pauseNs; 143 } else { 144 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 145 } 146 this.maxAttempts = builder.maxAttempts; 147 this.startLogErrorsCnt = builder.startLogErrorsCnt; 148 this.defaultScannerCaching = tableName.isSystemTable() 149 ? conn.connConf.getMetaScannerCaching() 150 : conn.connConf.getScannerCaching(); 151 this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); 152 this.requestAttributes = builder.requestAttributes; 153 } 154 155 @Override 156 public TableName getName() { 157 return tableName; 158 } 159 160 @Override 161 public Configuration getConfiguration() { 162 return conn.getConfiguration(); 163 } 164 165 @Override 166 public CompletableFuture<TableDescriptor> getDescriptor() { 167 return conn.getAdmin().getDescriptor(tableName); 168 } 169 170 @Override 171 public AsyncTableRegionLocator getRegionLocator() { 172 return conn.getRegionLocator(tableName); 173 } 174 175 private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller, 176 HRegionLocation loc, ClientService.Interface stub, REQ req, 177 Converter<MutateRequest, byte[], REQ> reqConvert, 178 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 179 return ConnectionUtils.call(controller, loc, stub, req, reqConvert, 180 (s, c, r, done) -> s.mutate(c, r, done), respConverter); 181 } 182 183 private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, 184 HRegionLocation loc, ClientService.Interface stub, REQ req, 185 Converter<MutateRequest, byte[], REQ> reqConvert) { 186 return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { 187 return null; 188 }); 189 } 190 191 private static Result toResult(HBaseRpcController controller, MutateResponse resp) 192 throws IOException { 193 if (!resp.hasResult()) { 194 return null; 195 } 196 return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); 197 } 198 199 @FunctionalInterface 200 private interface NoncedConverter<D, I, S> { 201 D convert(I info, S src, long nonceGroup, long nonce) throws IOException; 202 } 203 204 private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, 205 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 206 NoncedConverter<MutateRequest, byte[], REQ> reqConvert, 207 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 208 return mutate(controller, loc, stub, req, 209 (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); 210 } 211 212 private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) { 213 return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority) 214 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 215 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 216 .pause(pauseNs, TimeUnit.NANOSECONDS) 217 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 218 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) 219 .setRequestAttributes(requestAttributes); 220 } 221 222 private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> 223 newCaller(R row, long rpcTimeoutNs) { 224 return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); 225 } 226 227 private CompletableFuture<Result> get(Get get, int replicaId) { 228 return this.<Result, Get> newCaller(get, readRpcTimeoutNs) 229 .action((controller, loc, stub) -> ConnectionUtils.<Get, GetRequest, GetResponse, 230 Result> call(controller, loc, stub, get, RequestConverter::buildGetRequest, 231 (s, c, req, done) -> s.get(c, req, done), 232 (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) 233 .replicaId(replicaId).call(); 234 } 235 236 private TableOperationSpanBuilder newTableOperationSpanBuilder() { 237 return new TableOperationSpanBuilder(conn).setTableName(tableName); 238 } 239 240 @Override 241 public CompletableFuture<Result> get(Get get) { 242 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(get); 243 return tracedFuture( 244 () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), 245 RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, 246 conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()), 247 supplier); 248 } 249 250 @Override 251 public CompletableFuture<Void> put(Put put) { 252 validatePut(put, conn.connConf.getMaxKeyValueSize()); 253 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(put); 254 return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs) 255 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, 256 put, RequestConverter::buildMutateRequest)) 257 .call(), supplier); 258 } 259 260 @Override 261 public CompletableFuture<Void> delete(Delete delete) { 262 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(delete); 263 return tracedFuture(() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) 264 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, 265 stub, delete, RequestConverter::buildMutateRequest)) 266 .call(), supplier); 267 } 268 269 @Override 270 public CompletableFuture<Result> append(Append append) { 271 checkHasFamilies(append); 272 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(append); 273 return tracedFuture(() -> { 274 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 275 long nonce = conn.getNonceGenerator().newNonce(); 276 return this.<Result, Append> newCaller(append, rpcTimeoutNs) 277 .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, 278 controller, loc, stub, append, RequestConverter::buildMutateRequest, 279 RawAsyncTableImpl::toResult)) 280 .call(); 281 }, supplier); 282 } 283 284 @Override 285 public CompletableFuture<Result> increment(Increment increment) { 286 checkHasFamilies(increment); 287 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(increment); 288 return tracedFuture(() -> { 289 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 290 long nonce = conn.getNonceGenerator().newNonce(); 291 return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) 292 .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, 293 controller, loc, stub, increment, RequestConverter::buildMutateRequest, 294 RawAsyncTableImpl::toResult)) 295 .call(); 296 }, supplier); 297 } 298 299 private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 300 301 private final byte[] row; 302 303 private final byte[] family; 304 305 private byte[] qualifier; 306 307 private TimeRange timeRange; 308 309 private CompareOperator op; 310 311 private byte[] value; 312 313 public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 314 this.row = Preconditions.checkNotNull(row, "row is null"); 315 this.family = Preconditions.checkNotNull(family, "family is null"); 316 } 317 318 @Override 319 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 320 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" 321 + " an empty byte array, or just do not call this method if you want a null qualifier"); 322 return this; 323 } 324 325 @Override 326 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 327 this.timeRange = timeRange; 328 return this; 329 } 330 331 @Override 332 public CheckAndMutateBuilder ifNotExists() { 333 this.op = CompareOperator.EQUAL; 334 this.value = null; 335 return this; 336 } 337 338 @Override 339 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 340 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 341 this.value = Preconditions.checkNotNull(value, "value is null"); 342 return this; 343 } 344 345 private void preCheck() { 346 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" 347 + " calling ifNotExists/ifEquals/ifMatches before executing the request"); 348 } 349 350 @Override 351 public CompletableFuture<Boolean> thenPut(Put put) { 352 validatePut(put, conn.connConf.getMaxKeyValueSize()); 353 preCheck(); 354 final Supplier<Span> supplier = newTableOperationSpanBuilder() 355 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 356 .setContainerOperations(put); 357 return tracedFuture( 358 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 359 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, 360 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 361 null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE, false), 362 (c, r) -> r.getProcessed())) 363 .call(), 364 supplier); 365 } 366 367 @Override 368 public CompletableFuture<Boolean> thenDelete(Delete delete) { 369 preCheck(); 370 final Supplier<Span> supplier = newTableOperationSpanBuilder() 371 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 372 .setContainerOperations(delete); 373 return tracedFuture( 374 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 375 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, 376 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 377 null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE, false), 378 (c, r) -> r.getProcessed())) 379 .call(), 380 supplier); 381 } 382 383 @Override 384 public CompletableFuture<Boolean> thenMutate(RowMutations mutations) { 385 preCheck(); 386 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 387 final Supplier<Span> supplier = newTableOperationSpanBuilder() 388 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 389 .setContainerOperations(mutations); 390 return tracedFuture(() -> RawAsyncTableImpl.this 391 .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) 392 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, 393 mutations, 394 (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, 395 null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE, false), 396 CheckAndMutateResult::isSuccess)) 397 .call(), supplier); 398 } 399 } 400 401 @Override 402 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 403 return new CheckAndMutateBuilderImpl(row, family); 404 } 405 406 private final class CheckAndMutateWithFilterBuilderImpl 407 implements CheckAndMutateWithFilterBuilder { 408 409 private final byte[] row; 410 411 private final Filter filter; 412 413 private TimeRange timeRange; 414 415 public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { 416 this.row = Preconditions.checkNotNull(row, "row is null"); 417 this.filter = Preconditions.checkNotNull(filter, "filter is null"); 418 } 419 420 @Override 421 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 422 this.timeRange = timeRange; 423 return this; 424 } 425 426 @Override 427 public CompletableFuture<Boolean> thenPut(Put put) { 428 validatePut(put, conn.connConf.getMaxKeyValueSize()); 429 final Supplier<Span> supplier = newTableOperationSpanBuilder() 430 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 431 .setContainerOperations(put); 432 return tracedFuture( 433 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 434 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, 435 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, 436 timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE, false), 437 (c, r) -> r.getProcessed())) 438 .call(), 439 supplier); 440 } 441 442 @Override 443 public CompletableFuture<Boolean> thenDelete(Delete delete) { 444 final Supplier<Span> supplier = newTableOperationSpanBuilder() 445 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 446 .setContainerOperations(delete); 447 return tracedFuture( 448 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 449 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, 450 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, 451 timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE, false), 452 (c, r) -> r.getProcessed())) 453 .call(), 454 supplier); 455 } 456 457 @Override 458 public CompletableFuture<Boolean> thenMutate(RowMutations mutations) { 459 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 460 final Supplier<Span> supplier = newTableOperationSpanBuilder() 461 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 462 .setContainerOperations(mutations); 463 return tracedFuture(() -> RawAsyncTableImpl.this 464 .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) 465 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, 466 mutations, 467 (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter, 468 timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE, false), 469 CheckAndMutateResult::isSuccess)) 470 .call(), supplier); 471 } 472 } 473 474 @Override 475 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 476 return new CheckAndMutateWithFilterBuilderImpl(row, filter); 477 } 478 479 @Override 480 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 481 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutate) 482 .setContainerOperations(checkAndMutate.getAction()); 483 return tracedFuture(() -> { 484 if ( 485 checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete 486 || checkAndMutate.getAction() instanceof Increment 487 || checkAndMutate.getAction() instanceof Append 488 ) { 489 Mutation mutation = (Mutation) checkAndMutate.getAction(); 490 if (mutation instanceof Put) { 491 validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); 492 } 493 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 494 long nonce = conn.getNonceGenerator().newNonce(); 495 return RawAsyncTableImpl.this 496 .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(), 497 rpcTimeoutNs) 498 .action( 499 (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation, 500 (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), 501 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 502 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 503 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce, 504 checkAndMutate.isQueryMetricsEnabled()), 505 (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) 506 .call(); 507 } else if (checkAndMutate.getAction() instanceof RowMutations) { 508 RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); 509 validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize()); 510 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 511 long nonce = conn.getNonceGenerator().newNonce(); 512 return RawAsyncTableImpl.this 513 .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(), 514 rpcTimeoutNs) 515 .action((controller, loc, stub) -> RawAsyncTableImpl.this.<CheckAndMutateResult, 516 CheckAndMutateResult> mutateRow(controller, loc, stub, rowMutations, 517 (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(), 518 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 519 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 520 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce, 521 checkAndMutate.isQueryMetricsEnabled()), 522 resp -> resp)) 523 .call(); 524 } else { 525 CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>(); 526 future.completeExceptionally(new DoNotRetryIOException( 527 "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); 528 return future; 529 } 530 }, supplier); 531 } 532 533 @Override 534 public List<CompletableFuture<CheckAndMutateResult>> 535 checkAndMutate(List<CheckAndMutate> checkAndMutates) { 536 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutates) 537 .setContainerOperations(checkAndMutates); 538 return tracedFutures(() -> batch(checkAndMutates, rpcTimeoutNs).stream() 539 .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), supplier); 540 } 541 542 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, 543 // so here I write a new method as I do not want to change the abstraction of call method. 544 @SuppressWarnings("unchecked") 545 private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, 546 HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, 547 Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) { 548 CompletableFuture<RESP> future = new CompletableFuture<>(); 549 try { 550 byte[] regionName = loc.getRegion().getRegionName(); 551 MultiRequest req = reqConvert.convert(regionName, mutation); 552 stub.multi(controller, req, new RpcCallback<MultiResponse>() { 553 554 @Override 555 public void run(MultiResponse resp) { 556 if (controller.failed()) { 557 future.completeExceptionally(controller.getFailed()); 558 } else { 559 try { 560 org.apache.hadoop.hbase.client.MultiResponse multiResp = 561 ResponseConverter.getResults(req, resp, controller.cellScanner()); 562 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 563 loc.getServerName(), multiResp); 564 Throwable ex = multiResp.getException(regionName); 565 if (ex != null) { 566 future.completeExceptionally(ex instanceof IOException 567 ? ex 568 : new IOException( 569 "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); 570 } else { 571 future.complete( 572 respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0))); 573 } 574 } catch (IOException e) { 575 future.completeExceptionally(e); 576 } 577 } 578 } 579 }); 580 } catch (IOException e) { 581 future.completeExceptionally(e); 582 } 583 return future; 584 } 585 586 @Override 587 public CompletableFuture<Result> mutateRow(RowMutations mutations) { 588 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 589 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 590 long nonce = conn.getNonceGenerator().newNonce(); 591 final Supplier<Span> supplier = 592 newTableOperationSpanBuilder().setOperation(mutations).setContainerOperations(mutations); 593 return tracedFuture( 594 () -> this 595 .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) 596 .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub, 597 mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), 598 resp -> resp)) 599 .call(), 600 supplier); 601 } 602 603 private Scan setDefaultScanConfig(Scan scan) { 604 // always create a new scan object as we may reset the start row later. 605 Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); 606 if (newScan.getCaching() <= 0) { 607 newScan.setCaching(defaultScannerCaching); 608 } 609 if (newScan.getMaxResultSize() <= 0) { 610 newScan.setMaxResultSize(defaultScannerMaxResultSize); 611 } 612 return newScan; 613 } 614 615 @Override 616 public void scan(Scan scan, AdvancedScanResultConsumer consumer) { 617 new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, 618 pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, 619 startLogErrorsCnt, requestAttributes).start(); 620 } 621 622 private long resultSize2CacheSize(long maxResultSize) { 623 // * 2 if possible 624 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 625 } 626 627 @Override 628 public AsyncTableResultScanner getScanner(Scan scan) { 629 final long maxCacheSize = resultSize2CacheSize( 630 scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize); 631 final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan); 632 final AsyncTableResultScanner scanner = 633 new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize); 634 scan(scan, scanner); 635 return scanner; 636 } 637 638 @Override 639 public CompletableFuture<List<Result>> scanAll(Scan scan) { 640 CompletableFuture<List<Result>> future = new CompletableFuture<>(); 641 List<Result> scanResults = new ArrayList<>(); 642 scan(scan, new AdvancedScanResultConsumer() { 643 644 @Override 645 public void onNext(Result[] results, ScanController controller) { 646 scanResults.addAll(Arrays.asList(results)); 647 } 648 649 @Override 650 public void onError(Throwable error) { 651 future.completeExceptionally(error); 652 } 653 654 @Override 655 public void onComplete() { 656 future.complete(scanResults); 657 } 658 }); 659 return future; 660 } 661 662 @Override 663 public List<CompletableFuture<Result>> get(List<Get> gets) { 664 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(gets) 665 .setContainerOperations(HBaseSemanticAttributes.Operation.GET); 666 return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier); 667 } 668 669 @Override 670 public List<CompletableFuture<Void>> put(List<Put> puts) { 671 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(puts) 672 .setContainerOperations(HBaseSemanticAttributes.Operation.PUT); 673 return tracedFutures(() -> voidMutate(puts), supplier); 674 } 675 676 @Override 677 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 678 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(deletes) 679 .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE); 680 return tracedFutures(() -> voidMutate(deletes), supplier); 681 } 682 683 @Override 684 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 685 final Supplier<Span> supplier = 686 newTableOperationSpanBuilder().setOperation(actions).setContainerOperations(actions); 687 return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier); 688 } 689 690 private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { 691 return this.<Object> batch(actions, writeRpcTimeoutNs).stream() 692 .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); 693 } 694 695 private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) { 696 for (Row action : actions) { 697 if (action instanceof Put) { 698 validatePut((Put) action, conn.connConf.getMaxKeyValueSize()); 699 } else if (action instanceof CheckAndMutate) { 700 CheckAndMutate checkAndMutate = (CheckAndMutate) action; 701 if (checkAndMutate.getAction() instanceof Put) { 702 validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); 703 } else if (checkAndMutate.getAction() instanceof RowMutations) { 704 validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(), 705 conn.connConf.getMaxKeyValueSize()); 706 } 707 } else if (action instanceof RowMutations) { 708 validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize()); 709 } 710 } 711 return conn.callerFactory.batch().table(tableName).actions(actions) 712 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 713 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 714 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 715 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) 716 .setRequestAttributes(requestAttributes).call(); 717 } 718 719 @Override 720 public long getRpcTimeout(TimeUnit unit) { 721 return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); 722 } 723 724 @Override 725 public long getReadRpcTimeout(TimeUnit unit) { 726 return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); 727 } 728 729 @Override 730 public long getWriteRpcTimeout(TimeUnit unit) { 731 return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); 732 } 733 734 @Override 735 public long getOperationTimeout(TimeUnit unit) { 736 return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); 737 } 738 739 @Override 740 public long getScanTimeout(TimeUnit unit) { 741 return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); 742 } 743 744 @Override 745 public Map<String, byte[]> getRequestAttributes() { 746 return requestAttributes; 747 } 748 749 private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 750 ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { 751 RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, 752 region, row, rpcTimeoutNs, operationTimeoutNs); 753 final Span span = Span.current(); 754 S stub = stubMaker.apply(channel); 755 CompletableFuture<R> future = new CompletableFuture<>(); 756 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 757 callable.call(stub, controller, resp -> { 758 try (Scope ignored = span.makeCurrent()) { 759 if (controller.failed()) { 760 final Throwable failure = controller.getFailed(); 761 future.completeExceptionally(failure); 762 TraceUtil.setError(span, failure); 763 } else { 764 future.complete(resp); 765 span.setStatus(StatusCode.OK); 766 } 767 } finally { 768 span.end(); 769 } 770 }); 771 return future; 772 } 773 774 @Override 775 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 776 ServiceCaller<S, R> callable, byte[] row) { 777 return coprocessorService(stubMaker, callable, null, row); 778 } 779 780 private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) { 781 if (isEmptyStopRow(endKey)) { 782 if (isEmptyStopRow(region.getEndKey())) { 783 return true; 784 } 785 return false; 786 } else { 787 if (isEmptyStopRow(region.getEndKey())) { 788 return true; 789 } 790 int c = Bytes.compareTo(endKey, region.getEndKey()); 791 // 1. if the region contains endKey 792 // 2. endKey is equal to the region's endKey and we do not want to include endKey. 793 return c < 0 || (c == 0 && !endKeyInclusive); 794 } 795 } 796 797 private <S, R> void coprocessorServiceUntilComplete(Function<RpcChannel, S> stubMaker, 798 ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> callback, 799 AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, RegionInfo region, Span span) { 800 addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { 801 try (Scope ignored = span.makeCurrent()) { 802 if (e != null) { 803 callback.onRegionError(region, e); 804 } else { 805 callback.onRegionComplete(region, r); 806 } 807 808 ServiceCaller<S, R> updatedCallable; 809 if (e == null && r != null) { 810 updatedCallable = callback.getNextCallable(r, region); 811 } else { 812 updatedCallable = null; 813 } 814 815 // If updatedCallable is non-null, we will be sending another request, so no need to 816 // decrement unfinishedRequest (recall that && short-circuits). 817 // If updatedCallable is null, and unfinishedRequest decrements to 0, we're done with the 818 // requests for this coprocessor call. 819 if ( 820 updatedCallable == null && unfinishedRequest.decrementAndGet() == 0 821 && locateFinished.get() 822 ) { 823 callback.onComplete(); 824 } else if (updatedCallable != null) { 825 Duration waitInterval = callback.getWaitInterval(r, region); 826 LOG.trace("Coprocessor returned incomplete result. " 827 + "Sleeping for {} before making follow-up request.", waitInterval); 828 if (waitInterval.isZero()) { 829 AsyncConnectionImpl.RETRY_TIMER.newTimeout( 830 (timeout) -> coprocessorServiceUntilComplete(stubMaker, updatedCallable, callback, 831 locateFinished, unfinishedRequest, region, span), 832 waitInterval.toMillis(), TimeUnit.MILLISECONDS); 833 } else { 834 coprocessorServiceUntilComplete(stubMaker, updatedCallable, callback, locateFinished, 835 unfinishedRequest, region, span); 836 } 837 } 838 } 839 }); 840 } 841 842 private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, 843 ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> callback, byte[] endKey, 844 boolean endKeyInclusive, AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, 845 HRegionLocation loc, Throwable error) { 846 final Span span = Span.current(); 847 if (error != null) { 848 callback.onError(error); 849 TraceUtil.setError(span, error); 850 span.end(); 851 return; 852 } 853 unfinishedRequest.incrementAndGet(); 854 RegionInfo region = loc.getRegion(); 855 if (locateFinished(region, endKey, endKeyInclusive)) { 856 locateFinished.set(true); 857 } else { 858 addListener(conn.getLocator().getRegionLocation(tableName, region.getEndKey(), 859 RegionLocateType.CURRENT, operationTimeoutNs), (l, e) -> { 860 try (Scope ignored = span.makeCurrent()) { 861 onLocateComplete(stubMaker, callable, callback, endKey, endKeyInclusive, locateFinished, 862 unfinishedRequest, l, e); 863 } 864 }); 865 } 866 coprocessorServiceUntilComplete(stubMaker, callable, callback, locateFinished, 867 unfinishedRequest, region, span); 868 } 869 870 private final class CoprocessorServiceBuilderImpl<S, R> 871 implements CoprocessorServiceBuilder<S, R> { 872 873 private final Function<RpcChannel, S> stubMaker; 874 875 private final ServiceCaller<S, R> callable; 876 877 private final PartialResultCoprocessorCallback<S, R> callback; 878 879 private byte[] startKey = HConstants.EMPTY_START_ROW; 880 881 private boolean startKeyInclusive; 882 883 private byte[] endKey = HConstants.EMPTY_END_ROW; 884 885 private boolean endKeyInclusive; 886 887 public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, 888 ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> callback) { 889 this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); 890 this.callable = Preconditions.checkNotNull(callable, "callable is null"); 891 this.callback = Preconditions.checkNotNull(callback, "callback is null"); 892 } 893 894 @Override 895 public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { 896 this.startKey = Preconditions.checkNotNull(startKey, 897 "startKey is null. Consider using" 898 + " an empty byte array, or just do not call this method if you want to start selection" 899 + " from the first region"); 900 this.startKeyInclusive = inclusive; 901 return this; 902 } 903 904 @Override 905 public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { 906 this.endKey = Preconditions.checkNotNull(endKey, 907 "endKey is null. Consider using" 908 + " an empty byte array, or just do not call this method if you want to continue" 909 + " selection to the last region"); 910 this.endKeyInclusive = inclusive; 911 return this; 912 } 913 914 @Override 915 public void execute() { 916 final Span span = newTableOperationSpanBuilder() 917 .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC).build(); 918 try (Scope ignored = span.makeCurrent()) { 919 final RegionLocateType regionLocateType = 920 startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER; 921 final CompletableFuture<HRegionLocation> future = conn.getLocator() 922 .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs); 923 addListener(future, (loc, error) -> { 924 try (Scope ignored1 = span.makeCurrent()) { 925 onLocateComplete(stubMaker, callable, callback, endKey, endKeyInclusive, 926 new AtomicBoolean(false), new AtomicInteger(0), loc, error); 927 } 928 }); 929 } 930 } 931 } 932 933 @Override 934 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 935 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 936 CoprocessorCallback<R> callback) { 937 return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, 938 new NoopPartialResultCoprocessorCallback<>(callback)); 939 } 940 941 @Override 942 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 943 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 944 PartialResultCoprocessorCallback<S, R> callback) { 945 return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); 946 } 947}