001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 022import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; 023import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 024 025import java.io.IOException; 026import java.lang.reflect.UndeclaredThrowableException; 027import java.net.UnknownHostException; 028import java.util.Arrays; 029import java.util.List; 030import java.util.Optional; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicReference; 036import java.util.function.Function; 037import java.util.function.Predicate; 038import java.util.function.Supplier; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellComparator; 042import org.apache.hadoop.hbase.ExtendedCell; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.HRegionLocation; 045import org.apache.hadoop.hbase.PrivateCellUtil; 046import org.apache.hadoop.hbase.RegionLocations; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 050import org.apache.hadoop.hbase.ipc.FatalConnectionException; 051import org.apache.hadoop.hbase.ipc.HBaseRpcController; 052import org.apache.hadoop.hbase.ipc.ServerRpcController; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.ReflectionUtils; 055import org.apache.hadoop.ipc.RemoteException; 056import org.apache.hadoop.net.DNS; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 062import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 063import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 064import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 065import org.apache.hbase.thirdparty.io.netty.util.Timer; 066 067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 068import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 072 073/** 074 * Utility used by client connections. 075 */ 076@InterfaceAudience.Private 077public final class ConnectionUtils { 078 079 private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class); 080 081 /** 082 * Key for configuration in Configuration whose value is the class we implement making a new 083 * Connection instance. 084 */ 085 public static final String HBASE_CLIENT_CONNECTION_IMPL = "hbase.client.connection.impl"; 086 087 private ConnectionUtils() { 088 } 089 090 /** 091 * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}. 092 * @param pause time to pause 093 * @param tries amount of tries 094 * @return How long to wait after <code>tries</code> retries 095 */ 096 public static long getPauseTime(final long pause, final int tries) { 097 int ntries = tries; 098 if (ntries >= HConstants.RETRY_BACKOFF.length) { 099 ntries = HConstants.RETRY_BACKOFF.length - 1; 100 } 101 if (ntries < 0) { 102 ntries = 0; 103 } 104 105 long normalPause = pause * HConstants.RETRY_BACKOFF[ntries]; 106 // 1% possible jitter 107 long jitter = (long) (normalPause * ThreadLocalRandom.current().nextFloat() * 0.01f); 108 return normalPause + jitter; 109 } 110 111 /** 112 * Changes the configuration to set the number of retries needed when using Connection internally, 113 * e.g. for updating catalog tables, etc. Call this method before we create any Connections. 114 * @param c The Configuration instance to set the retries into. 115 * @param log Used to log what we set in here. 116 */ 117 public static void setServerSideHConnectionRetriesConfig(final Configuration c, final String sn, 118 final Logger log) { 119 // TODO: Fix this. Not all connections from server side should have 10 times the retries. 120 int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 121 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 122 // Go big. Multiply by 10. If we can't get to meta after this many retries 123 // then something seriously wrong. 124 int serversideMultiplier = c.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 125 HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); 126 int retries = hcRetries * serversideMultiplier; 127 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 128 log.info(sn + " server-side Connection retries=" + retries); 129 } 130 131 /** 132 * Get a unique key for the rpc stub to the given server. 133 */ 134 static String getStubKey(String serviceName, ServerName serverName) { 135 return String.format("%s@%s", serviceName, serverName); 136 } 137 138 /** 139 * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE]. 140 */ 141 static int retries2Attempts(int retries) { 142 return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1); 143 } 144 145 static void checkHasFamilies(Mutation mutation) { 146 Preconditions.checkArgument(mutation.numFamilies() > 0, 147 "Invalid arguments to %s, zero columns specified", mutation.toString()); 148 } 149 150 /** Dummy nonce generator for disabled nonces. */ 151 static final NonceGenerator NO_NONCE_GENERATOR = new NonceGenerator() { 152 153 @Override 154 public long newNonce() { 155 return HConstants.NO_NONCE; 156 } 157 158 @Override 159 public long getNonceGroup() { 160 return HConstants.NO_NONCE; 161 } 162 }; 163 164 // A byte array in which all elements are the max byte, and it is used to 165 // construct closest front row 166 static final byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); 167 168 /** 169 * Create the closest row after the specified row 170 */ 171 static byte[] createClosestRowAfter(byte[] row) { 172 return Arrays.copyOf(row, row.length + 1); 173 } 174 175 /** 176 * Create a row before the specified row and very close to the specified row. 177 */ 178 static byte[] createCloseRowBefore(byte[] row) { 179 if (row.length == 0) { 180 return MAX_BYTE_ARRAY; 181 } 182 if (row[row.length - 1] == 0) { 183 return Arrays.copyOf(row, row.length - 1); 184 } else { 185 byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length]; 186 System.arraycopy(row, 0, nextRow, 0, row.length - 1); 187 nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1); 188 System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length); 189 return nextRow; 190 } 191 } 192 193 static boolean isEmptyStartRow(byte[] row) { 194 return Bytes.equals(row, EMPTY_START_ROW); 195 } 196 197 static boolean isEmptyStopRow(byte[] row) { 198 return Bytes.equals(row, EMPTY_END_ROW); 199 } 200 201 static void resetController(HBaseRpcController controller, long timeoutNs, int priority, 202 TableName tableName) { 203 controller.reset(); 204 if (timeoutNs >= 0) { 205 controller.setCallTimeout( 206 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); 207 } 208 controller.setPriority(priority); 209 if (tableName != null) { 210 controller.setTableName(tableName); 211 } 212 } 213 214 static Throwable translateException(Throwable t) { 215 if (t instanceof UndeclaredThrowableException && t.getCause() != null) { 216 t = t.getCause(); 217 } 218 if (t instanceof RemoteException) { 219 t = ((RemoteException) t).unwrapRemoteException(); 220 } 221 if (t instanceof ServiceException && t.getCause() != null) { 222 t = translateException(t.getCause()); 223 } 224 return t; 225 } 226 227 static long calcEstimatedSize(Result rs) { 228 long estimatedHeapSizeOfResult = 0; 229 // We don't make Iterator here 230 for (Cell cell : rs.rawCells()) { 231 estimatedHeapSizeOfResult += cell.heapSize(); 232 } 233 return estimatedHeapSizeOfResult; 234 } 235 236 static Result filterCells(Result result, ExtendedCell keepCellsAfter) { 237 if (keepCellsAfter == null) { 238 // do not need to filter 239 return result; 240 } 241 // not the same row 242 if (!PrivateCellUtil.matchingRows(keepCellsAfter, result.getRow(), 0, result.getRow().length)) { 243 return result; 244 } 245 ExtendedCell[] rawCells = result.rawExtendedCells(); 246 int index = Arrays.binarySearch(rawCells, keepCellsAfter, 247 CellComparator.getInstance()::compareWithoutRow); 248 if (index < 0) { 249 index = -index - 1; 250 } else { 251 index++; 252 } 253 if (index == 0) { 254 return result; 255 } 256 if (index == rawCells.length) { 257 return null; 258 } 259 return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, 260 result.isStale(), result.mayHaveMoreCellsInRow()); 261 } 262 263 // Add a delta to avoid timeout immediately after a retry sleeping. 264 public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); 265 266 static Get toCheckExistenceOnly(Get get) { 267 if (get.isCheckExistenceOnly()) { 268 return get; 269 } 270 return ReflectionUtils.newInstance(get.getClass(), get).setCheckExistenceOnly(true); 271 } 272 273 static List<Get> toCheckExistenceOnly(List<Get> gets) { 274 return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList()); 275 } 276 277 static RegionLocateType getLocateType(Scan scan) { 278 if (scan.isReversed()) { 279 if (isEmptyStartRow(scan.getStartRow())) { 280 return RegionLocateType.BEFORE; 281 } else { 282 return scan.includeStartRow() ? RegionLocateType.CURRENT : RegionLocateType.BEFORE; 283 } 284 } else { 285 return scan.includeStartRow() ? RegionLocateType.CURRENT : RegionLocateType.AFTER; 286 } 287 } 288 289 static boolean noMoreResultsForScan(Scan scan, RegionInfo info) { 290 if (isEmptyStopRow(info.getEndKey())) { 291 return true; 292 } 293 if (isEmptyStopRow(scan.getStopRow())) { 294 return false; 295 } 296 int c = Bytes.compareTo(info.getEndKey(), scan.getStopRow()); 297 // 1. if our stop row is less than the endKey of the region 298 // 2. if our stop row is equal to the endKey of the region and we do not include the stop row 299 // for scan. 300 return c > 0 || (c == 0 && !scan.includeStopRow()); 301 } 302 303 static boolean noMoreResultsForReverseScan(Scan scan, RegionInfo info) { 304 if (isEmptyStartRow(info.getStartKey())) { 305 return true; 306 } 307 if (isEmptyStopRow(scan.getStopRow())) { 308 return false; 309 } 310 // no need to test the inclusive of the stop row as the start key of a region is included in 311 // the region. 312 return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; 313 } 314 315 public static ScanResultCache createScanResultCache(Scan scan) { 316 if (scan.getAllowPartialResults()) { 317 return new AllowPartialScanResultCache(); 318 } else if (scan.getBatch() > 0) { 319 return new BatchScanResultCache(scan.getBatch()); 320 } else { 321 return new CompleteScanResultCache(); 322 } 323 } 324 325 private static final String MY_ADDRESS = getMyAddress(); 326 327 private static String getMyAddress() { 328 try { 329 return DNS.getDefaultHost("default", "default"); 330 } catch (UnknownHostException uhe) { 331 LOG.error("cannot determine my address", uhe); 332 return null; 333 } 334 } 335 336 static boolean isRemote(String host) { 337 return !host.equalsIgnoreCase(MY_ADDRESS); 338 } 339 340 static void incRPCCallsMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { 341 if (scanMetrics == null) { 342 return; 343 } 344 scanMetrics.countOfRPCcalls.incrementAndGet(); 345 if (isRegionServerRemote) { 346 scanMetrics.countOfRemoteRPCcalls.incrementAndGet(); 347 } 348 } 349 350 static void incRPCRetriesMetrics(ScanMetrics scanMetrics, boolean isRegionServerRemote) { 351 if (scanMetrics == null) { 352 return; 353 } 354 scanMetrics.countOfRPCRetries.incrementAndGet(); 355 if (isRegionServerRemote) { 356 scanMetrics.countOfRemoteRPCRetries.incrementAndGet(); 357 } 358 } 359 360 static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs, 361 boolean isRegionServerRemote) { 362 if (scanMetrics == null || rrs == null || rrs.length == 0) { 363 return; 364 } 365 long resultSize = 0; 366 for (Result rr : rrs) { 367 for (Cell cell : rr.rawCells()) { 368 resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell); 369 } 370 } 371 scanMetrics.countOfBytesInResults.addAndGet(resultSize); 372 if (isRegionServerRemote) { 373 scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize); 374 } 375 } 376 377 /** 378 * Use the scan metrics returned by the server to add to the identically named counters in the 379 * client side metrics. If a counter does not exist with the same name as the server side metric, 380 * the attempt to increase the counter will fail. 381 */ 382 static void updateServerSideMetrics(ScanMetrics scanMetrics, ScanResponse response) { 383 if (scanMetrics == null || response == null || !response.hasScanMetrics()) { 384 return; 385 } 386 ResponseConverter.getScanMetrics(response).forEach(scanMetrics::addToCounter); 387 } 388 389 static void incRegionCountMetrics(ScanMetrics scanMetrics) { 390 if (scanMetrics == null) { 391 return; 392 } 393 scanMetrics.countOfRegions.incrementAndGet(); 394 } 395 396 /** 397 * Connect the two futures, if the src future is done, then mark the dst future as done. And if 398 * the dst future is done, then cancel the src future. This is used for timeline consistent read. 399 * <p/> 400 * Pass empty metrics if you want to link the primary future and the dst future so we will not 401 * increase the hedge read related metrics. 402 */ 403 private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture, 404 Optional<MetricsConnection> metrics) { 405 addListener(srcFuture, (r, e) -> { 406 if (e != null) { 407 dstFuture.completeExceptionally(e); 408 } else { 409 if (dstFuture.complete(r)) { 410 metrics.ifPresent(MetricsConnection::incrHedgedReadWin); 411 } 412 } 413 }); 414 // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture. 415 // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst 416 // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in 417 // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the 418 // tie. 419 addListener(dstFuture, (r, e) -> srcFuture.cancel(false)); 420 } 421 422 private static <T> void sendRequestsToSecondaryReplicas( 423 Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs, 424 CompletableFuture<T> future, Optional<MetricsConnection> metrics) { 425 if (future.isDone()) { 426 // do not send requests to secondary replicas if the future is done, i.e, the primary request 427 // has already been finished. 428 return; 429 } 430 for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) { 431 CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId); 432 metrics.ifPresent(MetricsConnection::incrHedgedReadOps); 433 connect(secondaryFuture, future, metrics); 434 } 435 } 436 437 static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator, 438 TableName tableName, Query query, byte[] row, RegionLocateType locateType, 439 Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs, 440 long primaryCallTimeoutNs, Timer retryTimer, Optional<MetricsConnection> metrics) { 441 if (query.getConsistency() != Consistency.TIMELINE) { 442 return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); 443 } 444 // user specifies a replica id explicitly, just send request to the specific replica 445 if (query.getReplicaId() >= 0) { 446 return requestReplica.apply(query.getReplicaId()); 447 } 448 // Timeline consistent read, where we may send requests to other region replicas 449 CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); 450 CompletableFuture<T> future = new CompletableFuture<>(); 451 connect(primaryFuture, future, Optional.empty()); 452 long startNs = System.nanoTime(); 453 // after the getRegionLocations, all the locations for the replicas of this region should have 454 // been cached, so it is not big deal to locate them again when actually sending requests to 455 // these replicas. 456 addListener(locator.getRegionLocations(tableName, row, locateType, false, rpcTimeoutNs), 457 (locs, error) -> { 458 if (error != null) { 459 LOG.warn( 460 "Failed to locate all the replicas for table={}, row='{}', locateType={}" 461 + " give up timeline consistent read", 462 tableName, Bytes.toStringBinary(row), locateType, error); 463 return; 464 } 465 if (locs.size() <= 1) { 466 LOG.warn( 467 "There are no secondary replicas for region {}, give up timeline consistent read", 468 locs.getDefaultRegionLocation().getRegion()); 469 return; 470 } 471 long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs); 472 if (delayNs <= 0) { 473 sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics); 474 } else { 475 retryTimer.newTimeout( 476 timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics), 477 delayNs, TimeUnit.NANOSECONDS); 478 } 479 }); 480 return future; 481 } 482 483 // validate for well-formedness 484 static void validatePut(Put put, int maxKeyValueSize) { 485 if (put.isEmpty()) { 486 throw new IllegalArgumentException("No columns to insert"); 487 } 488 if (maxKeyValueSize > 0) { 489 for (List<Cell> list : put.getFamilyCellMap().values()) { 490 for (Cell cell : list) { 491 if (cell.getSerializedSize() > maxKeyValueSize) { 492 throw new IllegalArgumentException("KeyValue size too large"); 493 } 494 } 495 } 496 } 497 } 498 499 static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValueSize) { 500 for (Mutation mutation : rowMutations.getMutations()) { 501 if (mutation instanceof Put) { 502 validatePut((Put) mutation, maxKeyValueSize); 503 } 504 } 505 } 506 507 /** 508 * Select the priority for the rpc call. 509 * <p/> 510 * The rules are: 511 * <ol> 512 * <li>If user set a priority explicitly, then just use it.</li> 513 * <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li> 514 * <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li> 515 * </ol> 516 * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}. 517 * @param tableName the table we operate on 518 */ 519 static int calcPriority(int priority, TableName tableName) { 520 if (priority != HConstants.PRIORITY_UNSET) { 521 return priority; 522 } else { 523 return getPriority(tableName); 524 } 525 } 526 527 static int getPriority(TableName tableName) { 528 if (tableName.isSystemTable()) { 529 return HConstants.SYSTEMTABLE_QOS; 530 } else { 531 return HConstants.NORMAL_QOS; 532 } 533 } 534 535 static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef, 536 AtomicReference<CompletableFuture<T>> futureRef, boolean reload, 537 Supplier<CompletableFuture<T>> fetch, Predicate<T> validator, String type) { 538 for (;;) { 539 if (!reload) { 540 T value = cacheRef.get(); 541 if (value != null && validator.test(value)) { 542 return CompletableFuture.completedFuture(value); 543 } 544 } 545 LOG.trace("{} cache is null, try fetching from registry", type); 546 if (futureRef.compareAndSet(null, new CompletableFuture<>())) { 547 LOG.debug("Start fetching {} from registry", type); 548 CompletableFuture<T> future = futureRef.get(); 549 addListener(fetch.get(), (value, error) -> { 550 if (error != null) { 551 LOG.debug("Failed to fetch {} from registry", type, error); 552 futureRef.getAndSet(null).completeExceptionally(error); 553 return; 554 } 555 LOG.debug("The fetched {} is {}", type, value); 556 // Here we update cache before reset future, so it is possible that someone can get a 557 // stale value. Consider this: 558 // 1. update cacheRef 559 // 2. someone clears the cache and relocates again 560 // 3. the futureRef is not null so the old future is used. 561 // 4. we clear futureRef and complete the future in it with the value being 562 // cleared in step 2. 563 // But we do not think it is a big deal as it rarely happens, and even if it happens, the 564 // caller will retry again later, no correctness problems. 565 cacheRef.set(value); 566 futureRef.set(null); 567 future.complete(value); 568 }); 569 return future; 570 } else { 571 CompletableFuture<T> future = futureRef.get(); 572 if (future != null) { 573 return future; 574 } 575 } 576 } 577 } 578 579 static void updateStats(Optional<ServerStatisticTracker> optStats, 580 Optional<MetricsConnection> optMetrics, ServerName serverName, MultiResponse resp) { 581 if (!optStats.isPresent() && !optMetrics.isPresent()) { 582 // ServerStatisticTracker and MetricsConnection are both not present, just return 583 return; 584 } 585 resp.getResults().forEach((regionName, regionResult) -> { 586 ClientProtos.RegionLoadStats stat = regionResult.getStat(); 587 if (stat == null) { 588 if (LOG.isDebugEnabled()) { 589 LOG.debug("No ClientProtos.RegionLoadStats found for server={}, region={}", serverName, 590 Bytes.toStringBinary(regionName)); 591 } 592 return; 593 } 594 RegionLoadStats regionLoadStats = ProtobufUtil.createRegionLoadStats(stat); 595 optStats.ifPresent( 596 stats -> ResultStatsUtil.updateStats(stats, serverName, regionName, regionLoadStats)); 597 optMetrics.ifPresent( 598 metrics -> ResultStatsUtil.updateStats(metrics, serverName, regionName, regionLoadStats)); 599 }); 600 } 601 602 @FunctionalInterface 603 interface Converter<D, I, S> { 604 D convert(I info, S src) throws IOException; 605 } 606 607 @FunctionalInterface 608 interface RpcCall<RESP, REQ> { 609 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 610 RpcCallback<RESP> done); 611 } 612 613 static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller, 614 HRegionLocation loc, ClientService.Interface stub, REQ req, 615 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 616 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 617 CompletableFuture<RESP> future = new CompletableFuture<>(); 618 try { 619 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 620 new RpcCallback<PRESP>() { 621 622 @Override 623 public void run(PRESP resp) { 624 if (controller.failed()) { 625 future.completeExceptionally(controller.getFailed()); 626 } else { 627 try { 628 future.complete(respConverter.convert(controller, resp)); 629 } catch (IOException e) { 630 future.completeExceptionally(e); 631 } 632 } 633 } 634 }); 635 } catch (IOException e) { 636 future.completeExceptionally(e); 637 } 638 return future; 639 } 640 641 static void shutdownPool(ExecutorService pool) { 642 pool.shutdown(); 643 try { 644 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 645 pool.shutdownNow(); 646 } 647 } catch (InterruptedException e) { 648 pool.shutdownNow(); 649 } 650 } 651 652 static void setCoprocessorError(RpcController controller, Throwable error) { 653 if (controller == null) { 654 return; 655 } 656 if (controller instanceof ServerRpcController) { 657 if (error instanceof IOException) { 658 ((ServerRpcController) controller).setFailedOn((IOException) error); 659 } else { 660 ((ServerRpcController) controller).setFailedOn(new IOException(error)); 661 } 662 } else if (controller instanceof ClientCoprocessorRpcController) { 663 ((ClientCoprocessorRpcController) controller).setFailed(error); 664 } else { 665 controller.setFailed(error.toString()); 666 } 667 } 668 669 public static boolean isUnexpectedPreambleHeaderException(IOException e) { 670 if (!(e instanceof RemoteException)) { 671 return false; 672 } 673 RemoteException re = (RemoteException) e; 674 return FatalConnectionException.class.getName().equals(re.getClassName()) 675 && re.getMessage().startsWith("Expected HEADER="); 676 } 677}