001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.client; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.RejectedExecutionException; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicLong; 038import org.apache.hadoop.hbase.CallQueueTooBigException; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HRegionLocation; 042import org.apache.hadoop.hbase.RegionLocations; 043import org.apache.hadoop.hbase.RetryImmediatelyException; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 047import org.apache.hadoop.hbase.client.coprocessor.Batch; 048import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 049import org.apache.hadoop.hbase.trace.TraceUtil; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.htrace.core.Tracer; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 058 059/** 060 * The context, and return value, for a single submit/submitAll call. 061 * Note on how this class (one AP submit) works. Initially, all requests are split into groups 062 * by server; request is sent to each server in parallel; the RPC calls are not async so a 063 * thread per server is used. Every time some actions fail, regions/locations might have 064 * changed, so we re-group them by server and region again and send these groups in parallel 065 * too. The result, in case of retries, is a "tree" of threads, with parent exiting after 066 * scheduling children. This is why lots of code doesn't require any synchronization. 067 */ 068@InterfaceAudience.Private 069class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { 070 071 private static final Logger LOG = LoggerFactory.getLogger(AsyncRequestFutureImpl.class); 072 073 private RetryingTimeTracker tracker; 074 075 /** 076 * Runnable (that can be submitted to thread pool) that waits for when it's time 077 * to issue replica calls, finds region replicas, groups the requests by replica and 078 * issues the calls (on separate threads, via sendMultiAction). 079 * This is done on a separate thread because we don't want to wait on user thread for 080 * our asynchronous call, and usually we have to wait before making replica calls. 081 */ 082 private final class ReplicaCallIssuingRunnable implements Runnable { 083 private final long startTime; 084 private final List<Action> initialActions; 085 086 public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) { 087 this.initialActions = initialActions; 088 this.startTime = startTime; 089 } 090 091 @Override 092 public void run() { 093 boolean done = false; 094 if (asyncProcess.primaryCallTimeoutMicroseconds > 0) { 095 try { 096 done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds); 097 } catch (InterruptedException ex) { 098 LOG.error("Replica thread interrupted - no replica calls {}", ex.getMessage()); 099 return; 100 } 101 } 102 if (done) return; // Done within primary timeout 103 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 104 List<Action> unknownLocActions = new ArrayList<>(); 105 if (replicaGetIndices == null) { 106 for (int i = 0; i < results.length; ++i) { 107 addReplicaActions(i, actionsByServer, unknownLocActions); 108 } 109 } else { 110 for (int replicaGetIndice : replicaGetIndices) { 111 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); 112 } 113 } 114 if (!actionsByServer.isEmpty()) { 115 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); 116 } 117 if (!unknownLocActions.isEmpty()) { 118 actionsByServer = new HashMap<>(); 119 for (Action action : unknownLocActions) { 120 addReplicaActionsAgain(action, actionsByServer); 121 } 122 // Some actions may have completely failed, they are handled inside addAgain. 123 if (!actionsByServer.isEmpty()) { 124 sendMultiAction(actionsByServer, 1, null, true); 125 } 126 } 127 } 128 129 /** 130 * Add replica actions to action map by server. 131 * @param index Index of the original action. 132 * @param actionsByServer The map by server to add it to. 133 */ 134 private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer, 135 List<Action> unknownReplicaActions) { 136 if (results[index] != null) return; // opportunistic. Never goes from non-null to null. 137 Action action = initialActions.get(index); 138 RegionLocations loc = findAllLocationsOrFail(action, true); 139 if (loc == null) return; 140 HRegionLocation[] locs = loc.getRegionLocations(); 141 if (locs.length == 1) { 142 LOG.warn("No replicas found for {}", action.getAction()); 143 return; 144 } 145 synchronized (replicaResultLock) { 146 // Don't run replica calls if the original has finished. We could do it e.g. if 147 // original has already failed before first replica call (unlikely given retries), 148 // but that would require additional synchronization w.r.t. returning to caller. 149 if (results[index] != null) return; 150 // We set the number of calls here. After that any path must call setResult/setError. 151 // True even for replicas that are not found - if we refuse to send we MUST set error. 152 updateResult(index, new ReplicaResultState(locs.length)); 153 } 154 for (int i = 1; i < locs.length; ++i) { 155 Action replicaAction = new Action(action, i); 156 if (locs[i] != null) { 157 asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), 158 replicaAction, actionsByServer, nonceGroup); 159 } else { 160 unknownReplicaActions.add(replicaAction); 161 } 162 } 163 } 164 165 private void addReplicaActionsAgain( 166 Action action, Map<ServerName, MultiAction> actionsByServer) { 167 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { 168 throw new AssertionError("Cannot have default replica here"); 169 } 170 HRegionLocation loc = getReplicaLocationOrFail(action); 171 if (loc == null) return; 172 asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), 173 action, actionsByServer, nonceGroup); 174 } 175 } 176 177 /** 178 * Runnable (that can be submitted to thread pool) that submits MultiAction to a 179 * single server. The server call is synchronous, therefore we do it on a thread pool. 180 */ 181 @VisibleForTesting 182 final class SingleServerRequestRunnable implements Runnable { 183 private final MultiAction multiAction; 184 private final int numAttempt; 185 private final ServerName server; 186 private final Set<CancellableRegionServerCallable> callsInProgress; 187 @VisibleForTesting 188 SingleServerRequestRunnable( 189 MultiAction multiAction, int numAttempt, ServerName server, 190 Set<CancellableRegionServerCallable> callsInProgress) { 191 this.multiAction = multiAction; 192 this.numAttempt = numAttempt; 193 this.server = server; 194 this.callsInProgress = callsInProgress; 195 } 196 197 @Override 198 public void run() { 199 AbstractResponse res = null; 200 CancellableRegionServerCallable callable = currentCallable; 201 try { 202 // setup the callable based on the actions, if we don't have one already from the request 203 if (callable == null) { 204 callable = createCallable(server, tableName, multiAction); 205 } 206 RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout); 207 try { 208 if (callsInProgress != null) { 209 callsInProgress.add(callable); 210 } 211 res = caller.callWithoutRetries(callable, operationTimeout); 212 if (res == null) { 213 // Cancelled 214 return; 215 } 216 } catch (IOException e) { 217 // The service itself failed . It may be an error coming from the communication 218 // layer, but, as well, a functional error raised by the server. 219 receiveGlobalFailure(multiAction, server, numAttempt, e); 220 return; 221 } catch (Throwable t) { 222 // This should not happen. Let's log & retry anyway. 223 LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected." + 224 " Retrying. Server=" + server + ", tableName=" + tableName, t); 225 receiveGlobalFailure(multiAction, server, numAttempt, t); 226 return; 227 } 228 if (res.type() == AbstractResponse.ResponseType.MULTI) { 229 // Normal case: we received an answer from the server, and it's not an exception. 230 receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt); 231 } else { 232 if (results != null) { 233 SingleResponse singleResponse = (SingleResponse) res; 234 updateResult(0, singleResponse.getEntry()); 235 } 236 decActionCounter(1); 237 } 238 } catch (Throwable t) { 239 // Something really bad happened. We are on the send thread that will now die. 240 LOG.error("id=" + asyncProcess.id + " error for " + tableName + " processing " + server, t); 241 throw new RuntimeException(t); 242 } finally { 243 asyncProcess.decTaskCounters(multiAction.getRegions(), server); 244 if (callsInProgress != null && callable != null && res != null) { 245 callsInProgress.remove(callable); 246 } 247 } 248 } 249 } 250 251 private final Batch.Callback<CResult> callback; 252 private final BatchErrors errors; 253 private final ConnectionImplementation.ServerErrorTracker errorsByServer; 254 private final ExecutorService pool; 255 private final Set<CancellableRegionServerCallable> callsInProgress; 256 257 258 private final TableName tableName; 259 private final AtomicLong actionsInProgress = new AtomicLong(-1); 260 /** 261 * The lock controls access to results. It is only held when populating results where 262 * there might be several callers (eventual consistency gets). For other requests, 263 * there's one unique call going on per result index. 264 */ 265 private final Object replicaResultLock = new Object(); 266 /** 267 * Result array. Null if results are not needed. Otherwise, each index corresponds to 268 * the action index in initial actions submitted. For most request types, has null-s for 269 * requests that are not done, and result/exception for those that are done. 270 * For eventual-consistency gets, initially the same applies; at some point, replica calls 271 * might be started, and ReplicaResultState is put at the corresponding indices. The 272 * returning calls check the type to detect when this is the case. After all calls are done, 273 * ReplicaResultState-s are replaced with results for the user. 274 */ 275 private final Object[] results; 276 /** 277 * Indices of replica gets in results. If null, all or no actions are replica-gets. 278 */ 279 private final int[] replicaGetIndices; 280 private final boolean hasAnyReplicaGets; 281 private final long nonceGroup; 282 private final CancellableRegionServerCallable currentCallable; 283 private final int operationTimeout; 284 private final int rpcTimeout; 285 private final AsyncProcess asyncProcess; 286 287 /** 288 * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only 289 * used to make logging more clear, we don't actually care why we don't retry. 290 */ 291 public enum Retry { 292 YES, 293 NO_LOCATION_PROBLEM, 294 NO_NOT_RETRIABLE, 295 NO_RETRIES_EXHAUSTED, 296 NO_OTHER_SUCCEEDED 297 } 298 299 /** Sync point for calls to multiple replicas for the same user request (Get). 300 * Created and put in the results array (we assume replica calls require results) when 301 * the replica calls are launched. See results for details of this process. 302 * POJO, all fields are public. To modify them, the object itself is locked. */ 303 private static class ReplicaResultState { 304 public ReplicaResultState(int callCount) { 305 this.callCount = callCount; 306 } 307 308 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ 309 int callCount; 310 /** Errors for which it is not decided whether we will report them to user. If one of the 311 * calls succeeds, we will discard the errors that may have happened in the other calls. */ 312 BatchErrors replicaErrors = null; 313 314 @Override 315 public String toString() { 316 return "[call count " + callCount + "; errors " + replicaErrors + "]"; 317 } 318 } 319 320 public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, 321 long nonceGroup, AsyncProcess asyncProcess) { 322 this.pool = task.getPool(); 323 this.callback = task.getCallback(); 324 this.nonceGroup = nonceGroup; 325 this.tableName = task.getTableName(); 326 this.actionsInProgress.set(actions.size()); 327 if (task.getResults() == null) { 328 results = task.getNeedResults() ? new Object[actions.size()] : null; 329 } else { 330 if (task.getResults().length != actions.size()) { 331 throw new AssertionError("results.length"); 332 } 333 this.results = task.getResults(); 334 for (int i = 0; i != this.results.length; ++i) { 335 results[i] = null; 336 } 337 } 338 List<Integer> replicaGetIndices = null; 339 boolean hasAnyReplicaGets = false; 340 if (results != null) { 341 // Check to see if any requests might require replica calls. 342 // We expect that many requests will consist of all or no multi-replica gets; in such 343 // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will 344 // store the list of action indexes for which replica gets are possible, and set 345 // hasAnyReplicaGets to true. 346 boolean hasAnyNonReplicaReqs = false; 347 int posInList = 0; 348 for (Action action : actions) { 349 boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction()); 350 if (isReplicaGet) { 351 hasAnyReplicaGets = true; 352 if (hasAnyNonReplicaReqs) { // Mixed case 353 if (replicaGetIndices == null) { 354 replicaGetIndices = new ArrayList<>(actions.size() - 1); 355 } 356 replicaGetIndices.add(posInList); 357 } 358 } else if (!hasAnyNonReplicaReqs) { 359 // The first non-multi-replica request in the action list. 360 hasAnyNonReplicaReqs = true; 361 if (posInList > 0) { 362 // Add all the previous requests to the index lists. We know they are all 363 // replica-gets because this is the first non-multi-replica request in the list. 364 replicaGetIndices = new ArrayList<>(actions.size() - 1); 365 for (int i = 0; i < posInList; ++i) { 366 replicaGetIndices.add(i); 367 } 368 } 369 } 370 ++posInList; 371 } 372 } 373 this.hasAnyReplicaGets = hasAnyReplicaGets; 374 if (replicaGetIndices != null) { 375 this.replicaGetIndices = new int[replicaGetIndices.size()]; 376 int i = 0; 377 for (Integer el : replicaGetIndices) { 378 this.replicaGetIndices[i++] = el; 379 } 380 } else { 381 this.replicaGetIndices = null; 382 } 383 this.callsInProgress = !hasAnyReplicaGets ? null : 384 Collections.newSetFromMap( 385 new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>()); 386 this.asyncProcess = asyncProcess; 387 this.errorsByServer = createServerErrorTracker(); 388 this.errors = new BatchErrors(); 389 this.operationTimeout = task.getOperationTimeout(); 390 this.rpcTimeout = task.getRpcTimeout(); 391 this.currentCallable = task.getCallable(); 392 if (task.getCallable() == null) { 393 tracker = new RetryingTimeTracker().start(); 394 } 395 } 396 397 @VisibleForTesting 398 protected Set<CancellableRegionServerCallable> getCallsInProgress() { 399 return callsInProgress; 400 } 401 402 @VisibleForTesting 403 SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server, 404 Set<CancellableRegionServerCallable> callsInProgress) { 405 return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); 406 } 407 408 /** 409 * Group a list of actions per region servers, and send them. 410 * 411 * @param currentActions - the list of row to submit 412 * @param numAttempt - the current numAttempt (first attempt is 1) 413 */ 414 void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) { 415 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 416 417 boolean isReplica = false; 418 List<Action> unknownReplicaActions = null; 419 for (Action action : currentActions) { 420 RegionLocations locs = findAllLocationsOrFail(action, true); 421 if (locs == null) continue; 422 boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); 423 if (isReplica && !isReplicaAction) { 424 // This is the property of the current implementation, not a requirement. 425 throw new AssertionError("Replica and non-replica actions in the same retry"); 426 } 427 isReplica = isReplicaAction; 428 HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); 429 if (loc == null || loc.getServerName() == null) { 430 if (isReplica) { 431 if (unknownReplicaActions == null) { 432 unknownReplicaActions = new ArrayList<>(1); 433 } 434 unknownReplicaActions.add(action); 435 } else { 436 // TODO: relies on primary location always being fetched 437 manageLocationError(action, null); 438 } 439 } else { 440 byte[] regionName = loc.getRegionInfo().getRegionName(); 441 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); 442 } 443 } 444 boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); 445 boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty(); 446 447 if (!actionsByServer.isEmpty()) { 448 // If this is a first attempt to group and send, no replicas, we need replica thread. 449 sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown) 450 ? currentActions : null, numAttempt > 1 && !hasUnknown); 451 } 452 453 if (hasUnknown) { 454 actionsByServer = new HashMap<>(); 455 for (Action action : unknownReplicaActions) { 456 HRegionLocation loc = getReplicaLocationOrFail(action); 457 if (loc == null) continue; 458 byte[] regionName = loc.getRegionInfo().getRegionName(); 459 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); 460 } 461 if (!actionsByServer.isEmpty()) { 462 sendMultiAction( 463 actionsByServer, numAttempt, doStartReplica ? currentActions : null, true); 464 } 465 } 466 } 467 468 private HRegionLocation getReplicaLocationOrFail(Action action) { 469 // We are going to try get location once again. For each action, we'll do it once 470 // from cache, because the previous calls in the loop might populate it. 471 int replicaId = action.getReplicaId(); 472 RegionLocations locs = findAllLocationsOrFail(action, true); 473 if (locs == null) return null; // manageError already called 474 HRegionLocation loc = locs.getRegionLocation(replicaId); 475 if (loc == null || loc.getServerName() == null) { 476 locs = findAllLocationsOrFail(action, false); 477 if (locs == null) return null; // manageError already called 478 loc = locs.getRegionLocation(replicaId); 479 } 480 if (loc == null || loc.getServerName() == null) { 481 manageLocationError(action, null); 482 return null; 483 } 484 return loc; 485 } 486 487 private void manageLocationError(Action action, Exception ex) { 488 String msg = "Cannot get replica " + action.getReplicaId() 489 + " location for " + action.getAction(); 490 LOG.error(msg); 491 if (ex == null) { 492 ex = new IOException(msg); 493 } 494 manageError(action.getOriginalIndex(), action.getAction(), 495 Retry.NO_LOCATION_PROBLEM, ex, null); 496 } 497 498 private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) { 499 if (action.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess.id + 500 ", row cannot be null"); 501 RegionLocations loc = null; 502 try { 503 loc = asyncProcess.connection.locateRegion( 504 tableName, action.getAction().getRow(), useCache, true, action.getReplicaId()); 505 } catch (IOException ex) { 506 manageLocationError(action, ex); 507 } 508 return loc; 509 } 510 511 /** 512 * Send a multi action structure to the servers, after a delay depending on the attempt 513 * number. Asynchronous. 514 * 515 * @param actionsByServer the actions structured by regions 516 * @param numAttempt the attempt number. 517 * @param actionsForReplicaThread original actions for replica thread; null on non-first call. 518 */ 519 void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, 520 int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) { 521 // Run the last item on the same thread if we are already on a send thread. 522 // We hope most of the time it will be the only item, so we can cut down on threads. 523 int actionsRemaining = actionsByServer.size(); 524 // This iteration is by server (the HRegionLocation comparator is by server portion only). 525 for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) { 526 ServerName server = e.getKey(); 527 MultiAction multiAction = e.getValue(); 528 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction, 529 numAttempt); 530 // make sure we correctly count the number of runnables before we try to reuse the send 531 // thread, in case we had to split the request into different runnables because of backoff 532 if (runnables.size() > actionsRemaining) { 533 actionsRemaining = runnables.size(); 534 } 535 536 // run all the runnables 537 // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack overflow 538 // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth 539 for (Runnable runnable : runnables) { 540 if ((--actionsRemaining == 0) && reuseThread 541 && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) { 542 runnable.run(); 543 } else { 544 try { 545 pool.submit(runnable); 546 } catch (Throwable t) { 547 if (t instanceof RejectedExecutionException) { 548 // This should never happen. But as the pool is provided by the end user, 549 // let's secure this a little. 550 LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + 551 " Server=" + server.getServerName(), t); 552 } else { 553 // see #HBASE-14359 for more details 554 LOG.warn("Caught unexpected exception/error: ", t); 555 } 556 asyncProcess.decTaskCounters(multiAction.getRegions(), server); 557 // We're likely to fail again, but this will increment the attempt counter, 558 // so it will finish. 559 receiveGlobalFailure(multiAction, server, numAttempt, t); 560 } 561 } 562 } 563 } 564 565 if (actionsForReplicaThread != null) { 566 startWaitingForReplicaCalls(actionsForReplicaThread); 567 } 568 } 569 570 private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server, 571 MultiAction multiAction, 572 int numAttempt) { 573 // no stats to manage, just do the standard action 574 if (asyncProcess.connection.getStatisticsTracker() == null) { 575 if (asyncProcess.connection.getConnectionMetrics() != null) { 576 asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); 577 } 578 asyncProcess.incTaskCounters(multiAction.getRegions(), server); 579 SingleServerRequestRunnable runnable = createSingleServerRequest( 580 multiAction, numAttempt, server, callsInProgress); 581 Tracer tracer = Tracer.curThreadTracer(); 582 583 if (tracer == null) { 584 return Collections.singletonList(runnable); 585 } else { 586 return Collections.singletonList(tracer.wrap(runnable, "AsyncProcess.sendMultiAction")); 587 } 588 } 589 590 // group the actions by the amount of delay 591 Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size()); 592 593 // split up the actions 594 for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) { 595 Long backoff = getBackoff(server, e.getKey()); 596 DelayingRunner runner = actions.get(backoff); 597 if (runner == null) { 598 actions.put(backoff, new DelayingRunner(backoff, e)); 599 } else { 600 runner.add(e); 601 } 602 } 603 604 List<Runnable> toReturn = new ArrayList<>(actions.size()); 605 for (DelayingRunner runner : actions.values()) { 606 asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); 607 String traceText = "AsyncProcess.sendMultiAction"; 608 Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); 609 // use a delay runner only if we need to sleep for some time 610 if (runner.getSleepTime() > 0) { 611 runner.setRunner(runnable); 612 traceText = "AsyncProcess.clientBackoff.sendMultiAction"; 613 runnable = runner; 614 if (asyncProcess.connection.getConnectionMetrics() != null) { 615 asyncProcess.connection.getConnectionMetrics() 616 .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime()); 617 } 618 } else { 619 if (asyncProcess.connection.getConnectionMetrics() != null) { 620 asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); 621 } 622 } 623 runnable = TraceUtil.wrap(runnable, traceText); 624 toReturn.add(runnable); 625 626 } 627 return toReturn; 628 } 629 630 /** 631 * @param server server location where the target region is hosted 632 * @param regionName name of the region which we are going to write some data 633 * @return the amount of time the client should wait until it submit a request to the 634 * specified server and region 635 */ 636 private Long getBackoff(ServerName server, byte[] regionName) { 637 ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker(); 638 ServerStatistics stats = tracker.getStats(server); 639 return asyncProcess.connection.getBackoffPolicy() 640 .getBackoffTime(server, regionName, stats); 641 } 642 643 /** 644 * Starts waiting to issue replica calls on a different thread; or issues them immediately. 645 */ 646 private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) { 647 long startTime = EnvironmentEdgeManager.currentTime(); 648 ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable( 649 actionsForReplicaThread, startTime); 650 if (asyncProcess.primaryCallTimeoutMicroseconds == 0) { 651 // Start replica calls immediately. 652 replicaRunnable.run(); 653 } else { 654 // Start the thread that may kick off replica gets. 655 // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea. 656 try { 657 pool.submit(replicaRunnable); 658 } catch (RejectedExecutionException ree) { 659 LOG.warn("id=" + asyncProcess.id + " replica task rejected by pool; no replica calls", ree); 660 } 661 } 662 } 663 664 /** 665 * Check that we can retry acts accordingly: logs, set the error status. 666 * 667 * @param originalIndex the position in the list sent 668 * @param row the row 669 * @param canRetry if false, we won't retry whatever the settings. 670 * @param throwable the throwable, if any (can be null) 671 * @param server the location, if any (can be null) 672 * @return true if the action can be retried, false otherwise. 673 */ 674 Retry manageError(int originalIndex, Row row, Retry canRetry, 675 Throwable throwable, ServerName server) { 676 if (canRetry == Retry.YES 677 && throwable != null && throwable instanceof DoNotRetryIOException) { 678 canRetry = Retry.NO_NOT_RETRIABLE; 679 } 680 681 if (canRetry != Retry.YES) { 682 // Batch.Callback<Res> was not called on failure in 0.94. We keep this. 683 setError(originalIndex, row, throwable, server); 684 } else if (isActionComplete(originalIndex, row)) { 685 canRetry = Retry.NO_OTHER_SUCCEEDED; 686 } 687 return canRetry; 688 } 689 690 /** 691 * Resubmit all the actions from this multiaction after a failure. 692 * 693 * @param rsActions the actions still to do from the initial list 694 * @param server the destination 695 * @param numAttempt the number of attempts so far 696 * @param t the throwable (if any) that caused the resubmit 697 */ 698 private void receiveGlobalFailure( 699 MultiAction rsActions, ServerName server, int numAttempt, Throwable t) { 700 errorsByServer.reportServerError(server); 701 Retry canRetry = errorsByServer.canTryMore(numAttempt) 702 ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; 703 704 cleanServerCache(server, t); 705 int failed = 0; 706 int stopped = 0; 707 List<Action> toReplay = new ArrayList<>(); 708 for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) { 709 byte[] regionName = e.getKey(); 710 byte[] row = e.getValue().get(0).getAction().getRow(); 711 // Do not use the exception for updating cache because it might be coming from 712 // any of the regions in the MultiAction. 713 updateCachedLocations(server, regionName, row, 714 ClientExceptionsUtil.isMetaClearingException(t) ? null : t); 715 for (Action action : e.getValue()) { 716 Retry retry = manageError( 717 action.getOriginalIndex(), action.getAction(), canRetry, t, server); 718 if (retry == Retry.YES) { 719 toReplay.add(action); 720 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { 721 ++stopped; 722 } else { 723 ++failed; 724 } 725 } 726 } 727 728 if (toReplay.isEmpty()) { 729 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped); 730 } else { 731 resubmit(server, toReplay, numAttempt, rsActions.size(), t); 732 } 733 } 734 735 /** 736 * Log as much info as possible, and, if there is something to replay, 737 * submit it again after a back off sleep. 738 */ 739 private void resubmit(ServerName oldServer, List<Action> toReplay, 740 int numAttempt, int failureCount, Throwable throwable) { 741 // We have something to replay. We're going to sleep a little before. 742 743 // We have two contradicting needs here: 744 // 1) We want to get the new location after having slept, as it may change. 745 // 2) We want to take into account the location when calculating the sleep time. 746 // 3) If all this is just because the response needed to be chunked try again FAST. 747 // It should be possible to have some heuristics to take the right decision. Short term, 748 // we go for one. 749 boolean retryImmediately = throwable instanceof RetryImmediatelyException; 750 int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; 751 long backOffTime; 752 if (retryImmediately) { 753 backOffTime = 0; 754 } else if (throwable instanceof CallQueueTooBigException) { 755 // Give a special check on CQTBE, see #HBASE-17114 756 backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE); 757 } else { 758 backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); 759 } 760 if (numAttempt > asyncProcess.startLogErrorsCnt) { 761 // We use this value to have some logs when we have multiple failures, but not too many 762 // logs, as errors are to be expected when a region moves, splits and so on 763 LOG.info(createLog(numAttempt, failureCount, toReplay.size(), 764 oldServer, throwable, backOffTime, true, null, -1, -1)); 765 } 766 767 try { 768 if (backOffTime > 0) { 769 Thread.sleep(backOffTime); 770 } 771 } catch (InterruptedException e) { 772 LOG.warn("#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e); 773 Thread.currentThread().interrupt(); 774 return; 775 } 776 777 groupAndSendMultiAction(toReplay, nextAttemptNumber); 778 } 779 780 private void logNoResubmit(ServerName oldServer, int numAttempt, 781 int failureCount, Throwable throwable, int failed, int stopped) { 782 if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) { 783 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString(); 784 String logMessage = createLog(numAttempt, failureCount, 0, oldServer, 785 throwable, -1, false, timeStr, failed, stopped); 786 if (failed != 0) { 787 // Only log final failures as warning 788 LOG.warn(logMessage); 789 } else { 790 LOG.info(logMessage); 791 } 792 } 793 } 794 795 /** 796 * Called when we receive the result of a server query. 797 * 798 * @param multiAction - the multiAction we sent 799 * @param server - the location. It's used as a server name. 800 * @param responses - the response, if any 801 * @param numAttempt - the attempt 802 */ 803 private void receiveMultiAction(MultiAction multiAction, ServerName server, 804 MultiResponse responses, int numAttempt) { 805 assert responses != null; 806 updateStats(server, responses); 807 // Success or partial success 808 // Analyze detailed results. We can still have individual failures to be redo. 809 // two specific throwables are managed: 810 // - DoNotRetryIOException: we continue to retry for other actions 811 // - RegionMovedException: we update the cache with the new region location 812 Map<byte[], MultiResponse.RegionResult> results = responses.getResults(); 813 List<Action> toReplay = new ArrayList<>(); 814 Throwable lastException = null; 815 int failureCount = 0; 816 int failed = 0; 817 int stopped = 0; 818 Retry retry = null; 819 // Go by original action. 820 for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) { 821 byte[] regionName = regionEntry.getKey(); 822 823 Throwable regionException = responses.getExceptions().get(regionName); 824 if (regionException != null) { 825 cleanServerCache(server, regionException); 826 } 827 828 Map<Integer, Object> regionResults = 829 results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap(); 830 boolean regionFailureRegistered = false; 831 for (Action sentAction : regionEntry.getValue()) { 832 Object result = regionResults.get(sentAction.getOriginalIndex()); 833 if (result == null) { 834 if (regionException == null) { 835 LOG.error("Server sent us neither results nor exceptions for " 836 + Bytes.toStringBinary(regionName) 837 + ", numAttempt:" + numAttempt); 838 regionException = new RuntimeException("Invalid response"); 839 } 840 // If the row operation encounters the region-lever error, the exception of action may be 841 // null. 842 result = regionException; 843 } 844 // Failure: retry if it's make sense else update the errors lists 845 if (result instanceof Throwable) { 846 Throwable actionException = (Throwable) result; 847 Row row = sentAction.getAction(); 848 lastException = regionException != null ? regionException 849 : ClientExceptionsUtil.findException(actionException); 850 // Register corresponding failures once per server/once per region. 851 if (!regionFailureRegistered) { 852 regionFailureRegistered = true; 853 updateCachedLocations(server, regionName, row.getRow(), actionException); 854 } 855 if (retry == null) { 856 errorsByServer.reportServerError(server); 857 // We determine canRetry only once for all calls, after reporting server failure. 858 retry = errorsByServer.canTryMore(numAttempt) ? 859 Retry.YES : Retry.NO_RETRIES_EXHAUSTED; 860 } 861 ++failureCount; 862 switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException, 863 server)) { 864 case YES: 865 toReplay.add(sentAction); 866 break; 867 case NO_OTHER_SUCCEEDED: 868 ++stopped; 869 break; 870 default: 871 ++failed; 872 break; 873 } 874 } else { 875 invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result); 876 setResult(sentAction, result); 877 } 878 } 879 } 880 if (toReplay.isEmpty()) { 881 logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped); 882 } else { 883 resubmit(server, toReplay, numAttempt, failureCount, lastException); 884 } 885 } 886 887 private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row, 888 Throwable rowException) { 889 if (tableName == null) { 890 return; 891 } 892 try { 893 asyncProcess.connection 894 .updateCachedLocations(tableName, regionName, row, rowException, server); 895 } catch (Throwable ex) { 896 // That should never happen, but if it did, we want to make sure 897 // we still process errors 898 LOG.error("Couldn't update cached region locations: " + ex); 899 } 900 } 901 902 private void invokeCallBack(byte[] regionName, byte[] row, CResult result) { 903 if (callback != null) { 904 try { 905 //noinspection unchecked 906 // TODO: would callback expect a replica region name if it gets one? 907 this.callback.update(regionName, row, result); 908 } catch (Throwable t) { 909 LOG.error("User callback threw an exception for " 910 + Bytes.toStringBinary(regionName) + ", ignoring", t); 911 } 912 } 913 } 914 915 private void cleanServerCache(ServerName server, Throwable regionException) { 916 if (ClientExceptionsUtil.isMetaClearingException(regionException)) { 917 // We want to make sure to clear the cache in case there were location-related exceptions. 918 // We don't to clear the cache for every possible exception that comes through, however. 919 asyncProcess.connection.clearCaches(server); 920 } 921 } 922 923 @VisibleForTesting 924 protected void updateStats(ServerName server, MultiResponse resp) { 925 ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()), 926 Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp); 927 } 928 929 930 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, 931 Throwable error, long backOffTime, boolean willRetry, String startTime, 932 int failed, int stopped) { 933 StringBuilder sb = new StringBuilder(); 934 sb.append("id=").append(asyncProcess.id).append(", table=").append(tableName). 935 append(", attempt=").append(numAttempt).append("/").append(asyncProcess.numTries). 936 append(", "); 937 938 if (failureCount > 0 || error != null){ 939 sb.append("failureCount=").append(failureCount).append("ops").append(", last exception="). 940 append(error); 941 } else { 942 sb.append("succeeded"); 943 } 944 945 sb.append(" on ").append(sn).append(", tracking started ").append(startTime); 946 947 if (willRetry) { 948 sb.append(", retrying after=").append(backOffTime).append("ms"). 949 append(", operationsToReplay=").append(replaySize); 950 } else if (failureCount > 0) { 951 if (stopped > 0) { 952 sb.append("; NOT retrying, stopped=").append(stopped). 953 append(" because successful operation on other replica"); 954 } 955 if (failed > 0) { 956 sb.append("; NOT retrying, failed=").append(failed).append(" -- final attempt!"); 957 } 958 } 959 960 return sb.toString(); 961 } 962 963 /** 964 * Sets the non-error result from a particular action. 965 * @param action Action (request) that the server responded to. 966 * @param result The result. 967 */ 968 private void setResult(Action action, Object result) { 969 if (result == null) { 970 throw new RuntimeException("Result cannot be null"); 971 } 972 ReplicaResultState state = null; 973 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); 974 int index = action.getOriginalIndex(); 975 if (results == null) { 976 decActionCounter(index); 977 return; // Simple case, no replica requests. 978 } 979 state = trySetResultSimple(index, action.getAction(), false, result, null, isStale); 980 if (state == null) { 981 return; // Simple case, no replica requests. 982 } 983 // At this point we know that state is set to replica tracking class. 984 // It could be that someone else is also looking at it; however, we know there can 985 // only be one state object, and only one thread can set callCount to 0. Other threads 986 // will either see state with callCount 0 after locking it; or will not see state at all 987 // we will replace it with the result. 988 synchronized (state) { 989 if (state.callCount == 0) { 990 return; // someone already set the result 991 } 992 state.callCount = 0; 993 } 994 synchronized (replicaResultLock) { 995 if (results[index] != state) { 996 throw new AssertionError("We set the callCount but someone else replaced the result"); 997 } 998 updateResult(index, result); 999 } 1000 1001 decActionCounter(index); 1002 } 1003 1004 /** 1005 * Sets the error from a particular action. 1006 * @param index Original action index. 1007 * @param row Original request. 1008 * @param throwable The resulting error. 1009 * @param server The source server. 1010 */ 1011 private void setError(int index, Row row, Throwable throwable, ServerName server) { 1012 ReplicaResultState state = null; 1013 if (results == null) { 1014 // Note that we currently cannot have replica requests with null results. So it shouldn't 1015 // happen that multiple replica calls will call dAC for same actions with results == null. 1016 // Only one call per action should be present in this case. 1017 errors.add(throwable, row, server); 1018 decActionCounter(index); 1019 return; // Simple case, no replica requests. 1020 } 1021 state = trySetResultSimple(index, row, true, throwable, server, false); 1022 if (state == null) { 1023 return; // Simple case, no replica requests. 1024 } 1025 BatchErrors target = null; // Error will be added to final errors, or temp replica errors. 1026 boolean isActionDone = false; 1027 synchronized (state) { 1028 switch (state.callCount) { 1029 case 0: return; // someone already set the result 1030 case 1: { // All calls failed, we are the last error. 1031 target = errors; 1032 isActionDone = true; 1033 break; 1034 } 1035 default: { 1036 assert state.callCount > 1; 1037 if (state.replicaErrors == null) { 1038 state.replicaErrors = new BatchErrors(); 1039 } 1040 target = state.replicaErrors; 1041 break; 1042 } 1043 } 1044 --state.callCount; 1045 } 1046 target.add(throwable, row, server); 1047 if (isActionDone) { 1048 if (state.replicaErrors != null) { // last call, no need to lock 1049 errors.merge(state.replicaErrors); 1050 } 1051 // See setResult for explanations. 1052 synchronized (replicaResultLock) { 1053 if (results[index] != state) { 1054 throw new AssertionError("We set the callCount but someone else replaced the result"); 1055 } 1056 updateResult(index, throwable); 1057 } 1058 decActionCounter(index); 1059 } 1060 } 1061 1062 /** 1063 * Checks if the action is complete; used on error to prevent needless retries. 1064 * Does not synchronize, assuming element index/field accesses are atomic. 1065 * This is an opportunistic optimization check, doesn't have to be strict. 1066 * @param index Original action index. 1067 * @param row Original request. 1068 */ 1069 private boolean isActionComplete(int index, Row row) { 1070 if (!AsyncProcess.isReplicaGet(row)) return false; 1071 Object resObj = results[index]; 1072 return (resObj != null) && (!(resObj instanceof ReplicaResultState) 1073 || ((ReplicaResultState)resObj).callCount == 0); 1074 } 1075 1076 /** 1077 * Tries to set the result or error for a particular action as if there were no replica calls. 1078 * @return null if successful; replica state if there were in fact replica calls. 1079 */ 1080 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, 1081 Object result, ServerName server, boolean isFromReplica) { 1082 Object resObj = null; 1083 if (!AsyncProcess.isReplicaGet(row)) { 1084 if (isFromReplica) { 1085 throw new AssertionError("Unexpected stale result for " + row); 1086 } 1087 updateResult(index, result); 1088 } else { 1089 synchronized (replicaResultLock) { 1090 resObj = results[index]; 1091 if (resObj == null) { 1092 if (isFromReplica) { 1093 throw new AssertionError("Unexpected stale result for " + row); 1094 } 1095 updateResult(index, result); 1096 } 1097 } 1098 } 1099 1100 ReplicaResultState rrs = 1101 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; 1102 if (rrs == null && isError) { 1103 // The resObj is not replica state (null or already set). 1104 errors.add((Throwable)result, row, server); 1105 } 1106 1107 if (resObj == null) { 1108 // resObj is null - no replica calls were made. 1109 decActionCounter(index); 1110 return null; 1111 } 1112 return rrs; 1113 } 1114 1115 private void decActionCounter(int index) { 1116 long actionsRemaining = actionsInProgress.decrementAndGet(); 1117 if (actionsRemaining < 0) { 1118 String error = buildDetailedErrorMsg("Incorrect actions in progress", index); 1119 throw new AssertionError(error); 1120 } else if (actionsRemaining == 0) { 1121 synchronized (actionsInProgress) { 1122 actionsInProgress.notifyAll(); 1123 } 1124 } 1125 } 1126 1127 private String buildDetailedErrorMsg(String string, int index) { 1128 StringBuilder error = new StringBuilder(128); 1129 error.append(string).append("; called for ").append(index).append(", actionsInProgress ") 1130 .append(actionsInProgress.get()).append("; replica gets: "); 1131 if (replicaGetIndices != null) { 1132 for (int i = 0; i < replicaGetIndices.length; ++i) { 1133 error.append(replicaGetIndices[i]).append(", "); 1134 } 1135 } else { 1136 error.append(hasAnyReplicaGets ? "all" : "none"); 1137 } 1138 error.append("; results "); 1139 if (results != null) { 1140 for (int i = 0; i < results.length; ++i) { 1141 Object o = results[i]; 1142 error.append(((o == null) ? "null" : o.toString())).append(", "); 1143 } 1144 } 1145 return error.toString(); 1146 } 1147 1148 @Override 1149 public void waitUntilDone() throws InterruptedIOException { 1150 try { 1151 waitUntilDone(Long.MAX_VALUE); 1152 } catch (InterruptedException iex) { 1153 throw new InterruptedIOException(iex.getMessage()); 1154 } finally { 1155 if (callsInProgress != null) { 1156 for (CancellableRegionServerCallable clb : callsInProgress) { 1157 clb.cancel(); 1158 } 1159 } 1160 } 1161 } 1162 1163 private boolean waitUntilDone(long cutoff) throws InterruptedException { 1164 boolean hasWait = cutoff != Long.MAX_VALUE; 1165 long lastLog = EnvironmentEdgeManager.currentTime(); 1166 long currentInProgress; 1167 while (0 != (currentInProgress = actionsInProgress.get())) { 1168 long now = EnvironmentEdgeManager.currentTime(); 1169 if (hasWait && (now * 1000L) > cutoff) { 1170 return false; 1171 } 1172 if (!hasWait) { // Only log if wait is infinite. 1173 if (now > lastLog + 10000) { 1174 lastLog = now; 1175 LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress 1176 + " actions to finish on table: " + tableName); 1177 } 1178 } 1179 synchronized (actionsInProgress) { 1180 if (actionsInProgress.get() == 0) break; 1181 if (!hasWait) { 1182 actionsInProgress.wait(10); 1183 } else { 1184 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); 1185 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); 1186 } 1187 } 1188 } 1189 return true; 1190 } 1191 1192 @Override 1193 public boolean hasError() { 1194 return errors.hasErrors(); 1195 } 1196 1197 @Override 1198 public List<? extends Row> getFailedOperations() { 1199 return errors.actions; 1200 } 1201 1202 @Override 1203 public RetriesExhaustedWithDetailsException getErrors() { 1204 return errors.makeException(asyncProcess.logBatchErrorDetails); 1205 } 1206 1207 @Override 1208 public Object[] getResults() throws InterruptedIOException { 1209 waitUntilDone(); 1210 return results; 1211 } 1212 1213 /** 1214 * Creates the server error tracker to use inside process. 1215 * Currently, to preserve the main assumption about current retries, and to work well with 1216 * the retry-limit-based calculation, the calculation is local per Process object. 1217 * We may benefit from connection-wide tracking of server errors. 1218 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection 1219 */ 1220 private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { 1221 return new ConnectionImplementation.ServerErrorTracker( 1222 asyncProcess.serverTrackerTimeout, asyncProcess.numTries); 1223 } 1224 1225 /** 1226 * Create a callable. Isolated to be easily overridden in the tests. 1227 */ 1228 private MultiServerCallable createCallable(final ServerName server, TableName tableName, 1229 final MultiAction multi) { 1230 return new MultiServerCallable(asyncProcess.connection, tableName, server, 1231 multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority()); 1232 } 1233 1234 private void updateResult(int index, Object result) { 1235 Object current = results[index]; 1236 if (current != null) { 1237 if (LOG.isDebugEnabled()) { 1238 LOG.debug("The result is assigned repeatedly! current:" + current 1239 + ", new:" + result); 1240 } 1241 } 1242 results[index] = result; 1243 } 1244 1245 @VisibleForTesting 1246 long getNumberOfActionsInProgress() { 1247 return actionsInProgress.get(); 1248 } 1249}