001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. 022// Internally, we use shaded protobuf. This below are part of our public API. 023//SEE ABOVE NOTE! 024import com.google.protobuf.Descriptors; 025import com.google.protobuf.Message; 026import com.google.protobuf.Service; 027import com.google.protobuf.ServiceException; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CompareOperator; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.HTableDescriptor; 035import org.apache.hadoop.hbase.KeyValueUtil; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.io.TimeRange; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042import org.apache.hadoop.hbase.client.coprocessor.Batch; 043import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; 044import org.apache.hadoop.hbase.filter.BinaryComparator; 045import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 046import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 047import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 049import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 051import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 052import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.Pair; 061import org.apache.hadoop.hbase.util.ReflectionUtils; 062import org.apache.hadoop.hbase.util.Threads; 063 064import java.io.IOException; 065import java.io.InterruptedIOException; 066import java.util.ArrayList; 067import java.util.Collections; 068import java.util.List; 069import java.util.Map; 070import java.util.TreeMap; 071import java.util.concurrent.Callable; 072import java.util.concurrent.ExecutionException; 073import java.util.concurrent.ExecutorService; 074import java.util.concurrent.Future; 075import java.util.concurrent.SynchronousQueue; 076import java.util.concurrent.ThreadPoolExecutor; 077import java.util.concurrent.TimeUnit; 078 079import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 080 081/** 082 * An implementation of {@link Table}. Used to communicate with a single HBase table. 083 * Lightweight. Get as needed and just close when done. 084 * Instances of this class SHOULD NOT be constructed directly. 085 * Obtain an instance via {@link Connection}. See {@link ConnectionFactory} 086 * class comment for an example of how. 087 * 088 * <p>This class is thread safe since 2.0.0 if not invoking any of the setter methods. 089 * All setters are moved into {@link TableBuilder} and reserved here only for keeping 090 * backward compatibility, and TODO will be removed soon. 091 * 092 * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked 093 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in 094 * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop 095 * Interface Classification</a> 096 * There are no guarantees for backwards source / binary compatibility and methods or class can 097 * change or go away without deprecation. 098 * 099 * @see Table 100 * @see Admin 101 * @see Connection 102 * @see ConnectionFactory 103 */ 104@InterfaceAudience.Private 105@InterfaceStability.Stable 106public class HTable implements Table { 107 private static final Logger LOG = LoggerFactory.getLogger(HTable.class); 108 private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG; 109 private final ClusterConnection connection; 110 private final TableName tableName; 111 private final Configuration configuration; 112 private final ConnectionConfiguration connConfiguration; 113 private boolean closed = false; 114 private final int scannerCaching; 115 private final long scannerMaxResultSize; 116 private final ExecutorService pool; // For Multi & Scan 117 private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc 118 private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX 119 private int readRpcTimeoutMs; // timeout for each read rpc request 120 private int writeRpcTimeoutMs; // timeout for each write rpc request 121 private final boolean cleanupPoolOnClose; // shutdown the pool in close() 122 private final HRegionLocator locator; 123 124 /** The Async process for batch */ 125 @VisibleForTesting 126 AsyncProcess multiAp; 127 private final RpcRetryingCallerFactory rpcCallerFactory; 128 private final RpcControllerFactory rpcControllerFactory; 129 130 // Marked Private @since 1.0 131 @InterfaceAudience.Private 132 public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { 133 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); 134 if (maxThreads == 0) { 135 maxThreads = 1; // is there a better default? 136 } 137 int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1); 138 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); 139 140 // Using the "direct handoff" approach, new threads will only be created 141 // if it is necessary and will grow unbounded. This could be bad but in HCM 142 // we only create as many Runnables as there are region servers. It means 143 // it also scales when new region servers are added. 144 ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, 145 TimeUnit.SECONDS, new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable")); 146 pool.allowCoreThreadTimeOut(true); 147 return pool; 148 } 149 150 /** 151 * Creates an object to access a HBase table. 152 * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to 153 * get a {@link Table} instance (use {@link Table} instead of {@link HTable}). 154 * @param connection Connection to be used. 155 * @param builder The table builder 156 * @param rpcCallerFactory The RPC caller factory 157 * @param rpcControllerFactory The RPC controller factory 158 * @param pool ExecutorService to be used. 159 */ 160 @InterfaceAudience.Private 161 protected HTable(final ClusterConnection connection, 162 final TableBuilderBase builder, 163 final RpcRetryingCallerFactory rpcCallerFactory, 164 final RpcControllerFactory rpcControllerFactory, 165 final ExecutorService pool) { 166 this.connection = Preconditions.checkNotNull(connection, "connection is null"); 167 this.configuration = connection.getConfiguration(); 168 this.connConfiguration = connection.getConnectionConfiguration(); 169 if (pool == null) { 170 this.pool = getDefaultExecutor(this.configuration); 171 this.cleanupPoolOnClose = true; 172 } else { 173 this.pool = pool; 174 this.cleanupPoolOnClose = false; 175 } 176 if (rpcCallerFactory == null) { 177 this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); 178 } else { 179 this.rpcCallerFactory = rpcCallerFactory; 180 } 181 182 if (rpcControllerFactory == null) { 183 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); 184 } else { 185 this.rpcControllerFactory = rpcControllerFactory; 186 } 187 188 this.tableName = builder.tableName; 189 this.operationTimeoutMs = builder.operationTimeout; 190 this.rpcTimeoutMs = builder.rpcTimeout; 191 this.readRpcTimeoutMs = builder.readRpcTimeout; 192 this.writeRpcTimeoutMs = builder.writeRpcTimeout; 193 this.scannerCaching = connConfiguration.getScannerCaching(); 194 this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); 195 196 // puts need to track errors globally due to how the APIs currently work. 197 multiAp = this.connection.getAsyncProcess(); 198 this.locator = new HRegionLocator(tableName, connection); 199 } 200 201 /** 202 * @return maxKeyValueSize from configuration. 203 */ 204 public static int getMaxKeyValueSize(Configuration conf) { 205 return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1); 206 } 207 208 @Override 209 public Configuration getConfiguration() { 210 return configuration; 211 } 212 213 @Override 214 public TableName getName() { 215 return tableName; 216 } 217 218 /** 219 * <em>INTERNAL</em> Used by unit tests and tools to do low-level 220 * manipulations. 221 * @return A Connection instance. 222 */ 223 @VisibleForTesting 224 protected Connection getConnection() { 225 return this.connection; 226 } 227 228 @Override 229 @Deprecated 230 public HTableDescriptor getTableDescriptor() throws IOException { 231 HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory, 232 rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs); 233 if (htd != null) { 234 return new ImmutableHTableDescriptor(htd); 235 } 236 return null; 237 } 238 239 @Override 240 public TableDescriptor getDescriptor() throws IOException { 241 return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, 242 rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs); 243 } 244 245 /** 246 * Get the corresponding start keys and regions for an arbitrary range of 247 * keys. 248 * <p> 249 * @param startKey Starting row in range, inclusive 250 * @param endKey Ending row in range 251 * @param includeEndKey true if endRow is inclusive, false if exclusive 252 * @return A pair of list of start keys and list of HRegionLocations that 253 * contain the specified range 254 * @throws IOException if a remote or network exception occurs 255 */ 256 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( 257 final byte[] startKey, final byte[] endKey, final boolean includeEndKey) 258 throws IOException { 259 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false); 260 } 261 262 /** 263 * Get the corresponding start keys and regions for an arbitrary range of 264 * keys. 265 * <p> 266 * @param startKey Starting row in range, inclusive 267 * @param endKey Ending row in range 268 * @param includeEndKey true if endRow is inclusive, false if exclusive 269 * @param reload true to reload information or false to use cached information 270 * @return A pair of list of start keys and list of HRegionLocations that 271 * contain the specified range 272 * @throws IOException if a remote or network exception occurs 273 */ 274 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( 275 final byte[] startKey, final byte[] endKey, final boolean includeEndKey, 276 final boolean reload) throws IOException { 277 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW); 278 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { 279 throw new IllegalArgumentException( 280 "Invalid range: " + Bytes.toStringBinary(startKey) + 281 " > " + Bytes.toStringBinary(endKey)); 282 } 283 List<byte[]> keysInRange = new ArrayList<>(); 284 List<HRegionLocation> regionsInRange = new ArrayList<>(); 285 byte[] currentKey = startKey; 286 do { 287 HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload); 288 keysInRange.add(currentKey); 289 regionsInRange.add(regionLocation); 290 currentKey = regionLocation.getRegionInfo().getEndKey(); 291 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) 292 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 293 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))); 294 return new Pair<>(keysInRange, regionsInRange); 295 } 296 297 /** 298 * The underlying {@link HTable} must not be closed. 299 * {@link Table#getScanner(Scan)} has other usage details. 300 */ 301 @Override 302 public ResultScanner getScanner(Scan scan) throws IOException { 303 if (scan.getCaching() <= 0) { 304 scan.setCaching(scannerCaching); 305 } 306 if (scan.getMaxResultSize() <= 0) { 307 scan.setMaxResultSize(scannerMaxResultSize); 308 } 309 if (scan.getMvccReadPoint() > 0) { 310 // it is not supposed to be set by user, clear 311 scan.resetMvccReadPoint(); 312 } 313 Boolean async = scan.isAsyncPrefetch(); 314 if (async == null) { 315 async = connConfiguration.isClientScannerAsyncPrefetch(); 316 } 317 318 if (scan.isReversed()) { 319 return new ReversedClientScanner(getConfiguration(), scan, getName(), 320 this.connection, this.rpcCallerFactory, this.rpcControllerFactory, 321 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); 322 } else { 323 if (async) { 324 return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, 325 this.rpcCallerFactory, this.rpcControllerFactory, 326 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); 327 } else { 328 return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, 329 this.rpcCallerFactory, this.rpcControllerFactory, 330 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); 331 } 332 } 333 } 334 335 /** 336 * The underlying {@link HTable} must not be closed. 337 * {@link Table#getScanner(byte[])} has other usage details. 338 */ 339 @Override 340 public ResultScanner getScanner(byte [] family) throws IOException { 341 Scan scan = new Scan(); 342 scan.addFamily(family); 343 return getScanner(scan); 344 } 345 346 /** 347 * The underlying {@link HTable} must not be closed. 348 * {@link Table#getScanner(byte[], byte[])} has other usage details. 349 */ 350 @Override 351 public ResultScanner getScanner(byte [] family, byte [] qualifier) 352 throws IOException { 353 Scan scan = new Scan(); 354 scan.addColumn(family, qualifier); 355 return getScanner(scan); 356 } 357 358 @Override 359 public Result get(final Get get) throws IOException { 360 return get(get, get.isCheckExistenceOnly()); 361 } 362 363 private Result get(Get get, final boolean checkExistenceOnly) throws IOException { 364 // if we are changing settings to the get, clone it. 365 if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) { 366 get = ReflectionUtils.newInstance(get.getClass(), get); 367 get.setCheckExistenceOnly(checkExistenceOnly); 368 if (get.getConsistency() == null){ 369 get.setConsistency(DEFAULT_CONSISTENCY); 370 } 371 } 372 373 if (get.getConsistency() == Consistency.STRONG) { 374 final Get configuredGet = get; 375 ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(), 376 get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) { 377 @Override 378 protected Result rpcCall() throws Exception { 379 ClientProtos.GetRequest request = RequestConverter.buildGetRequest( 380 getLocation().getRegionInfo().getRegionName(), configuredGet); 381 ClientProtos.GetResponse response = doGet(request); 382 return response == null? null: 383 ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 384 } 385 }; 386 return rpcCallerFactory.<Result>newCaller(readRpcTimeoutMs).callWithRetries(callable, 387 this.operationTimeoutMs); 388 } 389 390 // Call that takes into account the replica 391 RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( 392 rpcControllerFactory, tableName, this.connection, get, pool, 393 connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs, 394 connConfiguration.getPrimaryCallTimeoutMicroSecond()); 395 return callable.call(operationTimeoutMs); 396 } 397 398 @Override 399 public Result[] get(List<Get> gets) throws IOException { 400 if (gets.size() == 1) { 401 return new Result[]{get(gets.get(0))}; 402 } 403 try { 404 Object[] r1 = new Object[gets.size()]; 405 batch((List<? extends Row>)gets, r1, readRpcTimeoutMs); 406 // Translate. 407 Result [] results = new Result[r1.length]; 408 int i = 0; 409 for (Object obj: r1) { 410 // Batch ensures if there is a failure we get an exception instead 411 results[i++] = (Result)obj; 412 } 413 return results; 414 } catch (InterruptedException e) { 415 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 416 } 417 } 418 419 @Override 420 public void batch(final List<? extends Row> actions, final Object[] results) 421 throws InterruptedException, IOException { 422 int rpcTimeout = writeRpcTimeoutMs; 423 boolean hasRead = false; 424 boolean hasWrite = false; 425 for (Row action : actions) { 426 if (action instanceof Mutation) { 427 hasWrite = true; 428 } else { 429 hasRead = true; 430 } 431 if (hasRead && hasWrite) { 432 break; 433 } 434 } 435 if (hasRead && !hasWrite) { 436 rpcTimeout = readRpcTimeoutMs; 437 } 438 batch(actions, results, rpcTimeout); 439 } 440 441 public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout) 442 throws InterruptedException, IOException { 443 AsyncProcessTask task = AsyncProcessTask.newBuilder() 444 .setPool(pool) 445 .setTableName(tableName) 446 .setRowAccess(actions) 447 .setResults(results) 448 .setRpcTimeout(rpcTimeout) 449 .setOperationTimeout(operationTimeoutMs) 450 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 451 .build(); 452 AsyncRequestFuture ars = multiAp.submit(task); 453 ars.waitUntilDone(); 454 if (ars.hasError()) { 455 throw ars.getErrors(); 456 } 457 } 458 459 @Override 460 public <R> void batchCallback( 461 final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback) 462 throws IOException, InterruptedException { 463 doBatchWithCallback(actions, results, callback, connection, pool, tableName); 464 } 465 466 public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results, 467 Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName) 468 throws InterruptedIOException, RetriesExhaustedWithDetailsException { 469 int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout(); 470 int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 471 connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 472 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 473 AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback) 474 .setPool(pool) 475 .setTableName(tableName) 476 .setRowAccess(actions) 477 .setResults(results) 478 .setOperationTimeout(operationTimeout) 479 .setRpcTimeout(writeTimeout) 480 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 481 .build(); 482 AsyncRequestFuture ars = connection.getAsyncProcess().submit(task); 483 ars.waitUntilDone(); 484 if (ars.hasError()) { 485 throw ars.getErrors(); 486 } 487 } 488 489 @Override 490 public void delete(final Delete delete) throws IOException { 491 ClientServiceCallable<Void> callable = 492 new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(), 493 this.rpcControllerFactory.newController(), delete.getPriority()) { 494 @Override 495 protected Void rpcCall() throws Exception { 496 MutateRequest request = RequestConverter 497 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete); 498 doMutate(request); 499 return null; 500 } 501 }; 502 rpcCallerFactory.<Void>newCaller(this.writeRpcTimeoutMs) 503 .callWithRetries(callable, this.operationTimeoutMs); 504 } 505 506 @Override 507 public void delete(final List<Delete> deletes) 508 throws IOException { 509 Object[] results = new Object[deletes.size()]; 510 try { 511 batch(deletes, results, writeRpcTimeoutMs); 512 } catch (InterruptedException e) { 513 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 514 } finally { 515 // TODO: to be consistent with batch put(), do not modify input list 516 // mutate list so that it is empty for complete success, or contains only failed records 517 // results are returned in the same order as the requests in list walk the list backwards, 518 // so we can remove from list without impacting the indexes of earlier members 519 for (int i = results.length - 1; i>=0; i--) { 520 // if result is not null, it succeeded 521 if (results[i] instanceof Result) { 522 deletes.remove(i); 523 } 524 } 525 } 526 } 527 528 @Override 529 public void put(final Put put) throws IOException { 530 validatePut(put); 531 ClientServiceCallable<Void> callable = 532 new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(), 533 this.rpcControllerFactory.newController(), put.getPriority()) { 534 @Override 535 protected Void rpcCall() throws Exception { 536 MutateRequest request = 537 RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put); 538 doMutate(request); 539 return null; 540 } 541 }; 542 rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 543 this.operationTimeoutMs); 544 } 545 546 @Override 547 public void put(final List<Put> puts) throws IOException { 548 for (Put put : puts) { 549 validatePut(put); 550 } 551 Object[] results = new Object[puts.size()]; 552 try { 553 batch(puts, results, writeRpcTimeoutMs); 554 } catch (InterruptedException e) { 555 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 556 } 557 } 558 559 @Override 560 public void mutateRow(final RowMutations rm) throws IOException { 561 CancellableRegionServerCallable<MultiResponse> callable = 562 new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(), 563 rpcControllerFactory.newController(), writeRpcTimeoutMs, 564 new RetryingTimeTracker().start(), rm.getMaxPriority()) { 565 @Override 566 protected MultiResponse rpcCall() throws Exception { 567 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( 568 getLocation().getRegionInfo().getRegionName(), rm); 569 regionMutationBuilder.setAtomic(true); 570 MultiRequest request = 571 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); 572 ClientProtos.MultiResponse response = doMulti(request); 573 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); 574 if (res.hasException()) { 575 Throwable ex = ProtobufUtil.toException(res.getException()); 576 if (ex instanceof IOException) { 577 throw (IOException) ex; 578 } 579 throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex); 580 } 581 return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); 582 } 583 }; 584 AsyncProcessTask task = AsyncProcessTask.newBuilder() 585 .setPool(pool) 586 .setTableName(tableName) 587 .setRowAccess(rm.getMutations()) 588 .setCallable(callable) 589 .setRpcTimeout(writeRpcTimeoutMs) 590 .setOperationTimeout(operationTimeoutMs) 591 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 592 .build(); 593 AsyncRequestFuture ars = multiAp.submit(task); 594 ars.waitUntilDone(); 595 if (ars.hasError()) { 596 throw ars.getErrors(); 597 } 598 } 599 600 @Override 601 public Result append(final Append append) throws IOException { 602 checkHasFamilies(append); 603 NoncedRegionServerCallable<Result> callable = 604 new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(), 605 this.rpcControllerFactory.newController(), append.getPriority()) { 606 @Override 607 protected Result rpcCall() throws Exception { 608 MutateRequest request = RequestConverter.buildMutateRequest( 609 getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); 610 MutateResponse response = doMutate(request); 611 if (!response.hasResult()) return null; 612 return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 613 } 614 }; 615 return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs). 616 callWithRetries(callable, this.operationTimeoutMs); 617 } 618 619 @Override 620 public Result increment(final Increment increment) throws IOException { 621 checkHasFamilies(increment); 622 NoncedRegionServerCallable<Result> callable = 623 new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(), 624 this.rpcControllerFactory.newController(), increment.getPriority()) { 625 @Override 626 protected Result rpcCall() throws Exception { 627 MutateRequest request = RequestConverter.buildMutateRequest( 628 getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); 629 MutateResponse response = doMutate(request); 630 // Should this check for null like append does? 631 return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 632 } 633 }; 634 return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable, 635 this.operationTimeoutMs); 636 } 637 638 @Override 639 public long incrementColumnValue(final byte [] row, final byte [] family, 640 final byte [] qualifier, final long amount) 641 throws IOException { 642 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 643 } 644 645 @Override 646 public long incrementColumnValue(final byte [] row, final byte [] family, 647 final byte [] qualifier, final long amount, final Durability durability) 648 throws IOException { 649 NullPointerException npe = null; 650 if (row == null) { 651 npe = new NullPointerException("row is null"); 652 } else if (family == null) { 653 npe = new NullPointerException("family is null"); 654 } 655 if (npe != null) { 656 throw new IOException( 657 "Invalid arguments to incrementColumnValue", npe); 658 } 659 660 NoncedRegionServerCallable<Long> callable = 661 new NoncedRegionServerCallable<Long>(this.connection, getName(), row, 662 this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 663 @Override 664 protected Long rpcCall() throws Exception { 665 MutateRequest request = RequestConverter.buildIncrementRequest( 666 getLocation().getRegionInfo().getRegionName(), row, family, 667 qualifier, amount, durability, getNonceGroup(), getNonce()); 668 MutateResponse response = doMutate(request); 669 Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 670 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); 671 } 672 }; 673 return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs). 674 callWithRetries(callable, this.operationTimeoutMs); 675 } 676 677 @Override 678 @Deprecated 679 public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, 680 final byte [] value, final Put put) throws IOException { 681 return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put); 682 } 683 684 @Override 685 @Deprecated 686 public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, 687 final CompareOp compareOp, final byte [] value, final Put put) throws IOException { 688 return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put); 689 } 690 691 @Override 692 @Deprecated 693 public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, 694 final CompareOperator op, final byte [] value, final Put put) throws IOException { 695 // The name of the operators in CompareOperator are intentionally those of the 696 // operators in the filter's CompareOp enum. 697 return doCheckAndPut(row, family, qualifier, op.name(), value, null, put); 698 } 699 700 private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier, 701 final String opName, final byte[] value, final TimeRange timeRange, final Put put) 702 throws IOException { 703 ClientServiceCallable<Boolean> callable = 704 new ClientServiceCallable<Boolean>(this.connection, getName(), row, 705 this.rpcControllerFactory.newController(), put.getPriority()) { 706 @Override 707 protected Boolean rpcCall() throws Exception { 708 CompareType compareType = CompareType.valueOf(opName); 709 MutateRequest request = RequestConverter.buildMutateRequest( 710 getLocation().getRegionInfo().getRegionName(), row, family, qualifier, 711 new BinaryComparator(value), compareType, timeRange, put); 712 MutateResponse response = doMutate(request); 713 return Boolean.valueOf(response.getProcessed()); 714 } 715 }; 716 return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeoutMs) 717 .callWithRetries(callable, this.operationTimeoutMs); 718 } 719 720 @Override 721 @Deprecated 722 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 723 final byte[] value, final Delete delete) throws IOException { 724 return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, 725 delete); 726 } 727 728 @Override 729 @Deprecated 730 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 731 final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException { 732 return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete); 733 } 734 735 @Override 736 @Deprecated 737 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 738 final CompareOperator op, final byte[] value, final Delete delete) throws IOException { 739 return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete); 740 } 741 742 private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 743 final String opName, final byte[] value, final TimeRange timeRange, final Delete delete) 744 throws IOException { 745 CancellableRegionServerCallable<SingleResponse> callable = 746 new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row, 747 this.rpcControllerFactory.newController(), writeRpcTimeoutMs, 748 new RetryingTimeTracker().start(), delete.getPriority()) { 749 @Override 750 protected SingleResponse rpcCall() throws Exception { 751 CompareType compareType = CompareType.valueOf(opName); 752 MutateRequest request = RequestConverter 753 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, 754 qualifier, new BinaryComparator(value), compareType, timeRange, delete); 755 MutateResponse response = doMutate(request); 756 return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); 757 } 758 }; 759 List<Delete> rows = Collections.singletonList(delete); 760 Object[] results = new Object[1]; 761 AsyncProcessTask task = 762 AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows) 763 .setCallable(callable) 764 // TODO any better timeout? 765 .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs)) 766 .setOperationTimeout(operationTimeoutMs) 767 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build(); 768 AsyncRequestFuture ars = multiAp.submit(task); 769 ars.waitUntilDone(); 770 if (ars.hasError()) { 771 throw ars.getErrors(); 772 } 773 return ((SingleResponse.Entry) results[0]).isProcessed(); 774 } 775 776 @Override 777 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 778 return new CheckAndMutateBuilderImpl(row, family); 779 } 780 781 private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, 782 final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm) 783 throws IOException { 784 CancellableRegionServerCallable<MultiResponse> callable = 785 new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(), 786 rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(), 787 rm.getMaxPriority()) { 788 @Override 789 protected MultiResponse rpcCall() throws Exception { 790 CompareType compareType = CompareType.valueOf(opName); 791 MultiRequest request = RequestConverter 792 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier, 793 new BinaryComparator(value), compareType, timeRange, rm); 794 ClientProtos.MultiResponse response = doMulti(request); 795 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); 796 if (res.hasException()) { 797 Throwable ex = ProtobufUtil.toException(res.getException()); 798 if (ex instanceof IOException) { 799 throw (IOException) ex; 800 } 801 throw new IOException( 802 "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex); 803 } 804 return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); 805 } 806 }; 807 808 /** 809 * Currently, we use one array to store 'processed' flag which is returned by server. 810 * It is excessive to send such a large array, but that is required by the framework right now 811 * */ 812 Object[] results = new Object[rm.getMutations().size()]; 813 AsyncProcessTask task = AsyncProcessTask.newBuilder() 814 .setPool(pool) 815 .setTableName(tableName) 816 .setRowAccess(rm.getMutations()) 817 .setResults(results) 818 .setCallable(callable) 819 // TODO any better timeout? 820 .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs)) 821 .setOperationTimeout(operationTimeoutMs) 822 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 823 .build(); 824 AsyncRequestFuture ars = multiAp.submit(task); 825 ars.waitUntilDone(); 826 if (ars.hasError()) { 827 throw ars.getErrors(); 828 } 829 830 return ((Result)results[0]).getExists(); 831 } 832 833 @Override 834 @Deprecated 835 public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, 836 final CompareOp compareOp, final byte [] value, final RowMutations rm) 837 throws IOException { 838 return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm); 839 } 840 841 @Override 842 @Deprecated 843 public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, 844 final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException { 845 return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm); 846 } 847 848 @Override 849 public boolean exists(final Get get) throws IOException { 850 Result r = get(get, true); 851 assert r.getExists() != null; 852 return r.getExists(); 853 } 854 855 @Override 856 public boolean[] exists(List<Get> gets) throws IOException { 857 if (gets.isEmpty()) return new boolean[]{}; 858 if (gets.size() == 1) return new boolean[]{exists(gets.get(0))}; 859 860 ArrayList<Get> exists = new ArrayList<>(gets.size()); 861 for (Get g: gets){ 862 Get ge = new Get(g); 863 ge.setCheckExistenceOnly(true); 864 exists.add(ge); 865 } 866 867 Object[] r1= new Object[exists.size()]; 868 try { 869 batch(exists, r1, readRpcTimeoutMs); 870 } catch (InterruptedException e) { 871 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 872 } 873 874 // translate. 875 boolean[] results = new boolean[r1.length]; 876 int i = 0; 877 for (Object o : r1) { 878 // batch ensures if there is a failure we get an exception instead 879 results[i++] = ((Result)o).getExists(); 880 } 881 882 return results; 883 } 884 885 /** 886 * Process a mixed batch of Get, Put and Delete actions. All actions for a 887 * RegionServer are forwarded in one RPC call. Queries are executed in parallel. 888 * 889 * @param list The collection of actions. 890 * @param results An empty array, same size as list. If an exception is thrown, 891 * you can test here for partial results, and to determine which actions 892 * processed successfully. 893 * @throws IOException if there are problems talking to META. Per-item 894 * exceptions are stored in the results array. 895 */ 896 public <R> void processBatchCallback( 897 final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback) 898 throws IOException, InterruptedException { 899 this.batchCallback(list, results, callback); 900 } 901 902 @Override 903 public void close() throws IOException { 904 if (this.closed) { 905 return; 906 } 907 if (cleanupPoolOnClose) { 908 this.pool.shutdown(); 909 try { 910 boolean terminated = false; 911 do { 912 // wait until the pool has terminated 913 terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); 914 } while (!terminated); 915 } catch (InterruptedException e) { 916 this.pool.shutdownNow(); 917 LOG.warn("waitForTermination interrupted"); 918 } 919 } 920 this.closed = true; 921 } 922 923 // validate for well-formedness 924 public void validatePut(final Put put) throws IllegalArgumentException { 925 validatePut(put, connConfiguration.getMaxKeyValueSize()); 926 } 927 928 // validate for well-formedness 929 public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException { 930 if (put.isEmpty()) { 931 throw new IllegalArgumentException("No columns to insert"); 932 } 933 if (maxKeyValueSize > 0) { 934 for (List<Cell> list : put.getFamilyCellMap().values()) { 935 for (Cell cell : list) { 936 if (KeyValueUtil.length(cell) > maxKeyValueSize) { 937 throw new IllegalArgumentException("KeyValue size too large"); 938 } 939 } 940 } 941 } 942 } 943 944 /** 945 * The pool is used for mutli requests for this HTable 946 * @return the pool used for mutli 947 */ 948 ExecutorService getPool() { 949 return this.pool; 950 } 951 952 /** 953 * Explicitly clears the region cache to fetch the latest value from META. 954 * This is a power user function: avoid unless you know the ramifications. 955 */ 956 public void clearRegionCache() { 957 this.connection.clearRegionCache(); 958 } 959 960 @Override 961 public CoprocessorRpcChannel coprocessorService(byte[] row) { 962 return new RegionCoprocessorRpcChannel(connection, tableName, row); 963 } 964 965 @Override 966 public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service, 967 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable) 968 throws ServiceException, Throwable { 969 final Map<byte[],R> results = Collections.synchronizedMap( 970 new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR)); 971 coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() { 972 @Override 973 public void update(byte[] region, byte[] row, R value) { 974 if (region != null) { 975 results.put(region, value); 976 } 977 } 978 }); 979 return results; 980 } 981 982 @Override 983 public <T extends Service, R> void coprocessorService(final Class<T> service, 984 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable, 985 final Batch.Callback<R> callback) throws ServiceException, Throwable { 986 // get regions covered by the row range 987 List<byte[]> keys = getStartKeysInRange(startKey, endKey); 988 Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); 989 for (final byte[] r : keys) { 990 final RegionCoprocessorRpcChannel channel = 991 new RegionCoprocessorRpcChannel(connection, tableName, r); 992 Future<R> future = pool.submit(new Callable<R>() { 993 @Override 994 public R call() throws Exception { 995 T instance = 996 org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel); 997 R result = callable.call(instance); 998 byte[] region = channel.getLastRegion(); 999 if (callback != null) { 1000 callback.update(region, r, result); 1001 } 1002 return result; 1003 } 1004 }); 1005 futures.put(r, future); 1006 } 1007 for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) { 1008 try { 1009 e.getValue().get(); 1010 } catch (ExecutionException ee) { 1011 LOG.warn("Error calling coprocessor service " + service.getName() + " for row " 1012 + Bytes.toStringBinary(e.getKey()), ee); 1013 throw ee.getCause(); 1014 } catch (InterruptedException ie) { 1015 throw new InterruptedIOException("Interrupted calling coprocessor service " 1016 + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie); 1017 } 1018 } 1019 } 1020 1021 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) 1022 throws IOException { 1023 if (start == null) { 1024 start = HConstants.EMPTY_START_ROW; 1025 } 1026 if (end == null) { 1027 end = HConstants.EMPTY_END_ROW; 1028 } 1029 return getKeysAndRegionsInRange(start, end, true).getFirst(); 1030 } 1031 1032 @Override 1033 public long getRpcTimeout(TimeUnit unit) { 1034 return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS); 1035 } 1036 1037 @Override 1038 @Deprecated 1039 public int getRpcTimeout() { 1040 return rpcTimeoutMs; 1041 } 1042 1043 @Override 1044 @Deprecated 1045 public void setRpcTimeout(int rpcTimeout) { 1046 setReadRpcTimeout(rpcTimeout); 1047 setWriteRpcTimeout(rpcTimeout); 1048 } 1049 1050 @Override 1051 public long getReadRpcTimeout(TimeUnit unit) { 1052 return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS); 1053 } 1054 1055 @Override 1056 @Deprecated 1057 public int getReadRpcTimeout() { 1058 return readRpcTimeoutMs; 1059 } 1060 1061 @Override 1062 @Deprecated 1063 public void setReadRpcTimeout(int readRpcTimeout) { 1064 this.readRpcTimeoutMs = readRpcTimeout; 1065 } 1066 1067 @Override 1068 public long getWriteRpcTimeout(TimeUnit unit) { 1069 return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS); 1070 } 1071 1072 @Override 1073 @Deprecated 1074 public int getWriteRpcTimeout() { 1075 return writeRpcTimeoutMs; 1076 } 1077 1078 @Override 1079 @Deprecated 1080 public void setWriteRpcTimeout(int writeRpcTimeout) { 1081 this.writeRpcTimeoutMs = writeRpcTimeout; 1082 } 1083 1084 @Override 1085 public long getOperationTimeout(TimeUnit unit) { 1086 return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS); 1087 } 1088 1089 @Override 1090 @Deprecated 1091 public int getOperationTimeout() { 1092 return operationTimeoutMs; 1093 } 1094 1095 @Override 1096 @Deprecated 1097 public void setOperationTimeout(int operationTimeout) { 1098 this.operationTimeoutMs = operationTimeout; 1099 } 1100 1101 @Override 1102 public String toString() { 1103 return tableName + ";" + connection; 1104 } 1105 1106 @Override 1107 public <R extends Message> Map<byte[], R> batchCoprocessorService( 1108 Descriptors.MethodDescriptor methodDescriptor, Message request, 1109 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { 1110 final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>( 1111 Bytes.BYTES_COMPARATOR)); 1112 batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, 1113 new Callback<R>() { 1114 @Override 1115 public void update(byte[] region, byte[] row, R result) { 1116 if (region != null) { 1117 results.put(region, result); 1118 } 1119 } 1120 }); 1121 return results; 1122 } 1123 1124 @Override 1125 public <R extends Message> void batchCoprocessorService( 1126 final Descriptors.MethodDescriptor methodDescriptor, final Message request, 1127 byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback) 1128 throws ServiceException, Throwable { 1129 1130 if (startKey == null) { 1131 startKey = HConstants.EMPTY_START_ROW; 1132 } 1133 if (endKey == null) { 1134 endKey = HConstants.EMPTY_END_ROW; 1135 } 1136 // get regions covered by the row range 1137 Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions = 1138 getKeysAndRegionsInRange(startKey, endKey, true); 1139 List<byte[]> keys = keysAndRegions.getFirst(); 1140 List<HRegionLocation> regions = keysAndRegions.getSecond(); 1141 1142 // check if we have any calls to make 1143 if (keys.isEmpty()) { 1144 LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) + 1145 ", end=" + Bytes.toStringBinary(endKey)); 1146 return; 1147 } 1148 1149 List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size()); 1150 final Map<byte[], RegionCoprocessorServiceExec> execsByRow = new TreeMap<>(Bytes.BYTES_COMPARATOR); 1151 for (int i = 0; i < keys.size(); i++) { 1152 final byte[] rowKey = keys.get(i); 1153 final byte[] region = regions.get(i).getRegionInfo().getRegionName(); 1154 RegionCoprocessorServiceExec exec = 1155 new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request); 1156 execs.add(exec); 1157 execsByRow.put(rowKey, exec); 1158 } 1159 1160 // tracking for any possible deserialization errors on success callback 1161 // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here 1162 final List<Throwable> callbackErrorExceptions = new ArrayList<>(); 1163 final List<Row> callbackErrorActions = new ArrayList<>(); 1164 final List<String> callbackErrorServers = new ArrayList<>(); 1165 Object[] results = new Object[execs.size()]; 1166 1167 AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, 1168 RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), 1169 RpcControllerFactory.instantiate(configuration)); 1170 1171 Callback<ClientProtos.CoprocessorServiceResult> resultsCallback 1172 = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> { 1173 if (LOG.isTraceEnabled()) { 1174 LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() + 1175 ": region=" + Bytes.toStringBinary(region) + 1176 ", row=" + Bytes.toStringBinary(row) + 1177 ", value=" + serviceResult.getValue().getValue()); 1178 } 1179 try { 1180 Message.Builder builder = responsePrototype.newBuilderForType(); 1181 org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, 1182 serviceResult.getValue().getValue().toByteArray()); 1183 callback.update(region, row, (R) builder.build()); 1184 } catch (IOException e) { 1185 LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(), 1186 e); 1187 callbackErrorExceptions.add(e); 1188 callbackErrorActions.add(execsByRow.get(row)); 1189 callbackErrorServers.add("null"); 1190 } 1191 }; 1192 AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = 1193 AsyncProcessTask.newBuilder(resultsCallback) 1194 .setPool(pool) 1195 .setTableName(tableName) 1196 .setRowAccess(execs) 1197 .setResults(results) 1198 .setRpcTimeout(readRpcTimeoutMs) 1199 .setOperationTimeout(operationTimeoutMs) 1200 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 1201 .build(); 1202 AsyncRequestFuture future = asyncProcess.submit(task); 1203 future.waitUntilDone(); 1204 1205 if (future.hasError()) { 1206 throw future.getErrors(); 1207 } else if (!callbackErrorExceptions.isEmpty()) { 1208 throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions, 1209 callbackErrorServers); 1210 } 1211 } 1212 1213 public RegionLocator getRegionLocator() { 1214 return this.locator; 1215 } 1216 1217 private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 1218 1219 private final byte[] row; 1220 private final byte[] family; 1221 private byte[] qualifier; 1222 private TimeRange timeRange; 1223 private CompareOperator op; 1224 private byte[] value; 1225 1226 CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 1227 this.row = Preconditions.checkNotNull(row, "row is null"); 1228 this.family = Preconditions.checkNotNull(family, "family is null"); 1229 } 1230 1231 @Override 1232 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 1233 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" + 1234 " an empty byte array, or just do not call this method if you want a null qualifier"); 1235 return this; 1236 } 1237 1238 @Override 1239 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 1240 this.timeRange = timeRange; 1241 return this; 1242 } 1243 1244 @Override 1245 public CheckAndMutateBuilder ifNotExists() { 1246 this.op = CompareOperator.EQUAL; 1247 this.value = null; 1248 return this; 1249 } 1250 1251 @Override 1252 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 1253 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 1254 this.value = Preconditions.checkNotNull(value, "value is null"); 1255 return this; 1256 } 1257 1258 private void preCheck() { 1259 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" + 1260 " calling ifNotExists/ifEquals/ifMatches before executing the request"); 1261 } 1262 1263 @Override 1264 public boolean thenPut(Put put) throws IOException { 1265 preCheck(); 1266 return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put); 1267 } 1268 1269 @Override 1270 public boolean thenDelete(Delete delete) throws IOException { 1271 preCheck(); 1272 return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete); 1273 } 1274 1275 @Override 1276 public boolean thenMutate(RowMutations mutation) throws IOException { 1277 preCheck(); 1278 return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation); 1279 } 1280 } 1281}