001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 022import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 023import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 024import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 025 026import com.google.errorprone.annotations.RestrictedApi; 027import java.io.IOException; 028import java.io.PrintWriter; 029import java.io.StringWriter; 030import java.util.ArrayList; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.IdentityHashMap; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Optional; 038import java.util.OptionalLong; 039import java.util.concurrent.CompletableFuture; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentLinkedQueue; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.ConcurrentSkipListMap; 044import java.util.concurrent.TimeUnit; 045import java.util.function.Supplier; 046import java.util.stream.Collectors; 047import java.util.stream.Stream; 048import org.apache.commons.lang3.mutable.MutableBoolean; 049import org.apache.hadoop.hbase.DoNotRetryIOException; 050import org.apache.hadoop.hbase.ExtendedCellScannable; 051import org.apache.hadoop.hbase.HBaseServerException; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.HRegionLocation; 054import org.apache.hadoop.hbase.PrivateCellUtil; 055import org.apache.hadoop.hbase.RetryImmediatelyException; 056import org.apache.hadoop.hbase.ServerName; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; 059import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; 060import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 061import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager; 062import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 063import org.apache.hadoop.hbase.ipc.HBaseRpcController; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.yetus.audience.InterfaceAudience; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.io.netty.util.Timer; 071 072import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 073import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 076 077/** 078 * Retry caller for batch. 079 * <p> 080 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with 081 * other single operations 082 * <p> 083 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the 084 * implementation, we will record a {@code tries} parameter for each operation group, and if it is 085 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can 086 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of 087 * the depth of the tree. 088 */ 089@InterfaceAudience.Private 090class AsyncBatchRpcRetryingCaller<T> { 091 092 private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class); 093 094 private final Timer retryTimer; 095 096 private final AsyncConnectionImpl conn; 097 098 private final TableName tableName; 099 100 private final List<Action> actions; 101 102 private final List<CompletableFuture<T>> futures; 103 104 private final IdentityHashMap<Action, CompletableFuture<T>> action2Future; 105 106 private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors; 107 108 private final int maxAttempts; 109 110 private final long operationTimeoutNs; 111 112 private final long rpcTimeoutNs; 113 114 private final int startLogErrorsCnt; 115 116 private final long startNs; 117 118 private final HBaseServerExceptionPauseManager pauseManager; 119 120 private final Map<String, byte[]> requestAttributes; 121 122 // we can not use HRegionLocation as the map key because the hashCode and equals method of 123 // HRegionLocation only consider serverName. 124 // package private for testing log output 125 static final class RegionRequest { 126 127 public final HRegionLocation loc; 128 129 public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>(); 130 131 public RegionRequest(HRegionLocation loc) { 132 this.loc = loc; 133 } 134 } 135 136 private static final class ServerRequest { 137 138 public final ConcurrentMap<byte[], RegionRequest> actionsByRegion = 139 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 140 141 public void addAction(HRegionLocation loc, Action action) { 142 computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(), 143 () -> new RegionRequest(loc)).actions.add(action); 144 } 145 146 public void setRegionRequest(byte[] regionName, RegionRequest regionReq) { 147 actionsByRegion.put(regionName, regionReq); 148 } 149 150 public int getPriority() { 151 return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream()) 152 .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET); 153 } 154 } 155 156 public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 157 TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded, 158 int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, 159 Map<String, byte[]> requestAttributes) { 160 this.retryTimer = retryTimer; 161 this.conn = conn; 162 this.tableName = tableName; 163 this.maxAttempts = maxAttempts; 164 this.operationTimeoutNs = operationTimeoutNs; 165 this.rpcTimeoutNs = rpcTimeoutNs; 166 this.startLogErrorsCnt = startLogErrorsCnt; 167 this.actions = new ArrayList<>(actions.size()); 168 this.futures = new ArrayList<>(actions.size()); 169 this.action2Future = new IdentityHashMap<>(actions.size()); 170 for (int i = 0, n = actions.size(); i < n; i++) { 171 Row rawAction = actions.get(i); 172 Action action; 173 if (rawAction instanceof OperationWithAttributes) { 174 action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority()); 175 } else { 176 action = new Action(rawAction, i); 177 } 178 if (hasIncrementOrAppend(rawAction)) { 179 action.setNonce(conn.getNonceGenerator().newNonce()); 180 } 181 this.actions.add(action); 182 CompletableFuture<T> future = new CompletableFuture<>(); 183 futures.add(future); 184 action2Future.put(action, future); 185 } 186 this.action2Errors = new IdentityHashMap<>(); 187 this.startNs = System.nanoTime(); 188 this.pauseManager = 189 new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs); 190 this.requestAttributes = requestAttributes; 191 } 192 193 private static boolean hasIncrementOrAppend(Row action) { 194 if (action instanceof Append || action instanceof Increment) { 195 return true; 196 } else if (action instanceof RowMutations) { 197 return hasIncrementOrAppend((RowMutations) action); 198 } else if (action instanceof CheckAndMutate) { 199 return hasIncrementOrAppend(((CheckAndMutate) action).getAction()); 200 } 201 return false; 202 } 203 204 private static boolean hasIncrementOrAppend(RowMutations mutations) { 205 for (Mutation mutation : mutations.getMutations()) { 206 if (mutation instanceof Append || mutation instanceof Increment) { 207 return true; 208 } 209 } 210 return false; 211 } 212 213 private List<ThrowableWithExtraContext> removeErrors(Action action) { 214 synchronized (action2Errors) { 215 return action2Errors.remove(action); 216 } 217 } 218 219 private void logRegionsException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier, 220 Throwable error, ServerName serverName) { 221 if (tries > startLogErrorsCnt) { 222 String regions = 223 regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'") 224 .collect(Collectors.joining(",", "[", "]")); 225 LOG.warn("Process batch for {} from {} failed, tries={}", regions, serverName, tries, error); 226 } 227 } 228 229 private static final int MAX_SAMPLED_ERRORS = 3; 230 231 @RestrictedApi(explanation = "Should only be called in tests", link = "", 232 allowedOnPath = ".*/(src/test/|AsyncBatchRpcRetryingCaller).*") 233 static void logActionsException(int tries, int startLogErrorsCnt, RegionRequest regionReq, 234 IdentityHashMap<Action, Throwable> action2Error, ServerName serverName) { 235 if (tries <= startLogErrorsCnt || action2Error.isEmpty()) { 236 return; 237 } 238 if (LOG.isWarnEnabled()) { 239 StringWriter sw = new StringWriter(); 240 PrintWriter action2ErrorWriter = new PrintWriter(sw); 241 action2ErrorWriter.println(); 242 Iterator<Map.Entry<Action, Throwable>> iter = action2Error.entrySet().iterator(); 243 for (int i = 0; i < MAX_SAMPLED_ERRORS && iter.hasNext(); i++) { 244 Map.Entry<Action, Throwable> entry = iter.next(); 245 action2ErrorWriter.print(entry.getKey().getAction()); 246 action2ErrorWriter.print(" => "); 247 entry.getValue().printStackTrace(action2ErrorWriter); 248 } 249 action2ErrorWriter.flush(); 250 LOG.warn("Process batch for {} on {}, {}/{} actions failed, tries={}, sampled {} errors: {}", 251 regionReq.loc.getRegion().getRegionNameAsString(), serverName, action2Error.size(), 252 regionReq.actions.size(), tries, Math.min(MAX_SAMPLED_ERRORS, action2Error.size()), 253 sw.toString()); 254 } 255 // if trace is enabled, we log all the details 256 if (LOG.isTraceEnabled()) { 257 action2Error.forEach((action, error) -> LOG.trace( 258 "Process action {} in batch for {} on {} failed, tries={}", action.getAction(), 259 regionReq.loc.getRegion().getRegionNameAsString(), serverName, tries, error)); 260 } 261 } 262 263 private String getExtraContextForError(ServerName serverName) { 264 return serverName != null ? serverName.getServerName() : ""; 265 } 266 267 private void addError(Action action, Throwable error, ServerName serverName) { 268 List<ThrowableWithExtraContext> errors; 269 synchronized (action2Errors) { 270 errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>()); 271 } 272 errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), 273 getExtraContextForError(serverName))); 274 } 275 276 private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) { 277 actions.forEach(action -> addError(action, error, serverName)); 278 } 279 280 private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) { 281 CompletableFuture<T> future = action2Future.get(action); 282 if (future.isDone()) { 283 return; 284 } 285 ThrowableWithExtraContext errorWithCtx = 286 new ThrowableWithExtraContext(error, currentTime, extras); 287 List<ThrowableWithExtraContext> errors = removeErrors(action); 288 if (errors == null) { 289 errors = Collections.singletonList(errorWithCtx); 290 } else { 291 errors.add(errorWithCtx); 292 } 293 future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors)); 294 } 295 296 private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) { 297 long currentTime = EnvironmentEdgeManager.currentTime(); 298 String extras = getExtraContextForError(serverName); 299 actions.forEach(action -> failOne(action, tries, error, currentTime, extras)); 300 } 301 302 private void failAll(Stream<Action> actions, int tries) { 303 actions.forEach(action -> { 304 CompletableFuture<T> future = action2Future.get(action); 305 if (future.isDone()) { 306 return; 307 } 308 future.completeExceptionally(new RetriesExhaustedException(tries, 309 Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); 310 }); 311 } 312 313 private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion, 314 List<ExtendedCellScannable> cells, Map<Integer, Integer> indexMap) throws IOException { 315 ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); 316 ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); 317 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 318 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 319 for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) { 320 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 321 // multiRequestBuilder will be populated with region actions. 322 // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the 323 // action list. 324 RequestConverter.buildNoDataRegionActions(entry.getKey(), 325 entry.getValue().actions.stream() 326 .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex())) 327 .collect(Collectors.toList()), 328 cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, 329 indexMap); 330 } 331 return multiRequestBuilder.build(); 332 } 333 334 @SuppressWarnings("unchecked") 335 private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName, 336 RegionResult regionResult, List<Action> failedActions, Throwable regionException, 337 MutableBoolean retryImmediately, IdentityHashMap<Action, Throwable> action2Error) { 338 Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException); 339 if (result == null) { 340 LOG.error("Server " + serverName + " sent us neither result nor exception for row '" 341 + Bytes.toStringBinary(action.getAction().getRow()) + "' of " 342 + regionReq.loc.getRegion().getRegionNameAsString()); 343 addError(action, new RuntimeException("Invalid response"), serverName); 344 failedActions.add(action); 345 } else if (result instanceof Throwable) { 346 Throwable error = translateException((Throwable) result); 347 action2Error.put(action, error); 348 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 349 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 350 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), 351 getExtraContextForError(serverName)); 352 } else { 353 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { 354 retryImmediately.setTrue(); 355 } 356 failedActions.add(action); 357 } 358 } else { 359 action2Future.get(action).complete((T) result); 360 } 361 } 362 363 private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries, 364 ServerName serverName, MultiResponse resp) { 365 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 366 serverName, resp); 367 List<Action> failedActions = new ArrayList<>(); 368 MutableBoolean retryImmediately = new MutableBoolean(false); 369 actionsByRegion.forEach((rn, regionReq) -> { 370 RegionResult regionResult = resp.getResults().get(rn); 371 Throwable regionException = resp.getException(rn); 372 if (regionResult != null) { 373 // Here we record the exceptions and log it at once, to avoid flooding log output if lots of 374 // actions are failed. For example, if the region's memstore is full, all actions will 375 // received a RegionTooBusyException, see HBASE-29390. 376 IdentityHashMap<Action, Throwable> action2Error = new IdentityHashMap<>(); 377 regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName, 378 regionResult, failedActions, regionException, retryImmediately, action2Error)); 379 logActionsException(tries, startLogErrorsCnt, regionReq, action2Error, serverName); 380 } else { 381 Throwable error; 382 if (regionException == null) { 383 LOG.error("Server sent us neither results nor exceptions for {}", 384 Bytes.toStringBinary(rn)); 385 error = new RuntimeException("Invalid response"); 386 } else { 387 error = translateException(regionException); 388 } 389 logRegionsException(tries, () -> Stream.of(regionReq), error, serverName); 390 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 391 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 392 failAll(regionReq.actions.stream(), tries, error, serverName); 393 return; 394 } 395 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { 396 retryImmediately.setTrue(); 397 } 398 addError(regionReq.actions, error, serverName); 399 failedActions.addAll(regionReq.actions); 400 } 401 }); 402 if (!failedActions.isEmpty()) { 403 tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null); 404 } 405 } 406 407 private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) { 408 long remainingNs; 409 if (operationTimeoutNs > 0) { 410 remainingNs = pauseManager.remainingTimeNs(startNs); 411 if (remainingNs <= 0) { 412 failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), 413 tries); 414 return; 415 } 416 } else { 417 remainingNs = Long.MAX_VALUE; 418 } 419 ClientService.Interface stub; 420 try { 421 stub = conn.getRegionServerStub(serverName); 422 } catch (IOException e) { 423 onError(serverReq.actionsByRegion, tries, e, serverName); 424 return; 425 } 426 ClientProtos.MultiRequest req; 427 List<ExtendedCellScannable> cells = new ArrayList<>(); 428 // Map from a created RegionAction to the original index for a RowMutations within 429 // the original list of actions. This will be used to process the results when there 430 // is RowMutations/CheckAndMutate in the action list. 431 Map<Integer, Integer> indexMap = new HashMap<>(); 432 try { 433 req = buildReq(serverReq.actionsByRegion, cells, indexMap); 434 } catch (IOException e) { 435 onError(serverReq.actionsByRegion, tries, e, serverName); 436 return; 437 } 438 HBaseRpcController controller = conn.rpcControllerFactory.newController(); 439 resetController(controller, Math.min(rpcTimeoutNs, remainingNs), serverReq.getPriority(), 440 tableName); 441 controller.setRequestAttributes(requestAttributes); 442 if (!cells.isEmpty()) { 443 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cells)); 444 } 445 stub.multi(controller, req, resp -> { 446 if (controller.failed()) { 447 onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName); 448 } else { 449 try { 450 onComplete(serverReq.actionsByRegion, tries, serverName, 451 ResponseConverter.getResults(req, indexMap, resp, controller.cellScanner())); 452 } catch (Exception e) { 453 onError(serverReq.actionsByRegion, tries, e, serverName); 454 return; 455 } 456 } 457 }); 458 } 459 460 // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit, 461 // based on the load of the region server and the region. 462 private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) { 463 Optional<MetricsConnection> metrics = conn.getConnectionMetrics(); 464 Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker(); 465 if (!optStats.isPresent()) { 466 actionsByServer.forEach((serverName, serverReq) -> { 467 metrics.ifPresent(MetricsConnection::incrNormalRunners); 468 sendToServer(serverName, serverReq, tries); 469 }); 470 return; 471 } 472 ServerStatisticTracker stats = optStats.get(); 473 ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy(); 474 actionsByServer.forEach((serverName, serverReq) -> { 475 ServerStatistics serverStats = stats.getStats(serverName); 476 Map<Long, ServerRequest> groupByBackoff = new HashMap<>(); 477 serverReq.actionsByRegion.forEach((regionName, regionReq) -> { 478 long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats); 479 groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest()) 480 .setRegionRequest(regionName, regionReq); 481 }); 482 groupByBackoff.forEach((backoff, sr) -> { 483 if (backoff > 0) { 484 metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff)); 485 retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff, 486 TimeUnit.MILLISECONDS); 487 } else { 488 metrics.ifPresent(MetricsConnection::incrNormalRunners); 489 sendToServer(serverName, sr, tries); 490 } 491 }); 492 }); 493 } 494 495 private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t, 496 ServerName serverName) { 497 Throwable error = translateException(t); 498 logRegionsException(tries, () -> actionsByRegion.values().stream(), error, serverName); 499 actionsByRegion.forEach( 500 (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error)); 501 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 502 failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, 503 serverName); 504 return; 505 } 506 List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) 507 .collect(Collectors.toList()); 508 addError(copiedActions, error, serverName); 509 tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error); 510 } 511 512 private void tryResubmit(Stream<Action> actions, int tries, boolean immediately, 513 Throwable error) { 514 if (immediately) { 515 groupAndSend(actions, tries); 516 return; 517 } 518 519 OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs); 520 if (!maybePauseNsToUse.isPresent()) { 521 failAll(actions, tries); 522 return; 523 } 524 long delayNs = maybePauseNsToUse.getAsLong(); 525 if (HBaseServerException.isServerOverloaded(error)) { 526 Optional<MetricsConnection> metrics = conn.getConnectionMetrics(); 527 metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); 528 } 529 retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); 530 } 531 532 private void groupAndSend(Stream<Action> actions, int tries) { 533 long locateTimeoutNs; 534 if (operationTimeoutNs > 0) { 535 locateTimeoutNs = pauseManager.remainingTimeNs(startNs); 536 if (locateTimeoutNs <= 0) { 537 failAll(actions, tries); 538 return; 539 } 540 } else { 541 locateTimeoutNs = -1L; 542 } 543 ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>(); 544 ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>(); 545 addListener(CompletableFuture.allOf(actions 546 .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), 547 RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { 548 if (error != null) { 549 error = unwrapCompletionException(translateException(error)); 550 if (error instanceof DoNotRetryIOException) { 551 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); 552 return; 553 } 554 addError(action, error, null); 555 locateFailed.add(action); 556 } else { 557 computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc, 558 action); 559 } 560 })) 561 .toArray(CompletableFuture[]::new)), (v, r) -> { 562 if (!actionsByServer.isEmpty()) { 563 sendOrDelay(actionsByServer, tries); 564 } 565 if (!locateFailed.isEmpty()) { 566 tryResubmit(locateFailed.stream(), tries, false, null); 567 } 568 }); 569 } 570 571 public List<CompletableFuture<T>> call() { 572 groupAndSend(actions.stream(), 1); 573 return futures; 574 } 575}