001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. 022// Internally, we use shaded protobuf. This below are part of our public API. 023//SEE ABOVE NOTE! 024import com.google.protobuf.Descriptors; 025import com.google.protobuf.Message; 026import com.google.protobuf.Service; 027import com.google.protobuf.ServiceException; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.CompareOperator; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HRegionLocation; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.io.TimeRange; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.yetus.audience.InterfaceStability; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.apache.hadoop.hbase.client.coprocessor.Batch; 041import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; 042import org.apache.hadoop.hbase.filter.BinaryComparator; 043import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 044import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 045import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 048import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 049import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 050import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.hbase.util.ReflectionUtils; 060import org.apache.hadoop.hbase.util.Threads; 061 062import java.io.IOException; 063import java.io.InterruptedIOException; 064import java.util.ArrayList; 065import java.util.Collections; 066import java.util.List; 067import java.util.Map; 068import java.util.TreeMap; 069import java.util.concurrent.Callable; 070import java.util.concurrent.ExecutionException; 071import java.util.concurrent.ExecutorService; 072import java.util.concurrent.Future; 073import java.util.concurrent.SynchronousQueue; 074import java.util.concurrent.ThreadPoolExecutor; 075import java.util.concurrent.TimeUnit; 076 077import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 078 079/** 080 * An implementation of {@link Table}. Used to communicate with a single HBase table. 081 * Lightweight. Get as needed and just close when done. 082 * Instances of this class SHOULD NOT be constructed directly. 083 * Obtain an instance via {@link Connection}. See {@link ConnectionFactory} 084 * class comment for an example of how. 085 * 086 * <p>This class is thread safe since 2.0.0 if not invoking any of the setter methods. 087 * All setters are moved into {@link TableBuilder} and reserved here only for keeping 088 * backward compatibility, and TODO will be removed soon. 089 * 090 * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked 091 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in 092 * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop 093 * Interface Classification</a> 094 * There are no guarantees for backwards source / binary compatibility and methods or class can 095 * change or go away without deprecation. 096 * 097 * @see Table 098 * @see Admin 099 * @see Connection 100 * @see ConnectionFactory 101 */ 102@InterfaceAudience.Private 103@InterfaceStability.Stable 104public class HTable implements Table { 105 private static final Logger LOG = LoggerFactory.getLogger(HTable.class); 106 private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG; 107 private final ClusterConnection connection; 108 private final TableName tableName; 109 private final Configuration configuration; 110 private final ConnectionConfiguration connConfiguration; 111 private boolean closed = false; 112 private final int scannerCaching; 113 private final long scannerMaxResultSize; 114 private final ExecutorService pool; // For Multi & Scan 115 private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc 116 private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX 117 private int readRpcTimeoutMs; // timeout for each read rpc request 118 private int writeRpcTimeoutMs; // timeout for each write rpc request 119 private final boolean cleanupPoolOnClose; // shutdown the pool in close() 120 private final HRegionLocator locator; 121 122 /** The Async process for batch */ 123 @VisibleForTesting 124 AsyncProcess multiAp; 125 private final RpcRetryingCallerFactory rpcCallerFactory; 126 private final RpcControllerFactory rpcControllerFactory; 127 128 // Marked Private @since 1.0 129 @InterfaceAudience.Private 130 public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { 131 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); 132 if (maxThreads == 0) { 133 maxThreads = 1; // is there a better default? 134 } 135 int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1); 136 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); 137 138 // Using the "direct handoff" approach, new threads will only be created 139 // if it is necessary and will grow unbounded. This could be bad but in HCM 140 // we only create as many Runnables as there are region servers. It means 141 // it also scales when new region servers are added. 142 ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, 143 TimeUnit.SECONDS, new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable")); 144 pool.allowCoreThreadTimeOut(true); 145 return pool; 146 } 147 148 /** 149 * Creates an object to access a HBase table. 150 * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to 151 * get a {@link Table} instance (use {@link Table} instead of {@link HTable}). 152 * @param connection Connection to be used. 153 * @param builder The table builder 154 * @param rpcCallerFactory The RPC caller factory 155 * @param rpcControllerFactory The RPC controller factory 156 * @param pool ExecutorService to be used. 157 */ 158 @InterfaceAudience.Private 159 protected HTable(final ConnectionImplementation connection, 160 final TableBuilderBase builder, 161 final RpcRetryingCallerFactory rpcCallerFactory, 162 final RpcControllerFactory rpcControllerFactory, 163 final ExecutorService pool) { 164 this.connection = Preconditions.checkNotNull(connection, "connection is null"); 165 this.configuration = connection.getConfiguration(); 166 this.connConfiguration = connection.getConnectionConfiguration(); 167 if (pool == null) { 168 this.pool = getDefaultExecutor(this.configuration); 169 this.cleanupPoolOnClose = true; 170 } else { 171 this.pool = pool; 172 this.cleanupPoolOnClose = false; 173 } 174 if (rpcCallerFactory == null) { 175 this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); 176 } else { 177 this.rpcCallerFactory = rpcCallerFactory; 178 } 179 180 if (rpcControllerFactory == null) { 181 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); 182 } else { 183 this.rpcControllerFactory = rpcControllerFactory; 184 } 185 186 this.tableName = builder.tableName; 187 this.operationTimeoutMs = builder.operationTimeout; 188 this.rpcTimeoutMs = builder.rpcTimeout; 189 this.readRpcTimeoutMs = builder.readRpcTimeout; 190 this.writeRpcTimeoutMs = builder.writeRpcTimeout; 191 this.scannerCaching = connConfiguration.getScannerCaching(); 192 this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); 193 194 // puts need to track errors globally due to how the APIs currently work. 195 multiAp = this.connection.getAsyncProcess(); 196 this.locator = new HRegionLocator(tableName, connection); 197 } 198 199 /** 200 * @return maxKeyValueSize from configuration. 201 */ 202 public static int getMaxKeyValueSize(Configuration conf) { 203 return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1); 204 } 205 206 @Override 207 public Configuration getConfiguration() { 208 return configuration; 209 } 210 211 @Override 212 public TableName getName() { 213 return tableName; 214 } 215 216 /** 217 * <em>INTERNAL</em> Used by unit tests and tools to do low-level 218 * manipulations. 219 * @return A Connection instance. 220 */ 221 @VisibleForTesting 222 protected Connection getConnection() { 223 return this.connection; 224 } 225 226 @Override 227 @Deprecated 228 public HTableDescriptor getTableDescriptor() throws IOException { 229 HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory, 230 rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs); 231 if (htd != null) { 232 return new ImmutableHTableDescriptor(htd); 233 } 234 return null; 235 } 236 237 @Override 238 public TableDescriptor getDescriptor() throws IOException { 239 return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, 240 rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs); 241 } 242 243 /** 244 * Get the corresponding start keys and regions for an arbitrary range of 245 * keys. 246 * <p> 247 * @param startKey Starting row in range, inclusive 248 * @param endKey Ending row in range 249 * @param includeEndKey true if endRow is inclusive, false if exclusive 250 * @return A pair of list of start keys and list of HRegionLocations that 251 * contain the specified range 252 * @throws IOException if a remote or network exception occurs 253 */ 254 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( 255 final byte[] startKey, final byte[] endKey, final boolean includeEndKey) 256 throws IOException { 257 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false); 258 } 259 260 /** 261 * Get the corresponding start keys and regions for an arbitrary range of 262 * keys. 263 * <p> 264 * @param startKey Starting row in range, inclusive 265 * @param endKey Ending row in range 266 * @param includeEndKey true if endRow is inclusive, false if exclusive 267 * @param reload true to reload information or false to use cached information 268 * @return A pair of list of start keys and list of HRegionLocations that 269 * contain the specified range 270 * @throws IOException if a remote or network exception occurs 271 */ 272 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( 273 final byte[] startKey, final byte[] endKey, final boolean includeEndKey, 274 final boolean reload) throws IOException { 275 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW); 276 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { 277 throw new IllegalArgumentException( 278 "Invalid range: " + Bytes.toStringBinary(startKey) + 279 " > " + Bytes.toStringBinary(endKey)); 280 } 281 List<byte[]> keysInRange = new ArrayList<>(); 282 List<HRegionLocation> regionsInRange = new ArrayList<>(); 283 byte[] currentKey = startKey; 284 do { 285 HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload); 286 keysInRange.add(currentKey); 287 regionsInRange.add(regionLocation); 288 currentKey = regionLocation.getRegionInfo().getEndKey(); 289 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) 290 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 291 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))); 292 return new Pair<>(keysInRange, regionsInRange); 293 } 294 295 /** 296 * The underlying {@link HTable} must not be closed. 297 * {@link Table#getScanner(Scan)} has other usage details. 298 */ 299 @Override 300 public ResultScanner getScanner(Scan scan) throws IOException { 301 if (scan.getCaching() <= 0) { 302 scan.setCaching(scannerCaching); 303 } 304 if (scan.getMaxResultSize() <= 0) { 305 scan.setMaxResultSize(scannerMaxResultSize); 306 } 307 if (scan.getMvccReadPoint() > 0) { 308 // it is not supposed to be set by user, clear 309 scan.resetMvccReadPoint(); 310 } 311 Boolean async = scan.isAsyncPrefetch(); 312 if (async == null) { 313 async = connConfiguration.isClientScannerAsyncPrefetch(); 314 } 315 316 if (scan.isReversed()) { 317 return new ReversedClientScanner(getConfiguration(), scan, getName(), 318 this.connection, this.rpcCallerFactory, this.rpcControllerFactory, 319 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); 320 } else { 321 if (async) { 322 return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, 323 this.rpcCallerFactory, this.rpcControllerFactory, 324 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); 325 } else { 326 return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, 327 this.rpcCallerFactory, this.rpcControllerFactory, 328 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); 329 } 330 } 331 } 332 333 /** 334 * The underlying {@link HTable} must not be closed. 335 * {@link Table#getScanner(byte[])} has other usage details. 336 */ 337 @Override 338 public ResultScanner getScanner(byte [] family) throws IOException { 339 Scan scan = new Scan(); 340 scan.addFamily(family); 341 return getScanner(scan); 342 } 343 344 /** 345 * The underlying {@link HTable} must not be closed. 346 * {@link Table#getScanner(byte[], byte[])} has other usage details. 347 */ 348 @Override 349 public ResultScanner getScanner(byte [] family, byte [] qualifier) 350 throws IOException { 351 Scan scan = new Scan(); 352 scan.addColumn(family, qualifier); 353 return getScanner(scan); 354 } 355 356 @Override 357 public Result get(final Get get) throws IOException { 358 return get(get, get.isCheckExistenceOnly()); 359 } 360 361 private Result get(Get get, final boolean checkExistenceOnly) throws IOException { 362 // if we are changing settings to the get, clone it. 363 if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) { 364 get = ReflectionUtils.newInstance(get.getClass(), get); 365 get.setCheckExistenceOnly(checkExistenceOnly); 366 if (get.getConsistency() == null){ 367 get.setConsistency(DEFAULT_CONSISTENCY); 368 } 369 } 370 371 if (get.getConsistency() == Consistency.STRONG) { 372 final Get configuredGet = get; 373 ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(), 374 get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) { 375 @Override 376 protected Result rpcCall() throws Exception { 377 ClientProtos.GetRequest request = RequestConverter.buildGetRequest( 378 getLocation().getRegionInfo().getRegionName(), configuredGet); 379 ClientProtos.GetResponse response = doGet(request); 380 return response == null? null: 381 ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 382 } 383 }; 384 return rpcCallerFactory.<Result>newCaller(readRpcTimeoutMs).callWithRetries(callable, 385 this.operationTimeoutMs); 386 } 387 388 // Call that takes into account the replica 389 RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( 390 rpcControllerFactory, tableName, this.connection, get, pool, 391 connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs, 392 connConfiguration.getPrimaryCallTimeoutMicroSecond()); 393 return callable.call(operationTimeoutMs); 394 } 395 396 @Override 397 public Result[] get(List<Get> gets) throws IOException { 398 if (gets.size() == 1) { 399 return new Result[]{get(gets.get(0))}; 400 } 401 try { 402 Object[] r1 = new Object[gets.size()]; 403 batch((List<? extends Row>)gets, r1, readRpcTimeoutMs); 404 // Translate. 405 Result [] results = new Result[r1.length]; 406 int i = 0; 407 for (Object obj: r1) { 408 // Batch ensures if there is a failure we get an exception instead 409 results[i++] = (Result)obj; 410 } 411 return results; 412 } catch (InterruptedException e) { 413 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 414 } 415 } 416 417 @Override 418 public void batch(final List<? extends Row> actions, final Object[] results) 419 throws InterruptedException, IOException { 420 int rpcTimeout = writeRpcTimeoutMs; 421 boolean hasRead = false; 422 boolean hasWrite = false; 423 for (Row action : actions) { 424 if (action instanceof Mutation) { 425 hasWrite = true; 426 } else { 427 hasRead = true; 428 } 429 if (hasRead && hasWrite) { 430 break; 431 } 432 } 433 if (hasRead && !hasWrite) { 434 rpcTimeout = readRpcTimeoutMs; 435 } 436 batch(actions, results, rpcTimeout); 437 } 438 439 public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout) 440 throws InterruptedException, IOException { 441 AsyncProcessTask task = AsyncProcessTask.newBuilder() 442 .setPool(pool) 443 .setTableName(tableName) 444 .setRowAccess(actions) 445 .setResults(results) 446 .setRpcTimeout(rpcTimeout) 447 .setOperationTimeout(operationTimeoutMs) 448 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 449 .build(); 450 AsyncRequestFuture ars = multiAp.submit(task); 451 ars.waitUntilDone(); 452 if (ars.hasError()) { 453 throw ars.getErrors(); 454 } 455 } 456 457 @Override 458 public <R> void batchCallback( 459 final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback) 460 throws IOException, InterruptedException { 461 doBatchWithCallback(actions, results, callback, connection, pool, tableName); 462 } 463 464 public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results, 465 Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName) 466 throws InterruptedIOException, RetriesExhaustedWithDetailsException { 467 int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout(); 468 int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 469 connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 470 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 471 AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback) 472 .setPool(pool) 473 .setTableName(tableName) 474 .setRowAccess(actions) 475 .setResults(results) 476 .setOperationTimeout(operationTimeout) 477 .setRpcTimeout(writeTimeout) 478 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 479 .build(); 480 AsyncRequestFuture ars = connection.getAsyncProcess().submit(task); 481 ars.waitUntilDone(); 482 if (ars.hasError()) { 483 throw ars.getErrors(); 484 } 485 } 486 487 @Override 488 public void delete(final Delete delete) throws IOException { 489 ClientServiceCallable<Void> callable = 490 new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(), 491 this.rpcControllerFactory.newController(), delete.getPriority()) { 492 @Override 493 protected Void rpcCall() throws Exception { 494 MutateRequest request = RequestConverter 495 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete); 496 doMutate(request); 497 return null; 498 } 499 }; 500 rpcCallerFactory.<Void>newCaller(this.writeRpcTimeoutMs) 501 .callWithRetries(callable, this.operationTimeoutMs); 502 } 503 504 @Override 505 public void delete(final List<Delete> deletes) 506 throws IOException { 507 Object[] results = new Object[deletes.size()]; 508 try { 509 batch(deletes, results, writeRpcTimeoutMs); 510 } catch (InterruptedException e) { 511 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 512 } finally { 513 // TODO: to be consistent with batch put(), do not modify input list 514 // mutate list so that it is empty for complete success, or contains only failed records 515 // results are returned in the same order as the requests in list walk the list backwards, 516 // so we can remove from list without impacting the indexes of earlier members 517 for (int i = results.length - 1; i>=0; i--) { 518 // if result is not null, it succeeded 519 if (results[i] instanceof Result) { 520 deletes.remove(i); 521 } 522 } 523 } 524 } 525 526 @Override 527 public void put(final Put put) throws IOException { 528 validatePut(put); 529 ClientServiceCallable<Void> callable = 530 new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(), 531 this.rpcControllerFactory.newController(), put.getPriority()) { 532 @Override 533 protected Void rpcCall() throws Exception { 534 MutateRequest request = 535 RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put); 536 doMutate(request); 537 return null; 538 } 539 }; 540 rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 541 this.operationTimeoutMs); 542 } 543 544 @Override 545 public void put(final List<Put> puts) throws IOException { 546 for (Put put : puts) { 547 validatePut(put); 548 } 549 Object[] results = new Object[puts.size()]; 550 try { 551 batch(puts, results, writeRpcTimeoutMs); 552 } catch (InterruptedException e) { 553 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 554 } 555 } 556 557 @Override 558 public void mutateRow(final RowMutations rm) throws IOException { 559 CancellableRegionServerCallable<MultiResponse> callable = 560 new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(), 561 rpcControllerFactory.newController(), writeRpcTimeoutMs, 562 new RetryingTimeTracker().start(), rm.getMaxPriority()) { 563 @Override 564 protected MultiResponse rpcCall() throws Exception { 565 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( 566 getLocation().getRegionInfo().getRegionName(), rm); 567 regionMutationBuilder.setAtomic(true); 568 MultiRequest request = 569 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); 570 ClientProtos.MultiResponse response = doMulti(request); 571 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); 572 if (res.hasException()) { 573 Throwable ex = ProtobufUtil.toException(res.getException()); 574 if (ex instanceof IOException) { 575 throw (IOException) ex; 576 } 577 throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex); 578 } 579 return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); 580 } 581 }; 582 AsyncProcessTask task = AsyncProcessTask.newBuilder() 583 .setPool(pool) 584 .setTableName(tableName) 585 .setRowAccess(rm.getMutations()) 586 .setCallable(callable) 587 .setRpcTimeout(writeRpcTimeoutMs) 588 .setOperationTimeout(operationTimeoutMs) 589 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 590 .build(); 591 AsyncRequestFuture ars = multiAp.submit(task); 592 ars.waitUntilDone(); 593 if (ars.hasError()) { 594 throw ars.getErrors(); 595 } 596 } 597 598 @Override 599 public Result append(final Append append) throws IOException { 600 checkHasFamilies(append); 601 NoncedRegionServerCallable<Result> callable = 602 new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(), 603 this.rpcControllerFactory.newController(), append.getPriority()) { 604 @Override 605 protected Result rpcCall() throws Exception { 606 MutateRequest request = RequestConverter.buildMutateRequest( 607 getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); 608 MutateResponse response = doMutate(request); 609 if (!response.hasResult()) return null; 610 return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 611 } 612 }; 613 return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs). 614 callWithRetries(callable, this.operationTimeoutMs); 615 } 616 617 @Override 618 public Result increment(final Increment increment) throws IOException { 619 checkHasFamilies(increment); 620 NoncedRegionServerCallable<Result> callable = 621 new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(), 622 this.rpcControllerFactory.newController(), increment.getPriority()) { 623 @Override 624 protected Result rpcCall() throws Exception { 625 MutateRequest request = RequestConverter.buildMutateRequest( 626 getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); 627 MutateResponse response = doMutate(request); 628 // Should this check for null like append does? 629 return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 630 } 631 }; 632 return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable, 633 this.operationTimeoutMs); 634 } 635 636 @Override 637 public long incrementColumnValue(final byte [] row, final byte [] family, 638 final byte [] qualifier, final long amount) 639 throws IOException { 640 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 641 } 642 643 @Override 644 public long incrementColumnValue(final byte [] row, final byte [] family, 645 final byte [] qualifier, final long amount, final Durability durability) 646 throws IOException { 647 NullPointerException npe = null; 648 if (row == null) { 649 npe = new NullPointerException("row is null"); 650 } else if (family == null) { 651 npe = new NullPointerException("family is null"); 652 } 653 if (npe != null) { 654 throw new IOException( 655 "Invalid arguments to incrementColumnValue", npe); 656 } 657 658 NoncedRegionServerCallable<Long> callable = 659 new NoncedRegionServerCallable<Long>(this.connection, getName(), row, 660 this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 661 @Override 662 protected Long rpcCall() throws Exception { 663 MutateRequest request = RequestConverter.buildIncrementRequest( 664 getLocation().getRegionInfo().getRegionName(), row, family, 665 qualifier, amount, durability, getNonceGroup(), getNonce()); 666 MutateResponse response = doMutate(request); 667 Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 668 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); 669 } 670 }; 671 return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs). 672 callWithRetries(callable, this.operationTimeoutMs); 673 } 674 675 @Override 676 @Deprecated 677 public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, 678 final byte [] value, final Put put) throws IOException { 679 return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put); 680 } 681 682 @Override 683 @Deprecated 684 public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, 685 final CompareOp compareOp, final byte [] value, final Put put) throws IOException { 686 return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put); 687 } 688 689 @Override 690 @Deprecated 691 public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, 692 final CompareOperator op, final byte [] value, final Put put) throws IOException { 693 // The name of the operators in CompareOperator are intentionally those of the 694 // operators in the filter's CompareOp enum. 695 return doCheckAndPut(row, family, qualifier, op.name(), value, null, put); 696 } 697 698 private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier, 699 final String opName, final byte[] value, final TimeRange timeRange, final Put put) 700 throws IOException { 701 ClientServiceCallable<Boolean> callable = 702 new ClientServiceCallable<Boolean>(this.connection, getName(), row, 703 this.rpcControllerFactory.newController(), put.getPriority()) { 704 @Override 705 protected Boolean rpcCall() throws Exception { 706 CompareType compareType = CompareType.valueOf(opName); 707 MutateRequest request = RequestConverter.buildMutateRequest( 708 getLocation().getRegionInfo().getRegionName(), row, family, qualifier, 709 new BinaryComparator(value), compareType, timeRange, put); 710 MutateResponse response = doMutate(request); 711 return Boolean.valueOf(response.getProcessed()); 712 } 713 }; 714 return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeoutMs) 715 .callWithRetries(callable, this.operationTimeoutMs); 716 } 717 718 @Override 719 @Deprecated 720 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 721 final byte[] value, final Delete delete) throws IOException { 722 return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, 723 delete); 724 } 725 726 @Override 727 @Deprecated 728 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 729 final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException { 730 return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete); 731 } 732 733 @Override 734 @Deprecated 735 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 736 final CompareOperator op, final byte[] value, final Delete delete) throws IOException { 737 return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete); 738 } 739 740 private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 741 final String opName, final byte[] value, final TimeRange timeRange, final Delete delete) 742 throws IOException { 743 CancellableRegionServerCallable<SingleResponse> callable = 744 new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row, 745 this.rpcControllerFactory.newController(), writeRpcTimeoutMs, 746 new RetryingTimeTracker().start(), delete.getPriority()) { 747 @Override 748 protected SingleResponse rpcCall() throws Exception { 749 CompareType compareType = CompareType.valueOf(opName); 750 MutateRequest request = RequestConverter 751 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, 752 qualifier, new BinaryComparator(value), compareType, timeRange, delete); 753 MutateResponse response = doMutate(request); 754 return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); 755 } 756 }; 757 List<Delete> rows = Collections.singletonList(delete); 758 Object[] results = new Object[1]; 759 AsyncProcessTask task = 760 AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows) 761 .setCallable(callable) 762 // TODO any better timeout? 763 .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs)) 764 .setOperationTimeout(operationTimeoutMs) 765 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build(); 766 AsyncRequestFuture ars = multiAp.submit(task); 767 ars.waitUntilDone(); 768 if (ars.hasError()) { 769 throw ars.getErrors(); 770 } 771 return ((SingleResponse.Entry) results[0]).isProcessed(); 772 } 773 774 @Override 775 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 776 return new CheckAndMutateBuilderImpl(row, family); 777 } 778 779 private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, 780 final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm) 781 throws IOException { 782 CancellableRegionServerCallable<MultiResponse> callable = 783 new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(), 784 rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(), 785 rm.getMaxPriority()) { 786 @Override 787 protected MultiResponse rpcCall() throws Exception { 788 CompareType compareType = CompareType.valueOf(opName); 789 MultiRequest request = RequestConverter 790 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier, 791 new BinaryComparator(value), compareType, timeRange, rm); 792 ClientProtos.MultiResponse response = doMulti(request); 793 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); 794 if (res.hasException()) { 795 Throwable ex = ProtobufUtil.toException(res.getException()); 796 if (ex instanceof IOException) { 797 throw (IOException) ex; 798 } 799 throw new IOException( 800 "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex); 801 } 802 return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); 803 } 804 }; 805 806 /** 807 * Currently, we use one array to store 'processed' flag which is returned by server. 808 * It is excessive to send such a large array, but that is required by the framework right now 809 * */ 810 Object[] results = new Object[rm.getMutations().size()]; 811 AsyncProcessTask task = AsyncProcessTask.newBuilder() 812 .setPool(pool) 813 .setTableName(tableName) 814 .setRowAccess(rm.getMutations()) 815 .setResults(results) 816 .setCallable(callable) 817 // TODO any better timeout? 818 .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs)) 819 .setOperationTimeout(operationTimeoutMs) 820 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 821 .build(); 822 AsyncRequestFuture ars = multiAp.submit(task); 823 ars.waitUntilDone(); 824 if (ars.hasError()) { 825 throw ars.getErrors(); 826 } 827 828 return ((Result)results[0]).getExists(); 829 } 830 831 @Override 832 @Deprecated 833 public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, 834 final CompareOp compareOp, final byte [] value, final RowMutations rm) 835 throws IOException { 836 return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm); 837 } 838 839 @Override 840 @Deprecated 841 public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, 842 final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException { 843 return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm); 844 } 845 846 @Override 847 public boolean exists(final Get get) throws IOException { 848 Result r = get(get, true); 849 assert r.getExists() != null; 850 return r.getExists(); 851 } 852 853 @Override 854 public boolean[] exists(List<Get> gets) throws IOException { 855 if (gets.isEmpty()) return new boolean[]{}; 856 if (gets.size() == 1) return new boolean[]{exists(gets.get(0))}; 857 858 ArrayList<Get> exists = new ArrayList<>(gets.size()); 859 for (Get g: gets){ 860 Get ge = new Get(g); 861 ge.setCheckExistenceOnly(true); 862 exists.add(ge); 863 } 864 865 Object[] r1= new Object[exists.size()]; 866 try { 867 batch(exists, r1, readRpcTimeoutMs); 868 } catch (InterruptedException e) { 869 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 870 } 871 872 // translate. 873 boolean[] results = new boolean[r1.length]; 874 int i = 0; 875 for (Object o : r1) { 876 // batch ensures if there is a failure we get an exception instead 877 results[i++] = ((Result)o).getExists(); 878 } 879 880 return results; 881 } 882 883 /** 884 * Process a mixed batch of Get, Put and Delete actions. All actions for a 885 * RegionServer are forwarded in one RPC call. Queries are executed in parallel. 886 * 887 * @param list The collection of actions. 888 * @param results An empty array, same size as list. If an exception is thrown, 889 * you can test here for partial results, and to determine which actions 890 * processed successfully. 891 * @throws IOException if there are problems talking to META. Per-item 892 * exceptions are stored in the results array. 893 */ 894 public <R> void processBatchCallback( 895 final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback) 896 throws IOException, InterruptedException { 897 this.batchCallback(list, results, callback); 898 } 899 900 @Override 901 public void close() throws IOException { 902 if (this.closed) { 903 return; 904 } 905 if (cleanupPoolOnClose) { 906 this.pool.shutdown(); 907 try { 908 boolean terminated = false; 909 do { 910 // wait until the pool has terminated 911 terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); 912 } while (!terminated); 913 } catch (InterruptedException e) { 914 this.pool.shutdownNow(); 915 LOG.warn("waitForTermination interrupted"); 916 } 917 } 918 this.closed = true; 919 } 920 921 // validate for well-formedness 922 private void validatePut(final Put put) throws IllegalArgumentException { 923 ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize()); 924 } 925 926 /** 927 * The pool is used for mutli requests for this HTable 928 * @return the pool used for mutli 929 */ 930 ExecutorService getPool() { 931 return this.pool; 932 } 933 934 /** 935 * Explicitly clears the region cache to fetch the latest value from META. 936 * This is a power user function: avoid unless you know the ramifications. 937 */ 938 public void clearRegionCache() { 939 this.connection.clearRegionLocationCache(); 940 } 941 942 @Override 943 public CoprocessorRpcChannel coprocessorService(byte[] row) { 944 return new RegionCoprocessorRpcChannel(connection, tableName, row); 945 } 946 947 @Override 948 public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service, 949 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable) 950 throws ServiceException, Throwable { 951 final Map<byte[],R> results = Collections.synchronizedMap( 952 new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR)); 953 coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() { 954 @Override 955 public void update(byte[] region, byte[] row, R value) { 956 if (region != null) { 957 results.put(region, value); 958 } 959 } 960 }); 961 return results; 962 } 963 964 @Override 965 public <T extends Service, R> void coprocessorService(final Class<T> service, 966 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable, 967 final Batch.Callback<R> callback) throws ServiceException, Throwable { 968 // get regions covered by the row range 969 List<byte[]> keys = getStartKeysInRange(startKey, endKey); 970 Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); 971 for (final byte[] r : keys) { 972 final RegionCoprocessorRpcChannel channel = 973 new RegionCoprocessorRpcChannel(connection, tableName, r); 974 Future<R> future = pool.submit(new Callable<R>() { 975 @Override 976 public R call() throws Exception { 977 T instance = 978 org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel); 979 R result = callable.call(instance); 980 byte[] region = channel.getLastRegion(); 981 if (callback != null) { 982 callback.update(region, r, result); 983 } 984 return result; 985 } 986 }); 987 futures.put(r, future); 988 } 989 for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) { 990 try { 991 e.getValue().get(); 992 } catch (ExecutionException ee) { 993 LOG.warn("Error calling coprocessor service " + service.getName() + " for row " 994 + Bytes.toStringBinary(e.getKey()), ee); 995 throw ee.getCause(); 996 } catch (InterruptedException ie) { 997 throw new InterruptedIOException("Interrupted calling coprocessor service " 998 + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie); 999 } 1000 } 1001 } 1002 1003 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) 1004 throws IOException { 1005 if (start == null) { 1006 start = HConstants.EMPTY_START_ROW; 1007 } 1008 if (end == null) { 1009 end = HConstants.EMPTY_END_ROW; 1010 } 1011 return getKeysAndRegionsInRange(start, end, true).getFirst(); 1012 } 1013 1014 @Override 1015 public long getRpcTimeout(TimeUnit unit) { 1016 return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS); 1017 } 1018 1019 @Override 1020 @Deprecated 1021 public int getRpcTimeout() { 1022 return rpcTimeoutMs; 1023 } 1024 1025 @Override 1026 @Deprecated 1027 public void setRpcTimeout(int rpcTimeout) { 1028 setReadRpcTimeout(rpcTimeout); 1029 setWriteRpcTimeout(rpcTimeout); 1030 } 1031 1032 @Override 1033 public long getReadRpcTimeout(TimeUnit unit) { 1034 return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS); 1035 } 1036 1037 @Override 1038 @Deprecated 1039 public int getReadRpcTimeout() { 1040 return readRpcTimeoutMs; 1041 } 1042 1043 @Override 1044 @Deprecated 1045 public void setReadRpcTimeout(int readRpcTimeout) { 1046 this.readRpcTimeoutMs = readRpcTimeout; 1047 } 1048 1049 @Override 1050 public long getWriteRpcTimeout(TimeUnit unit) { 1051 return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS); 1052 } 1053 1054 @Override 1055 @Deprecated 1056 public int getWriteRpcTimeout() { 1057 return writeRpcTimeoutMs; 1058 } 1059 1060 @Override 1061 @Deprecated 1062 public void setWriteRpcTimeout(int writeRpcTimeout) { 1063 this.writeRpcTimeoutMs = writeRpcTimeout; 1064 } 1065 1066 @Override 1067 public long getOperationTimeout(TimeUnit unit) { 1068 return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS); 1069 } 1070 1071 @Override 1072 @Deprecated 1073 public int getOperationTimeout() { 1074 return operationTimeoutMs; 1075 } 1076 1077 @Override 1078 @Deprecated 1079 public void setOperationTimeout(int operationTimeout) { 1080 this.operationTimeoutMs = operationTimeout; 1081 } 1082 1083 @Override 1084 public String toString() { 1085 return tableName + ";" + connection; 1086 } 1087 1088 @Override 1089 public <R extends Message> Map<byte[], R> batchCoprocessorService( 1090 Descriptors.MethodDescriptor methodDescriptor, Message request, 1091 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { 1092 final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>( 1093 Bytes.BYTES_COMPARATOR)); 1094 batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, 1095 new Callback<R>() { 1096 @Override 1097 public void update(byte[] region, byte[] row, R result) { 1098 if (region != null) { 1099 results.put(region, result); 1100 } 1101 } 1102 }); 1103 return results; 1104 } 1105 1106 @Override 1107 public <R extends Message> void batchCoprocessorService( 1108 final Descriptors.MethodDescriptor methodDescriptor, final Message request, 1109 byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback) 1110 throws ServiceException, Throwable { 1111 1112 if (startKey == null) { 1113 startKey = HConstants.EMPTY_START_ROW; 1114 } 1115 if (endKey == null) { 1116 endKey = HConstants.EMPTY_END_ROW; 1117 } 1118 // get regions covered by the row range 1119 Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions = 1120 getKeysAndRegionsInRange(startKey, endKey, true); 1121 List<byte[]> keys = keysAndRegions.getFirst(); 1122 List<HRegionLocation> regions = keysAndRegions.getSecond(); 1123 1124 // check if we have any calls to make 1125 if (keys.isEmpty()) { 1126 LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) + 1127 ", end=" + Bytes.toStringBinary(endKey)); 1128 return; 1129 } 1130 1131 List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size()); 1132 final Map<byte[], RegionCoprocessorServiceExec> execsByRow = new TreeMap<>(Bytes.BYTES_COMPARATOR); 1133 for (int i = 0; i < keys.size(); i++) { 1134 final byte[] rowKey = keys.get(i); 1135 final byte[] region = regions.get(i).getRegionInfo().getRegionName(); 1136 RegionCoprocessorServiceExec exec = 1137 new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request); 1138 execs.add(exec); 1139 execsByRow.put(rowKey, exec); 1140 } 1141 1142 // tracking for any possible deserialization errors on success callback 1143 // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here 1144 final List<Throwable> callbackErrorExceptions = new ArrayList<>(); 1145 final List<Row> callbackErrorActions = new ArrayList<>(); 1146 final List<String> callbackErrorServers = new ArrayList<>(); 1147 Object[] results = new Object[execs.size()]; 1148 1149 AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, 1150 RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), 1151 RpcControllerFactory.instantiate(configuration)); 1152 1153 Callback<ClientProtos.CoprocessorServiceResult> resultsCallback 1154 = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> { 1155 if (LOG.isTraceEnabled()) { 1156 LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() + 1157 ": region=" + Bytes.toStringBinary(region) + 1158 ", row=" + Bytes.toStringBinary(row) + 1159 ", value=" + serviceResult.getValue().getValue()); 1160 } 1161 try { 1162 Message.Builder builder = responsePrototype.newBuilderForType(); 1163 org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, 1164 serviceResult.getValue().getValue().toByteArray()); 1165 callback.update(region, row, (R) builder.build()); 1166 } catch (IOException e) { 1167 LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(), 1168 e); 1169 callbackErrorExceptions.add(e); 1170 callbackErrorActions.add(execsByRow.get(row)); 1171 callbackErrorServers.add("null"); 1172 } 1173 }; 1174 AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = 1175 AsyncProcessTask.newBuilder(resultsCallback) 1176 .setPool(pool) 1177 .setTableName(tableName) 1178 .setRowAccess(execs) 1179 .setResults(results) 1180 .setRpcTimeout(readRpcTimeoutMs) 1181 .setOperationTimeout(operationTimeoutMs) 1182 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 1183 .build(); 1184 AsyncRequestFuture future = asyncProcess.submit(task); 1185 future.waitUntilDone(); 1186 1187 if (future.hasError()) { 1188 throw future.getErrors(); 1189 } else if (!callbackErrorExceptions.isEmpty()) { 1190 throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions, 1191 callbackErrorServers); 1192 } 1193 } 1194 1195 public RegionLocator getRegionLocator() { 1196 return this.locator; 1197 } 1198 1199 private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 1200 1201 private final byte[] row; 1202 private final byte[] family; 1203 private byte[] qualifier; 1204 private TimeRange timeRange; 1205 private CompareOperator op; 1206 private byte[] value; 1207 1208 CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 1209 this.row = Preconditions.checkNotNull(row, "row is null"); 1210 this.family = Preconditions.checkNotNull(family, "family is null"); 1211 } 1212 1213 @Override 1214 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 1215 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + 1216 " an empty byte array, or just do not call this method if you want a null qualifier"); 1217 return this; 1218 } 1219 1220 @Override 1221 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 1222 this.timeRange = timeRange; 1223 return this; 1224 } 1225 1226 @Override 1227 public CheckAndMutateBuilder ifNotExists() { 1228 this.op = CompareOperator.EQUAL; 1229 this.value = null; 1230 return this; 1231 } 1232 1233 @Override 1234 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 1235 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 1236 this.value = Preconditions.checkNotNull(value, "value is null"); 1237 return this; 1238 } 1239 1240 private void preCheck() { 1241 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + 1242 " calling ifNotExists/ifEquals/ifMatches before executing the request"); 1243 } 1244 1245 @Override 1246 public boolean thenPut(Put put) throws IOException { 1247 validatePut(put); 1248 preCheck(); 1249 return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put); 1250 } 1251 1252 @Override 1253 public boolean thenDelete(Delete delete) throws IOException { 1254 preCheck(); 1255 return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete); 1256 } 1257 1258 @Override 1259 public boolean thenMutate(RowMutations mutation) throws IOException { 1260 preCheck(); 1261 return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation); 1262 } 1263 } 1264}