001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations; 026import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 027import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; 028import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 029 030import io.opentelemetry.api.trace.Span; 031import io.opentelemetry.api.trace.StatusCode; 032import io.opentelemetry.context.Scope; 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.List; 037import java.util.Map; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.function.Function; 043import java.util.function.Supplier; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.CompareOperator; 046import org.apache.hadoop.hbase.DoNotRetryIOException; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; 051import org.apache.hadoop.hbase.client.ConnectionUtils.Converter; 052import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 053import org.apache.hadoop.hbase.filter.Filter; 054import org.apache.hadoop.hbase.io.TimeRange; 055import org.apache.hadoop.hbase.ipc.HBaseRpcController; 056import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 057import org.apache.hadoop.hbase.trace.TraceUtil; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.ReflectionUtils; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 065import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 066import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 067import org.apache.hbase.thirdparty.io.netty.util.Timer; 068 069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 070import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 071import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 079 080/** 081 * The implementation of RawAsyncTable. 082 * <p/> 083 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 084 * be finished inside the rpc framework thread, which means that the callbacks registered to the 085 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 086 * this class should not try to do time consuming tasks in the callbacks. 087 * @since 2.0.0 088 * @see AsyncTableImpl 089 */ 090@InterfaceAudience.Private 091class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { 092 093 private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); 094 095 private final AsyncConnectionImpl conn; 096 097 private final Timer retryTimer; 098 099 private final TableName tableName; 100 101 private final int defaultScannerCaching; 102 103 private final long defaultScannerMaxResultSize; 104 105 private final long rpcTimeoutNs; 106 107 private final long readRpcTimeoutNs; 108 109 private final long writeRpcTimeoutNs; 110 111 private final long operationTimeoutNs; 112 113 private final long scanTimeoutNs; 114 115 private final long pauseNs; 116 117 private final long pauseNsForServerOverloaded; 118 119 private final int maxAttempts; 120 121 private final int startLogErrorsCnt; 122 123 private final Map<String, byte[]> requestAttributes; 124 125 RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) { 126 this.conn = conn; 127 this.retryTimer = retryTimer; 128 this.tableName = builder.tableName; 129 this.rpcTimeoutNs = builder.rpcTimeoutNs; 130 this.readRpcTimeoutNs = builder.readRpcTimeoutNs; 131 this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs; 132 this.operationTimeoutNs = builder.operationTimeoutNs; 133 this.scanTimeoutNs = builder.scanTimeoutNs; 134 this.pauseNs = builder.pauseNs; 135 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 136 LOG.warn( 137 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 138 + " the normal pause value {} ms, use the greater one instead", 139 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 140 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 141 this.pauseNsForServerOverloaded = builder.pauseNs; 142 } else { 143 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 144 } 145 this.maxAttempts = builder.maxAttempts; 146 this.startLogErrorsCnt = builder.startLogErrorsCnt; 147 this.defaultScannerCaching = tableName.isSystemTable() 148 ? conn.connConf.getMetaScannerCaching() 149 : conn.connConf.getScannerCaching(); 150 this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); 151 this.requestAttributes = builder.requestAttributes; 152 } 153 154 @Override 155 public TableName getName() { 156 return tableName; 157 } 158 159 @Override 160 public Configuration getConfiguration() { 161 return conn.getConfiguration(); 162 } 163 164 @Override 165 public CompletableFuture<TableDescriptor> getDescriptor() { 166 return conn.getAdmin().getDescriptor(tableName); 167 } 168 169 @Override 170 public AsyncTableRegionLocator getRegionLocator() { 171 return conn.getRegionLocator(tableName); 172 } 173 174 private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller, 175 HRegionLocation loc, ClientService.Interface stub, REQ req, 176 Converter<MutateRequest, byte[], REQ> reqConvert, 177 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 178 return ConnectionUtils.call(controller, loc, stub, req, reqConvert, 179 (s, c, r, done) -> s.mutate(c, r, done), respConverter); 180 } 181 182 private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, 183 HRegionLocation loc, ClientService.Interface stub, REQ req, 184 Converter<MutateRequest, byte[], REQ> reqConvert) { 185 return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { 186 return null; 187 }); 188 } 189 190 private static Result toResult(HBaseRpcController controller, MutateResponse resp) 191 throws IOException { 192 if (!resp.hasResult()) { 193 return null; 194 } 195 return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); 196 } 197 198 @FunctionalInterface 199 private interface NoncedConverter<D, I, S> { 200 D convert(I info, S src, long nonceGroup, long nonce) throws IOException; 201 } 202 203 private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, 204 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 205 NoncedConverter<MutateRequest, byte[], REQ> reqConvert, 206 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 207 return mutate(controller, loc, stub, req, 208 (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); 209 } 210 211 private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) { 212 return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority) 213 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 214 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 215 .pause(pauseNs, TimeUnit.NANOSECONDS) 216 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 217 .maxAttempts(maxAttempts).setRequestAttributes(requestAttributes) 218 .startLogErrorsCnt(startLogErrorsCnt).setRequestAttributes(requestAttributes); 219 } 220 221 private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> 222 newCaller(R row, long rpcTimeoutNs) { 223 return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); 224 } 225 226 private CompletableFuture<Result> get(Get get, int replicaId) { 227 return this.<Result, Get> newCaller(get, readRpcTimeoutNs) 228 .action((controller, loc, stub) -> ConnectionUtils.<Get, GetRequest, GetResponse, 229 Result> call(controller, loc, stub, get, RequestConverter::buildGetRequest, 230 (s, c, req, done) -> s.get(c, req, done), 231 (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) 232 .replicaId(replicaId).call(); 233 } 234 235 private TableOperationSpanBuilder newTableOperationSpanBuilder() { 236 return new TableOperationSpanBuilder(conn).setTableName(tableName); 237 } 238 239 @Override 240 public CompletableFuture<Result> get(Get get) { 241 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(get); 242 return tracedFuture( 243 () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), 244 RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, 245 conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()), 246 supplier); 247 } 248 249 @Override 250 public CompletableFuture<Void> put(Put put) { 251 validatePut(put, conn.connConf.getMaxKeyValueSize()); 252 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(put); 253 return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs) 254 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, 255 put, RequestConverter::buildMutateRequest)) 256 .call(), supplier); 257 } 258 259 @Override 260 public CompletableFuture<Void> delete(Delete delete) { 261 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(delete); 262 return tracedFuture(() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) 263 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, 264 stub, delete, RequestConverter::buildMutateRequest)) 265 .call(), supplier); 266 } 267 268 @Override 269 public CompletableFuture<Result> append(Append append) { 270 checkHasFamilies(append); 271 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(append); 272 return tracedFuture(() -> { 273 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 274 long nonce = conn.getNonceGenerator().newNonce(); 275 return this.<Result, Append> newCaller(append, rpcTimeoutNs) 276 .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, 277 controller, loc, stub, append, RequestConverter::buildMutateRequest, 278 RawAsyncTableImpl::toResult)) 279 .call(); 280 }, supplier); 281 } 282 283 @Override 284 public CompletableFuture<Result> increment(Increment increment) { 285 checkHasFamilies(increment); 286 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(increment); 287 return tracedFuture(() -> { 288 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 289 long nonce = conn.getNonceGenerator().newNonce(); 290 return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) 291 .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, 292 controller, loc, stub, increment, RequestConverter::buildMutateRequest, 293 RawAsyncTableImpl::toResult)) 294 .call(); 295 }, supplier); 296 } 297 298 private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 299 300 private final byte[] row; 301 302 private final byte[] family; 303 304 private byte[] qualifier; 305 306 private TimeRange timeRange; 307 308 private CompareOperator op; 309 310 private byte[] value; 311 312 public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 313 this.row = Preconditions.checkNotNull(row, "row is null"); 314 this.family = Preconditions.checkNotNull(family, "family is null"); 315 } 316 317 @Override 318 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 319 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" 320 + " an empty byte array, or just do not call this method if you want a null qualifier"); 321 return this; 322 } 323 324 @Override 325 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 326 this.timeRange = timeRange; 327 return this; 328 } 329 330 @Override 331 public CheckAndMutateBuilder ifNotExists() { 332 this.op = CompareOperator.EQUAL; 333 this.value = null; 334 return this; 335 } 336 337 @Override 338 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 339 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 340 this.value = Preconditions.checkNotNull(value, "value is null"); 341 return this; 342 } 343 344 private void preCheck() { 345 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" 346 + " calling ifNotExists/ifEquals/ifMatches before executing the request"); 347 } 348 349 @Override 350 public CompletableFuture<Boolean> thenPut(Put put) { 351 validatePut(put, conn.connConf.getMaxKeyValueSize()); 352 preCheck(); 353 final Supplier<Span> supplier = newTableOperationSpanBuilder() 354 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 355 .setContainerOperations(put); 356 return tracedFuture( 357 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 358 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, 359 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 360 null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), 361 (c, r) -> r.getProcessed())) 362 .call(), 363 supplier); 364 } 365 366 @Override 367 public CompletableFuture<Boolean> thenDelete(Delete delete) { 368 preCheck(); 369 final Supplier<Span> supplier = newTableOperationSpanBuilder() 370 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 371 .setContainerOperations(delete); 372 return tracedFuture( 373 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 374 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, 375 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 376 null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), 377 (c, r) -> r.getProcessed())) 378 .call(), 379 supplier); 380 } 381 382 @Override 383 public CompletableFuture<Boolean> thenMutate(RowMutations mutations) { 384 preCheck(); 385 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 386 final Supplier<Span> supplier = newTableOperationSpanBuilder() 387 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 388 .setContainerOperations(mutations); 389 return tracedFuture(() -> RawAsyncTableImpl.this 390 .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) 391 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, 392 mutations, 393 (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, 394 null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), 395 CheckAndMutateResult::isSuccess)) 396 .call(), supplier); 397 } 398 } 399 400 @Override 401 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 402 return new CheckAndMutateBuilderImpl(row, family); 403 } 404 405 private final class CheckAndMutateWithFilterBuilderImpl 406 implements CheckAndMutateWithFilterBuilder { 407 408 private final byte[] row; 409 410 private final Filter filter; 411 412 private TimeRange timeRange; 413 414 public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { 415 this.row = Preconditions.checkNotNull(row, "row is null"); 416 this.filter = Preconditions.checkNotNull(filter, "filter is null"); 417 } 418 419 @Override 420 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 421 this.timeRange = timeRange; 422 return this; 423 } 424 425 @Override 426 public CompletableFuture<Boolean> thenPut(Put put) { 427 validatePut(put, conn.connConf.getMaxKeyValueSize()); 428 final Supplier<Span> supplier = newTableOperationSpanBuilder() 429 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 430 .setContainerOperations(put); 431 return tracedFuture( 432 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 433 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, 434 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, 435 timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), 436 (c, r) -> r.getProcessed())) 437 .call(), 438 supplier); 439 } 440 441 @Override 442 public CompletableFuture<Boolean> thenDelete(Delete delete) { 443 final Supplier<Span> supplier = newTableOperationSpanBuilder() 444 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 445 .setContainerOperations(delete); 446 return tracedFuture( 447 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 448 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, 449 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, 450 timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), 451 (c, r) -> r.getProcessed())) 452 .call(), 453 supplier); 454 } 455 456 @Override 457 public CompletableFuture<Boolean> thenMutate(RowMutations mutations) { 458 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 459 final Supplier<Span> supplier = newTableOperationSpanBuilder() 460 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 461 .setContainerOperations(mutations); 462 return tracedFuture(() -> RawAsyncTableImpl.this 463 .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) 464 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, 465 mutations, 466 (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter, 467 timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), 468 CheckAndMutateResult::isSuccess)) 469 .call(), supplier); 470 } 471 } 472 473 @Override 474 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 475 return new CheckAndMutateWithFilterBuilderImpl(row, filter); 476 } 477 478 @Override 479 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 480 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutate) 481 .setContainerOperations(checkAndMutate.getAction()); 482 return tracedFuture(() -> { 483 if ( 484 checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete 485 || checkAndMutate.getAction() instanceof Increment 486 || checkAndMutate.getAction() instanceof Append 487 ) { 488 Mutation mutation = (Mutation) checkAndMutate.getAction(); 489 if (mutation instanceof Put) { 490 validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); 491 } 492 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 493 long nonce = conn.getNonceGenerator().newNonce(); 494 return RawAsyncTableImpl.this 495 .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(), 496 rpcTimeoutNs) 497 .action( 498 (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation, 499 (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), 500 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 501 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 502 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce), 503 (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) 504 .call(); 505 } else if (checkAndMutate.getAction() instanceof RowMutations) { 506 RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); 507 validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize()); 508 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 509 long nonce = conn.getNonceGenerator().newNonce(); 510 return RawAsyncTableImpl.this 511 .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(), 512 rpcTimeoutNs) 513 .action((controller, loc, stub) -> RawAsyncTableImpl.this.<CheckAndMutateResult, 514 CheckAndMutateResult> mutateRow(controller, loc, stub, rowMutations, 515 (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(), 516 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 517 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 518 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce), 519 resp -> resp)) 520 .call(); 521 } else { 522 CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>(); 523 future.completeExceptionally(new DoNotRetryIOException( 524 "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); 525 return future; 526 } 527 }, supplier); 528 } 529 530 @Override 531 public List<CompletableFuture<CheckAndMutateResult>> 532 checkAndMutate(List<CheckAndMutate> checkAndMutates) { 533 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutates) 534 .setContainerOperations(checkAndMutates); 535 return tracedFutures(() -> batch(checkAndMutates, rpcTimeoutNs).stream() 536 .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), supplier); 537 } 538 539 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, 540 // so here I write a new method as I do not want to change the abstraction of call method. 541 @SuppressWarnings("unchecked") 542 private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, 543 HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, 544 Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) { 545 CompletableFuture<RESP> future = new CompletableFuture<>(); 546 try { 547 byte[] regionName = loc.getRegion().getRegionName(); 548 MultiRequest req = reqConvert.convert(regionName, mutation); 549 stub.multi(controller, req, new RpcCallback<MultiResponse>() { 550 551 @Override 552 public void run(MultiResponse resp) { 553 if (controller.failed()) { 554 future.completeExceptionally(controller.getFailed()); 555 } else { 556 try { 557 org.apache.hadoop.hbase.client.MultiResponse multiResp = 558 ResponseConverter.getResults(req, resp, controller.cellScanner()); 559 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 560 loc.getServerName(), multiResp); 561 Throwable ex = multiResp.getException(regionName); 562 if (ex != null) { 563 future.completeExceptionally(ex instanceof IOException 564 ? ex 565 : new IOException( 566 "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); 567 } else { 568 future.complete( 569 respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0))); 570 } 571 } catch (IOException e) { 572 future.completeExceptionally(e); 573 } 574 } 575 } 576 }); 577 } catch (IOException e) { 578 future.completeExceptionally(e); 579 } 580 return future; 581 } 582 583 @Override 584 public CompletableFuture<Result> mutateRow(RowMutations mutations) { 585 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 586 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 587 long nonce = conn.getNonceGenerator().newNonce(); 588 final Supplier<Span> supplier = 589 newTableOperationSpanBuilder().setOperation(mutations).setContainerOperations(mutations); 590 return tracedFuture( 591 () -> this 592 .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) 593 .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub, 594 mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), 595 resp -> resp)) 596 .call(), 597 supplier); 598 } 599 600 private Scan setDefaultScanConfig(Scan scan) { 601 // always create a new scan object as we may reset the start row later. 602 Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); 603 if (newScan.getCaching() <= 0) { 604 newScan.setCaching(defaultScannerCaching); 605 } 606 if (newScan.getMaxResultSize() <= 0) { 607 newScan.setMaxResultSize(defaultScannerMaxResultSize); 608 } 609 return newScan; 610 } 611 612 @Override 613 public void scan(Scan scan, AdvancedScanResultConsumer consumer) { 614 new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, 615 pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, 616 startLogErrorsCnt, requestAttributes).start(); 617 } 618 619 private long resultSize2CacheSize(long maxResultSize) { 620 // * 2 if possible 621 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 622 } 623 624 @Override 625 public AsyncTableResultScanner getScanner(Scan scan) { 626 final long maxCacheSize = resultSize2CacheSize( 627 scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize); 628 final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan); 629 final AsyncTableResultScanner scanner = 630 new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize); 631 scan(scan, scanner); 632 return scanner; 633 } 634 635 @Override 636 public CompletableFuture<List<Result>> scanAll(Scan scan) { 637 CompletableFuture<List<Result>> future = new CompletableFuture<>(); 638 List<Result> scanResults = new ArrayList<>(); 639 scan(scan, new AdvancedScanResultConsumer() { 640 641 @Override 642 public void onNext(Result[] results, ScanController controller) { 643 scanResults.addAll(Arrays.asList(results)); 644 } 645 646 @Override 647 public void onError(Throwable error) { 648 future.completeExceptionally(error); 649 } 650 651 @Override 652 public void onComplete() { 653 future.complete(scanResults); 654 } 655 }); 656 return future; 657 } 658 659 @Override 660 public List<CompletableFuture<Result>> get(List<Get> gets) { 661 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(gets) 662 .setContainerOperations(HBaseSemanticAttributes.Operation.GET); 663 return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier); 664 } 665 666 @Override 667 public List<CompletableFuture<Void>> put(List<Put> puts) { 668 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(puts) 669 .setContainerOperations(HBaseSemanticAttributes.Operation.PUT); 670 return tracedFutures(() -> voidMutate(puts), supplier); 671 } 672 673 @Override 674 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 675 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(deletes) 676 .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE); 677 return tracedFutures(() -> voidMutate(deletes), supplier); 678 } 679 680 @Override 681 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 682 final Supplier<Span> supplier = 683 newTableOperationSpanBuilder().setOperation(actions).setContainerOperations(actions); 684 return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier); 685 } 686 687 private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { 688 return this.<Object> batch(actions, writeRpcTimeoutNs).stream() 689 .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); 690 } 691 692 private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) { 693 for (Row action : actions) { 694 if (action instanceof Put) { 695 validatePut((Put) action, conn.connConf.getMaxKeyValueSize()); 696 } else if (action instanceof CheckAndMutate) { 697 CheckAndMutate checkAndMutate = (CheckAndMutate) action; 698 if (checkAndMutate.getAction() instanceof Put) { 699 validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); 700 } else if (checkAndMutate.getAction() instanceof RowMutations) { 701 validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(), 702 conn.connConf.getMaxKeyValueSize()); 703 } 704 } else if (action instanceof RowMutations) { 705 validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize()); 706 } 707 } 708 return conn.callerFactory.batch().table(tableName).actions(actions) 709 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 710 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 711 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 712 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) 713 .setRequestAttributes(requestAttributes).call(); 714 } 715 716 @Override 717 public long getRpcTimeout(TimeUnit unit) { 718 return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); 719 } 720 721 @Override 722 public long getReadRpcTimeout(TimeUnit unit) { 723 return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); 724 } 725 726 @Override 727 public long getWriteRpcTimeout(TimeUnit unit) { 728 return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); 729 } 730 731 @Override 732 public long getOperationTimeout(TimeUnit unit) { 733 return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); 734 } 735 736 @Override 737 public long getScanTimeout(TimeUnit unit) { 738 return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); 739 } 740 741 @Override 742 public Map<String, byte[]> getRequestAttributes() { 743 return requestAttributes; 744 } 745 746 private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 747 ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { 748 RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, 749 region, row, rpcTimeoutNs, operationTimeoutNs); 750 final Span span = Span.current(); 751 S stub = stubMaker.apply(channel); 752 CompletableFuture<R> future = new CompletableFuture<>(); 753 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 754 callable.call(stub, controller, resp -> { 755 try (Scope ignored = span.makeCurrent()) { 756 if (controller.failed()) { 757 final Throwable failure = controller.getFailed(); 758 future.completeExceptionally(failure); 759 TraceUtil.setError(span, failure); 760 } else { 761 future.complete(resp); 762 span.setStatus(StatusCode.OK); 763 } 764 } finally { 765 span.end(); 766 } 767 }); 768 return future; 769 } 770 771 @Override 772 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 773 ServiceCaller<S, R> callable, byte[] row) { 774 return coprocessorService(stubMaker, callable, null, row); 775 } 776 777 private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) { 778 if (isEmptyStopRow(endKey)) { 779 if (isEmptyStopRow(region.getEndKey())) { 780 return true; 781 } 782 return false; 783 } else { 784 if (isEmptyStopRow(region.getEndKey())) { 785 return true; 786 } 787 int c = Bytes.compareTo(endKey, region.getEndKey()); 788 // 1. if the region contains endKey 789 // 2. endKey is equal to the region's endKey and we do not want to include endKey. 790 return c < 0 || (c == 0 && !endKeyInclusive); 791 } 792 } 793 794 private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, 795 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, 796 byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, 797 AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { 798 final Span span = Span.current(); 799 if (error != null) { 800 callback.onError(error); 801 TraceUtil.setError(span, error); 802 span.end(); 803 return; 804 } 805 unfinishedRequest.incrementAndGet(); 806 RegionInfo region = loc.getRegion(); 807 if (locateFinished(region, endKey, endKeyInclusive)) { 808 locateFinished.set(true); 809 } else { 810 addListener(conn.getLocator().getRegionLocation(tableName, region.getEndKey(), 811 RegionLocateType.CURRENT, operationTimeoutNs), (l, e) -> { 812 try (Scope ignored = span.makeCurrent()) { 813 onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, 814 locateFinished, unfinishedRequest, l, e); 815 } 816 }); 817 } 818 addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { 819 try (Scope ignored = span.makeCurrent()) { 820 if (e != null) { 821 callback.onRegionError(region, e); 822 } else { 823 callback.onRegionComplete(region, r); 824 } 825 if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { 826 callback.onComplete(); 827 } 828 } 829 }); 830 } 831 832 private final class CoprocessorServiceBuilderImpl<S, R> 833 implements CoprocessorServiceBuilder<S, R> { 834 835 private final Function<RpcChannel, S> stubMaker; 836 837 private final ServiceCaller<S, R> callable; 838 839 private final CoprocessorCallback<R> callback; 840 841 private byte[] startKey = HConstants.EMPTY_START_ROW; 842 843 private boolean startKeyInclusive; 844 845 private byte[] endKey = HConstants.EMPTY_END_ROW; 846 847 private boolean endKeyInclusive; 848 849 public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, 850 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { 851 this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); 852 this.callable = Preconditions.checkNotNull(callable, "callable is null"); 853 this.callback = Preconditions.checkNotNull(callback, "callback is null"); 854 } 855 856 @Override 857 public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { 858 this.startKey = Preconditions.checkNotNull(startKey, 859 "startKey is null. Consider using" 860 + " an empty byte array, or just do not call this method if you want to start selection" 861 + " from the first region"); 862 this.startKeyInclusive = inclusive; 863 return this; 864 } 865 866 @Override 867 public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { 868 this.endKey = Preconditions.checkNotNull(endKey, 869 "endKey is null. Consider using" 870 + " an empty byte array, or just do not call this method if you want to continue" 871 + " selection to the last region"); 872 this.endKeyInclusive = inclusive; 873 return this; 874 } 875 876 @Override 877 public void execute() { 878 final Span span = newTableOperationSpanBuilder() 879 .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC).build(); 880 try (Scope ignored = span.makeCurrent()) { 881 final RegionLocateType regionLocateType = 882 startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER; 883 final CompletableFuture<HRegionLocation> future = conn.getLocator() 884 .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs); 885 addListener(future, (loc, error) -> { 886 try (Scope ignored1 = span.makeCurrent()) { 887 onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, 888 endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error); 889 } 890 }); 891 } 892 } 893 } 894 895 @Override 896 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 897 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 898 CoprocessorCallback<R> callback) { 899 return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); 900 } 901}