001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 022import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; 023import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 024 025import java.io.IOException; 026import java.lang.reflect.UndeclaredThrowableException; 027import java.net.UnknownHostException; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Optional; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicReference; 036import java.util.function.Function; 037import java.util.function.Predicate; 038import java.util.function.Supplier; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellComparator; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.HRegionLocation; 044import org.apache.hadoop.hbase.PrivateCellUtil; 045import org.apache.hadoop.hbase.RegionLocations; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 049import org.apache.hadoop.hbase.ipc.HBaseRpcController; 050import org.apache.hadoop.hbase.ipc.ServerRpcController; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.ReflectionUtils; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.hadoop.net.DNS; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 061import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 062import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 063import org.apache.hbase.thirdparty.io.netty.util.Timer; 064 065import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 066import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 070 071/** 072 * Utility used by client connections. 073 */ 074@InterfaceAudience.Private 075public final class ConnectionUtils { 076 077 private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class); 078 079 /** 080 * Key for configuration in Configuration whose value is the class we implement making a new 081 * Connection instance. 082 */ 083 public static final String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl"; 084 085 private ConnectionUtils() { 086 } 087 088 /** 089 * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}. 090 * @param pause time to pause 091 * @param tries amount of tries 092 * @return How long to wait after <code>tries</code> retries 093 */ 094 public static long getPauseTime(final long pause, final int tries) { 095 int ntries = tries; 096 if (ntries >= HConstants.RETRY_BACKOFF.length) { 097 ntries = HConstants.RETRY_BACKOFF.length - 1; 098 } 099 if (ntries < 0) { 100 ntries = 0; 101 } 102 103 long normalPause = pause * HConstants.RETRY_BACKOFF[ntries]; 104 // 1% possible jitter 105 long jitter = (long) (normalPause * ThreadLocalRandom.current().nextFloat() * 0.01f); 106 return normalPause + jitter; 107 } 108 109 /** 110 * Changes the configuration to set the number of retries needed when using Connection internally, 111 * e.g. for updating catalog tables, etc. Call this method before we create any Connections. 112 * @param c The Configuration instance to set the retries into. 113 * @param log Used to log what we set in here. 114 */ 115 public static void setServerSideHConnectionRetriesConfig(final Configuration c, final String sn, 116 final Logger log) { 117 // TODO: Fix this. Not all connections from server side should have 10 times the retries. 118 int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 119 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 120 // Go big. Multiply by 10. If we can't get to meta after this many retries 121 // then something seriously wrong. 122 int serversideMultiplier = c.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 123 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 124 int retries = hcRetries * serversideMultiplier; 125 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 126 log.info(sn + " server-side Connection retries=" + retries); 127 } 128 129 /** 130 * Get a unique key for the rpc stub to the given server. 131 */ 132 static String getStubKey(String serviceName, ServerName serverName) { 133 return String.format("%s@%s", serviceName, serverName); 134 } 135 136 /** 137 * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE]. 138 */ 139 static int retries2Attempts(int retries) { 140 return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1); 141 } 142 143 static void checkHasFamilies(Mutation mutation) { 144 Preconditions.checkArgument(mutation.numFamilies() > 0, 145 "Invalid arguments to %s, zero columns specified", mutation.toString()); 146 } 147 148 /** Dummy nonce generator for disabled nonces. */ 149 static final NonceGenerator NO_NONCE_GENERATOR = new NonceGenerator() { 150 151 @Override 152 public long newNonce() { 153 return HConstants.NO_NONCE; 154 } 155 156 @Override 157 public long getNonceGroup() { 158 return HConstants.NO_NONCE; 159 } 160 }; 161 162 // A byte array in which all elements are the max byte, and it is used to 163 // construct closest front row 164 static final byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); 165 166 /** 167 * Create the closest row after the specified row 168 */ 169 static byte[] createClosestRowAfter(byte[] row) { 170 return Arrays.copyOf(row, row.length + 1); 171 } 172 173 /** 174 * Create a row before the specified row and very close to the specified row. 175 */ 176 static byte[] createCloseRowBefore(byte[] row) { 177 if (row.length == 0) { 178 return MAX_BYTE_ARRAY; 179 } 180 if (row[row.length - 1] == 0) { 181 return Arrays.copyOf(row, row.length - 1); 182 } else { 183 byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length]; 184 System.arraycopy(row, 0, nextRow, 0, row.length - 1); 185 nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1); 186 System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length); 187 return nextRow; 188 } 189 } 190 191 static boolean isEmptyStartRow(byte[] row) { 192 return Bytes.equals(row, EMPTY_START_ROW); 193 } 194 195 static boolean isEmptyStopRow(byte[] row) { 196 return Bytes.equals(row, EMPTY_END_ROW); 197 } 198 199 static void resetController(HBaseRpcController controller, long timeoutNs, int priority) { 200 controller.reset(); 201 if (timeoutNs >= 0) { 202 controller.setCallTimeout( 203 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); 204 } 205 controller.setPriority(priority); 206 } 207 208 static Throwable translateException(Throwable t) { 209 if (t instanceof UndeclaredThrowableException && t.getCause() != null) { 210 t = t.getCause(); 211 } 212 if (t instanceof RemoteException) { 213 t = ((RemoteException) t).unwrapRemoteException(); 214 } 215 if (t instanceof ServiceException && t.getCause() != null) { 216 t = translateException(t.getCause()); 217 } 218 return t; 219 } 220 221 static long calcEstimatedSize(Result rs) { 222 long estimatedHeapSizeOfResult = 0; 223 // We don't make Iterator here 224 for (Cell cell : rs.rawCells()) { 225 estimatedHeapSizeOfResult += cell.heapSize(); 226 } 227 return estimatedHeapSizeOfResult; 228 } 229 230 static Result filterCells(Result result, Cell keepCellsAfter) { 231 if (keepCellsAfter == null) { 232 // do not need to filter 233 return result; 234 } 235 // not the same row 236 if (!PrivateCellUtil.matchingRows(keepCellsAfter, result.getRow(), 0, result.getRow().length)) { 237 return result; 238 } 239 Cell[] rawCells = result.rawCells(); 240 int index = Arrays.binarySearch(rawCells, keepCellsAfter, 241 CellComparator.getInstance()::compareWithoutRow); 242 if (index < 0) { 243 index = -index - 1; 244 } else { 245 index++; 246 } 247 if (index == 0) { 248 return result; 249 } 250 if (index == rawCells.length) { 251 return null; 252 } 253 return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, 254 result.isStale(), result.mayHaveMoreCellsInRow()); 255 } 256 257 // Add a delta to avoid timeout immediately after a retry sleeping. 258 static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); 259 260 static Get toCheckExistenceOnly(Get get) { 261 if (get.isCheckExistenceOnly()) { 262 return get; 263 } 264 return ReflectionUtils.newInstance(get.getClass(), get).setCheckExistenceOnly(true); 265 } 266 267 static List<Get> toCheckExistenceOnly(List<Get> gets) { 268 return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList()); 269 } 270 271 static RegionLocateType getLocateType(Scan scan) { 272 if (scan.isReversed()) { 273 if (isEmptyStartRow(scan.getStartRow())) { 274 return RegionLocateType.BEFORE; 275 } else { 276 return scan.includeStartRow() ? RegionLocateType.CURRENT : RegionLocateType.BEFORE; 277 } 278 } else { 279 return scan.includeStartRow() ? RegionLocateType.CURRENT : RegionLocateType.AFTER; 280 } 281 } 282 283 static boolean noMoreResultsForScan(Scan scan, RegionInfo info) { 284 if (isEmptyStopRow(info.getEndKey())) { 285 return true; 286 } 287 if (isEmptyStopRow(scan.getStopRow())) { 288 return false; 289 } 290 int c = Bytes.compareTo(info.getEndKey(), scan.getStopRow()); 291 // 1. if our stop row is less than the endKey of the region 292 // 2. if our stop row is equal to the endKey of the region and we do not include the stop row 293 // for scan. 294 return c > 0 || (c == 0 && !scan.includeStopRow()); 295 } 296 297 static boolean noMoreResultsForReverseScan(Scan scan, RegionInfo info) { 298 if (isEmptyStartRow(info.getStartKey())) { 299 return true; 300 } 301 if (isEmptyStopRow(scan.getStopRow())) { 302 return false; 303 } 304 // no need to test the inclusive of the stop row as the start key of a region is included in 305 // the region. 306 return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; 307 } 308 309 public static ScanResultCache createScanResultCache(Scan scan) { 310 if (scan.getAllowPartialResults()) { 311 return new AllowPartialScanResultCache(); 312 } else if (scan.getBatch() > 0) { 313 return new BatchScanResultCache(scan.getBatch()); 314 } else { 315 return new CompleteScanResultCache(); 316 } 317 } 318 319 private static final String MY_ADDRESS = getMyAddress(); 320 321 private static String getMyAddress() { 322 try { 323 return DNS.getDefaultHost("default", "default"); 324 } catch (UnknownHostException uhe) { 325 LOG.error("cannot determine my address", uhe); 326 return null; 327 } 328 } 329 330 static boolean isRemote(String host) { 331 return !host.equalsIgnoreCase(MY_ADDRESS); 332 } 333 334 static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { 335 if (scanMetrics == null) { 336 return; 337 } 338 scanMetrics.countOfRPCcalls.incrementAndGet(); 339 if (isRegionServerRemote) { 340 scanMetrics.countOfRemoteRPCcalls.incrementAndGet(); 341 } 342 } 343 344 static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { 345 if (scanMetrics == null) { 346 return; 347 } 348 scanMetrics.countOfRPCRetries.incrementAndGet(); 349 if (isRegionServerRemote) { 350 scanMetrics.countOfRemoteRPCRetries.incrementAndGet(); 351 } 352 } 353 354 static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs, 355 boolean isRegionServerRemote) { 356 if (scanMetrics == null || rrs == null || rrs.length == 0) { 357 return; 358 } 359 long resultSize = 0; 360 for (Result rr : rrs) { 361 for (Cell cell : rr.rawCells()) { 362 resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell); 363 } 364 } 365 scanMetrics.countOfBytesInResults.addAndGet(resultSize); 366 if (isRegionServerRemote) { 367 scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize); 368 } 369 } 370 371 /** 372 * Use the scan metrics returned by the server to add to the identically named counters in the 373 * client side metrics. If a counter does not exist with the same name as the server side metric, 374 * the attempt to increase the counter will fail. 375 */ 376 static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) { 377 if (scanMetrics == null || response == null || !response.hasScanMetrics()) { 378 return; 379 } 380 ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter); 381 } 382 383 static void incRegionCountMetrics(ScanMetrics scanMetrics) { 384 if (scanMetrics == null) { 385 return; 386 } 387 scanMetrics.countOfRegions.incrementAndGet(); 388 } 389 390 /** 391 * Connect the two futures, if the src future is done, then mark the dst future as done. And if 392 * the dst future is done, then cancel the src future. This is used for timeline consistent read. 393 * <p/> 394 * Pass empty metrics if you want to link the primary future and the dst future so we will not 395 * increase the hedge read related metrics. 396 */ 397 private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture, 398 Optional<MetricsConnection> metrics) { 399 addListener(srcFuture, (r, e) -> { 400 if (e != null) { 401 dstFuture.completeExceptionally(e); 402 } else { 403 if (dstFuture.complete(r)) { 404 metrics.ifPresent(MetricsConnection::incrHedgedReadWin); 405 } 406 } 407 }); 408 // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture. 409 // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst 410 // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in 411 // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the 412 // tie. 413 addListener(dstFuture, (r, e) -> srcFuture.cancel(false)); 414 } 415 416 private static <T> void sendRequestsToSecondaryReplicas( 417 Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs, 418 CompletableFuture<T> future, Optional<MetricsConnection> metrics) { 419 if (future.isDone()) { 420 // do not send requests to secondary replicas if the future is done, i.e, the primary request 421 // has already been finished. 422 return; 423 } 424 for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) { 425 CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId); 426 metrics.ifPresent(MetricsConnection::incrHedgedReadOps); 427 connect(secondaryFuture, future, metrics); 428 } 429 } 430 431 static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator, 432 TableName tableName, Query query, byte[] row, RegionLocateType locateType, 433 Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs, 434 long primaryCallTimeoutNs, Timer retryTimer, Optional<MetricsConnection> metrics) { 435 if (query.getConsistency() != Consistency.TIMELINE) { 436 return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); 437 } 438 // user specifies a replica id explicitly, just send request to the specific replica 439 if (query.getReplicaId() >= 0) { 440 return requestReplica.apply(query.getReplicaId()); 441 } 442 // Timeline consistent read, where we may send requests to other region replicas 443 CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); 444 CompletableFuture<T> future = new CompletableFuture<>(); 445 connect(primaryFuture, future, Optional.empty()); 446 long startNs = System.nanoTime(); 447 // after the getRegionLocations, all the locations for the replicas of this region should have 448 // been cached, so it is not big deal to locate them again when actually sending requests to 449 // these replicas. 450 addListener(locator.getRegionLocations(tableName, row, locateType, false, rpcTimeoutNs), 451 (locs, error) -> { 452 if (error != null) { 453 LOG.warn( 454 "Failed to locate all the replicas for table={}, row='{}', locateType={}" 455 + " give up timeline consistent read", 456 tableName, Bytes.toStringBinary(row), locateType, error); 457 return; 458 } 459 if (locs.size() <= 1) { 460 LOG.warn( 461 "There are no secondary replicas for region {}, give up timeline consistent read", 462 locs.getDefaultRegionLocation().getRegion()); 463 return; 464 } 465 long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs); 466 if (delayNs <= 0) { 467 sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics); 468 } else { 469 retryTimer.newTimeout( 470 timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics), 471 delayNs, TimeUnit.NANOSECONDS); 472 } 473 }); 474 return future; 475 } 476 477 // validate for well-formedness 478 static void validatePut(Put put, int maxKeyValueSize) { 479 if (put.isEmpty()) { 480 throw new IllegalArgumentException("No columns to insert"); 481 } 482 if (maxKeyValueSize > 0) { 483 for (List<Cell> list : put.getFamilyCellMap().values()) { 484 for (Cell cell : list) { 485 if (cell.getSerializedSize() > maxKeyValueSize) { 486 throw new IllegalArgumentException("KeyValue size too large"); 487 } 488 } 489 } 490 } 491 } 492 493 static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValueSize) { 494 for (Mutation mutation : rowMutations.getMutations()) { 495 if (mutation instanceof Put) { 496 validatePut((Put) mutation, maxKeyValueSize); 497 } 498 } 499 } 500 501 /** 502 * Select the priority for the rpc call. 503 * <p/> 504 * The rules are: 505 * <ol> 506 * <li>If user set a priority explicitly, then just use it.</li> 507 * <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li> 508 * <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li> 509 * </ol> 510 * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}. 511 * @param tableName the table we operate on 512 */ 513 static int calcPriority(int priority, TableName tableName) { 514 if (priority != HConstants.PRIORITY_UNSET) { 515 return priority; 516 } else { 517 return getPriority(tableName); 518 } 519 } 520 521 static int getPriority(TableName tableName) { 522 if (tableName.isSystemTable()) { 523 return HConstants.SYSTEMTABLE_QOS; 524 } else { 525 return HConstants.NORMAL_QOS; 526 } 527 } 528 529 static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef, 530 AtomicReference<CompletableFuture<T>> futureRef, boolean reload, 531 Supplier<CompletableFuture<T>> fetch, Predicate<T> validator, String type) { 532 for (;;) { 533 if (!reload) { 534 T value = cacheRef.get(); 535 if (value != null && validator.test(value)) { 536 return CompletableFuture.completedFuture(value); 537 } 538 } 539 LOG.trace("{} cache is null, try fetching from registry", type); 540 if (futureRef.compareAndSet(null, new CompletableFuture<>())) { 541 LOG.debug("Start fetching {} from registry", type); 542 CompletableFuture<T> future = futureRef.get(); 543 addListener(fetch.get(), (value, error) -> { 544 if (error != null) { 545 LOG.debug("Failed to fetch {} from registry", type, error); 546 futureRef.getAndSet(null).completeExceptionally(error); 547 return; 548 } 549 LOG.debug("The fetched {} is {}", type, value); 550 // Here we update cache before reset future, so it is possible that someone can get a 551 // stale value. Consider this: 552 // 1. update cacheRef 553 // 2. someone clears the cache and relocates again 554 // 3. the futureRef is not null so the old future is used. 555 // 4. we clear futureRef and complete the future in it with the value being 556 // cleared in step 2. 557 // But we do not think it is a big deal as it rarely happens, and even if it happens, the 558 // caller will retry again later, no correctness problems. 559 cacheRef.set(value); 560 futureRef.set(null); 561 future.complete(value); 562 }); 563 return future; 564 } else { 565 CompletableFuture<T> future = futureRef.get(); 566 if (future != null) { 567 return future; 568 } 569 } 570 } 571 } 572 573 static void updateStats(Optional<ServerStatisticTracker> optStats, 574 Optional<MetricsConnection> optMetrics, ServerName serverName, MultiResponse resp) { 575 if (!optStats.isPresent() && !optMetrics.isPresent()) { 576 // ServerStatisticTracker and MetricsConnection are both not present, just return 577 return; 578 } 579 resp.getResults().forEach((regionName, regionResult) -> { 580 ClientProtos.RegionLoadStats stat = regionResult.getStat(); 581 if (stat == null) { 582 if (LOG.isDebugEnabled()) { 583 LOG.debug("No ClientProtos.RegionLoadStats found for server={}, region={}", serverName, 584 Bytes.toStringBinary(regionName)); 585 } 586 return; 587 } 588 RegionLoadStats regionLoadStats = ProtobufUtil.createRegionLoadStats(stat); 589 optStats.ifPresent( 590 stats -> ResultStatsUtil.updateStats(stats, serverName, regionName, regionLoadStats)); 591 optMetrics.ifPresent( 592 metrics -> ResultStatsUtil.updateStats(metrics, serverName, regionName, regionLoadStats)); 593 }); 594 } 595 596 @FunctionalInterface 597 interface Converter<D, I, S> { 598 D convert(I info, S src) throws IOException; 599 } 600 601 @FunctionalInterface 602 interface RpcCall<RESP, REQ> { 603 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 604 RpcCallback<RESP> done); 605 } 606 607 static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 608 HRegionLocation loc, ClientService.Interface stub, REQ req, 609 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 610 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 611 CompletableFuture<RESP> future = new CompletableFuture<>(); 612 try { 613 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 614 new RpcCallback<PRESP>() { 615 616 @Override 617 public void run(PRESP resp) { 618 if (controller.failed()) { 619 future.completeExceptionally(controller.getFailed()); 620 } else { 621 try { 622 future.complete(respConverter.convert(controller, resp)); 623 } catch (IOException e) { 624 future.completeExceptionally(e); 625 } 626 } 627 } 628 }); 629 } catch (IOException e) { 630 future.completeExceptionally(e); 631 } 632 return future; 633 } 634 635 static void shutdownPool(ExecutorService pool) { 636 pool.shutdown(); 637 try { 638 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 639 pool.shutdownNow(); 640 } 641 } catch (InterruptedException e) { 642 pool.shutdownNow(); 643 } 644 } 645 646 static void setCoprocessorError(RpcController controller, Throwable error) { 647 if (controller == null) { 648 return; 649 } 650 if (controller instanceof ServerRpcController) { 651 if (error instanceof IOException) { 652 ((ServerRpcController) controller).setFailedOn((IOException) error); 653 } else { 654 ((ServerRpcController) controller).setFailedOn(new IOException(error)); 655 } 656 } else if (controller instanceof ClientCoprocessorRpcController) { 657 ((ClientCoprocessorRpcController) controller).setFailed(error); 658 } else { 659 controller.setFailed(error.toString()); 660 } 661 } 662}