001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. 021// Internally, we use shaded protobuf. This below are part of our public API. 022// SEE ABOVE NOTE! 023 024import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 025 026import com.google.protobuf.Descriptors; 027import com.google.protobuf.Message; 028import com.google.protobuf.Service; 029import com.google.protobuf.ServiceException; 030import io.opentelemetry.api.trace.Span; 031import io.opentelemetry.api.trace.StatusCode; 032import io.opentelemetry.context.Context; 033import io.opentelemetry.context.Scope; 034import java.io.IOException; 035import java.io.InterruptedIOException; 036import java.util.ArrayList; 037import java.util.Collections; 038import java.util.List; 039import java.util.Map; 040import java.util.Optional; 041import java.util.TreeMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Future; 045import java.util.concurrent.SynchronousQueue; 046import java.util.concurrent.ThreadPoolExecutor; 047import java.util.concurrent.TimeUnit; 048import java.util.function.Supplier; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.CompareOperator; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.HTableDescriptor; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.coprocessor.Batch; 056import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 057import org.apache.hadoop.hbase.client.trace.TableSpanBuilder; 058import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 059import org.apache.hadoop.hbase.filter.Filter; 060import org.apache.hadoop.hbase.io.TimeRange; 061import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 062import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 063import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 064import org.apache.hadoop.hbase.trace.TraceUtil; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.Pair; 067import org.apache.hadoop.hbase.util.ReflectionUtils; 068import org.apache.hadoop.hbase.util.Threads; 069import org.apache.yetus.audience.InterfaceAudience; 070import org.apache.yetus.audience.InterfaceStability; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 075import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 076 077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 078import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 079import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 084 085/** 086 * An implementation of {@link Table}. Used to communicate with a single HBase table. Lightweight. 087 * Get as needed and just close when done. Instances of this class SHOULD NOT be constructed 088 * directly. Obtain an instance via {@link Connection}. See {@link ConnectionFactory} class comment 089 * for an example of how. 090 * <p> 091 * This class is thread safe since 2.0.0 if not invoking any of the setter methods. All setters are 092 * moved into {@link TableBuilder} and reserved here only for keeping backward compatibility, and 093 * TODO will be removed soon. 094 * <p> 095 * HTable is no longer a client API. Use {@link Table} instead. It is marked 096 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in <a href= 097 * "https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop 098 * Interface Classification</a> There are no guarantees for backwards source / binary compatibility 099 * and methods or class can change or go away without deprecation. 100 * @see Table 101 * @see Admin 102 * @see Connection 103 * @see ConnectionFactory 104 */ 105@InterfaceAudience.Private 106@InterfaceStability.Stable 107public class HTable implements Table { 108 private static final Logger LOG = LoggerFactory.getLogger(HTable.class); 109 private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG; 110 private final ClusterConnection connection; 111 private final TableName tableName; 112 private final Configuration configuration; 113 private final ConnectionConfiguration connConfiguration; 114 private boolean closed = false; 115 private final int scannerCaching; 116 private final long scannerMaxResultSize; 117 private final ExecutorService pool; // For Multi & Scan 118 private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc 119 private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX 120 private int readRpcTimeoutMs; // timeout for each read rpc request 121 private int writeRpcTimeoutMs; // timeout for each write rpc request 122 123 private final int scanReadRpcTimeout; 124 private final int scanTimeout; 125 private final boolean cleanupPoolOnClose; // shutdown the pool in close() 126 private final HRegionLocator locator; 127 128 /** The Async process for batch */ 129 AsyncProcess multiAp; 130 private final RpcRetryingCallerFactory rpcCallerFactory; 131 private final RpcControllerFactory rpcControllerFactory; 132 133 // Marked Private @since 1.0 134 @InterfaceAudience.Private 135 public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { 136 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); 137 if (maxThreads == 0) { 138 maxThreads = 1; // is there a better default? 139 } 140 int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1); 141 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); 142 143 // Using the "direct handoff" approach, new threads will only be created 144 // if it is necessary and will grow unbounded. This could be bad but in HCM 145 // we only create as many Runnables as there are region servers. It means 146 // it also scales when new region servers are added. 147 ThreadPoolExecutor pool = 148 new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, TimeUnit.SECONDS, 149 new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("htable-pool-%d") 150 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 151 pool.allowCoreThreadTimeOut(true); 152 return pool; 153 } 154 155 /** 156 * Creates an object to access a HBase table. Used by HBase internally. DO NOT USE. See 157 * {@link ConnectionFactory} class comment for how to get a {@link Table} instance (use 158 * {@link Table} instead of {@link HTable}). 159 * @param connection Connection to be used. 160 * @param builder The table builder 161 * @param rpcCallerFactory The RPC caller factory 162 * @param rpcControllerFactory The RPC controller factory 163 * @param pool ExecutorService to be used. 164 */ 165 @InterfaceAudience.Private 166 protected HTable(final ConnectionImplementation connection, final TableBuilderBase builder, 167 final RpcRetryingCallerFactory rpcCallerFactory, 168 final RpcControllerFactory rpcControllerFactory, final ExecutorService pool) { 169 this.connection = Preconditions.checkNotNull(connection, "connection is null"); 170 this.configuration = connection.getConfiguration(); 171 this.connConfiguration = connection.getConnectionConfiguration(); 172 if (pool == null) { 173 this.pool = getDefaultExecutor(this.configuration); 174 this.cleanupPoolOnClose = true; 175 } else { 176 this.pool = pool; 177 this.cleanupPoolOnClose = false; 178 } 179 if (rpcCallerFactory == null) { 180 this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); 181 } else { 182 this.rpcCallerFactory = rpcCallerFactory; 183 } 184 185 if (rpcControllerFactory == null) { 186 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); 187 } else { 188 this.rpcControllerFactory = rpcControllerFactory; 189 } 190 191 this.tableName = builder.tableName; 192 this.operationTimeoutMs = builder.operationTimeout; 193 this.rpcTimeoutMs = builder.rpcTimeout; 194 this.readRpcTimeoutMs = builder.readRpcTimeout; 195 this.writeRpcTimeoutMs = builder.writeRpcTimeout; 196 this.scanReadRpcTimeout = builder.scanReadRpcTimeout; 197 this.scanTimeout = builder.scanTimeout; 198 this.scannerCaching = connConfiguration.getScannerCaching(); 199 this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); 200 201 // puts need to track errors globally due to how the APIs currently work. 202 multiAp = this.connection.getAsyncProcess(); 203 this.locator = new HRegionLocator(tableName, connection); 204 } 205 206 /** Returns maxKeyValueSize from configuration. */ 207 public static int getMaxKeyValueSize(Configuration conf) { 208 return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1); 209 } 210 211 @Override 212 public Configuration getConfiguration() { 213 return configuration; 214 } 215 216 @Override 217 public TableName getName() { 218 return tableName; 219 } 220 221 /** 222 * <em>INTERNAL</em> Used by unit tests and tools to do low-level manipulations. 223 * @return A Connection instance. 224 */ 225 protected Connection getConnection() { 226 return this.connection; 227 } 228 229 @Override 230 @Deprecated 231 public HTableDescriptor getTableDescriptor() throws IOException { 232 HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory, 233 rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs); 234 if (htd != null) { 235 return new ImmutableHTableDescriptor(htd); 236 } 237 return null; 238 } 239 240 @Override 241 public TableDescriptor getDescriptor() throws IOException { 242 return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, 243 rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs); 244 } 245 246 /** 247 * Get the corresponding start keys and regions for an arbitrary range of keys. 248 * <p> 249 * @param startKey Starting row in range, inclusive 250 * @param endKey Ending row in range 251 * @param includeEndKey true if endRow is inclusive, false if exclusive 252 * @return A pair of list of start keys and list of HRegionLocations that contain the specified 253 * range 254 * @throws IOException if a remote or network exception occurs 255 */ 256 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey, 257 final byte[] endKey, final boolean includeEndKey) throws IOException { 258 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false); 259 } 260 261 /** 262 * Get the corresponding start keys and regions for an arbitrary range of keys. 263 * <p> 264 * @param startKey Starting row in range, inclusive 265 * @param endKey Ending row in range 266 * @param includeEndKey true if endRow is inclusive, false if exclusive 267 * @param reload true to reload information or false to use cached information 268 * @return A pair of list of start keys and list of HRegionLocations that contain the specified 269 * range 270 * @throws IOException if a remote or network exception occurs 271 */ 272 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey, 273 final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException { 274 final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW); 275 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { 276 throw new IllegalArgumentException( 277 "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey)); 278 } 279 List<byte[]> keysInRange = new ArrayList<>(); 280 List<HRegionLocation> regionsInRange = new ArrayList<>(); 281 byte[] currentKey = startKey; 282 do { 283 HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload); 284 keysInRange.add(currentKey); 285 regionsInRange.add(regionLocation); 286 currentKey = regionLocation.getRegionInfo().getEndKey(); 287 } while ( 288 !Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) 289 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 290 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)) 291 ); 292 return new Pair<>(keysInRange, regionsInRange); 293 } 294 295 /** 296 * The underlying {@link HTable} must not be closed. {@link Table#getScanner(Scan)} has other 297 * usage details. 298 */ 299 @Override 300 public ResultScanner getScanner(Scan scan) throws IOException { 301 final Span span = 302 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(scan).build(); 303 try (Scope ignored = span.makeCurrent()) { 304 if (scan.getCaching() <= 0) { 305 scan.setCaching(scannerCaching); 306 } 307 if (scan.getMaxResultSize() <= 0) { 308 scan.setMaxResultSize(scannerMaxResultSize); 309 } 310 if (scan.getMvccReadPoint() > 0) { 311 // it is not supposed to be set by user, clear 312 scan.resetMvccReadPoint(); 313 } 314 final boolean async = scan.isAsyncPrefetch() != null 315 ? scan.isAsyncPrefetch() 316 : connConfiguration.isClientScannerAsyncPrefetch(); 317 final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan(); 318 319 if (scan.isReversed()) { 320 return new ReversedClientScanner(getConfiguration(), scan, getName(), connection, 321 rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, 322 replicaTimeout); 323 } else { 324 if (async) { 325 return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection, 326 rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, 327 replicaTimeout); 328 } else { 329 return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection, 330 rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, 331 replicaTimeout); 332 } 333 } 334 } 335 } 336 337 /** 338 * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[])} has other 339 * usage details. 340 */ 341 @Override 342 public ResultScanner getScanner(byte[] family) throws IOException { 343 Scan scan = new Scan(); 344 scan.addFamily(family); 345 return getScanner(scan); 346 } 347 348 /** 349 * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[], byte[])} has 350 * other usage details. 351 */ 352 @Override 353 public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { 354 Scan scan = new Scan(); 355 scan.addColumn(family, qualifier); 356 return getScanner(scan); 357 } 358 359 @Override 360 public Result get(final Get get) throws IOException { 361 final Supplier<Span> supplier = 362 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(get); 363 return TraceUtil.trace(() -> get(get, get.isCheckExistenceOnly()), supplier); 364 } 365 366 private Result get(Get get, final boolean checkExistenceOnly) throws IOException { 367 // if we are changing settings to the get, clone it. 368 if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) { 369 get = ReflectionUtils.newInstance(get.getClass(), get); 370 get.setCheckExistenceOnly(checkExistenceOnly); 371 if (get.getConsistency() == null) { 372 get.setConsistency(DEFAULT_CONSISTENCY); 373 } 374 } 375 376 if (get.getConsistency() == Consistency.STRONG) { 377 final Get configuredGet = get; 378 ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, 379 getName(), get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) { 380 @Override 381 protected Result rpcCall() throws Exception { 382 ClientProtos.GetRequest request = RequestConverter 383 .buildGetRequest(getLocation().getRegionInfo().getRegionName(), configuredGet); 384 ClientProtos.GetResponse response = doGet(request); 385 return response == null 386 ? null 387 : ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 388 } 389 }; 390 return rpcCallerFactory.<Result> newCaller(readRpcTimeoutMs).callWithRetries(callable, 391 this.operationTimeoutMs); 392 } 393 394 // Call that takes into account the replica 395 RpcRetryingCallerWithReadReplicas callable = 396 new RpcRetryingCallerWithReadReplicas(rpcControllerFactory, tableName, this.connection, get, 397 pool, connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs, 398 connConfiguration.getPrimaryCallTimeoutMicroSecond()); 399 return callable.call(operationTimeoutMs); 400 } 401 402 @Override 403 public Result[] get(List<Get> gets) throws IOException { 404 final Supplier<Span> supplier = 405 new TableOperationSpanBuilder(connection).setTableName(tableName) 406 .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(gets); 407 return TraceUtil.trace(() -> { 408 if (gets.size() == 1) { 409 return new Result[] { get(gets.get(0)) }; 410 } 411 try { 412 Object[] r1 = new Object[gets.size()]; 413 batch((List<? extends Row>) gets, r1, readRpcTimeoutMs); 414 // Translate. 415 Result[] results = new Result[r1.length]; 416 int i = 0; 417 for (Object obj : r1) { 418 // Batch ensures if there is a failure we get an exception instead 419 results[i++] = (Result) obj; 420 } 421 return results; 422 } catch (InterruptedException e) { 423 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 424 } 425 }, supplier); 426 } 427 428 @Override 429 public void batch(final List<? extends Row> actions, final Object[] results) 430 throws InterruptedException, IOException { 431 int rpcTimeout = writeRpcTimeoutMs; 432 boolean hasRead = false; 433 boolean hasWrite = false; 434 for (Row action : actions) { 435 if (action instanceof Mutation) { 436 hasWrite = true; 437 } else { 438 hasRead = true; 439 } 440 if (hasRead && hasWrite) { 441 break; 442 } 443 } 444 if (hasRead && !hasWrite) { 445 rpcTimeout = readRpcTimeoutMs; 446 } 447 try { 448 batch(actions, results, rpcTimeout); 449 } catch (InterruptedException e) { 450 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 451 } 452 } 453 454 public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout) 455 throws InterruptedException, IOException { 456 AsyncProcessTask task = 457 AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(actions) 458 .setResults(results).setRpcTimeout(rpcTimeout).setOperationTimeout(operationTimeoutMs) 459 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build(); 460 final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName) 461 .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(actions) 462 .build(); 463 try (Scope ignored = span.makeCurrent()) { 464 AsyncRequestFuture ars = multiAp.submit(task); 465 ars.waitUntilDone(); 466 if (ars.hasError()) { 467 TraceUtil.setError(span, ars.getErrors()); 468 throw ars.getErrors(); 469 } 470 span.setStatus(StatusCode.OK); 471 } finally { 472 span.end(); 473 } 474 } 475 476 @Override 477 public <R> void batchCallback(final List<? extends Row> actions, final Object[] results, 478 final Batch.Callback<R> callback) throws IOException, InterruptedException { 479 doBatchWithCallback(actions, results, callback, connection, pool, tableName); 480 } 481 482 public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results, 483 Batch.Callback<R> callback, ClusterConnection connection, ExecutorService pool, 484 TableName tableName) throws InterruptedIOException, RetriesExhaustedWithDetailsException { 485 int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout(); 486 int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 487 connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 488 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 489 AsyncProcessTask<R> task = 490 AsyncProcessTask.newBuilder(callback).setPool(pool).setTableName(tableName) 491 .setRowAccess(actions).setResults(results).setOperationTimeout(operationTimeout) 492 .setRpcTimeout(writeTimeout).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build(); 493 final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName) 494 .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(actions) 495 .build(); 496 try (Scope ignored = span.makeCurrent()) { 497 AsyncRequestFuture ars = connection.getAsyncProcess().submit(task); 498 ars.waitUntilDone(); 499 if (ars.hasError()) { 500 TraceUtil.setError(span, ars.getErrors()); 501 throw ars.getErrors(); 502 } 503 } finally { 504 span.end(); 505 } 506 } 507 508 @Override 509 public void delete(final Delete delete) throws IOException { 510 final Supplier<Span> supplier = 511 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(delete); 512 TraceUtil.trace(() -> { 513 ClientServiceCallable<Void> callable = 514 new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(), 515 this.rpcControllerFactory.newController(), delete.getPriority()) { 516 @Override 517 protected Void rpcCall() throws Exception { 518 MutateRequest request = RequestConverter 519 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete); 520 doMutate(request); 521 return null; 522 } 523 }; 524 rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 525 this.operationTimeoutMs); 526 }, supplier); 527 } 528 529 @Override 530 public void delete(final List<Delete> deletes) throws IOException { 531 Object[] results = new Object[deletes.size()]; 532 try { 533 batch(deletes, results, writeRpcTimeoutMs); 534 } catch (InterruptedException e) { 535 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 536 } finally { 537 // TODO: to be consistent with batch put(), do not modify input list 538 // mutate list so that it is empty for complete success, or contains only failed records 539 // results are returned in the same order as the requests in list walk the list backwards, 540 // so we can remove from list without impacting the indexes of earlier members 541 for (int i = results.length - 1; i >= 0; i--) { 542 // if result is not null, it succeeded 543 if (results[i] instanceof Result) { 544 deletes.remove(i); 545 } 546 } 547 } 548 } 549 550 @Override 551 public void put(final Put put) throws IOException { 552 final Supplier<Span> supplier = 553 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(put); 554 TraceUtil.trace(() -> { 555 validatePut(put); 556 ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(this.connection, 557 getName(), put.getRow(), this.rpcControllerFactory.newController(), put.getPriority()) { 558 @Override 559 protected Void rpcCall() throws Exception { 560 MutateRequest request = 561 RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put); 562 doMutate(request); 563 return null; 564 } 565 }; 566 rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 567 this.operationTimeoutMs); 568 }, supplier); 569 } 570 571 @Override 572 public void put(final List<Put> puts) throws IOException { 573 for (Put put : puts) { 574 validatePut(put); 575 } 576 Object[] results = new Object[puts.size()]; 577 try { 578 batch(puts, results, writeRpcTimeoutMs); 579 } catch (InterruptedException e) { 580 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 581 } 582 } 583 584 @Override 585 public Result mutateRow(final RowMutations rm) throws IOException { 586 final Supplier<Span> supplier = 587 new TableOperationSpanBuilder(connection).setTableName(tableName) 588 .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(rm); 589 return TraceUtil.trace(() -> { 590 long nonceGroup = getNonceGroup(); 591 long nonce = getNonce(); 592 CancellableRegionServerCallable<MultiResponse> callable = 593 new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(), 594 rpcControllerFactory.newController(), writeRpcTimeoutMs, 595 new RetryingTimeTracker().start(), rm.getMaxPriority()) { 596 @Override 597 protected MultiResponse rpcCall() throws Exception { 598 MultiRequest request = RequestConverter.buildMultiRequest( 599 getLocation().getRegionInfo().getRegionName(), rm, nonceGroup, nonce); 600 ClientProtos.MultiResponse response = doMulti(request); 601 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); 602 if (res.hasException()) { 603 Throwable ex = ProtobufUtil.toException(res.getException()); 604 if (ex instanceof IOException) { 605 throw (IOException) ex; 606 } 607 throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), 608 ex); 609 } 610 return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); 611 } 612 }; 613 Object[] results = new Object[rm.getMutations().size()]; 614 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName) 615 .setRowAccess(rm.getMutations()).setCallable(callable).setRpcTimeout(writeRpcTimeoutMs) 616 .setOperationTimeout(operationTimeoutMs) 617 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build(); 618 AsyncRequestFuture ars = multiAp.submit(task); 619 ars.waitUntilDone(); 620 if (ars.hasError()) { 621 throw ars.getErrors(); 622 } 623 return (Result) results[0]; 624 }, supplier); 625 } 626 627 private long getNonceGroup() { 628 return ((ClusterConnection) getConnection()).getNonceGenerator().getNonceGroup(); 629 } 630 631 private long getNonce() { 632 return ((ClusterConnection) getConnection()).getNonceGenerator().newNonce(); 633 } 634 635 @Override 636 public Result append(final Append append) throws IOException { 637 final Supplier<Span> supplier = 638 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(append); 639 return TraceUtil.trace(() -> { 640 checkHasFamilies(append); 641 NoncedRegionServerCallable<Result> callable = 642 new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(), 643 this.rpcControllerFactory.newController(), append.getPriority()) { 644 @Override 645 protected Result rpcCall() throws Exception { 646 MutateRequest request = 647 RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), 648 append, super.getNonceGroup(), super.getNonce()); 649 MutateResponse response = doMutate(request); 650 if (!response.hasResult()) { 651 return null; 652 } 653 return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 654 } 655 }; 656 return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 657 this.operationTimeoutMs); 658 }, supplier); 659 } 660 661 @Override 662 public Result increment(final Increment increment) throws IOException { 663 final Supplier<Span> supplier = 664 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(increment); 665 return TraceUtil.trace(() -> { 666 checkHasFamilies(increment); 667 NoncedRegionServerCallable<Result> callable = 668 new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(), 669 this.rpcControllerFactory.newController(), increment.getPriority()) { 670 @Override 671 protected Result rpcCall() throws Exception { 672 MutateRequest request = 673 RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), 674 increment, super.getNonceGroup(), super.getNonce()); 675 MutateResponse response = doMutate(request); 676 // Should this check for null like append does? 677 return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 678 } 679 }; 680 return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable, 681 this.operationTimeoutMs); 682 }, supplier); 683 } 684 685 @Override 686 public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier, 687 final long amount) throws IOException { 688 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 689 } 690 691 @Override 692 public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier, 693 final long amount, final Durability durability) throws IOException { 694 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 695 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.INCREMENT); 696 return TraceUtil.trace(() -> { 697 NullPointerException npe = null; 698 if (row == null) { 699 npe = new NullPointerException("row is null"); 700 } else if (family == null) { 701 npe = new NullPointerException("family is null"); 702 } 703 if (npe != null) { 704 throw new IOException("Invalid arguments to incrementColumnValue", npe); 705 } 706 707 NoncedRegionServerCallable<Long> callable = 708 new NoncedRegionServerCallable<Long>(this.connection, getName(), row, 709 this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 710 @Override 711 protected Long rpcCall() throws Exception { 712 MutateRequest request = RequestConverter.buildIncrementRequest( 713 getLocation().getRegionInfo().getRegionName(), row, family, qualifier, amount, 714 durability, super.getNonceGroup(), super.getNonce()); 715 MutateResponse response = doMutate(request); 716 Result result = 717 ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 718 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); 719 } 720 }; 721 return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 722 this.operationTimeoutMs); 723 }, supplier); 724 } 725 726 @Override 727 @Deprecated 728 public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier, 729 final byte[] value, final Put put) throws IOException { 730 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 731 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 732 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 733 HBaseSemanticAttributes.Operation.PUT); 734 return TraceUtil.trace( 735 () -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put) 736 .isSuccess(), 737 supplier); 738 } 739 740 @Override 741 @Deprecated 742 public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier, 743 final CompareOp compareOp, final byte[] value, final Put put) throws IOException { 744 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 745 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 746 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 747 HBaseSemanticAttributes.Operation.PUT); 748 return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, 749 toCompareOperator(compareOp), value, null, null, put).isSuccess(), supplier); 750 } 751 752 @Override 753 @Deprecated 754 public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier, 755 final CompareOperator op, final byte[] value, final Put put) throws IOException { 756 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 757 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 758 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 759 HBaseSemanticAttributes.Operation.PUT); 760 return TraceUtil.trace( 761 () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(), 762 supplier); 763 } 764 765 @Override 766 @Deprecated 767 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 768 final byte[] value, final Delete delete) throws IOException { 769 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 770 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 771 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 772 HBaseSemanticAttributes.Operation.DELETE); 773 return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, 774 value, null, null, delete).isSuccess(), supplier); 775 } 776 777 @Override 778 @Deprecated 779 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 780 final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException { 781 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 782 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 783 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 784 HBaseSemanticAttributes.Operation.DELETE); 785 return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, 786 toCompareOperator(compareOp), value, null, null, delete).isSuccess(), supplier); 787 } 788 789 @Override 790 @Deprecated 791 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 792 final CompareOperator op, final byte[] value, final Delete delete) throws IOException { 793 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 794 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 795 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 796 HBaseSemanticAttributes.Operation.DELETE); 797 return TraceUtil.trace( 798 () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(), 799 supplier); 800 } 801 802 @Override 803 @Deprecated 804 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 805 return new CheckAndMutateBuilderImpl(row, family); 806 } 807 808 @Override 809 @Deprecated 810 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 811 return new CheckAndMutateWithFilterBuilderImpl(row, filter); 812 } 813 814 private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family, 815 final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, 816 final TimeRange timeRange, final RowMutations rm) throws IOException { 817 long nonceGroup = getNonceGroup(); 818 long nonce = getNonce(); 819 CancellableRegionServerCallable<MultiResponse> callable = 820 new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(), 821 rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(), 822 rm.getMaxPriority()) { 823 @Override 824 protected MultiResponse rpcCall() throws Exception { 825 MultiRequest request = 826 RequestConverter.buildMultiRequest(getLocation().getRegionInfo().getRegionName(), row, 827 family, qualifier, op, value, filter, timeRange, rm, nonceGroup, nonce); 828 ClientProtos.MultiResponse response = doMulti(request); 829 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); 830 if (res.hasException()) { 831 Throwable ex = ProtobufUtil.toException(res.getException()); 832 if (ex instanceof IOException) { 833 throw (IOException) ex; 834 } 835 throw new IOException( 836 "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex); 837 } 838 return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); 839 } 840 }; 841 842 /** 843 * Currently, we use one array to store 'processed' flag which is returned by server. It is 844 * excessive to send such a large array, but that is required by the framework right now 845 */ 846 Object[] results = new Object[rm.getMutations().size()]; 847 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName) 848 .setRowAccess(rm.getMutations()).setResults(results).setCallable(callable) 849 // TODO any better timeout? 850 .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs)) 851 .setOperationTimeout(operationTimeoutMs).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 852 .build(); 853 AsyncRequestFuture ars = multiAp.submit(task); 854 ars.waitUntilDone(); 855 if (ars.hasError()) { 856 throw ars.getErrors(); 857 } 858 859 return (CheckAndMutateResult) results[0]; 860 } 861 862 @Override 863 @Deprecated 864 public boolean checkAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, 865 final CompareOp compareOp, final byte[] value, final RowMutations rm) throws IOException { 866 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 867 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 868 .setContainerOperations(rm); 869 return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, 870 toCompareOperator(compareOp), value, null, null, rm).isSuccess(), supplier); 871 } 872 873 @Override 874 @Deprecated 875 public boolean checkAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, 876 final CompareOperator op, final byte[] value, final RowMutations rm) throws IOException { 877 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 878 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 879 .setContainerOperations(rm); 880 return TraceUtil.trace( 881 () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(), 882 supplier); 883 } 884 885 @Override 886 public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { 887 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 888 .setTableName(tableName).setOperation(checkAndMutate).setContainerOperations(checkAndMutate); 889 return TraceUtil.trace(() -> { 890 Row action = checkAndMutate.getAction(); 891 if ( 892 action instanceof Put || action instanceof Delete || action instanceof Increment 893 || action instanceof Append 894 ) { 895 if (action instanceof Put) { 896 validatePut((Put) action); 897 } 898 return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(), 899 checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 900 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action); 901 } else { 902 return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(), 903 checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 904 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action); 905 } 906 }, supplier); 907 } 908 909 private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family, 910 final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, 911 final TimeRange timeRange, final Mutation mutation) throws IOException { 912 long nonceGroup = getNonceGroup(); 913 long nonce = getNonce(); 914 ClientServiceCallable<CheckAndMutateResult> callable = 915 new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row, 916 this.rpcControllerFactory.newController(), mutation.getPriority()) { 917 @Override 918 protected CheckAndMutateResult rpcCall() throws Exception { 919 MutateRequest request = 920 RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, 921 family, qualifier, op, value, filter, timeRange, mutation, nonceGroup, nonce); 922 MutateResponse response = doMutate(request); 923 if (response.hasResult()) { 924 return new CheckAndMutateResult(response.getProcessed(), 925 ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner())); 926 } 927 return new CheckAndMutateResult(response.getProcessed(), null); 928 } 929 }; 930 return rpcCallerFactory.<CheckAndMutateResult> newCaller(this.writeRpcTimeoutMs) 931 .callWithRetries(callable, this.operationTimeoutMs); 932 } 933 934 @Override 935 public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) 936 throws IOException { 937 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 938 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.BATCH) 939 .setContainerOperations(checkAndMutates); 940 return TraceUtil.trace(() -> { 941 if (checkAndMutates.isEmpty()) { 942 return Collections.emptyList(); 943 } 944 if (checkAndMutates.size() == 1) { 945 return Collections.singletonList(checkAndMutate(checkAndMutates.get(0))); 946 } 947 948 Object[] results = new Object[checkAndMutates.size()]; 949 try { 950 batch(checkAndMutates, results, writeRpcTimeoutMs); 951 } catch (InterruptedException e) { 952 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 953 } 954 955 // translate. 956 List<CheckAndMutateResult> ret = new ArrayList<>(results.length); 957 for (Object r : results) { 958 // Batch ensures if there is a failure we get an exception instead 959 ret.add((CheckAndMutateResult) r); 960 } 961 return ret; 962 }, supplier); 963 } 964 965 private CompareOperator toCompareOperator(CompareOp compareOp) { 966 switch (compareOp) { 967 case LESS: 968 return CompareOperator.LESS; 969 970 case LESS_OR_EQUAL: 971 return CompareOperator.LESS_OR_EQUAL; 972 973 case EQUAL: 974 return CompareOperator.EQUAL; 975 976 case NOT_EQUAL: 977 return CompareOperator.NOT_EQUAL; 978 979 case GREATER_OR_EQUAL: 980 return CompareOperator.GREATER_OR_EQUAL; 981 982 case GREATER: 983 return CompareOperator.GREATER; 984 985 case NO_OP: 986 return CompareOperator.NO_OP; 987 988 default: 989 throw new AssertionError(); 990 } 991 } 992 993 @Override 994 public boolean exists(final Get get) throws IOException { 995 final Supplier<Span> supplier = 996 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(get); 997 return TraceUtil.trace(() -> { 998 Result r = get(get, true); 999 assert r.getExists() != null; 1000 return r.getExists(); 1001 }, supplier); 1002 } 1003 1004 @Override 1005 public boolean[] exists(List<Get> gets) throws IOException { 1006 final Supplier<Span> supplier = 1007 new TableOperationSpanBuilder(connection).setTableName(tableName) 1008 .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(gets); 1009 return TraceUtil.trace(() -> { 1010 if (gets.isEmpty()) { 1011 return new boolean[] {}; 1012 } 1013 if (gets.size() == 1) { 1014 return new boolean[] { exists(gets.get(0)) }; 1015 } 1016 1017 ArrayList<Get> exists = new ArrayList<>(gets.size()); 1018 for (Get g : gets) { 1019 Get ge = new Get(g); 1020 ge.setCheckExistenceOnly(true); 1021 exists.add(ge); 1022 } 1023 1024 Object[] r1 = new Object[exists.size()]; 1025 try { 1026 batch(exists, r1, readRpcTimeoutMs); 1027 } catch (InterruptedException e) { 1028 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1029 } 1030 1031 // translate. 1032 boolean[] results = new boolean[r1.length]; 1033 int i = 0; 1034 for (Object o : r1) { 1035 // batch ensures if there is a failure we get an exception instead 1036 results[i++] = ((Result) o).getExists(); 1037 } 1038 1039 return results; 1040 }, supplier); 1041 } 1042 1043 /** 1044 * Process a mixed batch of Get, Put and Delete actions. All actions for a RegionServer are 1045 * forwarded in one RPC call. Queries are executed in parallel. 1046 * @param list The collection of actions. 1047 * @param results An empty array, same size as list. If an exception is thrown, you can test here 1048 * for partial results, and to determine which actions processed successfully. 1049 * @throws IOException if there are problems talking to META. Per-item exceptions are stored in 1050 * the results array. 1051 */ 1052 public <R> void processBatchCallback(final List<? extends Row> list, final Object[] results, 1053 final Batch.Callback<R> callback) throws IOException, InterruptedException { 1054 this.batchCallback(list, results, callback); 1055 } 1056 1057 @Override 1058 public void close() throws IOException { 1059 final Supplier<Span> supplier = 1060 new TableSpanBuilder(connection).setName("HTable.close").setTableName(tableName); 1061 TraceUtil.trace(() -> { 1062 if (this.closed) { 1063 return; 1064 } 1065 if (cleanupPoolOnClose) { 1066 this.pool.shutdown(); 1067 try { 1068 boolean terminated = false; 1069 do { 1070 // wait until the pool has terminated 1071 terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); 1072 } while (!terminated); 1073 } catch (InterruptedException e) { 1074 this.pool.shutdownNow(); 1075 LOG.warn("waitForTermination interrupted"); 1076 } 1077 } 1078 this.closed = true; 1079 }, supplier); 1080 } 1081 1082 // validate for well-formedness 1083 private void validatePut(final Put put) throws IllegalArgumentException { 1084 ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize()); 1085 } 1086 1087 /** 1088 * The pool is used for mutli requests for this HTable 1089 * @return the pool used for mutli 1090 */ 1091 ExecutorService getPool() { 1092 return this.pool; 1093 } 1094 1095 /** 1096 * Explicitly clears the region cache to fetch the latest value from META. This is a power user 1097 * function: avoid unless you know the ramifications. 1098 */ 1099 public void clearRegionCache() { 1100 this.connection.clearRegionLocationCache(); 1101 } 1102 1103 @Override 1104 public CoprocessorRpcChannel coprocessorService(byte[] row) { 1105 return new RegionCoprocessorRpcChannel(connection, tableName, row); 1106 } 1107 1108 @Override 1109 public <T extends Service, R> Map<byte[], R> coprocessorService(final Class<T> service, 1110 byte[] startKey, byte[] endKey, final Batch.Call<T, R> callable) 1111 throws ServiceException, Throwable { 1112 final Map<byte[], R> results = 1113 Collections.synchronizedMap(new TreeMap<>(Bytes.BYTES_COMPARATOR)); 1114 coprocessorService(service, startKey, endKey, callable, (region, row, value) -> { 1115 if (region != null) { 1116 results.put(region, value); 1117 } 1118 }); 1119 return results; 1120 } 1121 1122 @Override 1123 public <T extends Service, R> void coprocessorService(final Class<T> service, byte[] startKey, 1124 byte[] endKey, final Batch.Call<T, R> callable, final Batch.Callback<R> callback) 1125 throws ServiceException, Throwable { 1126 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1127 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC); 1128 TraceUtil.trace(() -> { 1129 final Context context = Context.current(); 1130 final ExecutorService wrappedPool = context.wrap(pool); 1131 // get regions covered by the row range 1132 List<byte[]> keys = getStartKeysInRange(startKey, endKey); 1133 Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); 1134 for (final byte[] r : keys) { 1135 final RegionCoprocessorRpcChannel channel = 1136 new RegionCoprocessorRpcChannel(connection, tableName, r); 1137 Future<R> future = wrappedPool.submit(() -> { 1138 T instance = 1139 org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel); 1140 R result = callable.call(instance); 1141 byte[] region = channel.getLastRegion(); 1142 if (callback != null) { 1143 callback.update(region, r, result); 1144 } 1145 return result; 1146 }); 1147 futures.put(r, future); 1148 } 1149 for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) { 1150 try { 1151 e.getValue().get(); 1152 } catch (ExecutionException ee) { 1153 LOG.warn("Error calling coprocessor service {} for row {}", service.getName(), 1154 Bytes.toStringBinary(e.getKey()), ee); 1155 throw ee.getCause(); 1156 } catch (InterruptedException ie) { 1157 throw new InterruptedIOException("Interrupted calling coprocessor service " 1158 + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie); 1159 } 1160 } 1161 }, supplier); 1162 } 1163 1164 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) throws IOException { 1165 if (start == null) { 1166 start = HConstants.EMPTY_START_ROW; 1167 } 1168 if (end == null) { 1169 end = HConstants.EMPTY_END_ROW; 1170 } 1171 return getKeysAndRegionsInRange(start, end, true).getFirst(); 1172 } 1173 1174 @Override 1175 public long getRpcTimeout(TimeUnit unit) { 1176 return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS); 1177 } 1178 1179 @Override 1180 @Deprecated 1181 public int getRpcTimeout() { 1182 return rpcTimeoutMs; 1183 } 1184 1185 @Override 1186 @Deprecated 1187 public void setRpcTimeout(int rpcTimeout) { 1188 setReadRpcTimeout(rpcTimeout); 1189 setWriteRpcTimeout(rpcTimeout); 1190 } 1191 1192 @Override 1193 public long getReadRpcTimeout(TimeUnit unit) { 1194 return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS); 1195 } 1196 1197 @Override 1198 @Deprecated 1199 public int getReadRpcTimeout() { 1200 return readRpcTimeoutMs; 1201 } 1202 1203 @Override 1204 @Deprecated 1205 public void setReadRpcTimeout(int readRpcTimeout) { 1206 this.readRpcTimeoutMs = readRpcTimeout; 1207 } 1208 1209 @Override 1210 public long getWriteRpcTimeout(TimeUnit unit) { 1211 return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS); 1212 } 1213 1214 @Override 1215 @Deprecated 1216 public int getWriteRpcTimeout() { 1217 return writeRpcTimeoutMs; 1218 } 1219 1220 @Override 1221 @Deprecated 1222 public void setWriteRpcTimeout(int writeRpcTimeout) { 1223 this.writeRpcTimeoutMs = writeRpcTimeout; 1224 } 1225 1226 @Override 1227 public long getOperationTimeout(TimeUnit unit) { 1228 return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS); 1229 } 1230 1231 @Override 1232 @Deprecated 1233 public int getOperationTimeout() { 1234 return operationTimeoutMs; 1235 } 1236 1237 @Override 1238 @Deprecated 1239 public void setOperationTimeout(int operationTimeout) { 1240 this.operationTimeoutMs = operationTimeout; 1241 } 1242 1243 @Override 1244 public String toString() { 1245 return tableName + ";" + connection; 1246 } 1247 1248 @Override 1249 public <R extends Message> Map<byte[], R> batchCoprocessorService( 1250 Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, 1251 R responsePrototype) throws ServiceException, Throwable { 1252 final Map<byte[], R> results = 1253 Collections.synchronizedMap(new TreeMap<>(Bytes.BYTES_COMPARATOR)); 1254 batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, 1255 (region, row, result) -> { 1256 if (region != null) { 1257 results.put(region, result); 1258 } 1259 }); 1260 return results; 1261 } 1262 1263 @Override 1264 public <R extends Message> void batchCoprocessorService( 1265 final Descriptors.MethodDescriptor methodDescriptor, final Message request, byte[] startKey, 1266 byte[] endKey, final R responsePrototype, final Batch.Callback<R> callback) 1267 throws ServiceException, Throwable { 1268 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1269 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC); 1270 TraceUtil.trace(() -> { 1271 final Context context = Context.current(); 1272 final byte[] sanitizedStartKey = 1273 Optional.ofNullable(startKey).orElse(HConstants.EMPTY_START_ROW); 1274 final byte[] sanitizedEndKey = Optional.ofNullable(endKey).orElse(HConstants.EMPTY_END_ROW); 1275 1276 // get regions covered by the row range 1277 Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions = 1278 getKeysAndRegionsInRange(sanitizedStartKey, sanitizedEndKey, true); 1279 List<byte[]> keys = keysAndRegions.getFirst(); 1280 List<HRegionLocation> regions = keysAndRegions.getSecond(); 1281 1282 // check if we have any calls to make 1283 if (keys.isEmpty()) { 1284 LOG.info("No regions were selected by key range start={}, end={}", 1285 Bytes.toStringBinary(sanitizedStartKey), Bytes.toStringBinary(sanitizedEndKey)); 1286 return; 1287 } 1288 1289 List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size()); 1290 final Map<byte[], RegionCoprocessorServiceExec> execsByRow = 1291 new TreeMap<>(Bytes.BYTES_COMPARATOR); 1292 for (int i = 0; i < keys.size(); i++) { 1293 final byte[] rowKey = keys.get(i); 1294 final byte[] region = regions.get(i).getRegionInfo().getRegionName(); 1295 RegionCoprocessorServiceExec exec = 1296 new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request); 1297 execs.add(exec); 1298 execsByRow.put(rowKey, exec); 1299 } 1300 1301 // tracking for any possible deserialization errors on success callback 1302 // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here 1303 final List<Throwable> callbackErrorExceptions = new ArrayList<>(); 1304 final List<Row> callbackErrorActions = new ArrayList<>(); 1305 final List<String> callbackErrorServers = new ArrayList<>(); 1306 Object[] results = new Object[execs.size()]; 1307 1308 AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, 1309 RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), 1310 RpcControllerFactory.instantiate(configuration)); 1311 1312 Batch.Callback<ClientProtos.CoprocessorServiceResult> resultsCallback = 1313 (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> { 1314 if (LOG.isTraceEnabled()) { 1315 LOG.trace("Received result for endpoint {}: region={}, row={}, value={}", 1316 methodDescriptor.getFullName(), Bytes.toStringBinary(region), 1317 Bytes.toStringBinary(row), serviceResult.getValue().getValue()); 1318 } 1319 try { 1320 Message.Builder builder = responsePrototype.newBuilderForType(); 1321 org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, 1322 serviceResult.getValue().getValue().toByteArray()); 1323 callback.update(region, row, (R) builder.build()); 1324 } catch (IOException e) { 1325 LOG.error("Unexpected response type from endpoint {}", methodDescriptor.getFullName(), 1326 e); 1327 callbackErrorExceptions.add(e); 1328 callbackErrorActions.add(execsByRow.get(row)); 1329 callbackErrorServers.add("null"); 1330 } 1331 }; 1332 AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = 1333 AsyncProcessTask.newBuilder(resultsCallback).setPool(context.wrap(pool)) 1334 .setTableName(tableName).setRowAccess(execs).setResults(results) 1335 .setRpcTimeout(readRpcTimeoutMs).setOperationTimeout(operationTimeoutMs) 1336 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).build(); 1337 AsyncRequestFuture future = asyncProcess.submit(task); 1338 future.waitUntilDone(); 1339 1340 if (future.hasError()) { 1341 throw future.getErrors(); 1342 } else if (!callbackErrorExceptions.isEmpty()) { 1343 throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, 1344 callbackErrorActions, callbackErrorServers); 1345 } 1346 }, supplier); 1347 } 1348 1349 @Override 1350 public RegionLocator getRegionLocator() { 1351 return this.locator; 1352 } 1353 1354 private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 1355 1356 private final byte[] row; 1357 private final byte[] family; 1358 private byte[] qualifier; 1359 private TimeRange timeRange; 1360 private CompareOperator op; 1361 private byte[] value; 1362 1363 CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 1364 this.row = Preconditions.checkNotNull(row, "row is null"); 1365 this.family = Preconditions.checkNotNull(family, "family is null"); 1366 } 1367 1368 @Override 1369 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 1370 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" 1371 + " an empty byte array, or just do not call this method if you want a null qualifier"); 1372 return this; 1373 } 1374 1375 @Override 1376 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 1377 this.timeRange = timeRange; 1378 return this; 1379 } 1380 1381 @Override 1382 public CheckAndMutateBuilder ifNotExists() { 1383 this.op = CompareOperator.EQUAL; 1384 this.value = null; 1385 return this; 1386 } 1387 1388 @Override 1389 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 1390 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 1391 this.value = Preconditions.checkNotNull(value, "value is null"); 1392 return this; 1393 } 1394 1395 private void preCheck() { 1396 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" 1397 + " calling ifNotExists/ifEquals/ifMatches before executing the request"); 1398 } 1399 1400 @Override 1401 public boolean thenPut(Put put) throws IOException { 1402 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1403 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1404 return TraceUtil.trace(() -> { 1405 validatePut(put); 1406 preCheck(); 1407 return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put) 1408 .isSuccess(); 1409 }, supplier); 1410 } 1411 1412 @Override 1413 public boolean thenDelete(Delete delete) throws IOException { 1414 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1415 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1416 return TraceUtil.trace(() -> { 1417 preCheck(); 1418 return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete) 1419 .isSuccess(); 1420 }, supplier); 1421 } 1422 1423 @Override 1424 public boolean thenMutate(RowMutations mutation) throws IOException { 1425 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1426 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1427 return TraceUtil.trace(() -> { 1428 preCheck(); 1429 return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, mutation) 1430 .isSuccess(); 1431 }, supplier); 1432 } 1433 } 1434 1435 private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder { 1436 1437 private final byte[] row; 1438 private final Filter filter; 1439 private TimeRange timeRange; 1440 1441 CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { 1442 this.row = Preconditions.checkNotNull(row, "row is null"); 1443 this.filter = Preconditions.checkNotNull(filter, "filter is null"); 1444 } 1445 1446 @Override 1447 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 1448 this.timeRange = timeRange; 1449 return this; 1450 } 1451 1452 @Override 1453 public boolean thenPut(Put put) throws IOException { 1454 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1455 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1456 return TraceUtil.trace(() -> { 1457 validatePut(put); 1458 return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put).isSuccess(); 1459 }, supplier); 1460 } 1461 1462 @Override 1463 public boolean thenDelete(Delete delete) throws IOException { 1464 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1465 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1466 return TraceUtil.trace( 1467 () -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess(), 1468 supplier); 1469 } 1470 1471 @Override 1472 public boolean thenMutate(RowMutations mutation) throws IOException { 1473 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1474 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1475 return TraceUtil 1476 .trace(() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation) 1477 .isSuccess(), supplier); 1478 } 1479 } 1480}