001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations; 026import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 027import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; 028import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 029 030import io.opentelemetry.api.trace.Span; 031import io.opentelemetry.api.trace.StatusCode; 032import io.opentelemetry.context.Scope; 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.List; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.function.Function; 042import java.util.function.Supplier; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.CompareOperator; 045import org.apache.hadoop.hbase.DoNotRetryIOException; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; 050import org.apache.hadoop.hbase.client.ConnectionUtils.Converter; 051import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 052import org.apache.hadoop.hbase.filter.Filter; 053import org.apache.hadoop.hbase.io.TimeRange; 054import org.apache.hadoop.hbase.ipc.HBaseRpcController; 055import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 056import org.apache.hadoop.hbase.trace.TraceUtil; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.ReflectionUtils; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 064import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 065import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 066import org.apache.hbase.thirdparty.io.netty.util.Timer; 067 068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 070import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 078 079/** 080 * The implementation of RawAsyncTable. 081 * <p/> 082 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 083 * be finished inside the rpc framework thread, which means that the callbacks registered to the 084 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 085 * this class should not try to do time consuming tasks in the callbacks. 086 * @since 2.0.0 087 * @see AsyncTableImpl 088 */ 089@InterfaceAudience.Private 090class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { 091 092 private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); 093 094 private final AsyncConnectionImpl conn; 095 096 private final Timer retryTimer; 097 098 private final TableName tableName; 099 100 private final int defaultScannerCaching; 101 102 private final long defaultScannerMaxResultSize; 103 104 private final long rpcTimeoutNs; 105 106 private final long readRpcTimeoutNs; 107 108 private final long writeRpcTimeoutNs; 109 110 private final long operationTimeoutNs; 111 112 private final long scanTimeoutNs; 113 114 private final long pauseNs; 115 116 private final long pauseNsForServerOverloaded; 117 118 private final int maxAttempts; 119 120 private final int startLogErrorsCnt; 121 122 RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) { 123 this.conn = conn; 124 this.retryTimer = retryTimer; 125 this.tableName = builder.tableName; 126 this.rpcTimeoutNs = builder.rpcTimeoutNs; 127 this.readRpcTimeoutNs = builder.readRpcTimeoutNs; 128 this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs; 129 this.operationTimeoutNs = builder.operationTimeoutNs; 130 this.scanTimeoutNs = builder.scanTimeoutNs; 131 this.pauseNs = builder.pauseNs; 132 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 133 LOG.warn( 134 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 135 + " the normal pause value {} ms, use the greater one instead", 136 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 137 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 138 this.pauseNsForServerOverloaded = builder.pauseNs; 139 } else { 140 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 141 } 142 this.maxAttempts = builder.maxAttempts; 143 this.startLogErrorsCnt = builder.startLogErrorsCnt; 144 this.defaultScannerCaching = tableName.isSystemTable() 145 ? conn.connConf.getMetaScannerCaching() 146 : conn.connConf.getScannerCaching(); 147 this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); 148 } 149 150 @Override 151 public TableName getName() { 152 return tableName; 153 } 154 155 @Override 156 public Configuration getConfiguration() { 157 return conn.getConfiguration(); 158 } 159 160 @Override 161 public CompletableFuture<TableDescriptor> getDescriptor() { 162 return conn.getAdmin().getDescriptor(tableName); 163 } 164 165 @Override 166 public AsyncTableRegionLocator getRegionLocator() { 167 return conn.getRegionLocator(tableName); 168 } 169 170 private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller, 171 HRegionLocation loc, ClientService.Interface stub, REQ req, 172 Converter<MutateRequest, byte[], REQ> reqConvert, 173 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 174 return ConnectionUtils.call(controller, loc, stub, req, reqConvert, 175 (s, c, r, done) -> s.mutate(c, r, done), respConverter); 176 } 177 178 private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, 179 HRegionLocation loc, ClientService.Interface stub, REQ req, 180 Converter<MutateRequest, byte[], REQ> reqConvert) { 181 return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { 182 return null; 183 }); 184 } 185 186 private static Result toResult(HBaseRpcController controller, MutateResponse resp) 187 throws IOException { 188 if (!resp.hasResult()) { 189 return null; 190 } 191 return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); 192 } 193 194 @FunctionalInterface 195 private interface NoncedConverter<D, I, S> { 196 D convert(I info, S src, long nonceGroup, long nonce) throws IOException; 197 } 198 199 private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, 200 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 201 NoncedConverter<MutateRequest, byte[], REQ> reqConvert, 202 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 203 return mutate(controller, loc, stub, req, 204 (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); 205 } 206 207 private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) { 208 return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority) 209 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 210 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 211 .pause(pauseNs, TimeUnit.NANOSECONDS) 212 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 213 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); 214 } 215 216 private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> 217 newCaller(R row, long rpcTimeoutNs) { 218 return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); 219 } 220 221 private CompletableFuture<Result> get(Get get, int replicaId) { 222 return this.<Result, Get> newCaller(get, readRpcTimeoutNs) 223 .action((controller, loc, stub) -> ConnectionUtils.<Get, GetRequest, GetResponse, 224 Result> call(controller, loc, stub, get, RequestConverter::buildGetRequest, 225 (s, c, req, done) -> s.get(c, req, done), 226 (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) 227 .replicaId(replicaId).call(); 228 } 229 230 private TableOperationSpanBuilder newTableOperationSpanBuilder() { 231 return new TableOperationSpanBuilder(conn).setTableName(tableName); 232 } 233 234 @Override 235 public CompletableFuture<Result> get(Get get) { 236 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(get); 237 return tracedFuture( 238 () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), 239 RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, 240 conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()), 241 supplier); 242 } 243 244 @Override 245 public CompletableFuture<Void> put(Put put) { 246 validatePut(put, conn.connConf.getMaxKeyValueSize()); 247 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(put); 248 return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs) 249 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, 250 put, RequestConverter::buildMutateRequest)) 251 .call(), supplier); 252 } 253 254 @Override 255 public CompletableFuture<Void> delete(Delete delete) { 256 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(delete); 257 return tracedFuture(() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) 258 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, 259 stub, delete, RequestConverter::buildMutateRequest)) 260 .call(), supplier); 261 } 262 263 @Override 264 public CompletableFuture<Result> append(Append append) { 265 checkHasFamilies(append); 266 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(append); 267 return tracedFuture(() -> { 268 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 269 long nonce = conn.getNonceGenerator().newNonce(); 270 return this.<Result, Append> newCaller(append, rpcTimeoutNs) 271 .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, 272 controller, loc, stub, append, RequestConverter::buildMutateRequest, 273 RawAsyncTableImpl::toResult)) 274 .call(); 275 }, supplier); 276 } 277 278 @Override 279 public CompletableFuture<Result> increment(Increment increment) { 280 checkHasFamilies(increment); 281 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(increment); 282 return tracedFuture(() -> { 283 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 284 long nonce = conn.getNonceGenerator().newNonce(); 285 return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) 286 .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, 287 controller, loc, stub, increment, RequestConverter::buildMutateRequest, 288 RawAsyncTableImpl::toResult)) 289 .call(); 290 }, supplier); 291 } 292 293 private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 294 295 private final byte[] row; 296 297 private final byte[] family; 298 299 private byte[] qualifier; 300 301 private TimeRange timeRange; 302 303 private CompareOperator op; 304 305 private byte[] value; 306 307 public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 308 this.row = Preconditions.checkNotNull(row, "row is null"); 309 this.family = Preconditions.checkNotNull(family, "family is null"); 310 } 311 312 @Override 313 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 314 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" 315 + " an empty byte array, or just do not call this method if you want a null qualifier"); 316 return this; 317 } 318 319 @Override 320 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 321 this.timeRange = timeRange; 322 return this; 323 } 324 325 @Override 326 public CheckAndMutateBuilder ifNotExists() { 327 this.op = CompareOperator.EQUAL; 328 this.value = null; 329 return this; 330 } 331 332 @Override 333 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 334 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 335 this.value = Preconditions.checkNotNull(value, "value is null"); 336 return this; 337 } 338 339 private void preCheck() { 340 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" 341 + " calling ifNotExists/ifEquals/ifMatches before executing the request"); 342 } 343 344 @Override 345 public CompletableFuture<Boolean> thenPut(Put put) { 346 validatePut(put, conn.connConf.getMaxKeyValueSize()); 347 preCheck(); 348 final Supplier<Span> supplier = newTableOperationSpanBuilder() 349 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 350 .setContainerOperations(put); 351 return tracedFuture( 352 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 353 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, 354 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 355 null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), 356 (c, r) -> r.getProcessed())) 357 .call(), 358 supplier); 359 } 360 361 @Override 362 public CompletableFuture<Boolean> thenDelete(Delete delete) { 363 preCheck(); 364 final Supplier<Span> supplier = newTableOperationSpanBuilder() 365 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 366 .setContainerOperations(delete); 367 return tracedFuture( 368 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 369 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, 370 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 371 null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), 372 (c, r) -> r.getProcessed())) 373 .call(), 374 supplier); 375 } 376 377 @Override 378 public CompletableFuture<Boolean> thenMutate(RowMutations mutations) { 379 preCheck(); 380 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 381 final Supplier<Span> supplier = newTableOperationSpanBuilder() 382 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 383 .setContainerOperations(mutations); 384 return tracedFuture(() -> RawAsyncTableImpl.this 385 .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) 386 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, 387 mutations, 388 (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, 389 null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), 390 CheckAndMutateResult::isSuccess)) 391 .call(), supplier); 392 } 393 } 394 395 @Override 396 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 397 return new CheckAndMutateBuilderImpl(row, family); 398 } 399 400 private final class CheckAndMutateWithFilterBuilderImpl 401 implements CheckAndMutateWithFilterBuilder { 402 403 private final byte[] row; 404 405 private final Filter filter; 406 407 private TimeRange timeRange; 408 409 public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { 410 this.row = Preconditions.checkNotNull(row, "row is null"); 411 this.filter = Preconditions.checkNotNull(filter, "filter is null"); 412 } 413 414 @Override 415 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 416 this.timeRange = timeRange; 417 return this; 418 } 419 420 @Override 421 public CompletableFuture<Boolean> thenPut(Put put) { 422 validatePut(put, conn.connConf.getMaxKeyValueSize()); 423 final Supplier<Span> supplier = newTableOperationSpanBuilder() 424 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 425 .setContainerOperations(put); 426 return tracedFuture( 427 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 428 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, 429 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, 430 timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), 431 (c, r) -> r.getProcessed())) 432 .call(), 433 supplier); 434 } 435 436 @Override 437 public CompletableFuture<Boolean> thenDelete(Delete delete) { 438 final Supplier<Span> supplier = newTableOperationSpanBuilder() 439 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 440 .setContainerOperations(delete); 441 return tracedFuture( 442 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 443 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, 444 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, 445 timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), 446 (c, r) -> r.getProcessed())) 447 .call(), 448 supplier); 449 } 450 451 @Override 452 public CompletableFuture<Boolean> thenMutate(RowMutations mutations) { 453 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 454 final Supplier<Span> supplier = newTableOperationSpanBuilder() 455 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 456 .setContainerOperations(mutations); 457 return tracedFuture(() -> RawAsyncTableImpl.this 458 .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) 459 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, 460 mutations, 461 (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter, 462 timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), 463 CheckAndMutateResult::isSuccess)) 464 .call(), supplier); 465 } 466 } 467 468 @Override 469 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 470 return new CheckAndMutateWithFilterBuilderImpl(row, filter); 471 } 472 473 @Override 474 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 475 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutate) 476 .setContainerOperations(checkAndMutate.getAction()); 477 return tracedFuture(() -> { 478 if ( 479 checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete 480 || checkAndMutate.getAction() instanceof Increment 481 || checkAndMutate.getAction() instanceof Append 482 ) { 483 Mutation mutation = (Mutation) checkAndMutate.getAction(); 484 if (mutation instanceof Put) { 485 validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); 486 } 487 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 488 long nonce = conn.getNonceGenerator().newNonce(); 489 return RawAsyncTableImpl.this 490 .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(), 491 rpcTimeoutNs) 492 .action( 493 (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation, 494 (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), 495 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 496 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 497 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce), 498 (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) 499 .call(); 500 } else if (checkAndMutate.getAction() instanceof RowMutations) { 501 RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); 502 validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize()); 503 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 504 long nonce = conn.getNonceGenerator().newNonce(); 505 return RawAsyncTableImpl.this 506 .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(), 507 rpcTimeoutNs) 508 .action((controller, loc, stub) -> RawAsyncTableImpl.this.<CheckAndMutateResult, 509 CheckAndMutateResult> mutateRow(controller, loc, stub, rowMutations, 510 (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(), 511 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 512 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 513 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce), 514 resp -> resp)) 515 .call(); 516 } else { 517 CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>(); 518 future.completeExceptionally(new DoNotRetryIOException( 519 "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); 520 return future; 521 } 522 }, supplier); 523 } 524 525 @Override 526 public List<CompletableFuture<CheckAndMutateResult>> 527 checkAndMutate(List<CheckAndMutate> checkAndMutates) { 528 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutates) 529 .setContainerOperations(checkAndMutates); 530 return tracedFutures(() -> batch(checkAndMutates, rpcTimeoutNs).stream() 531 .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), supplier); 532 } 533 534 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, 535 // so here I write a new method as I do not want to change the abstraction of call method. 536 @SuppressWarnings("unchecked") 537 private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, 538 HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, 539 Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) { 540 CompletableFuture<RESP> future = new CompletableFuture<>(); 541 try { 542 byte[] regionName = loc.getRegion().getRegionName(); 543 MultiRequest req = reqConvert.convert(regionName, mutation); 544 stub.multi(controller, req, new RpcCallback<MultiResponse>() { 545 546 @Override 547 public void run(MultiResponse resp) { 548 if (controller.failed()) { 549 future.completeExceptionally(controller.getFailed()); 550 } else { 551 try { 552 org.apache.hadoop.hbase.client.MultiResponse multiResp = 553 ResponseConverter.getResults(req, resp, controller.cellScanner()); 554 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 555 loc.getServerName(), multiResp); 556 Throwable ex = multiResp.getException(regionName); 557 if (ex != null) { 558 future.completeExceptionally(ex instanceof IOException 559 ? ex 560 : new IOException( 561 "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); 562 } else { 563 future.complete( 564 respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0))); 565 } 566 } catch (IOException e) { 567 future.completeExceptionally(e); 568 } 569 } 570 } 571 }); 572 } catch (IOException e) { 573 future.completeExceptionally(e); 574 } 575 return future; 576 } 577 578 @Override 579 public CompletableFuture<Result> mutateRow(RowMutations mutations) { 580 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 581 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 582 long nonce = conn.getNonceGenerator().newNonce(); 583 final Supplier<Span> supplier = 584 newTableOperationSpanBuilder().setOperation(mutations).setContainerOperations(mutations); 585 return tracedFuture( 586 () -> this 587 .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) 588 .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub, 589 mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), 590 resp -> resp)) 591 .call(), 592 supplier); 593 } 594 595 private Scan setDefaultScanConfig(Scan scan) { 596 // always create a new scan object as we may reset the start row later. 597 Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); 598 if (newScan.getCaching() <= 0) { 599 newScan.setCaching(defaultScannerCaching); 600 } 601 if (newScan.getMaxResultSize() <= 0) { 602 newScan.setMaxResultSize(defaultScannerMaxResultSize); 603 } 604 return newScan; 605 } 606 607 @Override 608 public void scan(Scan scan, AdvancedScanResultConsumer consumer) { 609 new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, 610 pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, 611 startLogErrorsCnt).start(); 612 } 613 614 private long resultSize2CacheSize(long maxResultSize) { 615 // * 2 if possible 616 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 617 } 618 619 @Override 620 public AsyncTableResultScanner getScanner(Scan scan) { 621 final long maxCacheSize = resultSize2CacheSize( 622 scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize); 623 final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan); 624 final AsyncTableResultScanner scanner = 625 new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize); 626 scan(scan, scanner); 627 return scanner; 628 } 629 630 @Override 631 public CompletableFuture<List<Result>> scanAll(Scan scan) { 632 CompletableFuture<List<Result>> future = new CompletableFuture<>(); 633 List<Result> scanResults = new ArrayList<>(); 634 scan(scan, new AdvancedScanResultConsumer() { 635 636 @Override 637 public void onNext(Result[] results, ScanController controller) { 638 scanResults.addAll(Arrays.asList(results)); 639 } 640 641 @Override 642 public void onError(Throwable error) { 643 future.completeExceptionally(error); 644 } 645 646 @Override 647 public void onComplete() { 648 future.complete(scanResults); 649 } 650 }); 651 return future; 652 } 653 654 @Override 655 public List<CompletableFuture<Result>> get(List<Get> gets) { 656 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(gets) 657 .setContainerOperations(HBaseSemanticAttributes.Operation.GET); 658 return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier); 659 } 660 661 @Override 662 public List<CompletableFuture<Void>> put(List<Put> puts) { 663 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(puts) 664 .setContainerOperations(HBaseSemanticAttributes.Operation.PUT); 665 return tracedFutures(() -> voidMutate(puts), supplier); 666 } 667 668 @Override 669 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 670 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(deletes) 671 .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE); 672 return tracedFutures(() -> voidMutate(deletes), supplier); 673 } 674 675 @Override 676 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 677 final Supplier<Span> supplier = 678 newTableOperationSpanBuilder().setOperation(actions).setContainerOperations(actions); 679 return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier); 680 } 681 682 private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { 683 return this.<Object> batch(actions, writeRpcTimeoutNs).stream() 684 .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); 685 } 686 687 private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) { 688 for (Row action : actions) { 689 if (action instanceof Put) { 690 validatePut((Put) action, conn.connConf.getMaxKeyValueSize()); 691 } else if (action instanceof CheckAndMutate) { 692 CheckAndMutate checkAndMutate = (CheckAndMutate) action; 693 if (checkAndMutate.getAction() instanceof Put) { 694 validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); 695 } else if (checkAndMutate.getAction() instanceof RowMutations) { 696 validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(), 697 conn.connConf.getMaxKeyValueSize()); 698 } 699 } else if (action instanceof RowMutations) { 700 validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize()); 701 } 702 } 703 return conn.callerFactory.batch().table(tableName).actions(actions) 704 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 705 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 706 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 707 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); 708 } 709 710 @Override 711 public long getRpcTimeout(TimeUnit unit) { 712 return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); 713 } 714 715 @Override 716 public long getReadRpcTimeout(TimeUnit unit) { 717 return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); 718 } 719 720 @Override 721 public long getWriteRpcTimeout(TimeUnit unit) { 722 return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); 723 } 724 725 @Override 726 public long getOperationTimeout(TimeUnit unit) { 727 return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); 728 } 729 730 @Override 731 public long getScanTimeout(TimeUnit unit) { 732 return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); 733 } 734 735 private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 736 ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { 737 RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, 738 region, row, rpcTimeoutNs, operationTimeoutNs); 739 final Span span = Span.current(); 740 S stub = stubMaker.apply(channel); 741 CompletableFuture<R> future = new CompletableFuture<>(); 742 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 743 callable.call(stub, controller, resp -> { 744 try (Scope ignored = span.makeCurrent()) { 745 if (controller.failed()) { 746 final Throwable failure = controller.getFailed(); 747 future.completeExceptionally(failure); 748 TraceUtil.setError(span, failure); 749 } else { 750 future.complete(resp); 751 span.setStatus(StatusCode.OK); 752 } 753 } finally { 754 span.end(); 755 } 756 }); 757 return future; 758 } 759 760 @Override 761 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 762 ServiceCaller<S, R> callable, byte[] row) { 763 return coprocessorService(stubMaker, callable, null, row); 764 } 765 766 private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) { 767 if (isEmptyStopRow(endKey)) { 768 if (isEmptyStopRow(region.getEndKey())) { 769 return true; 770 } 771 return false; 772 } else { 773 if (isEmptyStopRow(region.getEndKey())) { 774 return true; 775 } 776 int c = Bytes.compareTo(endKey, region.getEndKey()); 777 // 1. if the region contains endKey 778 // 2. endKey is equal to the region's endKey and we do not want to include endKey. 779 return c < 0 || c == 0 && !endKeyInclusive; 780 } 781 } 782 783 private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, 784 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, 785 byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, 786 AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { 787 final Span span = Span.current(); 788 if (error != null) { 789 callback.onError(error); 790 TraceUtil.setError(span, error); 791 span.end(); 792 return; 793 } 794 unfinishedRequest.incrementAndGet(); 795 RegionInfo region = loc.getRegion(); 796 if (locateFinished(region, endKey, endKeyInclusive)) { 797 locateFinished.set(true); 798 } else { 799 addListener(conn.getLocator().getRegionLocation(tableName, region.getEndKey(), 800 RegionLocateType.CURRENT, operationTimeoutNs), (l, e) -> { 801 try (Scope ignored = span.makeCurrent()) { 802 onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, 803 locateFinished, unfinishedRequest, l, e); 804 } 805 }); 806 } 807 addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { 808 try (Scope ignored = span.makeCurrent()) { 809 if (e != null) { 810 callback.onRegionError(region, e); 811 } else { 812 callback.onRegionComplete(region, r); 813 } 814 if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { 815 callback.onComplete(); 816 } 817 } 818 }); 819 } 820 821 private final class CoprocessorServiceBuilderImpl<S, R> 822 implements CoprocessorServiceBuilder<S, R> { 823 824 private final Function<RpcChannel, S> stubMaker; 825 826 private final ServiceCaller<S, R> callable; 827 828 private final CoprocessorCallback<R> callback; 829 830 private byte[] startKey = HConstants.EMPTY_START_ROW; 831 832 private boolean startKeyInclusive; 833 834 private byte[] endKey = HConstants.EMPTY_END_ROW; 835 836 private boolean endKeyInclusive; 837 838 public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, 839 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { 840 this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); 841 this.callable = Preconditions.checkNotNull(callable, "callable is null"); 842 this.callback = Preconditions.checkNotNull(callback, "callback is null"); 843 } 844 845 @Override 846 public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { 847 this.startKey = Preconditions.checkNotNull(startKey, 848 "startKey is null. Consider using" 849 + " an empty byte array, or just do not call this method if you want to start selection" 850 + " from the first region"); 851 this.startKeyInclusive = inclusive; 852 return this; 853 } 854 855 @Override 856 public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { 857 this.endKey = Preconditions.checkNotNull(endKey, 858 "endKey is null. Consider using" 859 + " an empty byte array, or just do not call this method if you want to continue" 860 + " selection to the last region"); 861 this.endKeyInclusive = inclusive; 862 return this; 863 } 864 865 @Override 866 public void execute() { 867 final Span span = newTableOperationSpanBuilder() 868 .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC).build(); 869 try (Scope ignored = span.makeCurrent()) { 870 final RegionLocateType regionLocateType = 871 startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER; 872 final CompletableFuture<HRegionLocation> future = conn.getLocator() 873 .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs); 874 addListener(future, (loc, error) -> { 875 try (Scope ignored1 = span.makeCurrent()) { 876 onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, 877 endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error); 878 } 879 }); 880 } 881 } 882 } 883 884 @Override 885 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 886 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 887 CoprocessorCallback<R> callback) { 888 return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); 889 } 890}