001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 022import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; 023import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 024 025import java.io.IOException; 026import java.lang.reflect.UndeclaredThrowableException; 027import java.net.UnknownHostException; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Optional; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicReference; 036import java.util.function.Function; 037import java.util.function.Predicate; 038import java.util.function.Supplier; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellComparator; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.HRegionLocation; 044import org.apache.hadoop.hbase.PrivateCellUtil; 045import org.apache.hadoop.hbase.RegionLocations; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 049import org.apache.hadoop.hbase.ipc.FatalConnectionException; 050import org.apache.hadoop.hbase.ipc.HBaseRpcController; 051import org.apache.hadoop.hbase.ipc.ServerRpcController; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.ReflectionUtils; 054import org.apache.hadoop.ipc.RemoteException; 055import org.apache.hadoop.net.DNS; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 061import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 062import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 063import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 064import org.apache.hbase.thirdparty.io.netty.util.Timer; 065 066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 067import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 071 072/** 073 * Utility used by client connections. 074 */ 075@InterfaceAudience.Private 076public final class ConnectionUtils { 077 078 private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class); 079 080 /** 081 * Key for configuration in Configuration whose value is the class we implement making a new 082 * Connection instance. 083 */ 084 public static final String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl"; 085 086 private ConnectionUtils() { 087 } 088 089 /** 090 * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}. 091 * @param pause time to pause 092 * @param tries amount of tries 093 * @return How long to wait after <code>tries</code> retries 094 */ 095 public static long getPauseTime(final long pause, final int tries) { 096 int ntries = tries; 097 if (ntries >= HConstants.RETRY_BACKOFF.length) { 098 ntries = HConstants.RETRY_BACKOFF.length - 1; 099 } 100 if (ntries < 0) { 101 ntries = 0; 102 } 103 104 long normalPause = pause * HConstants.RETRY_BACKOFF[ntries]; 105 // 1% possible jitter 106 long jitter = (long) (normalPause * ThreadLocalRandom.current().nextFloat() * 0.01f); 107 return normalPause + jitter; 108 } 109 110 /** 111 * Changes the configuration to set the number of retries needed when using Connection internally, 112 * e.g. for updating catalog tables, etc. Call this method before we create any Connections. 113 * @param c The Configuration instance to set the retries into. 114 * @param log Used to log what we set in here. 115 */ 116 public static void setServerSideHConnectionRetriesConfig(final Configuration c, final String sn, 117 final Logger log) { 118 // TODO: Fix this. Not all connections from server side should have 10 times the retries. 119 int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 120 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 121 // Go big. Multiply by 10. If we can't get to meta after this many retries 122 // then something seriously wrong. 123 int serversideMultiplier = c.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 124 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 125 int retries = hcRetries * serversideMultiplier; 126 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 127 log.info(sn + " server-side Connection retries=" + retries); 128 } 129 130 /** 131 * Get a unique key for the rpc stub to the given server. 132 */ 133 static String getStubKey(String serviceName, ServerName serverName) { 134 return String.format("%s@%s", serviceName, serverName); 135 } 136 137 /** 138 * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE]. 139 */ 140 static int retries2Attempts(int retries) { 141 return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1); 142 } 143 144 static void checkHasFamilies(Mutation mutation) { 145 Preconditions.checkArgument(mutation.numFamilies() > 0, 146 "Invalid arguments to %s, zero columns specified", mutation.toString()); 147 } 148 149 /** Dummy nonce generator for disabled nonces. */ 150 static final NonceGenerator NO_NONCE_GENERATOR = new NonceGenerator() { 151 152 @Override 153 public long newNonce() { 154 return HConstants.NO_NONCE; 155 } 156 157 @Override 158 public long getNonceGroup() { 159 return HConstants.NO_NONCE; 160 } 161 }; 162 163 // A byte array in which all elements are the max byte, and it is used to 164 // construct closest front row 165 static final byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); 166 167 /** 168 * Create the closest row after the specified row 169 */ 170 static byte[] createClosestRowAfter(byte[] row) { 171 return Arrays.copyOf(row, row.length + 1); 172 } 173 174 /** 175 * Create a row before the specified row and very close to the specified row. 176 */ 177 static byte[] createCloseRowBefore(byte[] row) { 178 if (row.length == 0) { 179 return MAX_BYTE_ARRAY; 180 } 181 if (row[row.length - 1] == 0) { 182 return Arrays.copyOf(row, row.length - 1); 183 } else { 184 byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length]; 185 System.arraycopy(row, 0, nextRow, 0, row.length - 1); 186 nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1); 187 System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length); 188 return nextRow; 189 } 190 } 191 192 static boolean isEmptyStartRow(byte[] row) { 193 return Bytes.equals(row, EMPTY_START_ROW); 194 } 195 196 static boolean isEmptyStopRow(byte[] row) { 197 return Bytes.equals(row, EMPTY_END_ROW); 198 } 199 200 static void resetController(HBaseRpcController controller, long timeoutNs, int priority, 201 TableName tableName) { 202 controller.reset(); 203 if (timeoutNs >= 0) { 204 controller.setCallTimeout( 205 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); 206 } 207 controller.setPriority(priority); 208 if (tableName != null) { 209 controller.setTableName(tableName); 210 } 211 } 212 213 static Throwable translateException(Throwable t) { 214 if (t instanceof UndeclaredThrowableException && t.getCause() != null) { 215 t = t.getCause(); 216 } 217 if (t instanceof RemoteException) { 218 t = ((RemoteException) t).unwrapRemoteException(); 219 } 220 if (t instanceof ServiceException && t.getCause() != null) { 221 t = translateException(t.getCause()); 222 } 223 return t; 224 } 225 226 static long calcEstimatedSize(Result rs) { 227 long estimatedHeapSizeOfResult = 0; 228 // We don't make Iterator here 229 for (Cell cell : rs.rawCells()) { 230 estimatedHeapSizeOfResult += cell.heapSize(); 231 } 232 return estimatedHeapSizeOfResult; 233 } 234 235 static Result filterCells(Result result, Cell keepCellsAfter) { 236 if (keepCellsAfter == null) { 237 // do not need to filter 238 return result; 239 } 240 // not the same row 241 if (!PrivateCellUtil.matchingRows(keepCellsAfter, result.getRow(), 0, result.getRow().length)) { 242 return result; 243 } 244 Cell[] rawCells = result.rawCells(); 245 int index = Arrays.binarySearch(rawCells, keepCellsAfter, 246 CellComparator.getInstance()::compareWithoutRow); 247 if (index < 0) { 248 index = -index - 1; 249 } else { 250 index++; 251 } 252 if (index == 0) { 253 return result; 254 } 255 if (index == rawCells.length) { 256 return null; 257 } 258 return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, 259 result.isStale(), result.mayHaveMoreCellsInRow()); 260 } 261 262 // Add a delta to avoid timeout immediately after a retry sleeping. 263 public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); 264 265 static Get toCheckExistenceOnly(Get get) { 266 if (get.isCheckExistenceOnly()) { 267 return get; 268 } 269 return ReflectionUtils.newInstance(get.getClass(), get).setCheckExistenceOnly(true); 270 } 271 272 static List<Get> toCheckExistenceOnly(List<Get> gets) { 273 return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList()); 274 } 275 276 static RegionLocateType getLocateType(Scan scan) { 277 if (scan.isReversed()) { 278 if (isEmptyStartRow(scan.getStartRow())) { 279 return RegionLocateType.BEFORE; 280 } else { 281 return scan.includeStartRow() ? RegionLocateType.CURRENT : RegionLocateType.BEFORE; 282 } 283 } else { 284 return scan.includeStartRow() ? RegionLocateType.CURRENT : RegionLocateType.AFTER; 285 } 286 } 287 288 static boolean noMoreResultsForScan(Scan scan, RegionInfo info) { 289 if (isEmptyStopRow(info.getEndKey())) { 290 return true; 291 } 292 if (isEmptyStopRow(scan.getStopRow())) { 293 return false; 294 } 295 int c = Bytes.compareTo(info.getEndKey(), scan.getStopRow()); 296 // 1. if our stop row is less than the endKey of the region 297 // 2. if our stop row is equal to the endKey of the region and we do not include the stop row 298 // for scan. 299 return c > 0 || (c == 0 && !scan.includeStopRow()); 300 } 301 302 static boolean noMoreResultsForReverseScan(Scan scan, RegionInfo info) { 303 if (isEmptyStartRow(info.getStartKey())) { 304 return true; 305 } 306 if (isEmptyStopRow(scan.getStopRow())) { 307 return false; 308 } 309 // no need to test the inclusive of the stop row as the start key of a region is included in 310 // the region. 311 return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; 312 } 313 314 public static ScanResultCache createScanResultCache(Scan scan) { 315 if (scan.getAllowPartialResults()) { 316 return new AllowPartialScanResultCache(); 317 } else if (scan.getBatch() > 0) { 318 return new BatchScanResultCache(scan.getBatch()); 319 } else { 320 return new CompleteScanResultCache(); 321 } 322 } 323 324 private static final String MY_ADDRESS = getMyAddress(); 325 326 private static String getMyAddress() { 327 try { 328 return DNS.getDefaultHost("default", "default"); 329 } catch (UnknownHostException uhe) { 330 LOG.error("cannot determine my address", uhe); 331 return null; 332 } 333 } 334 335 static boolean isRemote(String host) { 336 return !host.equalsIgnoreCase(MY_ADDRESS); 337 } 338 339 static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { 340 if (scanMetrics == null) { 341 return; 342 } 343 scanMetrics.countOfRPCcalls.incrementAndGet(); 344 if (isRegionServerRemote) { 345 scanMetrics.countOfRemoteRPCcalls.incrementAndGet(); 346 } 347 } 348 349 static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { 350 if (scanMetrics == null) { 351 return; 352 } 353 scanMetrics.countOfRPCRetries.incrementAndGet(); 354 if (isRegionServerRemote) { 355 scanMetrics.countOfRemoteRPCRetries.incrementAndGet(); 356 } 357 } 358 359 static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs, 360 boolean isRegionServerRemote) { 361 if (scanMetrics == null || rrs == null || rrs.length == 0) { 362 return; 363 } 364 long resultSize = 0; 365 for (Result rr : rrs) { 366 for (Cell cell : rr.rawCells()) { 367 resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell); 368 } 369 } 370 scanMetrics.countOfBytesInResults.addAndGet(resultSize); 371 if (isRegionServerRemote) { 372 scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize); 373 } 374 } 375 376 /** 377 * Use the scan metrics returned by the server to add to the identically named counters in the 378 * client side metrics. If a counter does not exist with the same name as the server side metric, 379 * the attempt to increase the counter will fail. 380 */ 381 static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) { 382 if (scanMetrics == null || response == null || !response.hasScanMetrics()) { 383 return; 384 } 385 ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter); 386 } 387 388 static void incRegionCountMetrics(ScanMetrics scanMetrics) { 389 if (scanMetrics == null) { 390 return; 391 } 392 scanMetrics.countOfRegions.incrementAndGet(); 393 } 394 395 /** 396 * Connect the two futures, if the src future is done, then mark the dst future as done. And if 397 * the dst future is done, then cancel the src future. This is used for timeline consistent read. 398 * <p/> 399 * Pass empty metrics if you want to link the primary future and the dst future so we will not 400 * increase the hedge read related metrics. 401 */ 402 private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture, 403 Optional<MetricsConnection> metrics) { 404 addListener(srcFuture, (r, e) -> { 405 if (e != null) { 406 dstFuture.completeExceptionally(e); 407 } else { 408 if (dstFuture.complete(r)) { 409 metrics.ifPresent(MetricsConnection::incrHedgedReadWin); 410 } 411 } 412 }); 413 // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture. 414 // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst 415 // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in 416 // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the 417 // tie. 418 addListener(dstFuture, (r, e) -> srcFuture.cancel(false)); 419 } 420 421 private static <T> void sendRequestsToSecondaryReplicas( 422 Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs, 423 CompletableFuture<T> future, Optional<MetricsConnection> metrics) { 424 if (future.isDone()) { 425 // do not send requests to secondary replicas if the future is done, i.e, the primary request 426 // has already been finished. 427 return; 428 } 429 for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) { 430 CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId); 431 metrics.ifPresent(MetricsConnection::incrHedgedReadOps); 432 connect(secondaryFuture, future, metrics); 433 } 434 } 435 436 static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator, 437 TableName tableName, Query query, byte[] row, RegionLocateType locateType, 438 Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs, 439 long primaryCallTimeoutNs, Timer retryTimer, Optional<MetricsConnection> metrics) { 440 if (query.getConsistency() != Consistency.TIMELINE) { 441 return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); 442 } 443 // user specifies a replica id explicitly, just send request to the specific replica 444 if (query.getReplicaId() >= 0) { 445 return requestReplica.apply(query.getReplicaId()); 446 } 447 // Timeline consistent read, where we may send requests to other region replicas 448 CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); 449 CompletableFuture<T> future = new CompletableFuture<>(); 450 connect(primaryFuture, future, Optional.empty()); 451 long startNs = System.nanoTime(); 452 // after the getRegionLocations, all the locations for the replicas of this region should have 453 // been cached, so it is not big deal to locate them again when actually sending requests to 454 // these replicas. 455 addListener(locator.getRegionLocations(tableName, row, locateType, false, rpcTimeoutNs), 456 (locs, error) -> { 457 if (error != null) { 458 LOG.warn( 459 "Failed to locate all the replicas for table={}, row='{}', locateType={}" 460 + " give up timeline consistent read", 461 tableName, Bytes.toStringBinary(row), locateType, error); 462 return; 463 } 464 if (locs.size() <= 1) { 465 LOG.warn( 466 "There are no secondary replicas for region {}, give up timeline consistent read", 467 locs.getDefaultRegionLocation().getRegion()); 468 return; 469 } 470 long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs); 471 if (delayNs <= 0) { 472 sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics); 473 } else { 474 retryTimer.newTimeout( 475 timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics), 476 delayNs, TimeUnit.NANOSECONDS); 477 } 478 }); 479 return future; 480 } 481 482 // validate for well-formedness 483 static void validatePut(Put put, int maxKeyValueSize) { 484 if (put.isEmpty()) { 485 throw new IllegalArgumentException("No columns to insert"); 486 } 487 if (maxKeyValueSize > 0) { 488 for (List<Cell> list : put.getFamilyCellMap().values()) { 489 for (Cell cell : list) { 490 if (cell.getSerializedSize() > maxKeyValueSize) { 491 throw new IllegalArgumentException("KeyValue size too large"); 492 } 493 } 494 } 495 } 496 } 497 498 static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValueSize) { 499 for (Mutation mutation : rowMutations.getMutations()) { 500 if (mutation instanceof Put) { 501 validatePut((Put) mutation, maxKeyValueSize); 502 } 503 } 504 } 505 506 /** 507 * Select the priority for the rpc call. 508 * <p/> 509 * The rules are: 510 * <ol> 511 * <li>If user set a priority explicitly, then just use it.</li> 512 * <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li> 513 * <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li> 514 * </ol> 515 * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}. 516 * @param tableName the table we operate on 517 */ 518 static int calcPriority(int priority, TableName tableName) { 519 if (priority != HConstants.PRIORITY_UNSET) { 520 return priority; 521 } else { 522 return getPriority(tableName); 523 } 524 } 525 526 static int getPriority(TableName tableName) { 527 if (tableName.isSystemTable()) { 528 return HConstants.SYSTEMTABLE_QOS; 529 } else { 530 return HConstants.NORMAL_QOS; 531 } 532 } 533 534 static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef, 535 AtomicReference<CompletableFuture<T>> futureRef, boolean reload, 536 Supplier<CompletableFuture<T>> fetch, Predicate<T> validator, String type) { 537 for (;;) { 538 if (!reload) { 539 T value = cacheRef.get(); 540 if (value != null && validator.test(value)) { 541 return CompletableFuture.completedFuture(value); 542 } 543 } 544 LOG.trace("{} cache is null, try fetching from registry", type); 545 if (futureRef.compareAndSet(null, new CompletableFuture<>())) { 546 LOG.debug("Start fetching {} from registry", type); 547 CompletableFuture<T> future = futureRef.get(); 548 addListener(fetch.get(), (value, error) -> { 549 if (error != null) { 550 LOG.debug("Failed to fetch {} from registry", type, error); 551 futureRef.getAndSet(null).completeExceptionally(error); 552 return; 553 } 554 LOG.debug("The fetched {} is {}", type, value); 555 // Here we update cache before reset future, so it is possible that someone can get a 556 // stale value. Consider this: 557 // 1. update cacheRef 558 // 2. someone clears the cache and relocates again 559 // 3. the futureRef is not null so the old future is used. 560 // 4. we clear futureRef and complete the future in it with the value being 561 // cleared in step 2. 562 // But we do not think it is a big deal as it rarely happens, and even if it happens, the 563 // caller will retry again later, no correctness problems. 564 cacheRef.set(value); 565 futureRef.set(null); 566 future.complete(value); 567 }); 568 return future; 569 } else { 570 CompletableFuture<T> future = futureRef.get(); 571 if (future != null) { 572 return future; 573 } 574 } 575 } 576 } 577 578 static void updateStats(Optional<ServerStatisticTracker> optStats, 579 Optional<MetricsConnection> optMetrics, ServerName serverName, MultiResponse resp) { 580 if (!optStats.isPresent() && !optMetrics.isPresent()) { 581 // ServerStatisticTracker and MetricsConnection are both not present, just return 582 return; 583 } 584 resp.getResults().forEach((regionName, regionResult) -> { 585 ClientProtos.RegionLoadStats stat = regionResult.getStat(); 586 if (stat == null) { 587 if (LOG.isDebugEnabled()) { 588 LOG.debug("No ClientProtos.RegionLoadStats found for server={}, region={}", serverName, 589 Bytes.toStringBinary(regionName)); 590 } 591 return; 592 } 593 RegionLoadStats regionLoadStats = ProtobufUtil.createRegionLoadStats(stat); 594 optStats.ifPresent( 595 stats -> ResultStatsUtil.updateStats(stats, serverName, regionName, regionLoadStats)); 596 optMetrics.ifPresent( 597 metrics -> ResultStatsUtil.updateStats(metrics, serverName, regionName, regionLoadStats)); 598 }); 599 } 600 601 @FunctionalInterface 602 interface Converter<D, I, S> { 603 D convert(I info, S src) throws IOException; 604 } 605 606 @FunctionalInterface 607 interface RpcCall<RESP, REQ> { 608 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 609 RpcCallback<RESP> done); 610 } 611 612 static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 613 HRegionLocation loc, ClientService.Interface stub, REQ req, 614 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 615 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 616 CompletableFuture<RESP> future = new CompletableFuture<>(); 617 try { 618 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 619 new RpcCallback<PRESP>() { 620 621 @Override 622 public void run(PRESP resp) { 623 if (controller.failed()) { 624 future.completeExceptionally(controller.getFailed()); 625 } else { 626 try { 627 future.complete(respConverter.convert(controller, resp)); 628 } catch (IOException e) { 629 future.completeExceptionally(e); 630 } 631 } 632 } 633 }); 634 } catch (IOException e) { 635 future.completeExceptionally(e); 636 } 637 return future; 638 } 639 640 static void shutdownPool(ExecutorService pool) { 641 pool.shutdown(); 642 try { 643 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 644 pool.shutdownNow(); 645 } 646 } catch (InterruptedException e) { 647 pool.shutdownNow(); 648 } 649 } 650 651 static void setCoprocessorError(RpcController controller, Throwable error) { 652 if (controller == null) { 653 return; 654 } 655 if (controller instanceof ServerRpcController) { 656 if (error instanceof IOException) { 657 ((ServerRpcController) controller).setFailedOn((IOException) error); 658 } else { 659 ((ServerRpcController) controller).setFailedOn(new IOException(error)); 660 } 661 } else if (controller instanceof ClientCoprocessorRpcController) { 662 ((ClientCoprocessorRpcController) controller).setFailed(error); 663 } else { 664 controller.setFailed(error.toString()); 665 } 666 } 667 668 public static boolean isUnexpectedPreambleHeaderException(IOException e) { 669 if (!(e instanceof RemoteException)) { 670 return false; 671 } 672 RemoteException re = (RemoteException) e; 673 return FatalConnectionException.class.getName().equals(re.getClassName()) 674 && re.getMessage().startsWith("Expected HEADER="); 675 } 676}