001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 022import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; 023import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 024 025import java.io.IOException; 026import java.lang.reflect.UndeclaredThrowableException; 027import java.net.UnknownHostException; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Optional; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicReference; 036import java.util.function.Function; 037import java.util.function.Predicate; 038import java.util.function.Supplier; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellComparator; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.HRegionLocation; 044import org.apache.hadoop.hbase.PrivateCellUtil; 045import org.apache.hadoop.hbase.RegionLocations; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 049import org.apache.hadoop.hbase.ipc.HBaseRpcController; 050import org.apache.hadoop.hbase.ipc.ServerRpcController; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.ReflectionUtils; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.hadoop.net.DNS; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 061import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 062import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 063import org.apache.hbase.thirdparty.io.netty.util.Timer; 064 065import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 066import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 070 071/** 072 * Utility used by client connections. 073 */ 074@InterfaceAudience.Private 075public final class ConnectionUtils { 076 077 private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class); 078 079 /** 080 * Key for configuration in Configuration whose value is the class we implement making a new 081 * Connection instance. 082 */ 083 public static final String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl"; 084 085 private ConnectionUtils() { 086 } 087 088 /** 089 * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}. 090 * @param pause time to pause 091 * @param tries amount of tries 092 * @return How long to wait after <code>tries</code> retries 093 */ 094 public static long getPauseTime(final long pause, final int tries) { 095 int ntries = tries; 096 if (ntries >= HConstants.RETRY_BACKOFF.length) { 097 ntries = HConstants.RETRY_BACKOFF.length - 1; 098 } 099 if (ntries < 0) { 100 ntries = 0; 101 } 102 103 long normalPause = pause * HConstants.RETRY_BACKOFF[ntries]; 104 // 1% possible jitter 105 long jitter = (long) (normalPause * ThreadLocalRandom.current().nextFloat() * 0.01f); 106 return normalPause + jitter; 107 } 108 109 /** 110 * Changes the configuration to set the number of retries needed when using Connection internally, 111 * e.g. for updating catalog tables, etc. Call this method before we create any Connections. 112 * @param c The Configuration instance to set the retries into. 113 * @param log Used to log what we set in here. 114 */ 115 public static void setServerSideHConnectionRetriesConfig(final Configuration c, final String sn, 116 final Logger log) { 117 // TODO: Fix this. Not all connections from server side should have 10 times the retries. 118 int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 119 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 120 // Go big. Multiply by 10. If we can't get to meta after this many retries 121 // then something seriously wrong. 122 int serversideMultiplier = c.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 123 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 124 int retries = hcRetries * serversideMultiplier; 125 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 126 log.info(sn + " server-side Connection retries=" + retries); 127 } 128 129 /** 130 * Get a unique key for the rpc stub to the given server. 131 */ 132 static String getStubKey(String serviceName, ServerName serverName) { 133 return String.format("%s@%s", serviceName, serverName); 134 } 135 136 /** 137 * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE]. 138 */ 139 static int retries2Attempts(int retries) { 140 return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1); 141 } 142 143 static void checkHasFamilies(Mutation mutation) { 144 Preconditions.checkArgument(mutation.numFamilies() > 0, 145 "Invalid arguments to %s, zero columns specified", mutation.toString()); 146 } 147 148 /** Dummy nonce generator for disabled nonces. */ 149 static final NonceGenerator NO_NONCE_GENERATOR = new NonceGenerator() { 150 151 @Override 152 public long newNonce() { 153 return HConstants.NO_NONCE; 154 } 155 156 @Override 157 public long getNonceGroup() { 158 return HConstants.NO_NONCE; 159 } 160 }; 161 162 // A byte array in which all elements are the max byte, and it is used to 163 // construct closest front row 164 static final byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); 165 166 /** 167 * Create the closest row after the specified row 168 */ 169 static byte[] createClosestRowAfter(byte[] row) { 170 return Arrays.copyOf(row, row.length + 1); 171 } 172 173 /** 174 * Create a row before the specified row and very close to the specified row. 175 */ 176 static byte[] createCloseRowBefore(byte[] row) { 177 if (row.length == 0) { 178 return MAX_BYTE_ARRAY; 179 } 180 if (row[row.length - 1] == 0) { 181 return Arrays.copyOf(row, row.length - 1); 182 } else { 183 byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length]; 184 System.arraycopy(row, 0, nextRow, 0, row.length - 1); 185 nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1); 186 System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length); 187 return nextRow; 188 } 189 } 190 191 static boolean isEmptyStartRow(byte[] row) { 192 return Bytes.equals(row, EMPTY_START_ROW); 193 } 194 195 static boolean isEmptyStopRow(byte[] row) { 196 return Bytes.equals(row, EMPTY_END_ROW); 197 } 198 199 static void resetController(HBaseRpcController controller, long timeoutNs, int priority) { 200 controller.reset(); 201 if (timeoutNs >= 0) { 202 controller.setCallTimeout( 203 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); 204 } 205 controller.setPriority(priority); 206 } 207 208 static Throwable translateException(Throwable t) { 209 if (t instanceof UndeclaredThrowableException && t.getCause() != null) { 210 t = t.getCause(); 211 } 212 if (t instanceof RemoteException) { 213 t = ((RemoteException) t).unwrapRemoteException(); 214 } 215 if (t instanceof ServiceException && t.getCause() != null) { 216 t = translateException(t.getCause()); 217 } 218 return t; 219 } 220 221 static long calcEstimatedSize(Result rs) { 222 long estimatedHeapSizeOfResult = 0; 223 // We don't make Iterator here 224 for (Cell cell : rs.rawCells()) { 225 estimatedHeapSizeOfResult += cell.heapSize(); 226 } 227 return estimatedHeapSizeOfResult; 228 } 229 230 static Result filterCells(Result result, Cell keepCellsAfter) { 231 if (keepCellsAfter == null) { 232 // do not need to filter 233 return result; 234 } 235 // not the same row 236 if (!PrivateCellUtil.matchingRows(keepCellsAfter, result.getRow(), 0, result.getRow().length)) { 237 return result; 238 } 239 Cell[] rawCells = result.rawCells(); 240 int index = Arrays.binarySearch(rawCells, keepCellsAfter, 241 CellComparator.getInstance()::compareWithoutRow); 242 if (index < 0) { 243 index = -index - 1; 244 } else { 245 index++; 246 } 247 if (index == 0) { 248 return result; 249 } 250 if (index == rawCells.length) { 251 return null; 252 } 253 return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, 254 result.isStale(), result.mayHaveMoreCellsInRow()); 255 } 256 257 // Add a delta to avoid timeout immediately after a retry sleeping. 258 static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); 259 260 static Get toCheckExistenceOnly(Get get) { 261 if (get.isCheckExistenceOnly()) { 262 return get; 263 } 264 return ReflectionUtils.newInstance(get.getClass(), get).setCheckExistenceOnly(true); 265 } 266 267 static List<Get> toCheckExistenceOnly(List<Get> gets) { 268 return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList()); 269 } 270 271 static RegionLocateType getLocateType(Scan scan) { 272 if (scan.isReversed()) { 273 if (isEmptyStartRow(scan.getStartRow())) { 274 return RegionLocateType.BEFORE; 275 } else { 276 return scan.includeStartRow() ? RegionLocateType.CURRENT : RegionLocateType.BEFORE; 277 } 278 } else { 279 return scan.includeStartRow() ? RegionLocateType.CURRENT : RegionLocateType.AFTER; 280 } 281 } 282 283 static boolean noMoreResultsForScan(Scan scan, RegionInfo info) { 284 if (isEmptyStopRow(info.getEndKey())) { 285 return true; 286 } 287 if (isEmptyStopRow(scan.getStopRow())) { 288 return false; 289 } 290 int c = Bytes.compareTo(info.getEndKey(), scan.getStopRow()); 291 // 1. if our stop row is less than the endKey of the region 292 // 2. if our stop row is equal to the endKey of the region and we do not include the stop row 293 // for scan. 294 return c > 0 || (c == 0 && !scan.includeStopRow()); 295 } 296 297 static boolean noMoreResultsForReverseScan(Scan scan, RegionInfo info) { 298 if (isEmptyStartRow(info.getStartKey())) { 299 return true; 300 } 301 if (isEmptyStopRow(scan.getStopRow())) { 302 return false; 303 } 304 // no need to test the inclusive of the stop row as the start key of a region is included in 305 // the region. 306 return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; 307 } 308 309 static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) { 310 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 311 .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList())); 312 } 313 314 public static ScanResultCache createScanResultCache(Scan scan) { 315 if (scan.getAllowPartialResults()) { 316 return new AllowPartialScanResultCache(); 317 } else if (scan.getBatch() > 0) { 318 return new BatchScanResultCache(scan.getBatch()); 319 } else { 320 return new CompleteScanResultCache(); 321 } 322 } 323 324 private static final String MY_ADDRESS = getMyAddress(); 325 326 private static String getMyAddress() { 327 try { 328 return DNS.getDefaultHost("default", "default"); 329 } catch (UnknownHostException uhe) { 330 LOG.error("cannot determine my address", uhe); 331 return null; 332 } 333 } 334 335 static boolean isRemote(String host) { 336 return !host.equalsIgnoreCase(MY_ADDRESS); 337 } 338 339 static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { 340 if (scanMetrics == null) { 341 return; 342 } 343 scanMetrics.countOfRPCcalls.incrementAndGet(); 344 if (isRegionServerRemote) { 345 scanMetrics.countOfRemoteRPCcalls.incrementAndGet(); 346 } 347 } 348 349 static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { 350 if (scanMetrics == null) { 351 return; 352 } 353 scanMetrics.countOfRPCRetries.incrementAndGet(); 354 if (isRegionServerRemote) { 355 scanMetrics.countOfRemoteRPCRetries.incrementAndGet(); 356 } 357 } 358 359 static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs, 360 boolean isRegionServerRemote) { 361 if (scanMetrics == null || rrs == null || rrs.length == 0) { 362 return; 363 } 364 long resultSize = 0; 365 for (Result rr : rrs) { 366 for (Cell cell : rr.rawCells()) { 367 resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell); 368 } 369 } 370 scanMetrics.countOfBytesInResults.addAndGet(resultSize); 371 if (isRegionServerRemote) { 372 scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize); 373 } 374 } 375 376 /** 377 * Use the scan metrics returned by the server to add to the identically named counters in the 378 * client side metrics. If a counter does not exist with the same name as the server side metric, 379 * the attempt to increase the counter will fail. 380 */ 381 static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) { 382 if (scanMetrics == null || response == null || !response.hasScanMetrics()) { 383 return; 384 } 385 ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter); 386 } 387 388 static void incRegionCountMetrics(ScanMetrics scanMetrics) { 389 if (scanMetrics == null) { 390 return; 391 } 392 scanMetrics.countOfRegions.incrementAndGet(); 393 } 394 395 /** 396 * Connect the two futures, if the src future is done, then mark the dst future as done. And if 397 * the dst future is done, then cancel the src future. This is used for timeline consistent read. 398 * <p/> 399 * Pass empty metrics if you want to link the primary future and the dst future so we will not 400 * increase the hedge read related metrics. 401 */ 402 private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture, 403 Optional<MetricsConnection> metrics) { 404 addListener(srcFuture, (r, e) -> { 405 if (e != null) { 406 dstFuture.completeExceptionally(e); 407 } else { 408 if (dstFuture.complete(r)) { 409 metrics.ifPresent(MetricsConnection::incrHedgedReadWin); 410 } 411 } 412 }); 413 // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture. 414 // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst 415 // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in 416 // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the 417 // tie. 418 addListener(dstFuture, (r, e) -> srcFuture.cancel(false)); 419 } 420 421 private static <T> void sendRequestsToSecondaryReplicas( 422 Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs, 423 CompletableFuture<T> future, Optional<MetricsConnection> metrics) { 424 if (future.isDone()) { 425 // do not send requests to secondary replicas if the future is done, i.e, the primary request 426 // has already been finished. 427 return; 428 } 429 for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) { 430 CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId); 431 metrics.ifPresent(MetricsConnection::incrHedgedReadOps); 432 connect(secondaryFuture, future, metrics); 433 } 434 } 435 436 static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator, 437 TableName tableName, Query query, byte[] row, RegionLocateType locateType, 438 Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs, 439 long primaryCallTimeoutNs, Timer retryTimer, Optional<MetricsConnection> metrics) { 440 if (query.getConsistency() != Consistency.TIMELINE) { 441 return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); 442 } 443 // user specifies a replica id explicitly, just send request to the specific replica 444 if (query.getReplicaId() >= 0) { 445 return requestReplica.apply(query.getReplicaId()); 446 } 447 // Timeline consistent read, where we may send requests to other region replicas 448 CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); 449 CompletableFuture<T> future = new CompletableFuture<>(); 450 connect(primaryFuture, future, Optional.empty()); 451 long startNs = System.nanoTime(); 452 // after the getRegionLocations, all the locations for the replicas of this region should have 453 // been cached, so it is not big deal to locate them again when actually sending requests to 454 // these replicas. 455 addListener(locator.getRegionLocations(tableName, row, locateType, false, rpcTimeoutNs), 456 (locs, error) -> { 457 if (error != null) { 458 LOG.warn( 459 "Failed to locate all the replicas for table={}, row='{}', locateType={}" 460 + " give up timeline consistent read", 461 tableName, Bytes.toStringBinary(row), locateType, error); 462 return; 463 } 464 if (locs.size() <= 1) { 465 LOG.warn( 466 "There are no secondary replicas for region {}, give up timeline consistent read", 467 locs.getDefaultRegionLocation().getRegion()); 468 return; 469 } 470 long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs); 471 if (delayNs <= 0) { 472 sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics); 473 } else { 474 retryTimer.newTimeout( 475 timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics), 476 delayNs, TimeUnit.NANOSECONDS); 477 } 478 }); 479 return future; 480 } 481 482 // validate for well-formedness 483 static void validatePut(Put put, int maxKeyValueSize) { 484 if (put.isEmpty()) { 485 throw new IllegalArgumentException("No columns to insert"); 486 } 487 if (maxKeyValueSize > 0) { 488 for (List<Cell> list : put.getFamilyCellMap().values()) { 489 for (Cell cell : list) { 490 if (cell.getSerializedSize() > maxKeyValueSize) { 491 throw new IllegalArgumentException("KeyValue size too large"); 492 } 493 } 494 } 495 } 496 } 497 498 static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValueSize) { 499 for (Mutation mutation : rowMutations.getMutations()) { 500 if (mutation instanceof Put) { 501 validatePut((Put) mutation, maxKeyValueSize); 502 } 503 } 504 } 505 506 /** 507 * Select the priority for the rpc call. 508 * <p/> 509 * The rules are: 510 * <ol> 511 * <li>If user set a priority explicitly, then just use it.</li> 512 * <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li> 513 * <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li> 514 * </ol> 515 * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}. 516 * @param tableName the table we operate on 517 */ 518 static int calcPriority(int priority, TableName tableName) { 519 if (priority != HConstants.PRIORITY_UNSET) { 520 return priority; 521 } else { 522 return getPriority(tableName); 523 } 524 } 525 526 static int getPriority(TableName tableName) { 527 if (tableName.isSystemTable()) { 528 return HConstants.SYSTEMTABLE_QOS; 529 } else { 530 return HConstants.NORMAL_QOS; 531 } 532 } 533 534 static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef, 535 AtomicReference<CompletableFuture<T>> futureRef, boolean reload, 536 Supplier<CompletableFuture<T>> fetch, Predicate<T> validator, String type) { 537 for (;;) { 538 if (!reload) { 539 T value = cacheRef.get(); 540 if (value != null && validator.test(value)) { 541 return CompletableFuture.completedFuture(value); 542 } 543 } 544 LOG.trace("{} cache is null, try fetching from registry", type); 545 if (futureRef.compareAndSet(null, new CompletableFuture<>())) { 546 LOG.debug("Start fetching {} from registry", type); 547 CompletableFuture<T> future = futureRef.get(); 548 addListener(fetch.get(), (value, error) -> { 549 if (error != null) { 550 LOG.debug("Failed to fetch {} from registry", type, error); 551 futureRef.getAndSet(null).completeExceptionally(error); 552 return; 553 } 554 LOG.debug("The fetched {} is {}", type, value); 555 // Here we update cache before reset future, so it is possible that someone can get a 556 // stale value. Consider this: 557 // 1. update cacheRef 558 // 2. someone clears the cache and relocates again 559 // 3. the futureRef is not null so the old future is used. 560 // 4. we clear futureRef and complete the future in it with the value being 561 // cleared in step 2. 562 // But we do not think it is a big deal as it rarely happens, and even if it happens, the 563 // caller will retry again later, no correctness problems. 564 cacheRef.set(value); 565 futureRef.set(null); 566 future.complete(value); 567 }); 568 return future; 569 } else { 570 CompletableFuture<T> future = futureRef.get(); 571 if (future != null) { 572 return future; 573 } 574 } 575 } 576 } 577 578 static void updateStats(Optional<ServerStatisticTracker> optStats, 579 Optional<MetricsConnection> optMetrics, ServerName serverName, MultiResponse resp) { 580 if (!optStats.isPresent() && !optMetrics.isPresent()) { 581 // ServerStatisticTracker and MetricsConnection are both not present, just return 582 return; 583 } 584 resp.getResults().forEach((regionName, regionResult) -> { 585 ClientProtos.RegionLoadStats stat = regionResult.getStat(); 586 if (stat == null) { 587 LOG.error("No ClientProtos.RegionLoadStats found for server={}, region={}", serverName, 588 Bytes.toStringBinary(regionName)); 589 return; 590 } 591 RegionLoadStats regionLoadStats = ProtobufUtil.createRegionLoadStats(stat); 592 optStats.ifPresent( 593 stats -> ResultStatsUtil.updateStats(stats, serverName, regionName, regionLoadStats)); 594 optMetrics.ifPresent( 595 metrics -> ResultStatsUtil.updateStats(metrics, serverName, regionName, regionLoadStats)); 596 }); 597 } 598 599 @FunctionalInterface 600 interface Converter<D, I, S> { 601 D convert(I info, S src) throws IOException; 602 } 603 604 @FunctionalInterface 605 interface RpcCall<RESP, REQ> { 606 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 607 RpcCallback<RESP> done); 608 } 609 610 static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 611 HRegionLocation loc, ClientService.Interface stub, REQ req, 612 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 613 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 614 CompletableFuture<RESP> future = new CompletableFuture<>(); 615 try { 616 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 617 new RpcCallback<PRESP>() { 618 619 @Override 620 public void run(PRESP resp) { 621 if (controller.failed()) { 622 future.completeExceptionally(controller.getFailed()); 623 } else { 624 try { 625 future.complete(respConverter.convert(controller, resp)); 626 } catch (IOException e) { 627 future.completeExceptionally(e); 628 } 629 } 630 } 631 }); 632 } catch (IOException e) { 633 future.completeExceptionally(e); 634 } 635 return future; 636 } 637 638 static void shutdownPool(ExecutorService pool) { 639 pool.shutdown(); 640 try { 641 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 642 pool.shutdownNow(); 643 } 644 } catch (InterruptedException e) { 645 pool.shutdownNow(); 646 } 647 } 648 649 static void setCoprocessorError(RpcController controller, Throwable error) { 650 if (controller == null) { 651 return; 652 } 653 if (controller instanceof ServerRpcController) { 654 if (error instanceof IOException) { 655 ((ServerRpcController) controller).setFailedOn((IOException) error); 656 } else { 657 ((ServerRpcController) controller).setFailedOn(new IOException(error)); 658 } 659 } else if (controller instanceof ClientCoprocessorRpcController) { 660 ((ClientCoprocessorRpcController) controller).setFailed(error); 661 } else { 662 controller.setFailed(error.toString()); 663 } 664 } 665}