001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 022import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; 023import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 024 025import java.io.IOException; 026import java.lang.reflect.UndeclaredThrowableException; 027import java.net.UnknownHostException; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Optional; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicReference; 036import java.util.function.Function; 037import java.util.function.Predicate; 038import java.util.function.Supplier; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellComparator; 042import org.apache.hadoop.hbase.ExtendedCell; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.HRegionLocation; 045import org.apache.hadoop.hbase.PrivateCellUtil; 046import org.apache.hadoop.hbase.RegionLocations; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 050import org.apache.hadoop.hbase.ipc.FatalConnectionException; 051import org.apache.hadoop.hbase.ipc.HBaseRpcController; 052import org.apache.hadoop.hbase.ipc.ServerRpcController; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.ReflectionUtils; 055import org.apache.hadoop.ipc.RemoteException; 056import org.apache.hadoop.net.DNS; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 062import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 063import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 064import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 065import org.apache.hbase.thirdparty.io.netty.util.Timer; 066 067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 068import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 072 073/** 074 * Utility used by client connections. 075 */ 076@InterfaceAudience.Private 077public final class ConnectionUtils { 078 079 private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class); 080 081 /** 082 * Key for configuration in Configuration whose value is the class we implement making a new 083 * Connection instance. 084 */ 085 public static final String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl"; 086 087 private ConnectionUtils() { 088 } 089 090 /** 091 * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}. 092 * @param pause time to pause 093 * @param tries amount of tries 094 * @return How long to wait after <code>tries</code> retries 095 */ 096 public static long getPauseTime(final long pause, final int tries) { 097 int ntries = tries; 098 if (ntries >= HConstants.RETRY_BACKOFF.length) { 099 ntries = HConstants.RETRY_BACKOFF.length - 1; 100 } 101 if (ntries < 0) { 102 ntries = 0; 103 } 104 105 long normalPause = pause * HConstants.RETRY_BACKOFF[ntries]; 106 // 1% possible jitter 107 long jitter = (long) (normalPause * ThreadLocalRandom.current().nextFloat() * 0.01f); 108 return normalPause + jitter; 109 } 110 111 /** 112 * Changes the configuration to set the number of retries needed when using Connection internally, 113 * e.g. for updating catalog tables, etc. Call this method before we create any Connections. 114 * @param c The Configuration instance to set the retries into. 115 * @param log Used to log what we set in here. 116 */ 117 public static void setServerSideHConnectionRetriesConfig(final Configuration c, final String sn, 118 final Logger log) { 119 // TODO: Fix this. Not all connections from server side should have 10 times the retries. 120 int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 121 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 122 // Go big. Multiply by 10. If we can't get to meta after this many retries 123 // then something seriously wrong. 124 int serversideMultiplier = c.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 125 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 126 int retries = hcRetries * serversideMultiplier; 127 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 128 log.info(sn + " server-side Connection retries=" + retries); 129 } 130 131 /** 132 * Get a unique key for the rpc stub to the given server. 133 */ 134 static String getStubKey(String serviceName, ServerName serverName) { 135 return String.format("%s@%s", serviceName, serverName); 136 } 137 138 /** 139 * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE]. 140 */ 141 static int retries2Attempts(int retries) { 142 return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1); 143 } 144 145 static void checkHasFamilies(Mutation mutation) { 146 Preconditions.checkArgument(mutation.numFamilies() > 0, 147 "Invalid arguments to %s, zero columns specified", mutation.toString()); 148 } 149 150 /** Dummy nonce generator for disabled nonces. */ 151 static final NonceGenerator NO_NONCE_GENERATOR = new NonceGenerator() { 152 153 @Override 154 public long newNonce() { 155 return HConstants.NO_NONCE; 156 } 157 158 @Override 159 public long getNonceGroup() { 160 return HConstants.NO_NONCE; 161 } 162 }; 163 164 // A byte array in which all elements are the max byte, and it is used to 165 // construct closest front row 166 static final byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); 167 168 /** 169 * Create the closest row after the specified row 170 */ 171 static byte[] createClosestRowAfter(byte[] row) { 172 return Arrays.copyOf(row, row.length + 1); 173 } 174 175 /** 176 * Create a row before the specified row and very close to the specified row. 177 */ 178 static byte[] createCloseRowBefore(byte[] row) { 179 if (row.length == 0) { 180 return MAX_BYTE_ARRAY; 181 } 182 if (row[row.length - 1] == 0) { 183 return Arrays.copyOf(row, row.length - 1); 184 } else { 185 byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length]; 186 System.arraycopy(row, 0, nextRow, 0, row.length - 1); 187 nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1); 188 System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length); 189 return nextRow; 190 } 191 } 192 193 static boolean isEmptyStartRow(byte[] row) { 194 return Bytes.equals(row, EMPTY_START_ROW); 195 } 196 197 static boolean isEmptyStopRow(byte[] row) { 198 return Bytes.equals(row, EMPTY_END_ROW); 199 } 200 201 static void resetController(HBaseRpcController controller, long timeoutNs, int priority, 202 TableName tableName) { 203 controller.reset(); 204 if (timeoutNs >= 0) { 205 controller.setCallTimeout( 206 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); 207 } 208 controller.setPriority(priority, tableName); 209 if (tableName != null) { 210 controller.setTableName(tableName); 211 } 212 } 213 214 static Throwable translateException(Throwable t) { 215 if (t instanceof UndeclaredThrowableException && t.getCause() != null) { 216 t = t.getCause(); 217 } 218 if (t instanceof RemoteException) { 219 t = ((RemoteException) t).unwrapRemoteException(); 220 } 221 if (t instanceof ServiceException && t.getCause() != null) { 222 t = translateException(t.getCause()); 223 } 224 return t; 225 } 226 227 static long calcEstimatedSize(Result rs) { 228 long estimatedHeapSizeOfResult = 0; 229 // We don't make Iterator here 230 for (Cell cell : rs.rawCells()) { 231 estimatedHeapSizeOfResult += cell.heapSize(); 232 } 233 return estimatedHeapSizeOfResult; 234 } 235 236 static Result filterCells(Result result, ExtendedCell keepCellsAfter) { 237 if (keepCellsAfter == null) { 238 // do not need to filter 239 return result; 240 } 241 // not the same row 242 if (!PrivateCellUtil.matchingRows(keepCellsAfter, result.getRow(), 0, result.getRow().length)) { 243 return result; 244 } 245 ExtendedCell[] rawCells = result.rawExtendedCells(); 246 int index = Arrays.binarySearch(rawCells, keepCellsAfter, 247 CellComparator.getInstance()::compareWithoutRow); 248 if (index < 0) { 249 index = -index - 1; 250 } else { 251 index++; 252 } 253 if (index == 0) { 254 return result; 255 } 256 if (index == rawCells.length) { 257 return null; 258 } 259 return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, 260 result.isStale(), result.mayHaveMoreCellsInRow()); 261 } 262 263 // Add a delta to avoid timeout immediately after a retry sleeping. 264 public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); 265 266 static Get toCheckExistenceOnly(Get get) { 267 if (get.isCheckExistenceOnly()) { 268 return get; 269 } 270 return ReflectionUtils.newInstance(get.getClass(), get).setCheckExistenceOnly(true); 271 } 272 273 static List<Get> toCheckExistenceOnly(List<Get> gets) { 274 return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList()); 275 } 276 277 static RegionLocateType getLocateType(Scan scan) { 278 if (scan.isReversed()) { 279 if (isEmptyStartRow(scan.getStartRow())) { 280 return RegionLocateType.BEFORE; 281 } else { 282 return scan.includeStartRow() ? RegionLocateType.CURRENT : RegionLocateType.BEFORE; 283 } 284 } else { 285 return scan.includeStartRow() ? RegionLocateType.CURRENT : RegionLocateType.AFTER; 286 } 287 } 288 289 static boolean noMoreResultsForScan(Scan scan, RegionInfo info) { 290 if (isEmptyStopRow(info.getEndKey())) { 291 return true; 292 } 293 if (isEmptyStopRow(scan.getStopRow())) { 294 return false; 295 } 296 int c = Bytes.compareTo(info.getEndKey(), scan.getStopRow()); 297 // 1. if our stop row is less than the endKey of the region 298 // 2. if our stop row is equal to the endKey of the region and we do not include the stop row 299 // for scan. 300 return c > 0 || (c == 0 && !scan.includeStopRow()); 301 } 302 303 static boolean noMoreResultsForReverseScan(Scan scan, RegionInfo info) { 304 if (isEmptyStartRow(info.getStartKey())) { 305 return true; 306 } 307 if (isEmptyStopRow(scan.getStopRow())) { 308 return false; 309 } 310 // no need to test the inclusive of the stop row as the start key of a region is included in 311 // the region. 312 return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; 313 } 314 315 public static ScanResultCache createScanResultCache(Scan scan) { 316 if (scan.getAllowPartialResults()) { 317 return new AllowPartialScanResultCache(); 318 } else if (scan.getBatch() > 0) { 319 return new BatchScanResultCache(scan.getBatch()); 320 } else { 321 return new CompleteScanResultCache(); 322 } 323 } 324 325 private static final String MY_ADDRESS = getMyAddress(); 326 327 private static String getMyAddress() { 328 try { 329 return DNS.getDefaultHost("default", "default"); 330 } catch (UnknownHostException uhe) { 331 LOG.error("cannot determine my address", uhe); 332 return null; 333 } 334 } 335 336 static boolean isRemote(String host) { 337 return !host.equalsIgnoreCase(MY_ADDRESS); 338 } 339 340 static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { 341 if (scanMetrics == null) { 342 return; 343 } 344 scanMetrics.addToCounter(ScanMetrics.RPC_CALLS_METRIC_NAME, 1); 345 if (isRegionServerRemote) { 346 scanMetrics.addToCounter(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME, 1); 347 } 348 } 349 350 static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { 351 if (scanMetrics == null) { 352 return; 353 } 354 scanMetrics.addToCounter(ScanMetrics.RPC_RETRIES_METRIC_NAME, 1); 355 if (isRegionServerRemote) { 356 scanMetrics.addToCounter(ScanMetrics.REMOTE_RPC_RETRIES_METRIC_NAME, 1); 357 } 358 } 359 360 static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs, 361 boolean isRegionServerRemote) { 362 if (scanMetrics == null || rrs == null || rrs.length == 0) { 363 return; 364 } 365 long resultSize = 0; 366 for (Result rr : rrs) { 367 for (Cell cell : rr.rawCells()) { 368 resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell); 369 } 370 } 371 scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME, resultSize); 372 if (isRegionServerRemote) { 373 scanMetrics.addToCounter(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME, resultSize); 374 } 375 } 376 377 /** 378 * Use the scan metrics returned by the server to add to the identically named counters in the 379 * client side metrics. If a counter does not exist with the same name as the server side metric, 380 * the attempt to increase the counter will fail. 381 */ 382 static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) { 383 if (scanMetrics == null || response == null || !response.hasScanMetrics()) { 384 return; 385 } 386 ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter); 387 } 388 389 static void incRegionCountMetrics(ScanMetrics scanMetrics) { 390 if (scanMetrics == null) { 391 return; 392 } 393 scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1); 394 } 395 396 static void incMillisBetweenNextsMetrics(ScanMetrics scanMetrics, long millis) { 397 if (scanMetrics == null) { 398 return; 399 } 400 scanMetrics.addToCounter(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME, millis); 401 } 402 403 /** 404 * Connect the two futures, if the src future is done, then mark the dst future as done. And if 405 * the dst future is done, then cancel the src future. This is used for timeline consistent read. 406 * <p/> 407 * Pass empty metrics if you want to link the primary future and the dst future so we will not 408 * increase the hedge read related metrics. 409 */ 410 private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture, 411 Optional<MetricsConnection> metrics) { 412 addListener(srcFuture, (r, e) -> { 413 if (e != null) { 414 dstFuture.completeExceptionally(e); 415 } else { 416 if (dstFuture.complete(r)) { 417 metrics.ifPresent(MetricsConnection::incrHedgedReadWin); 418 } 419 } 420 }); 421 // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture. 422 // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst 423 // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in 424 // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the 425 // tie. 426 addListener(dstFuture, (r, e) -> srcFuture.cancel(false)); 427 } 428 429 private static <T> void sendRequestsToSecondaryReplicas( 430 Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs, 431 CompletableFuture<T> future, Optional<MetricsConnection> metrics) { 432 if (future.isDone()) { 433 // do not send requests to secondary replicas if the future is done, i.e, the primary request 434 // has already been finished. 435 return; 436 } 437 for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) { 438 CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId); 439 metrics.ifPresent(MetricsConnection::incrHedgedReadOps); 440 connect(secondaryFuture, future, metrics); 441 } 442 } 443 444 static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator, 445 TableName tableName, Query query, byte[] row, RegionLocateType locateType, 446 Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs, 447 long primaryCallTimeoutNs, Timer retryTimer, Optional<MetricsConnection> metrics) { 448 if (query.getConsistency() != Consistency.TIMELINE) { 449 return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); 450 } 451 // user specifies a replica id explicitly, just send request to the specific replica 452 if (query.getReplicaId() >= 0) { 453 return requestReplica.apply(query.getReplicaId()); 454 } 455 // Timeline consistent read, where we may send requests to other region replicas 456 CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); 457 CompletableFuture<T> future = new CompletableFuture<>(); 458 connect(primaryFuture, future, Optional.empty()); 459 long startNs = System.nanoTime(); 460 // after the getRegionLocations, all the locations for the replicas of this region should have 461 // been cached, so it is not big deal to locate them again when actually sending requests to 462 // these replicas. 463 addListener(locator.getRegionLocations(tableName, row, locateType, false, rpcTimeoutNs), 464 (locs, error) -> { 465 if (error != null) { 466 LOG.warn( 467 "Failed to locate all the replicas for table={}, row='{}', locateType={}" 468 + " give up timeline consistent read", 469 tableName, Bytes.toStringBinary(row), locateType, error); 470 return; 471 } 472 if (locs.size() <= 1) { 473 LOG.warn( 474 "There are no secondary replicas for region {}, give up timeline consistent read", 475 locs.getDefaultRegionLocation().getRegion()); 476 return; 477 } 478 long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs); 479 if (delayNs <= 0) { 480 sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics); 481 } else { 482 retryTimer.newTimeout( 483 timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics), 484 delayNs, TimeUnit.NANOSECONDS); 485 } 486 }); 487 return future; 488 } 489 490 // Validate individual Mutation 491 static void validateMutation(Mutation mutation, int maxKeyValueSize) { 492 // Skip Delete 493 if (mutation instanceof Delete) return; 494 495 // 1. Check if empty 496 if (mutation.isEmpty()) { 497 throw new IllegalArgumentException( 498 "No columns to " + mutation.getClass().getSimpleName().toLowerCase()); 499 } 500 501 // 2. Check if size exceeds maxKeyValueSize 502 if (maxKeyValueSize > 0) { 503 for (List<Cell> list : mutation.getFamilyCellMap().values()) { 504 for (Cell cell : list) { 505 if (cell.getSerializedSize() > maxKeyValueSize) { 506 throw new IllegalArgumentException("KeyValue size too large"); 507 } 508 } 509 } 510 } 511 } 512 513 // Validate RowMutations 514 static void validateRowMutations(RowMutations rowMutations, int maxKeyValueSize) { 515 for (Mutation mutation : rowMutations.getMutations()) { 516 validateMutation(mutation, maxKeyValueSize); 517 } 518 } 519 520 static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef, 521 AtomicReference<CompletableFuture<T>> futureRef, boolean reload, 522 Supplier<CompletableFuture<T>> fetch, Predicate<T> validator, String type) { 523 for (;;) { 524 if (!reload) { 525 T value = cacheRef.get(); 526 if (value != null && validator.test(value)) { 527 return CompletableFuture.completedFuture(value); 528 } 529 } 530 LOG.trace("{} cache is null, try fetching from registry", type); 531 if (futureRef.compareAndSet(null, new CompletableFuture<>())) { 532 LOG.debug("Start fetching {} from registry", type); 533 CompletableFuture<T> future = futureRef.get(); 534 addListener(fetch.get(), (value, error) -> { 535 if (error != null) { 536 LOG.debug("Failed to fetch {} from registry", type, error); 537 futureRef.getAndSet(null).completeExceptionally(error); 538 return; 539 } 540 LOG.debug("The fetched {} is {}", type, value); 541 // Here we update cache before reset future, so it is possible that someone can get a 542 // stale value. Consider this: 543 // 1. update cacheRef 544 // 2. someone clears the cache and relocates again 545 // 3. the futureRef is not null so the old future is used. 546 // 4. we clear futureRef and complete the future in it with the value being 547 // cleared in step 2. 548 // But we do not think it is a big deal as it rarely happens, and even if it happens, the 549 // caller will retry again later, no correctness problems. 550 cacheRef.set(value); 551 futureRef.set(null); 552 future.complete(value); 553 }); 554 return future; 555 } else { 556 CompletableFuture<T> future = futureRef.get(); 557 if (future != null) { 558 return future; 559 } 560 } 561 } 562 } 563 564 static void updateStats(Optional<ServerStatisticTracker> optStats, 565 Optional<MetricsConnection> optMetrics, ServerName serverName, MultiResponse resp) { 566 if (!optStats.isPresent() && !optMetrics.isPresent()) { 567 // ServerStatisticTracker and MetricsConnection are both not present, just return 568 return; 569 } 570 resp.getResults().forEach((regionName, regionResult) -> { 571 ClientProtos.RegionLoadStats stat = regionResult.getStat(); 572 if (stat == null) { 573 if (LOG.isDebugEnabled()) { 574 LOG.debug("No ClientProtos.RegionLoadStats found for server={}, region={}", serverName, 575 Bytes.toStringBinary(regionName)); 576 } 577 return; 578 } 579 RegionLoadStats regionLoadStats = ProtobufUtil.createRegionLoadStats(stat); 580 optStats.ifPresent( 581 stats -> ResultStatsUtil.updateStats(stats, serverName, regionName, regionLoadStats)); 582 optMetrics.ifPresent( 583 metrics -> ResultStatsUtil.updateStats(metrics, serverName, regionName, regionLoadStats)); 584 }); 585 } 586 587 @FunctionalInterface 588 interface Converter<D, I, S> { 589 D convert(I info, S src) throws IOException; 590 } 591 592 @FunctionalInterface 593 interface RpcCall<RESP, REQ> { 594 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 595 RpcCallback<RESP> done); 596 } 597 598 static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 599 HRegionLocation loc, ClientService.Interface stub, REQ req, 600 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 601 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 602 CompletableFuture<RESP> future = new CompletableFuture<>(); 603 try { 604 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 605 new RpcCallback<PRESP>() { 606 607 @Override 608 public void run(PRESP resp) { 609 if (controller.failed()) { 610 future.completeExceptionally(controller.getFailed()); 611 } else { 612 try { 613 future.complete(respConverter.convert(controller, resp)); 614 } catch (IOException e) { 615 future.completeExceptionally(e); 616 } 617 } 618 } 619 }); 620 } catch (IOException e) { 621 future.completeExceptionally(e); 622 } 623 return future; 624 } 625 626 static void shutdownPool(ExecutorService pool) { 627 pool.shutdown(); 628 try { 629 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 630 pool.shutdownNow(); 631 } 632 } catch (InterruptedException e) { 633 pool.shutdownNow(); 634 } 635 } 636 637 static void setCoprocessorError(RpcController controller, Throwable error) { 638 if (controller == null) { 639 return; 640 } 641 if (controller instanceof ServerRpcController) { 642 if (error instanceof IOException) { 643 ((ServerRpcController) controller).setFailedOn((IOException) error); 644 } else { 645 ((ServerRpcController) controller).setFailedOn(new IOException(error)); 646 } 647 } else if (controller instanceof ClientCoprocessorRpcController) { 648 ((ClientCoprocessorRpcController) controller).setFailed(error); 649 } else { 650 controller.setFailed(error.toString()); 651 } 652 } 653 654 public static boolean isUnexpectedPreambleHeaderException(IOException e) { 655 if (!(e instanceof RemoteException)) { 656 return false; 657 } 658 RemoteException re = (RemoteException) e; 659 return FatalConnectionException.class.getName().equals(re.getClassName()) 660 && re.getMessage().startsWith("Expected HEADER="); 661 } 662}