001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.SocketTimeoutException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.Date;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.RejectedExecutionException;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicLong;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.HBaseServerException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.RegionLocations;
042import org.apache.hadoop.hbase.RetryImmediatelyException;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
046import org.apache.hadoop.hbase.client.coprocessor.Batch;
047import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * The context, and return value, for a single submit/submitAll call. Note on how this class (one AP
056 * submit) works. Initially, all requests are split into groups by server; request is sent to each
057 * server in parallel; the RPC calls are not async so a thread per server is used. Every time some
058 * actions fail, regions/locations might have changed, so we re-group them by server and region
059 * again and send these groups in parallel too. The result, in case of retries, is a "tree" of
060 * threads, with parent exiting after scheduling children. This is why lots of code doesn't require
061 * any synchronization.
062 */
063@InterfaceAudience.Private
064class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
065
066  private static final Logger LOG = LoggerFactory.getLogger(AsyncRequestFutureImpl.class);
067
068  private RetryingTimeTracker tracker;
069
070  /**
071   * Runnable (that can be submitted to thread pool) that waits for when it's time to issue replica
072   * calls, finds region replicas, groups the requests by replica and issues the calls (on separate
073   * threads, via sendMultiAction). This is done on a separate thread because we don't want to wait
074   * on user thread for our asynchronous call, and usually we have to wait before making replica
075   * calls.
076   */
077  private final class ReplicaCallIssuingRunnable implements Runnable {
078    private final long startTime;
079    private final List<Action> initialActions;
080
081    public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) {
082      this.initialActions = initialActions;
083      this.startTime = startTime;
084    }
085
086    @Override
087    public void run() {
088      boolean done = false;
089      if (asyncProcess.primaryCallTimeoutMicroseconds > 0) {
090        try {
091          done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds);
092        } catch (InterruptedException ex) {
093          LOG.error("Replica thread interrupted - no replica calls {}", ex.getMessage());
094          return;
095        }
096      }
097      if (done) return; // Done within primary timeout
098      Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
099      List<Action> unknownLocActions = new ArrayList<>();
100      if (replicaGetIndices == null) {
101        for (int i = 0; i < results.length; ++i) {
102          addReplicaActions(i, actionsByServer, unknownLocActions);
103        }
104      } else {
105        for (int replicaGetIndice : replicaGetIndices) {
106          addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
107        }
108      }
109      if (!actionsByServer.isEmpty()) {
110        sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
111      }
112      if (!unknownLocActions.isEmpty()) {
113        actionsByServer = new HashMap<>();
114        for (Action action : unknownLocActions) {
115          addReplicaActionsAgain(action, actionsByServer);
116        }
117        // Some actions may have completely failed, they are handled inside addAgain.
118        if (!actionsByServer.isEmpty()) {
119          sendMultiAction(actionsByServer, 1, null, true);
120        }
121      }
122    }
123
124    /**
125     * Add replica actions to action map by server.
126     * @param index           Index of the original action.
127     * @param actionsByServer The map by server to add it to.
128     */
129    private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer,
130      List<Action> unknownReplicaActions) {
131      if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
132      Action action = initialActions.get(index);
133      RegionLocations loc = findAllLocationsOrFail(action, true);
134      if (loc == null) return;
135      HRegionLocation[] locs = loc.getRegionLocations();
136      if (locs.length == 1) {
137        LOG.warn("No replicas found for {}", action.getAction());
138        return;
139      }
140      synchronized (replicaResultLock) {
141        // Don't run replica calls if the original has finished. We could do it e.g. if
142        // original has already failed before first replica call (unlikely given retries),
143        // but that would require additional synchronization w.r.t. returning to caller.
144        if (results[index] != null) return;
145        // We set the number of calls here. After that any path must call setResult/setError.
146        // True even for replicas that are not found - if we refuse to send we MUST set error.
147        updateResult(index, new ReplicaResultState(locs.length));
148      }
149      for (int i = 1; i < locs.length; ++i) {
150        Action replicaAction = new Action(action, i);
151        if (locs[i] != null) {
152          asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
153            replicaAction, actionsByServer, nonceGroup);
154        } else {
155          unknownReplicaActions.add(replicaAction);
156        }
157      }
158    }
159
160    private void addReplicaActionsAgain(Action action,
161      Map<ServerName, MultiAction> actionsByServer) {
162      if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
163        throw new AssertionError("Cannot have default replica here");
164      }
165      HRegionLocation loc = getReplicaLocationOrFail(action);
166      if (loc == null) return;
167      asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), action,
168        actionsByServer, nonceGroup);
169    }
170  }
171
172  /**
173   * Runnable (that can be submitted to thread pool) that submits MultiAction to a single server.
174   * The server call is synchronous, therefore we do it on a thread pool.
175   */
176  final class SingleServerRequestRunnable implements Runnable {
177    private final MultiAction multiAction;
178    private final int numAttempt;
179    private final ServerName server;
180    private final Set<CancellableRegionServerCallable> callsInProgress;
181
182    SingleServerRequestRunnable(MultiAction multiAction, int numAttempt, ServerName server,
183      Set<CancellableRegionServerCallable> callsInProgress) {
184      this.multiAction = multiAction;
185      this.numAttempt = numAttempt;
186      this.server = server;
187      this.callsInProgress = callsInProgress;
188    }
189
190    @Override
191    public void run() {
192      AbstractResponse res = null;
193      CancellableRegionServerCallable callable = currentCallable;
194      try {
195        // setup the callable based on the actions, if we don't have one already from the request
196        if (callable == null) {
197          callable = createCallable(server, tableName, multiAction);
198        }
199        RpcRetryingCaller<AbstractResponse> caller =
200          asyncProcess.createCaller(callable, rpcTimeout);
201        try {
202          if (callsInProgress != null) {
203            callsInProgress.add(callable);
204          }
205          res = caller.callWithoutRetries(callable, operationTimeout);
206          if (res == null) {
207            // Cancelled
208            return;
209          }
210        } catch (IOException e) {
211          // The service itself failed . It may be an error coming from the communication
212          // layer, but, as well, a functional error raised by the server.
213          receiveGlobalFailure(multiAction, server, numAttempt, e);
214          return;
215        } catch (Throwable t) {
216          // This should not happen. Let's log & retry anyway.
217          LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected."
218            + " Retrying. Server=" + server + ", tableName=" + tableName, t);
219          receiveGlobalFailure(multiAction, server, numAttempt, t);
220          return;
221        }
222        if (res.type() == AbstractResponse.ResponseType.MULTI) {
223          // Normal case: we received an answer from the server, and it's not an exception.
224          receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt);
225        } else {
226          if (results != null) {
227            SingleResponse singleResponse = (SingleResponse) res;
228            updateResult(0, singleResponse.getEntry());
229          }
230          decActionCounter(1);
231        }
232      } catch (Throwable t) {
233        // Something really bad happened. We are on the send thread that will now die.
234        LOG.error("id=" + asyncProcess.id + " error for " + tableName + " processing " + server, t);
235        throw new RuntimeException(t);
236      } finally {
237        asyncProcess.decTaskCounters(multiAction.getRegions(), server);
238        if (callsInProgress != null && callable != null && res != null) {
239          callsInProgress.remove(callable);
240        }
241      }
242    }
243  }
244
245  private final Batch.Callback<CResult> callback;
246  private final BatchErrors errors;
247  private final ConnectionImplementation.ServerErrorTracker errorsByServer;
248  private final ExecutorService pool;
249  private final Set<CancellableRegionServerCallable> callsInProgress;
250
251  private final TableName tableName;
252  private final AtomicLong actionsInProgress = new AtomicLong(-1);
253  /**
254   * The lock controls access to results. It is only held when populating results where there might
255   * be several callers (eventual consistency gets). For other requests, there's one unique call
256   * going on per result index.
257   */
258  private final Object replicaResultLock = new Object();
259  /**
260   * Result array. Null if results are not needed. Otherwise, each index corresponds to the action
261   * index in initial actions submitted. For most request types, has null-s for requests that are
262   * not done, and result/exception for those that are done. For eventual-consistency gets,
263   * initially the same applies; at some point, replica calls might be started, and
264   * ReplicaResultState is put at the corresponding indices. The returning calls check the type to
265   * detect when this is the case. After all calls are done, ReplicaResultState-s are replaced with
266   * results for the user.
267   */
268  private final Object[] results;
269  /**
270   * Indices of replica gets in results. If null, all or no actions are replica-gets.
271   */
272  private final int[] replicaGetIndices;
273  private final boolean hasAnyReplicaGets;
274  private final long nonceGroup;
275  private final CancellableRegionServerCallable currentCallable;
276  private final int operationTimeout;
277  private final int rpcTimeout;
278  private final AsyncProcess asyncProcess;
279
280  /**
281   * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
282   * used to make logging more clear, we don't actually care why we don't retry.
283   */
284  public enum Retry {
285    YES,
286    NO_LOCATION_PROBLEM,
287    NO_NOT_RETRIABLE,
288    NO_RETRIES_EXHAUSTED,
289    NO_OTHER_SUCCEEDED
290  }
291
292  /**
293   * Sync point for calls to multiple replicas for the same user request (Get). Created and put in
294   * the results array (we assume replica calls require results) when the replica calls are
295   * launched. See results for details of this process. POJO, all fields are public. To modify them,
296   * the object itself is locked.
297   */
298  private static class ReplicaResultState {
299    public ReplicaResultState(int callCount) {
300      this.callCount = callCount;
301    }
302
303    /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
304    int callCount;
305    /**
306     * Errors for which it is not decided whether we will report them to user. If one of the calls
307     * succeeds, we will discard the errors that may have happened in the other calls.
308     */
309    BatchErrors replicaErrors = null;
310
311    @Override
312    public String toString() {
313      return "[call count " + callCount + "; errors " + replicaErrors + "]";
314    }
315  }
316
317  public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long nonceGroup,
318    AsyncProcess asyncProcess) {
319    this.pool = task.getPool();
320    this.callback = task.getCallback();
321    this.nonceGroup = nonceGroup;
322    this.tableName = task.getTableName();
323    this.actionsInProgress.set(actions.size());
324    if (task.getResults() == null) {
325      results = task.getNeedResults() ? new Object[actions.size()] : null;
326    } else {
327      if (task.getResults().length != actions.size()) {
328        throw new AssertionError("results.length");
329      }
330      this.results = task.getResults();
331      for (int i = 0; i != this.results.length; ++i) {
332        results[i] = null;
333      }
334    }
335    List<Integer> replicaGetIndices = null;
336    boolean hasAnyReplicaGets = false;
337    if (results != null) {
338      // Check to see if any requests might require replica calls.
339      // We expect that many requests will consist of all or no multi-replica gets; in such
340      // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
341      // store the list of action indexes for which replica gets are possible, and set
342      // hasAnyReplicaGets to true.
343      boolean hasAnyNonReplicaReqs = false;
344      int posInList = 0;
345      for (Action action : actions) {
346        boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction());
347        if (isReplicaGet) {
348          hasAnyReplicaGets = true;
349          if (hasAnyNonReplicaReqs) { // Mixed case
350            if (replicaGetIndices == null) {
351              replicaGetIndices = new ArrayList<>(actions.size() - 1);
352            }
353            replicaGetIndices.add(posInList);
354          }
355        } else if (!hasAnyNonReplicaReqs) {
356          // The first non-multi-replica request in the action list.
357          hasAnyNonReplicaReqs = true;
358          if (posInList > 0) {
359            // Add all the previous requests to the index lists. We know they are all
360            // replica-gets because this is the first non-multi-replica request in the list.
361            replicaGetIndices = new ArrayList<>(actions.size() - 1);
362            for (int i = 0; i < posInList; ++i) {
363              replicaGetIndices.add(i);
364            }
365          }
366        }
367        ++posInList;
368      }
369    }
370    this.hasAnyReplicaGets = hasAnyReplicaGets;
371    if (replicaGetIndices != null) {
372      this.replicaGetIndices = new int[replicaGetIndices.size()];
373      int i = 0;
374      for (Integer el : replicaGetIndices) {
375        this.replicaGetIndices[i++] = el;
376      }
377    } else {
378      this.replicaGetIndices = null;
379    }
380    this.callsInProgress = !hasAnyReplicaGets
381      ? null
382      : Collections
383        .newSetFromMap(new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>());
384    this.asyncProcess = asyncProcess;
385    this.errorsByServer = createServerErrorTracker();
386    this.errors = new BatchErrors();
387    this.operationTimeout = task.getOperationTimeout();
388    this.rpcTimeout = task.getRpcTimeout();
389    this.currentCallable = task.getCallable();
390    if (task.getCallable() == null) {
391      tracker = new RetryingTimeTracker().start();
392    }
393  }
394
395  protected Set<CancellableRegionServerCallable> getCallsInProgress() {
396    return callsInProgress;
397  }
398
399  SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt,
400    ServerName server, Set<CancellableRegionServerCallable> callsInProgress) {
401    return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
402  }
403
404  /**
405   * Group a list of actions per region servers, and send them.
406   * @param currentActions - the list of row to submit
407   * @param numAttempt     - the current numAttempt (first attempt is 1)
408   */
409  void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) {
410    Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
411
412    boolean isReplica = false;
413    List<Action> unknownReplicaActions = null;
414    for (Action action : currentActions) {
415      RegionLocations locs = findAllLocationsOrFail(action, true);
416      if (locs == null) continue;
417      boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
418      if (isReplica && !isReplicaAction) {
419        // This is the property of the current implementation, not a requirement.
420        throw new AssertionError("Replica and non-replica actions in the same retry");
421      }
422      isReplica = isReplicaAction;
423      HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
424      if (loc == null || loc.getServerName() == null) {
425        if (isReplica) {
426          if (unknownReplicaActions == null) {
427            unknownReplicaActions = new ArrayList<>(1);
428          }
429          unknownReplicaActions.add(action);
430        } else {
431          // TODO: relies on primary location always being fetched
432          manageLocationError(action, null);
433        }
434      } else {
435        byte[] regionName = loc.getRegionInfo().getRegionName();
436        AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer,
437          nonceGroup);
438      }
439    }
440    boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
441    boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
442
443    if (!actionsByServer.isEmpty()) {
444      // If this is a first attempt to group and send, no replicas, we need replica thread.
445      sendMultiAction(actionsByServer, numAttempt,
446        (doStartReplica && !hasUnknown) ? currentActions : null, numAttempt > 1 && !hasUnknown);
447    }
448
449    if (hasUnknown) {
450      actionsByServer = new HashMap<>();
451      for (Action action : unknownReplicaActions) {
452        HRegionLocation loc = getReplicaLocationOrFail(action);
453        if (loc == null) continue;
454        byte[] regionName = loc.getRegionInfo().getRegionName();
455        AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer,
456          nonceGroup);
457      }
458      if (!actionsByServer.isEmpty()) {
459        sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
460      }
461    }
462  }
463
464  private HRegionLocation getReplicaLocationOrFail(Action action) {
465    // We are going to try get location once again. For each action, we'll do it once
466    // from cache, because the previous calls in the loop might populate it.
467    int replicaId = action.getReplicaId();
468    RegionLocations locs = findAllLocationsOrFail(action, true);
469    if (locs == null) return null; // manageError already called
470    HRegionLocation loc = locs.getRegionLocation(replicaId);
471    if (loc == null || loc.getServerName() == null) {
472      locs = findAllLocationsOrFail(action, false);
473      if (locs == null) return null; // manageError already called
474      loc = locs.getRegionLocation(replicaId);
475    }
476    if (loc == null || loc.getServerName() == null) {
477      manageLocationError(action, null);
478      return null;
479    }
480    return loc;
481  }
482
483  private void manageLocationError(Action action, Exception ex) {
484    String msg =
485      "Cannot get replica " + action.getReplicaId() + " location for " + action.getAction();
486    LOG.error(msg);
487    if (ex == null) {
488      ex = new IOException(msg);
489    }
490    manageError(action.getOriginalIndex(), action.getAction(), Retry.NO_LOCATION_PROBLEM, ex, null);
491  }
492
493  private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) {
494    if (action.getAction() == null)
495      throw new IllegalArgumentException("#" + asyncProcess.id + ", row cannot be null");
496    RegionLocations loc = null;
497    try {
498      loc = asyncProcess.connection.locateRegion(tableName, action.getAction().getRow(), useCache,
499        true, action.getReplicaId());
500    } catch (IOException ex) {
501      manageLocationError(action, ex);
502    }
503    return loc;
504  }
505
506  /**
507   * Send a multi action structure to the servers, after a delay depending on the attempt number.
508   * Asynchronous.
509   * @param actionsByServer         the actions structured by regions
510   * @param numAttempt              the attempt number.
511   * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
512   */
513  void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttempt,
514    List<Action> actionsForReplicaThread, boolean reuseThread) {
515    // Run the last item on the same thread if we are already on a send thread.
516    // We hope most of the time it will be the only item, so we can cut down on threads.
517    int actionsRemaining = actionsByServer.size();
518    // This iteration is by server (the HRegionLocation comparator is by server portion only).
519    for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) {
520      ServerName server = e.getKey();
521      MultiAction multiAction = e.getValue();
522      Collection<? extends Runnable> runnables =
523        getNewMultiActionRunnable(server, multiAction, numAttempt);
524      // make sure we correctly count the number of runnables before we try to reuse the send
525      // thread, in case we had to split the request into different runnables because of backoff
526      if (runnables.size() > actionsRemaining) {
527        actionsRemaining = runnables.size();
528      }
529
530      // run all the runnables
531      // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack
532      // overflow
533      // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth
534      for (Runnable runnable : runnables) {
535        if (
536          (--actionsRemaining == 0) && reuseThread
537            && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0
538        ) {
539          runnable.run();
540        } else {
541          try {
542            pool.execute(runnable);
543          } catch (Throwable t) {
544            if (t instanceof RejectedExecutionException) {
545              // This should never happen. But as the pool is provided by the end user,
546              // let's secure this a little.
547              LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + " Server="
548                + server.getServerName(), t);
549            } else {
550              // see #HBASE-14359 for more details
551              LOG.warn("Caught unexpected exception/error: ", t);
552            }
553            asyncProcess.decTaskCounters(multiAction.getRegions(), server);
554            // We're likely to fail again, but this will increment the attempt counter,
555            // so it will finish.
556            receiveGlobalFailure(multiAction, server, numAttempt, t);
557          }
558        }
559      }
560    }
561
562    if (actionsForReplicaThread != null) {
563      startWaitingForReplicaCalls(actionsForReplicaThread);
564    }
565  }
566
567  @SuppressWarnings("MixedMutabilityReturnType")
568  private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
569    MultiAction multiAction, int numAttempt) {
570    // no stats to manage, just do the standard action
571    if (asyncProcess.connection.getStatisticsTracker() == null) {
572      if (asyncProcess.connection.getConnectionMetrics() != null) {
573        asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
574      }
575      asyncProcess.incTaskCounters(multiAction.getRegions(), server);
576      SingleServerRequestRunnable runnable =
577        createSingleServerRequest(multiAction, numAttempt, server, callsInProgress);
578
579      // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
580      return Collections.singletonList(runnable);
581    }
582
583    // group the actions by the amount of delay
584    Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size());
585
586    // split up the actions
587    for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) {
588      Long backoff = getBackoff(server, e.getKey());
589      DelayingRunner runner = actions.get(backoff);
590      if (runner == null) {
591        actions.put(backoff, new DelayingRunner(backoff, e));
592      } else {
593        runner.add(e);
594      }
595    }
596
597    List<Runnable> toReturn = new ArrayList<>(actions.size());
598    for (DelayingRunner runner : actions.values()) {
599      asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
600      Runnable runnable =
601        createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
602      // use a delay runner only if we need to sleep for some time
603      if (runner.getSleepTime() > 0) {
604        runner.setRunner(runnable);
605        runnable = runner;
606        if (asyncProcess.connection.getConnectionMetrics() != null) {
607          asyncProcess.connection.getConnectionMetrics()
608            .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime());
609        }
610      } else {
611        if (asyncProcess.connection.getConnectionMetrics() != null) {
612          asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
613        }
614      }
615      // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
616      toReturn.add(runnable);
617
618    }
619    return toReturn;
620  }
621
622  /**
623   * @param server     server location where the target region is hosted
624   * @param regionName name of the region which we are going to write some data
625   * @return the amount of time the client should wait until it submit a request to the specified
626   *         server and region
627   */
628  private Long getBackoff(ServerName server, byte[] regionName) {
629    ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker();
630    ServerStatistics stats = tracker.getStats(server);
631    return asyncProcess.connection.getBackoffPolicy().getBackoffTime(server, regionName, stats);
632  }
633
634  /**
635   * Starts waiting to issue replica calls on a different thread; or issues them immediately.
636   */
637  private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) {
638    long startTime = EnvironmentEdgeManager.currentTime();
639    ReplicaCallIssuingRunnable replicaRunnable =
640      new ReplicaCallIssuingRunnable(actionsForReplicaThread, startTime);
641    if (asyncProcess.primaryCallTimeoutMicroseconds == 0) {
642      // Start replica calls immediately.
643      replicaRunnable.run();
644    } else {
645      // Start the thread that may kick off replica gets.
646      // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
647      try {
648        pool.execute(replicaRunnable);
649      } catch (RejectedExecutionException ree) {
650        LOG.warn("id=" + asyncProcess.id + " replica task rejected by pool; no replica calls", ree);
651      }
652    }
653  }
654
655  /**
656   * Check that we can retry acts accordingly: logs, set the error status.
657   * @param originalIndex the position in the list sent
658   * @param row           the row
659   * @param canRetry      if false, we won't retry whatever the settings.
660   * @param throwable     the throwable, if any (can be null)
661   * @param server        the location, if any (can be null)
662   * @return true if the action can be retried, false otherwise.
663   */
664  Retry manageError(int originalIndex, Row row, Retry canRetry, Throwable throwable,
665    ServerName server) {
666    if (canRetry == Retry.YES && throwable != null && throwable instanceof DoNotRetryIOException) {
667      canRetry = Retry.NO_NOT_RETRIABLE;
668    }
669
670    if (canRetry != Retry.YES) {
671      // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
672      setError(originalIndex, row, throwable, server);
673    } else if (isActionComplete(originalIndex, row)) {
674      canRetry = Retry.NO_OTHER_SUCCEEDED;
675    }
676    return canRetry;
677  }
678
679  /**
680   * Resubmit all the actions from this multiaction after a failure.
681   * @param rsActions  the actions still to do from the initial list
682   * @param server     the destination
683   * @param numAttempt the number of attempts so far
684   * @param t          the throwable (if any) that caused the resubmit
685   */
686  private void receiveGlobalFailure(MultiAction rsActions, ServerName server, int numAttempt,
687    Throwable t) {
688    errorsByServer.reportServerError(server);
689    Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
690
691    cleanServerCache(server, t);
692    int failed = 0;
693    int stopped = 0;
694    List<Action> toReplay = new ArrayList<>();
695    for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
696      byte[] regionName = e.getKey();
697      byte[] row = e.getValue().get(0).getAction().getRow();
698      // Do not use the exception for updating cache because it might be coming from
699      // any of the regions in the MultiAction.
700      updateCachedLocations(server, regionName, row,
701        ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
702      for (Action action : e.getValue()) {
703        Retry retry =
704          manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server);
705        if (retry == Retry.YES) {
706          toReplay.add(action);
707        } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
708          ++stopped;
709        } else {
710          ++failed;
711        }
712      }
713    }
714
715    if (toReplay.isEmpty()) {
716      logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
717    } else {
718      resubmit(server, toReplay, numAttempt, rsActions.size(), t);
719    }
720  }
721
722  /**
723   * Log as much info as possible, and, if there is something to replay, submit it again after a
724   * back off sleep.
725   */
726  private void resubmit(ServerName oldServer, List<Action> toReplay, int numAttempt,
727    int failureCount, Throwable throwable) {
728    // We have something to replay. We're going to sleep a little before.
729
730    // We have two contradicting needs here:
731    // 1) We want to get the new location after having slept, as it may change.
732    // 2) We want to take into account the location when calculating the sleep time.
733    // 3) If all this is just because the response needed to be chunked try again FAST.
734    // It should be possible to have some heuristics to take the right decision. Short term,
735    // we go for one.
736    boolean retryImmediately = throwable instanceof RetryImmediatelyException;
737    int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
738    long backOffTime;
739    if (retryImmediately) {
740      backOffTime = 0;
741    } else if (HBaseServerException.isServerOverloaded(throwable)) {
742      // Give a special check when encountering an exception indicating the server is overloaded.
743      // see #HBASE-17114 and HBASE-26807
744      backOffTime = errorsByServer.calculateBackoffTime(oldServer,
745        asyncProcess.connectionConfiguration.getPauseMillisForServerOverloaded());
746    } else {
747      backOffTime = errorsByServer.calculateBackoffTime(oldServer,
748        asyncProcess.connectionConfiguration.getPauseMillis());
749    }
750    if (numAttempt > asyncProcess.startLogErrorsCnt) {
751      // We use this value to have some logs when we have multiple failures, but not too many
752      // logs, as errors are to be expected when a region moves, splits and so on
753      LOG.info(createLog(numAttempt, failureCount, toReplay.size(), oldServer, throwable,
754        backOffTime, true, null, -1, -1));
755    }
756
757    try {
758      if (backOffTime > 0) {
759        Thread.sleep(backOffTime);
760      }
761    } catch (InterruptedException e) {
762      LOG.warn(
763        "#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
764      Thread.currentThread().interrupt();
765      return;
766    }
767
768    groupAndSendMultiAction(toReplay, nextAttemptNumber);
769  }
770
771  private void logNoResubmit(ServerName oldServer, int numAttempt, int failureCount,
772    Throwable throwable, int failed, int stopped) {
773    if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) {
774      @SuppressWarnings("JavaUtilDate")
775      String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
776      String logMessage = createLog(numAttempt, failureCount, 0, oldServer, throwable, -1, false,
777        timeStr, failed, stopped);
778      if (failed != 0) {
779        // Only log final failures as warning
780        LOG.warn(logMessage);
781      } else {
782        LOG.info(logMessage);
783      }
784    }
785  }
786
787  /**
788   * Called when we receive the result of a server query.
789   * @param multiAction - the multiAction we sent
790   * @param server      - the location. It's used as a server name.
791   * @param responses   - the response, if any
792   * @param numAttempt  - the attempt
793   */
794  private void receiveMultiAction(MultiAction multiAction, ServerName server,
795    MultiResponse responses, int numAttempt) {
796    assert responses != null;
797    updateStats(server, responses);
798    // Success or partial success
799    // Analyze detailed results. We can still have individual failures to be redo.
800    // two specific throwables are managed:
801    // - DoNotRetryIOException: we continue to retry for other actions
802    // - RegionMovedException: we update the cache with the new region location
803    Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
804    List<Action> toReplay = new ArrayList<>();
805    Throwable lastException = null;
806    int failureCount = 0;
807    int failed = 0;
808    int stopped = 0;
809    Retry retry = null;
810    // Go by original action.
811    for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
812      byte[] regionName = regionEntry.getKey();
813
814      Throwable regionException = responses.getExceptions().get(regionName);
815      if (regionException != null) {
816        cleanServerCache(server, regionException);
817      }
818
819      Map<Integer, Object> regionResults =
820        results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap();
821      boolean regionFailureRegistered = false;
822      for (Action sentAction : regionEntry.getValue()) {
823        Object result = regionResults.get(sentAction.getOriginalIndex());
824        if (result == null) {
825          if (regionException == null) {
826            LOG.error("Server sent us neither results nor exceptions for "
827              + Bytes.toStringBinary(regionName) + ", numAttempt:" + numAttempt);
828            regionException = new RuntimeException("Invalid response");
829          }
830          // If the row operation encounters the region-lever error, the exception of action may be
831          // null.
832          result = regionException;
833        }
834        // Failure: retry if it's make sense else update the errors lists
835        if (result instanceof Throwable) {
836          Throwable actionException = (Throwable) result;
837          Row row = sentAction.getAction();
838          lastException = regionException != null
839            ? regionException
840            : ClientExceptionsUtil.findException(actionException);
841          // Register corresponding failures once per server/once per region.
842          if (!regionFailureRegistered) {
843            regionFailureRegistered = true;
844            updateCachedLocations(server, regionName, row.getRow(), actionException);
845          }
846          if (retry == null) {
847            errorsByServer.reportServerError(server);
848            // We determine canRetry only once for all calls, after reporting server failure.
849            retry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
850          }
851          ++failureCount;
852          switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException, server)) {
853            case YES:
854              toReplay.add(sentAction);
855              break;
856            case NO_OTHER_SUCCEEDED:
857              ++stopped;
858              break;
859            default:
860              ++failed;
861              break;
862          }
863        } else {
864          invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result);
865          setResult(sentAction, result);
866        }
867      }
868    }
869    if (toReplay.isEmpty()) {
870      logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped);
871    } else {
872      resubmit(server, toReplay, numAttempt, failureCount, lastException);
873    }
874  }
875
876  private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row,
877    Throwable rowException) {
878    if (tableName == null) {
879      return;
880    }
881    try {
882      asyncProcess.connection.updateCachedLocations(tableName, regionName, row, rowException,
883        server);
884    } catch (Throwable ex) {
885      // That should never happen, but if it did, we want to make sure
886      // we still process errors
887      LOG.error("Couldn't update cached region locations: " + ex);
888    }
889  }
890
891  private void invokeCallBack(byte[] regionName, byte[] row, CResult result) {
892    if (callback != null) {
893      try {
894        // noinspection unchecked
895        // TODO: would callback expect a replica region name if it gets one?
896        this.callback.update(regionName, row, result);
897      } catch (Throwable t) {
898        LOG.error(
899          "User callback threw an exception for " + Bytes.toStringBinary(regionName) + ", ignoring",
900          t);
901      }
902    }
903  }
904
905  private void cleanServerCache(ServerName server, Throwable regionException) {
906    if (ClientExceptionsUtil.isMetaClearingException(regionException)) {
907      // We want to make sure to clear the cache in case there were location-related exceptions.
908      // We don't to clear the cache for every possible exception that comes through, however.
909      asyncProcess.connection.clearCaches(server);
910    }
911  }
912
913  protected void updateStats(ServerName server, MultiResponse resp) {
914    ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()),
915      Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp);
916  }
917
918  private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
919    Throwable error, long backOffTime, boolean willRetry, String startTime, int failed,
920    int stopped) {
921    StringBuilder sb = new StringBuilder();
922    sb.append("id=").append(asyncProcess.id).append(", table=").append(tableName)
923      .append(", attempt=").append(numAttempt).append("/").append(asyncProcess.numTries)
924      .append(", ");
925
926    if (failureCount > 0 || error != null) {
927      sb.append("failureCount=").append(failureCount).append("ops").append(", last exception=")
928        .append(error);
929    } else {
930      sb.append("succeeded");
931    }
932
933    sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
934
935    if (willRetry) {
936      sb.append(", retrying after=").append(backOffTime).append("ms")
937        .append(", operationsToReplay=").append(replaySize);
938    } else if (failureCount > 0) {
939      if (stopped > 0) {
940        sb.append("; NOT retrying, stopped=").append(stopped)
941          .append(" because successful operation on other replica");
942      }
943      if (failed > 0) {
944        sb.append("; NOT retrying, failed=").append(failed).append(" -- final attempt!");
945      }
946    }
947
948    return sb.toString();
949  }
950
951  /**
952   * Sets the non-error result from a particular action.
953   * @param action Action (request) that the server responded to.
954   * @param result The result.
955   */
956  private void setResult(Action action, Object result) {
957    if (result == null) {
958      throw new RuntimeException("Result cannot be null");
959    }
960    boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
961    int index = action.getOriginalIndex();
962    if (results == null) {
963      decActionCounter(index);
964      return; // Simple case, no replica requests.
965    }
966    ReplicaResultState state =
967      trySetResultSimple(index, action.getAction(), false, result, null, isStale);
968    if (state == null) {
969      return; // Simple case, no replica requests.
970    }
971    // At this point we know that state is set to replica tracking class.
972    // It could be that someone else is also looking at it; however, we know there can
973    // only be one state object, and only one thread can set callCount to 0. Other threads
974    // will either see state with callCount 0 after locking it; or will not see state at all
975    // we will replace it with the result.
976    synchronized (state) {
977      if (state.callCount == 0) {
978        return; // someone already set the result
979      }
980      state.callCount = 0;
981    }
982    synchronized (replicaResultLock) {
983      if (results[index] != state) {
984        throw new AssertionError("We set the callCount but someone else replaced the result");
985      }
986      updateResult(index, result);
987    }
988
989    decActionCounter(index);
990  }
991
992  /**
993   * Sets the error from a particular action.
994   * @param index     Original action index.
995   * @param row       Original request.
996   * @param throwable The resulting error.
997   * @param server    The source server.
998   */
999  private void setError(int index, Row row, Throwable throwable, ServerName server) {
1000    if (results == null) {
1001      // Note that we currently cannot have replica requests with null results. So it shouldn't
1002      // happen that multiple replica calls will call dAC for same actions with results == null.
1003      // Only one call per action should be present in this case.
1004      errors.add(throwable, row, server);
1005      decActionCounter(index);
1006      return; // Simple case, no replica requests.
1007    }
1008    ReplicaResultState state = trySetResultSimple(index, row, true, throwable, server, false);
1009    if (state == null) {
1010      return; // Simple case, no replica requests.
1011    }
1012    BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1013    boolean isActionDone = false;
1014    synchronized (state) {
1015      switch (state.callCount) {
1016        case 0:
1017          return; // someone already set the result
1018        case 1: { // All calls failed, we are the last error.
1019          target = errors;
1020          isActionDone = true;
1021          break;
1022        }
1023        default: {
1024          assert state.callCount > 1;
1025          if (state.replicaErrors == null) {
1026            state.replicaErrors = new BatchErrors();
1027          }
1028          target = state.replicaErrors;
1029          break;
1030        }
1031      }
1032      --state.callCount;
1033    }
1034    target.add(throwable, row, server);
1035    if (isActionDone) {
1036      if (state.replicaErrors != null) { // last call, no need to lock
1037        errors.merge(state.replicaErrors);
1038      }
1039      // See setResult for explanations.
1040      synchronized (replicaResultLock) {
1041        if (results[index] != state) {
1042          throw new AssertionError("We set the callCount but someone else replaced the result");
1043        }
1044        updateResult(index, throwable);
1045      }
1046      decActionCounter(index);
1047    }
1048  }
1049
1050  /**
1051   * Checks if the action is complete; used on error to prevent needless retries. Does not
1052   * synchronize, assuming element index/field accesses are atomic. This is an opportunistic
1053   * optimization check, doesn't have to be strict.
1054   * @param index Original action index.
1055   * @param row   Original request.
1056   */
1057  private boolean isActionComplete(int index, Row row) {
1058    if (!AsyncProcess.isReplicaGet(row)) return false;
1059    Object resObj = results[index];
1060    return (resObj != null)
1061      && (!(resObj instanceof ReplicaResultState) || ((ReplicaResultState) resObj).callCount == 0);
1062  }
1063
1064  /**
1065   * Tries to set the result or error for a particular action as if there were no replica calls.
1066   * @return null if successful; replica state if there were in fact replica calls.
1067   */
1068  private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, Object result,
1069    ServerName server, boolean isFromReplica) {
1070    Object resObj = null;
1071    if (!AsyncProcess.isReplicaGet(row)) {
1072      if (isFromReplica) {
1073        throw new AssertionError("Unexpected stale result for " + row);
1074      }
1075      updateResult(index, result);
1076    } else {
1077      synchronized (replicaResultLock) {
1078        resObj = results[index];
1079        if (resObj == null) {
1080          if (isFromReplica) {
1081            throw new AssertionError("Unexpected stale result for " + row);
1082          }
1083          updateResult(index, result);
1084        }
1085      }
1086    }
1087
1088    ReplicaResultState rrs =
1089      (resObj instanceof ReplicaResultState) ? (ReplicaResultState) resObj : null;
1090    if (rrs == null && isError) {
1091      // The resObj is not replica state (null or already set).
1092      errors.add((Throwable) result, row, server);
1093    }
1094
1095    if (resObj == null) {
1096      // resObj is null - no replica calls were made.
1097      decActionCounter(index);
1098      return null;
1099    }
1100    return rrs;
1101  }
1102
1103  private void decActionCounter(int index) {
1104    long actionsRemaining = actionsInProgress.decrementAndGet();
1105    if (actionsRemaining < 0) {
1106      String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1107      throw new AssertionError(error);
1108    } else if (actionsRemaining == 0) {
1109      synchronized (actionsInProgress) {
1110        actionsInProgress.notifyAll();
1111      }
1112    }
1113  }
1114
1115  private String buildDetailedErrorMsg(String string, int index) {
1116    StringBuilder error = new StringBuilder(128);
1117    error.append(string).append("; called for ").append(index).append(", actionsInProgress ")
1118      .append(actionsInProgress.get()).append("; replica gets: ");
1119    if (replicaGetIndices != null) {
1120      for (int i = 0; i < replicaGetIndices.length; ++i) {
1121        error.append(replicaGetIndices[i]).append(", ");
1122      }
1123    } else {
1124      error.append(hasAnyReplicaGets ? "all" : "none");
1125    }
1126    error.append("; results ");
1127    if (results != null) {
1128      for (int i = 0; i < results.length; ++i) {
1129        Object o = results[i];
1130        error.append(((o == null) ? "null" : o.toString())).append(", ");
1131      }
1132    }
1133    return error.toString();
1134  }
1135
1136  @Override
1137  public void waitUntilDone() throws InterruptedIOException {
1138    try {
1139      if (this.operationTimeout > 0) {
1140        // the worker thread maybe over by some exception without decrement the actionsInProgress,
1141        // then the guarantee of operationTimeout will be broken, so we should set cutoff to avoid
1142        // stuck here forever
1143        long cutoff = (EnvironmentEdgeManager.currentTime() + this.operationTimeout) * 1000L;
1144        if (!waitUntilDone(cutoff)) {
1145          throw new SocketTimeoutException("time out before the actionsInProgress changed to zero");
1146        }
1147      } else {
1148        waitUntilDone(Long.MAX_VALUE);
1149      }
1150    } catch (InterruptedException iex) {
1151      throw new InterruptedIOException(iex.getMessage());
1152    } finally {
1153      if (callsInProgress != null) {
1154        for (CancellableRegionServerCallable clb : callsInProgress) {
1155          clb.cancel();
1156        }
1157      }
1158    }
1159  }
1160
1161  private boolean waitUntilDone(long cutoff) throws InterruptedException {
1162    boolean hasWait = cutoff != Long.MAX_VALUE;
1163    long lastLog = EnvironmentEdgeManager.currentTime();
1164    long currentInProgress;
1165    while (0 != (currentInProgress = actionsInProgress.get())) {
1166      long now = EnvironmentEdgeManager.currentTime();
1167      if (hasWait && (now * 1000L) > cutoff) {
1168        return false;
1169      }
1170      if (!hasWait) { // Only log if wait is infinite.
1171        if (now > lastLog + 10000) {
1172          lastLog = now;
1173          LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress
1174            + "  actions to finish on table: " + tableName);
1175        }
1176      }
1177      synchronized (actionsInProgress) {
1178        if (actionsInProgress.get() == 0) break;
1179        if (!hasWait) {
1180          actionsInProgress.wait(10);
1181        } else {
1182          long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1183          TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1184        }
1185      }
1186    }
1187    return true;
1188  }
1189
1190  @Override
1191  public boolean hasError() {
1192    return errors.hasErrors();
1193  }
1194
1195  @Override
1196  public List<? extends Row> getFailedOperations() {
1197    return errors.actions;
1198  }
1199
1200  @Override
1201  public RetriesExhaustedWithDetailsException getErrors() {
1202    return errors.makeException(asyncProcess.logBatchErrorDetails);
1203  }
1204
1205  @Override
1206  public Object[] getResults() throws InterruptedIOException {
1207    waitUntilDone();
1208    return results;
1209  }
1210
1211  /**
1212   * Creates the server error tracker to use inside process. Currently, to preserve the main
1213   * assumption about current retries, and to work well with the retry-limit-based calculation, the
1214   * calculation is local per Process object. We may benefit from connection-wide tracking of server
1215   * errors.
1216   * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1217   */
1218  private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
1219    return new ConnectionImplementation.ServerErrorTracker(asyncProcess.serverTrackerTimeout,
1220      asyncProcess.numTries);
1221  }
1222
1223  /**
1224   * Create a callable. Isolated to be easily overridden in the tests.
1225   */
1226  private MultiServerCallable createCallable(final ServerName server, TableName tableName,
1227    final MultiAction multi) {
1228    return new MultiServerCallable(asyncProcess.connection, tableName, server, multi,
1229      asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
1230  }
1231
1232  private void updateResult(int index, Object result) {
1233    Object current = results[index];
1234    if (current != null) {
1235      if (LOG.isDebugEnabled()) {
1236        LOG.debug("The result is assigned repeatedly! current:" + current + ", new:" + result);
1237      }
1238    }
1239    results[index] = result;
1240  }
1241
1242  long getNumberOfActionsInProgress() {
1243    return actionsInProgress.get();
1244  }
1245}