001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. 022// Internally, we use shaded protobuf. This below are part of our public API. 023//SEE ABOVE NOTE! 024import com.google.protobuf.Descriptors; 025import com.google.protobuf.Message; 026import com.google.protobuf.Service; 027import com.google.protobuf.ServiceException; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CompareOperator; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HRegionLocation; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.coprocessor.Batch; 036import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; 037import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 038import org.apache.hadoop.hbase.filter.Filter; 039import org.apache.hadoop.hbase.io.TimeRange; 040import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 041import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 042import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 043import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 044import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.Pair; 052import org.apache.hadoop.hbase.util.ReflectionUtils; 053import org.apache.hadoop.hbase.util.Threads; 054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.apache.yetus.audience.InterfaceStability; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import java.io.IOException; 062import java.io.InterruptedIOException; 063import java.util.ArrayList; 064import java.util.Collections; 065import java.util.List; 066import java.util.Map; 067import java.util.TreeMap; 068import java.util.concurrent.Callable; 069import java.util.concurrent.ExecutionException; 070import java.util.concurrent.ExecutorService; 071import java.util.concurrent.Future; 072import java.util.concurrent.SynchronousQueue; 073import java.util.concurrent.ThreadPoolExecutor; 074import java.util.concurrent.TimeUnit; 075 076import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 077 078/** 079 * An implementation of {@link Table}. Used to communicate with a single HBase table. 080 * Lightweight. Get as needed and just close when done. 081 * Instances of this class SHOULD NOT be constructed directly. 082 * Obtain an instance via {@link Connection}. See {@link ConnectionFactory} 083 * class comment for an example of how. 084 * 085 * <p>This class is thread safe since 2.0.0 if not invoking any of the setter methods. 086 * All setters are moved into {@link TableBuilder} and reserved here only for keeping 087 * backward compatibility, and TODO will be removed soon. 088 * 089 * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked 090 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in 091 * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop 092 * Interface Classification</a> 093 * There are no guarantees for backwards source / binary compatibility and methods or class can 094 * change or go away without deprecation. 095 * 096 * @see Table 097 * @see Admin 098 * @see Connection 099 * @see ConnectionFactory 100 */ 101@InterfaceAudience.Private 102@InterfaceStability.Stable 103public class HTable implements Table { 104 private static final Logger LOG = LoggerFactory.getLogger(HTable.class); 105 private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG; 106 private final ClusterConnection connection; 107 private final TableName tableName; 108 private final Configuration configuration; 109 private final ConnectionConfiguration connConfiguration; 110 private boolean closed = false; 111 private final int scannerCaching; 112 private final long scannerMaxResultSize; 113 private final ExecutorService pool; // For Multi & Scan 114 private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc 115 private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX 116 private int readRpcTimeoutMs; // timeout for each read rpc request 117 private int writeRpcTimeoutMs; // timeout for each write rpc request 118 private final boolean cleanupPoolOnClose; // shutdown the pool in close() 119 private final HRegionLocator locator; 120 121 /** The Async process for batch */ 122 AsyncProcess multiAp; 123 private final RpcRetryingCallerFactory rpcCallerFactory; 124 private final RpcControllerFactory rpcControllerFactory; 125 126 // Marked Private @since 1.0 127 @InterfaceAudience.Private 128 public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { 129 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); 130 if (maxThreads == 0) { 131 maxThreads = 1; // is there a better default? 132 } 133 int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1); 134 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); 135 136 // Using the "direct handoff" approach, new threads will only be created 137 // if it is necessary and will grow unbounded. This could be bad but in HCM 138 // we only create as many Runnables as there are region servers. It means 139 // it also scales when new region servers are added. 140 ThreadPoolExecutor pool = 141 new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, TimeUnit.SECONDS, 142 new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("htable-pool-%d") 143 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 144 pool.allowCoreThreadTimeOut(true); 145 return pool; 146 } 147 148 /** 149 * Creates an object to access a HBase table. 150 * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to 151 * get a {@link Table} instance (use {@link Table} instead of {@link HTable}). 152 * @param connection Connection to be used. 153 * @param builder The table builder 154 * @param rpcCallerFactory The RPC caller factory 155 * @param rpcControllerFactory The RPC controller factory 156 * @param pool ExecutorService to be used. 157 */ 158 @InterfaceAudience.Private 159 protected HTable(final ConnectionImplementation connection, 160 final TableBuilderBase builder, 161 final RpcRetryingCallerFactory rpcCallerFactory, 162 final RpcControllerFactory rpcControllerFactory, 163 final ExecutorService pool) { 164 this.connection = Preconditions.checkNotNull(connection, "connection is null"); 165 this.configuration = connection.getConfiguration(); 166 this.connConfiguration = connection.getConnectionConfiguration(); 167 if (pool == null) { 168 this.pool = getDefaultExecutor(this.configuration); 169 this.cleanupPoolOnClose = true; 170 } else { 171 this.pool = pool; 172 this.cleanupPoolOnClose = false; 173 } 174 if (rpcCallerFactory == null) { 175 this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); 176 } else { 177 this.rpcCallerFactory = rpcCallerFactory; 178 } 179 180 if (rpcControllerFactory == null) { 181 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); 182 } else { 183 this.rpcControllerFactory = rpcControllerFactory; 184 } 185 186 this.tableName = builder.tableName; 187 this.operationTimeoutMs = builder.operationTimeout; 188 this.rpcTimeoutMs = builder.rpcTimeout; 189 this.readRpcTimeoutMs = builder.readRpcTimeout; 190 this.writeRpcTimeoutMs = builder.writeRpcTimeout; 191 this.scannerCaching = connConfiguration.getScannerCaching(); 192 this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); 193 194 // puts need to track errors globally due to how the APIs currently work. 195 multiAp = this.connection.getAsyncProcess(); 196 this.locator = new HRegionLocator(tableName, connection); 197 } 198 199 /** 200 * @return maxKeyValueSize from configuration. 201 */ 202 public static int getMaxKeyValueSize(Configuration conf) { 203 return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1); 204 } 205 206 @Override 207 public Configuration getConfiguration() { 208 return configuration; 209 } 210 211 @Override 212 public TableName getName() { 213 return tableName; 214 } 215 216 /** 217 * <em>INTERNAL</em> Used by unit tests and tools to do low-level 218 * manipulations. 219 * @return A Connection instance. 220 */ 221 protected Connection getConnection() { 222 return this.connection; 223 } 224 225 @Override 226 @Deprecated 227 public HTableDescriptor getTableDescriptor() throws IOException { 228 HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory, 229 rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs); 230 if (htd != null) { 231 return new ImmutableHTableDescriptor(htd); 232 } 233 return null; 234 } 235 236 @Override 237 public TableDescriptor getDescriptor() throws IOException { 238 return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, 239 rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs); 240 } 241 242 /** 243 * Get the corresponding start keys and regions for an arbitrary range of 244 * keys. 245 * <p> 246 * @param startKey Starting row in range, inclusive 247 * @param endKey Ending row in range 248 * @param includeEndKey true if endRow is inclusive, false if exclusive 249 * @return A pair of list of start keys and list of HRegionLocations that 250 * contain the specified range 251 * @throws IOException if a remote or network exception occurs 252 */ 253 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( 254 final byte[] startKey, final byte[] endKey, final boolean includeEndKey) 255 throws IOException { 256 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false); 257 } 258 259 /** 260 * Get the corresponding start keys and regions for an arbitrary range of 261 * keys. 262 * <p> 263 * @param startKey Starting row in range, inclusive 264 * @param endKey Ending row in range 265 * @param includeEndKey true if endRow is inclusive, false if exclusive 266 * @param reload true to reload information or false to use cached information 267 * @return A pair of list of start keys and list of HRegionLocations that 268 * contain the specified range 269 * @throws IOException if a remote or network exception occurs 270 */ 271 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( 272 final byte[] startKey, final byte[] endKey, final boolean includeEndKey, 273 final boolean reload) throws IOException { 274 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW); 275 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { 276 throw new IllegalArgumentException( 277 "Invalid range: " + Bytes.toStringBinary(startKey) + 278 " > " + Bytes.toStringBinary(endKey)); 279 } 280 List<byte[]> keysInRange = new ArrayList<>(); 281 List<HRegionLocation> regionsInRange = new ArrayList<>(); 282 byte[] currentKey = startKey; 283 do { 284 HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload); 285 keysInRange.add(currentKey); 286 regionsInRange.add(regionLocation); 287 currentKey = regionLocation.getRegionInfo().getEndKey(); 288 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) 289 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 290 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))); 291 return new Pair<>(keysInRange, regionsInRange); 292 } 293 294 /** 295 * The underlying {@link HTable} must not be closed. 296 * {@link Table#getScanner(Scan)} has other usage details. 297 */ 298 @Override 299 public ResultScanner getScanner(Scan scan) throws IOException { 300 if (scan.getCaching() <= 0) { 301 scan.setCaching(scannerCaching); 302 } 303 if (scan.getMaxResultSize() <= 0) { 304 scan.setMaxResultSize(scannerMaxResultSize); 305 } 306 if (scan.getMvccReadPoint() > 0) { 307 // it is not supposed to be set by user, clear 308 scan.resetMvccReadPoint(); 309 } 310 Boolean async = scan.isAsyncPrefetch(); 311 if (async == null) { 312 async = connConfiguration.isClientScannerAsyncPrefetch(); 313 } 314 315 if (scan.isReversed()) { 316 return new ReversedClientScanner(getConfiguration(), scan, getName(), 317 this.connection, this.rpcCallerFactory, this.rpcControllerFactory, 318 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); 319 } else { 320 if (async) { 321 return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, 322 this.rpcCallerFactory, this.rpcControllerFactory, 323 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); 324 } else { 325 return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, 326 this.rpcCallerFactory, this.rpcControllerFactory, 327 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); 328 } 329 } 330 } 331 332 /** 333 * The underlying {@link HTable} must not be closed. 334 * {@link Table#getScanner(byte[])} has other usage details. 335 */ 336 @Override 337 public ResultScanner getScanner(byte [] family) throws IOException { 338 Scan scan = new Scan(); 339 scan.addFamily(family); 340 return getScanner(scan); 341 } 342 343 /** 344 * The underlying {@link HTable} must not be closed. 345 * {@link Table#getScanner(byte[], byte[])} has other usage details. 346 */ 347 @Override 348 public ResultScanner getScanner(byte [] family, byte [] qualifier) 349 throws IOException { 350 Scan scan = new Scan(); 351 scan.addColumn(family, qualifier); 352 return getScanner(scan); 353 } 354 355 @Override 356 public Result get(final Get get) throws IOException { 357 return get(get, get.isCheckExistenceOnly()); 358 } 359 360 private Result get(Get get, final boolean checkExistenceOnly) throws IOException { 361 // if we are changing settings to the get, clone it. 362 if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) { 363 get = ReflectionUtils.newInstance(get.getClass(), get); 364 get.setCheckExistenceOnly(checkExistenceOnly); 365 if (get.getConsistency() == null){ 366 get.setConsistency(DEFAULT_CONSISTENCY); 367 } 368 } 369 370 if (get.getConsistency() == Consistency.STRONG) { 371 final Get configuredGet = get; 372 ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(), 373 get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) { 374 @Override 375 protected Result rpcCall() throws Exception { 376 ClientProtos.GetRequest request = RequestConverter.buildGetRequest( 377 getLocation().getRegionInfo().getRegionName(), configuredGet); 378 ClientProtos.GetResponse response = doGet(request); 379 return response == null? null: 380 ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 381 } 382 }; 383 return rpcCallerFactory.<Result>newCaller(readRpcTimeoutMs).callWithRetries(callable, 384 this.operationTimeoutMs); 385 } 386 387 // Call that takes into account the replica 388 RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( 389 rpcControllerFactory, tableName, this.connection, get, pool, 390 connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs, 391 connConfiguration.getPrimaryCallTimeoutMicroSecond()); 392 return callable.call(operationTimeoutMs); 393 } 394 395 @Override 396 public Result[] get(List<Get> gets) throws IOException { 397 if (gets.size() == 1) { 398 return new Result[]{get(gets.get(0))}; 399 } 400 try { 401 Object[] r1 = new Object[gets.size()]; 402 batch((List<? extends Row>)gets, r1, readRpcTimeoutMs); 403 // Translate. 404 Result [] results = new Result[r1.length]; 405 int i = 0; 406 for (Object obj: r1) { 407 // Batch ensures if there is a failure we get an exception instead 408 results[i++] = (Result)obj; 409 } 410 return results; 411 } catch (InterruptedException e) { 412 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 413 } 414 } 415 416 @Override 417 public void batch(final List<? extends Row> actions, final Object[] results) 418 throws InterruptedException, IOException { 419 int rpcTimeout = writeRpcTimeoutMs; 420 boolean hasRead = false; 421 boolean hasWrite = false; 422 for (Row action : actions) { 423 if (action instanceof Mutation) { 424 hasWrite = true; 425 } else { 426 hasRead = true; 427 } 428 if (hasRead && hasWrite) { 429 break; 430 } 431 } 432 if (hasRead && !hasWrite) { 433 rpcTimeout = readRpcTimeoutMs; 434 } 435 batch(actions, results, rpcTimeout); 436 } 437 438 public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout) 439 throws InterruptedException, IOException { 440 AsyncProcessTask task = AsyncProcessTask.newBuilder() 441 .setPool(pool) 442 .setTableName(tableName) 443 .setRowAccess(actions) 444 .setResults(results) 445 .setRpcTimeout(rpcTimeout) 446 .setOperationTimeout(operationTimeoutMs) 447 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 448 .build(); 449 AsyncRequestFuture ars = multiAp.submit(task); 450 ars.waitUntilDone(); 451 if (ars.hasError()) { 452 throw ars.getErrors(); 453 } 454 } 455 456 @Override 457 public <R> void batchCallback( 458 final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback) 459 throws IOException, InterruptedException { 460 doBatchWithCallback(actions, results, callback, connection, pool, tableName); 461 } 462 463 public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results, 464 Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName) 465 throws InterruptedIOException, RetriesExhaustedWithDetailsException { 466 int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout(); 467 int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 468 connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 469 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 470 AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback) 471 .setPool(pool) 472 .setTableName(tableName) 473 .setRowAccess(actions) 474 .setResults(results) 475 .setOperationTimeout(operationTimeout) 476 .setRpcTimeout(writeTimeout) 477 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 478 .build(); 479 AsyncRequestFuture ars = connection.getAsyncProcess().submit(task); 480 ars.waitUntilDone(); 481 if (ars.hasError()) { 482 throw ars.getErrors(); 483 } 484 } 485 486 @Override 487 public void delete(final Delete delete) throws IOException { 488 ClientServiceCallable<Void> callable = 489 new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(), 490 this.rpcControllerFactory.newController(), delete.getPriority()) { 491 @Override 492 protected Void rpcCall() throws Exception { 493 MutateRequest request = RequestConverter 494 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete); 495 doMutate(request); 496 return null; 497 } 498 }; 499 rpcCallerFactory.<Void>newCaller(this.writeRpcTimeoutMs) 500 .callWithRetries(callable, this.operationTimeoutMs); 501 } 502 503 @Override 504 public void delete(final List<Delete> deletes) 505 throws IOException { 506 Object[] results = new Object[deletes.size()]; 507 try { 508 batch(deletes, results, writeRpcTimeoutMs); 509 } catch (InterruptedException e) { 510 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 511 } finally { 512 // TODO: to be consistent with batch put(), do not modify input list 513 // mutate list so that it is empty for complete success, or contains only failed records 514 // results are returned in the same order as the requests in list walk the list backwards, 515 // so we can remove from list without impacting the indexes of earlier members 516 for (int i = results.length - 1; i>=0; i--) { 517 // if result is not null, it succeeded 518 if (results[i] instanceof Result) { 519 deletes.remove(i); 520 } 521 } 522 } 523 } 524 525 @Override 526 public void put(final Put put) throws IOException { 527 validatePut(put); 528 ClientServiceCallable<Void> callable = 529 new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(), 530 this.rpcControllerFactory.newController(), put.getPriority()) { 531 @Override 532 protected Void rpcCall() throws Exception { 533 MutateRequest request = 534 RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put); 535 doMutate(request); 536 return null; 537 } 538 }; 539 rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 540 this.operationTimeoutMs); 541 } 542 543 @Override 544 public void put(final List<Put> puts) throws IOException { 545 for (Put put : puts) { 546 validatePut(put); 547 } 548 Object[] results = new Object[puts.size()]; 549 try { 550 batch(puts, results, writeRpcTimeoutMs); 551 } catch (InterruptedException e) { 552 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 553 } 554 } 555 556 @Override 557 public Result mutateRow(final RowMutations rm) throws IOException { 558 CancellableRegionServerCallable<MultiResponse> callable = 559 new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(), 560 rpcControllerFactory.newController(), writeRpcTimeoutMs, 561 new RetryingTimeTracker().start(), rm.getMaxPriority()) { 562 @Override 563 protected MultiResponse rpcCall() throws Exception { 564 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( 565 getLocation().getRegionInfo().getRegionName(), rm); 566 regionMutationBuilder.setAtomic(true); 567 MultiRequest request = 568 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); 569 ClientProtos.MultiResponse response = doMulti(request); 570 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); 571 if (res.hasException()) { 572 Throwable ex = ProtobufUtil.toException(res.getException()); 573 if (ex instanceof IOException) { 574 throw (IOException) ex; 575 } 576 throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex); 577 } 578 return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); 579 } 580 }; 581 Object[] results = new Object[rm.getMutations().size()]; 582 AsyncProcessTask task = AsyncProcessTask.newBuilder() 583 .setPool(pool) 584 .setTableName(tableName) 585 .setRowAccess(rm.getMutations()) 586 .setCallable(callable) 587 .setRpcTimeout(writeRpcTimeoutMs) 588 .setOperationTimeout(operationTimeoutMs) 589 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 590 .setResults(results) 591 .build(); 592 AsyncRequestFuture ars = multiAp.submit(task); 593 ars.waitUntilDone(); 594 if (ars.hasError()) { 595 throw ars.getErrors(); 596 } 597 return (Result) results[0]; 598 } 599 600 @Override 601 public Result append(final Append append) throws IOException { 602 checkHasFamilies(append); 603 NoncedRegionServerCallable<Result> callable = 604 new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(), 605 this.rpcControllerFactory.newController(), append.getPriority()) { 606 @Override 607 protected Result rpcCall() throws Exception { 608 MutateRequest request = RequestConverter.buildMutateRequest( 609 getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); 610 MutateResponse response = doMutate(request); 611 if (!response.hasResult()) return null; 612 return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 613 } 614 }; 615 return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs). 616 callWithRetries(callable, this.operationTimeoutMs); 617 } 618 619 @Override 620 public Result increment(final Increment increment) throws IOException { 621 checkHasFamilies(increment); 622 NoncedRegionServerCallable<Result> callable = 623 new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(), 624 this.rpcControllerFactory.newController(), increment.getPriority()) { 625 @Override 626 protected Result rpcCall() throws Exception { 627 MutateRequest request = RequestConverter.buildMutateRequest( 628 getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); 629 MutateResponse response = doMutate(request); 630 // Should this check for null like append does? 631 return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 632 } 633 }; 634 return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable, 635 this.operationTimeoutMs); 636 } 637 638 @Override 639 public long incrementColumnValue(final byte [] row, final byte [] family, 640 final byte [] qualifier, final long amount) 641 throws IOException { 642 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 643 } 644 645 @Override 646 public long incrementColumnValue(final byte [] row, final byte [] family, 647 final byte [] qualifier, final long amount, final Durability durability) 648 throws IOException { 649 NullPointerException npe = null; 650 if (row == null) { 651 npe = new NullPointerException("row is null"); 652 } else if (family == null) { 653 npe = new NullPointerException("family is null"); 654 } 655 if (npe != null) { 656 throw new IOException( 657 "Invalid arguments to incrementColumnValue", npe); 658 } 659 660 NoncedRegionServerCallable<Long> callable = 661 new NoncedRegionServerCallable<Long>(this.connection, getName(), row, 662 this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 663 @Override 664 protected Long rpcCall() throws Exception { 665 MutateRequest request = RequestConverter.buildIncrementRequest( 666 getLocation().getRegionInfo().getRegionName(), row, family, 667 qualifier, amount, durability, getNonceGroup(), getNonce()); 668 MutateResponse response = doMutate(request); 669 Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 670 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); 671 } 672 }; 673 return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs). 674 callWithRetries(callable, this.operationTimeoutMs); 675 } 676 677 @Override 678 @Deprecated 679 public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, 680 final byte [] value, final Put put) throws IOException { 681 return doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, 682 put).isSuccess(); 683 } 684 685 @Override 686 @Deprecated 687 public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, 688 final CompareOp compareOp, final byte [] value, final Put put) throws IOException { 689 return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null, 690 null, put).isSuccess(); 691 } 692 693 @Override 694 @Deprecated 695 public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, 696 final CompareOperator op, final byte [] value, final Put put) throws IOException { 697 // The name of the operators in CompareOperator are intentionally those of the 698 // operators in the filter's CompareOp enum. 699 return doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(); 700 } 701 702 @Override 703 @Deprecated 704 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 705 final byte[] value, final Delete delete) throws IOException { 706 return doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, 707 null, delete).isSuccess(); 708 } 709 710 @Override 711 @Deprecated 712 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 713 final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException { 714 return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null, 715 null, delete).isSuccess(); 716 } 717 718 @Override 719 @Deprecated 720 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 721 final CompareOperator op, final byte[] value, final Delete delete) throws IOException { 722 return doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(); 723 } 724 725 @Override 726 @Deprecated 727 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 728 return new CheckAndMutateBuilderImpl(row, family); 729 } 730 731 @Override 732 @Deprecated 733 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 734 return new CheckAndMutateWithFilterBuilderImpl(row, filter); 735 } 736 737 private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family, 738 final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, 739 final TimeRange timeRange, final RowMutations rm) throws IOException { 740 CancellableRegionServerCallable<MultiResponse> callable = 741 new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(), 742 rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(), 743 rm.getMaxPriority()) { 744 @Override 745 protected MultiResponse rpcCall() throws Exception { 746 MultiRequest request = RequestConverter 747 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, 748 qualifier, op, value, filter, timeRange, rm); 749 ClientProtos.MultiResponse response = doMulti(request); 750 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); 751 if (res.hasException()) { 752 Throwable ex = ProtobufUtil.toException(res.getException()); 753 if (ex instanceof IOException) { 754 throw (IOException) ex; 755 } 756 throw new IOException( 757 "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex); 758 } 759 return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); 760 } 761 }; 762 763 /** 764 * Currently, we use one array to store 'processed' flag which is returned by server. 765 * It is excessive to send such a large array, but that is required by the framework right now 766 * */ 767 Object[] results = new Object[rm.getMutations().size()]; 768 AsyncProcessTask task = AsyncProcessTask.newBuilder() 769 .setPool(pool) 770 .setTableName(tableName) 771 .setRowAccess(rm.getMutations()) 772 .setResults(results) 773 .setCallable(callable) 774 // TODO any better timeout? 775 .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs)) 776 .setOperationTimeout(operationTimeoutMs) 777 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 778 .build(); 779 AsyncRequestFuture ars = multiAp.submit(task); 780 ars.waitUntilDone(); 781 if (ars.hasError()) { 782 throw ars.getErrors(); 783 } 784 785 return (CheckAndMutateResult) results[0]; 786 } 787 788 @Override 789 @Deprecated 790 public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, 791 final CompareOp compareOp, final byte [] value, final RowMutations rm) 792 throws IOException { 793 return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null, 794 null, rm).isSuccess(); 795 } 796 797 @Override 798 @Deprecated 799 public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, 800 final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException { 801 return doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(); 802 } 803 804 @Override 805 public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { 806 Row action = checkAndMutate.getAction(); 807 if (action instanceof Put || action instanceof Delete || action instanceof Increment || 808 action instanceof Append) { 809 if (action instanceof Put) { 810 validatePut((Put) action); 811 } 812 return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(), 813 checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 814 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action); 815 } else { 816 return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(), 817 checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 818 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action); 819 } 820 } 821 822 private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family, 823 final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, 824 final TimeRange timeRange, final Mutation mutation) throws IOException { 825 ClientServiceCallable<CheckAndMutateResult> callable = 826 new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row, 827 this.rpcControllerFactory.newController(), mutation.getPriority()) { 828 @Override 829 protected CheckAndMutateResult rpcCall() throws Exception { 830 MutateRequest request = RequestConverter.buildMutateRequest( 831 getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value, 832 filter, timeRange, mutation); 833 MutateResponse response = doMutate(request); 834 if (response.hasResult()) { 835 return new CheckAndMutateResult(response.getProcessed(), 836 ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner())); 837 } 838 return new CheckAndMutateResult(response.getProcessed(), null); 839 } 840 }; 841 return rpcCallerFactory.<CheckAndMutateResult> newCaller(this.writeRpcTimeoutMs) 842 .callWithRetries(callable, this.operationTimeoutMs); 843 } 844 845 @Override 846 public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) 847 throws IOException { 848 if (checkAndMutates.isEmpty()) { 849 return Collections.emptyList(); 850 } 851 if (checkAndMutates.size() == 1) { 852 return Collections.singletonList(checkAndMutate(checkAndMutates.get(0))); 853 } 854 855 Object[] results = new Object[checkAndMutates.size()]; 856 try { 857 batch(checkAndMutates, results, writeRpcTimeoutMs); 858 } catch (InterruptedException e) { 859 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 860 } 861 862 // translate. 863 List<CheckAndMutateResult> ret = new ArrayList<>(results.length); 864 for (Object r : results) { 865 // Batch ensures if there is a failure we get an exception instead 866 ret.add((CheckAndMutateResult) r); 867 } 868 return ret; 869 } 870 871 private CompareOperator toCompareOperator(CompareOp compareOp) { 872 switch (compareOp) { 873 case LESS: 874 return CompareOperator.LESS; 875 876 case LESS_OR_EQUAL: 877 return CompareOperator.LESS_OR_EQUAL; 878 879 case EQUAL: 880 return CompareOperator.EQUAL; 881 882 case NOT_EQUAL: 883 return CompareOperator.NOT_EQUAL; 884 885 case GREATER_OR_EQUAL: 886 return CompareOperator.GREATER_OR_EQUAL; 887 888 case GREATER: 889 return CompareOperator.GREATER; 890 891 case NO_OP: 892 return CompareOperator.NO_OP; 893 894 default: 895 throw new AssertionError(); 896 } 897 } 898 899 @Override 900 public boolean exists(final Get get) throws IOException { 901 Result r = get(get, true); 902 assert r.getExists() != null; 903 return r.getExists(); 904 } 905 906 @Override 907 public boolean[] exists(List<Get> gets) throws IOException { 908 if (gets.isEmpty()) return new boolean[]{}; 909 if (gets.size() == 1) return new boolean[]{exists(gets.get(0))}; 910 911 ArrayList<Get> exists = new ArrayList<>(gets.size()); 912 for (Get g: gets){ 913 Get ge = new Get(g); 914 ge.setCheckExistenceOnly(true); 915 exists.add(ge); 916 } 917 918 Object[] r1= new Object[exists.size()]; 919 try { 920 batch(exists, r1, readRpcTimeoutMs); 921 } catch (InterruptedException e) { 922 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 923 } 924 925 // translate. 926 boolean[] results = new boolean[r1.length]; 927 int i = 0; 928 for (Object o : r1) { 929 // batch ensures if there is a failure we get an exception instead 930 results[i++] = ((Result)o).getExists(); 931 } 932 933 return results; 934 } 935 936 /** 937 * Process a mixed batch of Get, Put and Delete actions. All actions for a 938 * RegionServer are forwarded in one RPC call. Queries are executed in parallel. 939 * 940 * @param list The collection of actions. 941 * @param results An empty array, same size as list. If an exception is thrown, 942 * you can test here for partial results, and to determine which actions 943 * processed successfully. 944 * @throws IOException if there are problems talking to META. Per-item 945 * exceptions are stored in the results array. 946 */ 947 public <R> void processBatchCallback( 948 final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback) 949 throws IOException, InterruptedException { 950 this.batchCallback(list, results, callback); 951 } 952 953 @Override 954 public void close() throws IOException { 955 if (this.closed) { 956 return; 957 } 958 if (cleanupPoolOnClose) { 959 this.pool.shutdown(); 960 try { 961 boolean terminated = false; 962 do { 963 // wait until the pool has terminated 964 terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); 965 } while (!terminated); 966 } catch (InterruptedException e) { 967 this.pool.shutdownNow(); 968 LOG.warn("waitForTermination interrupted"); 969 } 970 } 971 this.closed = true; 972 } 973 974 // validate for well-formedness 975 private void validatePut(final Put put) throws IllegalArgumentException { 976 ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize()); 977 } 978 979 /** 980 * The pool is used for mutli requests for this HTable 981 * @return the pool used for mutli 982 */ 983 ExecutorService getPool() { 984 return this.pool; 985 } 986 987 /** 988 * Explicitly clears the region cache to fetch the latest value from META. 989 * This is a power user function: avoid unless you know the ramifications. 990 */ 991 public void clearRegionCache() { 992 this.connection.clearRegionLocationCache(); 993 } 994 995 @Override 996 public CoprocessorRpcChannel coprocessorService(byte[] row) { 997 return new RegionCoprocessorRpcChannel(connection, tableName, row); 998 } 999 1000 @Override 1001 public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service, 1002 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable) 1003 throws ServiceException, Throwable { 1004 final Map<byte[],R> results = Collections.synchronizedMap( 1005 new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR)); 1006 coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() { 1007 @Override 1008 public void update(byte[] region, byte[] row, R value) { 1009 if (region != null) { 1010 results.put(region, value); 1011 } 1012 } 1013 }); 1014 return results; 1015 } 1016 1017 @Override 1018 public <T extends Service, R> void coprocessorService(final Class<T> service, 1019 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable, 1020 final Batch.Callback<R> callback) throws ServiceException, Throwable { 1021 // get regions covered by the row range 1022 List<byte[]> keys = getStartKeysInRange(startKey, endKey); 1023 Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); 1024 for (final byte[] r : keys) { 1025 final RegionCoprocessorRpcChannel channel = 1026 new RegionCoprocessorRpcChannel(connection, tableName, r); 1027 Future<R> future = pool.submit(new Callable<R>() { 1028 @Override 1029 public R call() throws Exception { 1030 T instance = 1031 org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel); 1032 R result = callable.call(instance); 1033 byte[] region = channel.getLastRegion(); 1034 if (callback != null) { 1035 callback.update(region, r, result); 1036 } 1037 return result; 1038 } 1039 }); 1040 futures.put(r, future); 1041 } 1042 for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) { 1043 try { 1044 e.getValue().get(); 1045 } catch (ExecutionException ee) { 1046 LOG.warn("Error calling coprocessor service " + service.getName() + " for row " 1047 + Bytes.toStringBinary(e.getKey()), ee); 1048 throw ee.getCause(); 1049 } catch (InterruptedException ie) { 1050 throw new InterruptedIOException("Interrupted calling coprocessor service " 1051 + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie); 1052 } 1053 } 1054 } 1055 1056 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) 1057 throws IOException { 1058 if (start == null) { 1059 start = HConstants.EMPTY_START_ROW; 1060 } 1061 if (end == null) { 1062 end = HConstants.EMPTY_END_ROW; 1063 } 1064 return getKeysAndRegionsInRange(start, end, true).getFirst(); 1065 } 1066 1067 @Override 1068 public long getRpcTimeout(TimeUnit unit) { 1069 return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS); 1070 } 1071 1072 @Override 1073 @Deprecated 1074 public int getRpcTimeout() { 1075 return rpcTimeoutMs; 1076 } 1077 1078 @Override 1079 @Deprecated 1080 public void setRpcTimeout(int rpcTimeout) { 1081 setReadRpcTimeout(rpcTimeout); 1082 setWriteRpcTimeout(rpcTimeout); 1083 } 1084 1085 @Override 1086 public long getReadRpcTimeout(TimeUnit unit) { 1087 return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS); 1088 } 1089 1090 @Override 1091 @Deprecated 1092 public int getReadRpcTimeout() { 1093 return readRpcTimeoutMs; 1094 } 1095 1096 @Override 1097 @Deprecated 1098 public void setReadRpcTimeout(int readRpcTimeout) { 1099 this.readRpcTimeoutMs = readRpcTimeout; 1100 } 1101 1102 @Override 1103 public long getWriteRpcTimeout(TimeUnit unit) { 1104 return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS); 1105 } 1106 1107 @Override 1108 @Deprecated 1109 public int getWriteRpcTimeout() { 1110 return writeRpcTimeoutMs; 1111 } 1112 1113 @Override 1114 @Deprecated 1115 public void setWriteRpcTimeout(int writeRpcTimeout) { 1116 this.writeRpcTimeoutMs = writeRpcTimeout; 1117 } 1118 1119 @Override 1120 public long getOperationTimeout(TimeUnit unit) { 1121 return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS); 1122 } 1123 1124 @Override 1125 @Deprecated 1126 public int getOperationTimeout() { 1127 return operationTimeoutMs; 1128 } 1129 1130 @Override 1131 @Deprecated 1132 public void setOperationTimeout(int operationTimeout) { 1133 this.operationTimeoutMs = operationTimeout; 1134 } 1135 1136 @Override 1137 public String toString() { 1138 return tableName + ";" + connection; 1139 } 1140 1141 @Override 1142 public <R extends Message> Map<byte[], R> batchCoprocessorService( 1143 Descriptors.MethodDescriptor methodDescriptor, Message request, 1144 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { 1145 final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>( 1146 Bytes.BYTES_COMPARATOR)); 1147 batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, 1148 new Callback<R>() { 1149 @Override 1150 public void update(byte[] region, byte[] row, R result) { 1151 if (region != null) { 1152 results.put(region, result); 1153 } 1154 } 1155 }); 1156 return results; 1157 } 1158 1159 @Override 1160 public <R extends Message> void batchCoprocessorService( 1161 final Descriptors.MethodDescriptor methodDescriptor, final Message request, 1162 byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback) 1163 throws ServiceException, Throwable { 1164 1165 if (startKey == null) { 1166 startKey = HConstants.EMPTY_START_ROW; 1167 } 1168 if (endKey == null) { 1169 endKey = HConstants.EMPTY_END_ROW; 1170 } 1171 // get regions covered by the row range 1172 Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions = 1173 getKeysAndRegionsInRange(startKey, endKey, true); 1174 List<byte[]> keys = keysAndRegions.getFirst(); 1175 List<HRegionLocation> regions = keysAndRegions.getSecond(); 1176 1177 // check if we have any calls to make 1178 if (keys.isEmpty()) { 1179 LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) + 1180 ", end=" + Bytes.toStringBinary(endKey)); 1181 return; 1182 } 1183 1184 List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size()); 1185 final Map<byte[], RegionCoprocessorServiceExec> execsByRow = new TreeMap<>(Bytes.BYTES_COMPARATOR); 1186 for (int i = 0; i < keys.size(); i++) { 1187 final byte[] rowKey = keys.get(i); 1188 final byte[] region = regions.get(i).getRegionInfo().getRegionName(); 1189 RegionCoprocessorServiceExec exec = 1190 new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request); 1191 execs.add(exec); 1192 execsByRow.put(rowKey, exec); 1193 } 1194 1195 // tracking for any possible deserialization errors on success callback 1196 // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here 1197 final List<Throwable> callbackErrorExceptions = new ArrayList<>(); 1198 final List<Row> callbackErrorActions = new ArrayList<>(); 1199 final List<String> callbackErrorServers = new ArrayList<>(); 1200 Object[] results = new Object[execs.size()]; 1201 1202 AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, 1203 RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), 1204 RpcControllerFactory.instantiate(configuration)); 1205 1206 Callback<ClientProtos.CoprocessorServiceResult> resultsCallback 1207 = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> { 1208 if (LOG.isTraceEnabled()) { 1209 LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() + 1210 ": region=" + Bytes.toStringBinary(region) + 1211 ", row=" + Bytes.toStringBinary(row) + 1212 ", value=" + serviceResult.getValue().getValue()); 1213 } 1214 try { 1215 Message.Builder builder = responsePrototype.newBuilderForType(); 1216 org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, 1217 serviceResult.getValue().getValue().toByteArray()); 1218 callback.update(region, row, (R) builder.build()); 1219 } catch (IOException e) { 1220 LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(), 1221 e); 1222 callbackErrorExceptions.add(e); 1223 callbackErrorActions.add(execsByRow.get(row)); 1224 callbackErrorServers.add("null"); 1225 } 1226 }; 1227 AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = 1228 AsyncProcessTask.newBuilder(resultsCallback) 1229 .setPool(pool) 1230 .setTableName(tableName) 1231 .setRowAccess(execs) 1232 .setResults(results) 1233 .setRpcTimeout(readRpcTimeoutMs) 1234 .setOperationTimeout(operationTimeoutMs) 1235 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 1236 .build(); 1237 AsyncRequestFuture future = asyncProcess.submit(task); 1238 future.waitUntilDone(); 1239 1240 if (future.hasError()) { 1241 throw future.getErrors(); 1242 } else if (!callbackErrorExceptions.isEmpty()) { 1243 throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions, 1244 callbackErrorServers); 1245 } 1246 } 1247 1248 @Override 1249 public RegionLocator getRegionLocator() { 1250 return this.locator; 1251 } 1252 1253 private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 1254 1255 private final byte[] row; 1256 private final byte[] family; 1257 private byte[] qualifier; 1258 private TimeRange timeRange; 1259 private CompareOperator op; 1260 private byte[] value; 1261 1262 CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 1263 this.row = Preconditions.checkNotNull(row, "row is null"); 1264 this.family = Preconditions.checkNotNull(family, "family is null"); 1265 } 1266 1267 @Override 1268 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 1269 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + 1270 " an empty byte array, or just do not call this method if you want a null qualifier"); 1271 return this; 1272 } 1273 1274 @Override 1275 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 1276 this.timeRange = timeRange; 1277 return this; 1278 } 1279 1280 @Override 1281 public CheckAndMutateBuilder ifNotExists() { 1282 this.op = CompareOperator.EQUAL; 1283 this.value = null; 1284 return this; 1285 } 1286 1287 @Override 1288 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 1289 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 1290 this.value = Preconditions.checkNotNull(value, "value is null"); 1291 return this; 1292 } 1293 1294 private void preCheck() { 1295 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + 1296 " calling ifNotExists/ifEquals/ifMatches before executing the request"); 1297 } 1298 1299 @Override 1300 public boolean thenPut(Put put) throws IOException { 1301 validatePut(put); 1302 preCheck(); 1303 return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put) 1304 .isSuccess(); 1305 } 1306 1307 @Override 1308 public boolean thenDelete(Delete delete) throws IOException { 1309 preCheck(); 1310 return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete) 1311 .isSuccess(); 1312 } 1313 1314 @Override 1315 public boolean thenMutate(RowMutations mutation) throws IOException { 1316 preCheck(); 1317 return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, 1318 mutation).isSuccess(); 1319 } 1320 } 1321 1322 private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder { 1323 1324 private final byte[] row; 1325 private final Filter filter; 1326 private TimeRange timeRange; 1327 1328 CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { 1329 this.row = Preconditions.checkNotNull(row, "row is null"); 1330 this.filter = Preconditions.checkNotNull(filter, "filter is null"); 1331 } 1332 1333 @Override 1334 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 1335 this.timeRange = timeRange; 1336 return this; 1337 } 1338 1339 @Override 1340 public boolean thenPut(Put put) throws IOException { 1341 validatePut(put); 1342 return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put).isSuccess(); 1343 } 1344 1345 @Override 1346 public boolean thenDelete(Delete delete) throws IOException { 1347 return doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess(); 1348 } 1349 1350 @Override 1351 public boolean thenMutate(RowMutations mutation) throws IOException { 1352 return doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation) 1353 .isSuccess(); 1354 } 1355 } 1356}