001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.RejectedExecutionException;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicLong;
037import org.apache.hadoop.hbase.CallQueueTooBigException;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.RegionLocations;
042import org.apache.hadoop.hbase.RetryImmediatelyException;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
046import org.apache.hadoop.hbase.client.coprocessor.Batch;
047import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
048import org.apache.hadoop.hbase.trace.TraceUtil;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.htrace.core.Tracer;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
057
058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
060
061/**
062 * The context, and return value, for a single submit/submitAll call.
063 * Note on how this class (one AP submit) works. Initially, all requests are split into groups
064 * by server; request is sent to each server in parallel; the RPC calls are not async so a
065 * thread per server is used. Every time some actions fail, regions/locations might have
066 * changed, so we re-group them by server and region again and send these groups in parallel
067 * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
068 * scheduling children. This is why lots of code doesn't require any synchronization.
069 */
070@InterfaceAudience.Private
071class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
072
073  private static final Logger LOG = LoggerFactory.getLogger(AsyncRequestFutureImpl.class);
074
075  private RetryingTimeTracker tracker;
076
077  /**
078   * Runnable (that can be submitted to thread pool) that waits for when it's time
079   * to issue replica calls, finds region replicas, groups the requests by replica and
080   * issues the calls (on separate threads, via sendMultiAction).
081   * This is done on a separate thread because we don't want to wait on user thread for
082   * our asynchronous call, and usually we have to wait before making replica calls.
083   */
084  private final class ReplicaCallIssuingRunnable implements Runnable {
085    private final long startTime;
086    private final List<Action> initialActions;
087
088    public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) {
089      this.initialActions = initialActions;
090      this.startTime = startTime;
091    }
092
093    @Override
094    public void run() {
095      boolean done = false;
096      if (asyncProcess.primaryCallTimeoutMicroseconds > 0) {
097        try {
098          done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds);
099        } catch (InterruptedException ex) {
100          LOG.error("Replica thread interrupted - no replica calls {}", ex.getMessage());
101          return;
102        }
103      }
104      if (done) return; // Done within primary timeout
105      Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
106      List<Action> unknownLocActions = new ArrayList<>();
107      if (replicaGetIndices == null) {
108        for (int i = 0; i < results.length; ++i) {
109          addReplicaActions(i, actionsByServer, unknownLocActions);
110        }
111      } else {
112        for (int replicaGetIndice : replicaGetIndices) {
113          addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
114        }
115      }
116      if (!actionsByServer.isEmpty()) {
117        sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
118      }
119      if (!unknownLocActions.isEmpty()) {
120        actionsByServer = new HashMap<>();
121        for (Action action : unknownLocActions) {
122          addReplicaActionsAgain(action, actionsByServer);
123        }
124        // Some actions may have completely failed, they are handled inside addAgain.
125        if (!actionsByServer.isEmpty()) {
126          sendMultiAction(actionsByServer, 1, null, true);
127        }
128      }
129    }
130
131    /**
132     * Add replica actions to action map by server.
133     * @param index Index of the original action.
134     * @param actionsByServer The map by server to add it to.
135     */
136    private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer,
137                                   List<Action> unknownReplicaActions) {
138      if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
139      Action action = initialActions.get(index);
140      RegionLocations loc = findAllLocationsOrFail(action, true);
141      if (loc == null) return;
142      HRegionLocation[] locs = loc.getRegionLocations();
143      if (locs.length == 1) {
144        LOG.warn("No replicas found for {}", action.getAction());
145        return;
146      }
147      synchronized (replicaResultLock) {
148        // Don't run replica calls if the original has finished. We could do it e.g. if
149        // original has already failed before first replica call (unlikely given retries),
150        // but that would require additional synchronization w.r.t. returning to caller.
151        if (results[index] != null) return;
152        // We set the number of calls here. After that any path must call setResult/setError.
153        // True even for replicas that are not found - if we refuse to send we MUST set error.
154        updateResult(index, new ReplicaResultState(locs.length));
155      }
156      for (int i = 1; i < locs.length; ++i) {
157        Action replicaAction = new Action(action, i);
158        if (locs[i] != null) {
159          asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
160              replicaAction, actionsByServer, nonceGroup);
161        } else {
162          unknownReplicaActions.add(replicaAction);
163        }
164      }
165    }
166
167    private void addReplicaActionsAgain(
168        Action action, Map<ServerName, MultiAction> actionsByServer) {
169      if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
170        throw new AssertionError("Cannot have default replica here");
171      }
172      HRegionLocation loc = getReplicaLocationOrFail(action);
173      if (loc == null) return;
174      asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
175          action, actionsByServer, nonceGroup);
176    }
177  }
178
179  /**
180   * Runnable (that can be submitted to thread pool) that submits MultiAction to a
181   * single server. The server call is synchronous, therefore we do it on a thread pool.
182   */
183  @VisibleForTesting
184  final class SingleServerRequestRunnable implements Runnable {
185    private final MultiAction multiAction;
186    private final int numAttempt;
187    private final ServerName server;
188    private final Set<CancellableRegionServerCallable> callsInProgress;
189    @VisibleForTesting
190    SingleServerRequestRunnable(
191        MultiAction multiAction, int numAttempt, ServerName server,
192        Set<CancellableRegionServerCallable> callsInProgress) {
193      this.multiAction = multiAction;
194      this.numAttempt = numAttempt;
195      this.server = server;
196      this.callsInProgress = callsInProgress;
197    }
198
199    @Override
200    public void run() {
201      AbstractResponse res = null;
202      CancellableRegionServerCallable callable = currentCallable;
203      try {
204        // setup the callable based on the actions, if we don't have one already from the request
205        if (callable == null) {
206          callable = createCallable(server, tableName, multiAction);
207        }
208        RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout);
209        try {
210          if (callsInProgress != null) {
211            callsInProgress.add(callable);
212          }
213          res = caller.callWithoutRetries(callable, operationTimeout);
214          if (res == null) {
215            // Cancelled
216            return;
217          }
218        } catch (IOException e) {
219          // The service itself failed . It may be an error coming from the communication
220          //   layer, but, as well, a functional error raised by the server.
221          receiveGlobalFailure(multiAction, server, numAttempt, e);
222          return;
223        } catch (Throwable t) {
224          // This should not happen. Let's log & retry anyway.
225          LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected." +
226              " Retrying. Server=" + server + ", tableName=" + tableName, t);
227          receiveGlobalFailure(multiAction, server, numAttempt, t);
228          return;
229        }
230        if (res.type() == AbstractResponse.ResponseType.MULTI) {
231          // Normal case: we received an answer from the server, and it's not an exception.
232          receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt);
233        } else {
234          if (results != null) {
235            SingleResponse singleResponse = (SingleResponse) res;
236            updateResult(0, singleResponse.getEntry());
237          }
238          decActionCounter(1);
239        }
240      } catch (Throwable t) {
241        // Something really bad happened. We are on the send thread that will now die.
242        LOG.error("id=" + asyncProcess.id + " error for " + tableName + " processing " + server, t);
243        throw new RuntimeException(t);
244      } finally {
245        asyncProcess.decTaskCounters(multiAction.getRegions(), server);
246        if (callsInProgress != null && callable != null && res != null) {
247          callsInProgress.remove(callable);
248        }
249      }
250    }
251  }
252
253  private final Batch.Callback<CResult> callback;
254  private final BatchErrors errors;
255  private final ConnectionImplementation.ServerErrorTracker errorsByServer;
256  private final ExecutorService pool;
257  private final Set<CancellableRegionServerCallable> callsInProgress;
258
259
260  private final TableName tableName;
261  private final AtomicLong actionsInProgress = new AtomicLong(-1);
262  /**
263   * The lock controls access to results. It is only held when populating results where
264   * there might be several callers (eventual consistency gets). For other requests,
265   * there's one unique call going on per result index.
266   */
267  private final Object replicaResultLock = new Object();
268  /**
269   * Result array.  Null if results are not needed. Otherwise, each index corresponds to
270   * the action index in initial actions submitted. For most request types, has null-s for
271   * requests that are not done, and result/exception for those that are done.
272   * For eventual-consistency gets, initially the same applies; at some point, replica calls
273   * might be started, and ReplicaResultState is put at the corresponding indices. The
274   * returning calls check the type to detect when this is the case. After all calls are done,
275   * ReplicaResultState-s are replaced with results for the user.
276   */
277  private final Object[] results;
278  /**
279   * Indices of replica gets in results. If null, all or no actions are replica-gets.
280   */
281  private final int[] replicaGetIndices;
282  private final boolean hasAnyReplicaGets;
283  private final long nonceGroup;
284  private final CancellableRegionServerCallable currentCallable;
285  private final int operationTimeout;
286  private final int rpcTimeout;
287  private final AsyncProcess asyncProcess;
288
289  /**
290   * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
291   * used to make logging more clear, we don't actually care why we don't retry.
292   */
293  public enum Retry {
294    YES,
295    NO_LOCATION_PROBLEM,
296    NO_NOT_RETRIABLE,
297    NO_RETRIES_EXHAUSTED,
298    NO_OTHER_SUCCEEDED
299  }
300
301  /** Sync point for calls to multiple replicas for the same user request (Get).
302   * Created and put in the results array (we assume replica calls require results) when
303   * the replica calls are launched. See results for details of this process.
304   * POJO, all fields are public. To modify them, the object itself is locked. */
305  private static class ReplicaResultState {
306    public ReplicaResultState(int callCount) {
307      this.callCount = callCount;
308    }
309
310    /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
311    int callCount;
312    /** Errors for which it is not decided whether we will report them to user. If one of the
313     * calls succeeds, we will discard the errors that may have happened in the other calls. */
314    BatchErrors replicaErrors = null;
315
316    @Override
317    public String toString() {
318      return "[call count " + callCount + "; errors " + replicaErrors + "]";
319    }
320  }
321
322  public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
323      long nonceGroup, AsyncProcess asyncProcess) {
324    this.pool = task.getPool();
325    this.callback = task.getCallback();
326    this.nonceGroup = nonceGroup;
327    this.tableName = task.getTableName();
328    this.actionsInProgress.set(actions.size());
329    if (task.getResults() == null) {
330      results = task.getNeedResults() ? new Object[actions.size()] : null;
331    } else {
332      if (task.getResults().length != actions.size()) {
333        throw new AssertionError("results.length");
334      }
335      this.results = task.getResults();
336      for (int i = 0; i != this.results.length; ++i) {
337        results[i] = null;
338      }
339    }
340    List<Integer> replicaGetIndices = null;
341    boolean hasAnyReplicaGets = false;
342    if (results != null) {
343      // Check to see if any requests might require replica calls.
344      // We expect that many requests will consist of all or no multi-replica gets; in such
345      // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
346      // store the list of action indexes for which replica gets are possible, and set
347      // hasAnyReplicaGets to true.
348      boolean hasAnyNonReplicaReqs = false;
349      int posInList = 0;
350      for (Action action : actions) {
351        boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction());
352        if (isReplicaGet) {
353          hasAnyReplicaGets = true;
354          if (hasAnyNonReplicaReqs) { // Mixed case
355            if (replicaGetIndices == null) {
356              replicaGetIndices = new ArrayList<>(actions.size() - 1);
357            }
358            replicaGetIndices.add(posInList);
359          }
360        } else if (!hasAnyNonReplicaReqs) {
361          // The first non-multi-replica request in the action list.
362          hasAnyNonReplicaReqs = true;
363          if (posInList > 0) {
364            // Add all the previous requests to the index lists. We know they are all
365            // replica-gets because this is the first non-multi-replica request in the list.
366            replicaGetIndices = new ArrayList<>(actions.size() - 1);
367            for (int i = 0; i < posInList; ++i) {
368              replicaGetIndices.add(i);
369            }
370          }
371        }
372        ++posInList;
373      }
374    }
375    this.hasAnyReplicaGets = hasAnyReplicaGets;
376    if (replicaGetIndices != null) {
377      this.replicaGetIndices = new int[replicaGetIndices.size()];
378      int i = 0;
379      for (Integer el : replicaGetIndices) {
380        this.replicaGetIndices[i++] = el;
381      }
382    } else {
383      this.replicaGetIndices = null;
384    }
385    this.callsInProgress = !hasAnyReplicaGets ? null :
386        Collections.newSetFromMap(
387            new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>());
388    this.asyncProcess = asyncProcess;
389    this.errorsByServer = createServerErrorTracker();
390    this.errors = new BatchErrors();
391    this.operationTimeout = task.getOperationTimeout();
392    this.rpcTimeout = task.getRpcTimeout();
393    this.currentCallable = task.getCallable();
394    if (task.getCallable() == null) {
395      tracker = new RetryingTimeTracker().start();
396    }
397  }
398
399  @VisibleForTesting
400  protected Set<CancellableRegionServerCallable> getCallsInProgress() {
401    return callsInProgress;
402  }
403
404  @VisibleForTesting
405  SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server,
406        Set<CancellableRegionServerCallable> callsInProgress) {
407    return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
408  }
409
410  /**
411   * Group a list of actions per region servers, and send them.
412   *
413   * @param currentActions - the list of row to submit
414   * @param numAttempt - the current numAttempt (first attempt is 1)
415   */
416  void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) {
417    Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
418
419    boolean isReplica = false;
420    List<Action> unknownReplicaActions = null;
421    for (Action action : currentActions) {
422      RegionLocations locs = findAllLocationsOrFail(action, true);
423      if (locs == null) continue;
424      boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
425      if (isReplica && !isReplicaAction) {
426        // This is the property of the current implementation, not a requirement.
427        throw new AssertionError("Replica and non-replica actions in the same retry");
428      }
429      isReplica = isReplicaAction;
430      HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
431      if (loc == null || loc.getServerName() == null) {
432        if (isReplica) {
433          if (unknownReplicaActions == null) {
434            unknownReplicaActions = new ArrayList<>(1);
435          }
436          unknownReplicaActions.add(action);
437        } else {
438          // TODO: relies on primary location always being fetched
439          manageLocationError(action, null);
440        }
441      } else {
442        byte[] regionName = loc.getRegionInfo().getRegionName();
443        AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
444      }
445    }
446    boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
447    boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
448
449    if (!actionsByServer.isEmpty()) {
450      // If this is a first attempt to group and send, no replicas, we need replica thread.
451      sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
452          ? currentActions : null, numAttempt > 1 && !hasUnknown);
453    }
454
455    if (hasUnknown) {
456      actionsByServer = new HashMap<>();
457      for (Action action : unknownReplicaActions) {
458        HRegionLocation loc = getReplicaLocationOrFail(action);
459        if (loc == null) continue;
460        byte[] regionName = loc.getRegionInfo().getRegionName();
461        AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
462      }
463      if (!actionsByServer.isEmpty()) {
464        sendMultiAction(
465            actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
466      }
467    }
468  }
469
470  private HRegionLocation getReplicaLocationOrFail(Action action) {
471    // We are going to try get location once again. For each action, we'll do it once
472    // from cache, because the previous calls in the loop might populate it.
473    int replicaId = action.getReplicaId();
474    RegionLocations locs = findAllLocationsOrFail(action, true);
475    if (locs == null) return null; // manageError already called
476    HRegionLocation loc = locs.getRegionLocation(replicaId);
477    if (loc == null || loc.getServerName() == null) {
478      locs = findAllLocationsOrFail(action, false);
479      if (locs == null) return null; // manageError already called
480      loc = locs.getRegionLocation(replicaId);
481    }
482    if (loc == null || loc.getServerName() == null) {
483      manageLocationError(action, null);
484      return null;
485    }
486    return loc;
487  }
488
489  private void manageLocationError(Action action, Exception ex) {
490    String msg = "Cannot get replica " + action.getReplicaId()
491        + " location for " + action.getAction();
492    LOG.error(msg);
493    if (ex == null) {
494      ex = new IOException(msg);
495    }
496    manageError(action.getOriginalIndex(), action.getAction(),
497        Retry.NO_LOCATION_PROBLEM, ex, null);
498  }
499
500  private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) {
501    if (action.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess.id +
502        ", row cannot be null");
503    RegionLocations loc = null;
504    try {
505      loc = asyncProcess.connection.locateRegion(
506          tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
507    } catch (IOException ex) {
508      manageLocationError(action, ex);
509    }
510    return loc;
511  }
512
513  /**
514   * Send a multi action structure to the servers, after a delay depending on the attempt
515   * number. Asynchronous.
516   *
517   * @param actionsByServer the actions structured by regions
518   * @param numAttempt the attempt number.
519   * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
520   */
521  void sendMultiAction(Map<ServerName, MultiAction> actionsByServer,
522                               int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) {
523    // Run the last item on the same thread if we are already on a send thread.
524    // We hope most of the time it will be the only item, so we can cut down on threads.
525    int actionsRemaining = actionsByServer.size();
526    // This iteration is by server (the HRegionLocation comparator is by server portion only).
527    for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) {
528      ServerName server = e.getKey();
529      MultiAction multiAction = e.getValue();
530      Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
531          numAttempt);
532      // make sure we correctly count the number of runnables before we try to reuse the send
533      // thread, in case we had to split the request into different runnables because of backoff
534      if (runnables.size() > actionsRemaining) {
535        actionsRemaining = runnables.size();
536      }
537
538      // run all the runnables
539      // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack overflow
540      // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth
541      for (Runnable runnable : runnables) {
542        if ((--actionsRemaining == 0) && reuseThread
543            && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) {
544          runnable.run();
545        } else {
546          try {
547            pool.submit(runnable);
548          } catch (Throwable t) {
549            if (t instanceof RejectedExecutionException) {
550              // This should never happen. But as the pool is provided by the end user,
551              // let's secure this a little.
552              LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." +
553                  " Server=" + server.getServerName(), t);
554            } else {
555              // see #HBASE-14359 for more details
556              LOG.warn("Caught unexpected exception/error: ", t);
557            }
558            asyncProcess.decTaskCounters(multiAction.getRegions(), server);
559            // We're likely to fail again, but this will increment the attempt counter,
560            // so it will finish.
561            receiveGlobalFailure(multiAction, server, numAttempt, t);
562          }
563        }
564      }
565    }
566
567    if (actionsForReplicaThread != null) {
568      startWaitingForReplicaCalls(actionsForReplicaThread);
569    }
570  }
571
572  private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
573                                                                   MultiAction multiAction,
574                                                                   int numAttempt) {
575    // no stats to manage, just do the standard action
576    if (asyncProcess.connection.getStatisticsTracker() == null) {
577      if (asyncProcess.connection.getConnectionMetrics() != null) {
578        asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
579      }
580      asyncProcess.incTaskCounters(multiAction.getRegions(), server);
581      SingleServerRequestRunnable runnable = createSingleServerRequest(
582              multiAction, numAttempt, server, callsInProgress);
583      Tracer tracer = Tracer.curThreadTracer();
584
585      if (tracer == null) {
586        return Collections.singletonList(runnable);
587      } else {
588        return Collections.singletonList(tracer.wrap(runnable, "AsyncProcess.sendMultiAction"));
589      }
590    }
591
592    // group the actions by the amount of delay
593    Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size());
594
595    // split up the actions
596    for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) {
597      Long backoff = getBackoff(server, e.getKey());
598      DelayingRunner runner = actions.get(backoff);
599      if (runner == null) {
600        actions.put(backoff, new DelayingRunner(backoff, e));
601      } else {
602        runner.add(e);
603      }
604    }
605
606    List<Runnable> toReturn = new ArrayList<>(actions.size());
607    for (DelayingRunner runner : actions.values()) {
608      asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
609      String traceText = "AsyncProcess.sendMultiAction";
610      Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
611      // use a delay runner only if we need to sleep for some time
612      if (runner.getSleepTime() > 0) {
613        runner.setRunner(runnable);
614        traceText = "AsyncProcess.clientBackoff.sendMultiAction";
615        runnable = runner;
616        if (asyncProcess.connection.getConnectionMetrics() != null) {
617          asyncProcess.connection.getConnectionMetrics().incrDelayRunners();
618          asyncProcess.connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
619        }
620      } else {
621        if (asyncProcess.connection.getConnectionMetrics() != null) {
622          asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
623        }
624      }
625      runnable = TraceUtil.wrap(runnable, traceText);
626      toReturn.add(runnable);
627
628    }
629    return toReturn;
630  }
631
632  /**
633   * @param server server location where the target region is hosted
634   * @param regionName name of the region which we are going to write some data
635   * @return the amount of time the client should wait until it submit a request to the
636   * specified server and region
637   */
638  private Long getBackoff(ServerName server, byte[] regionName) {
639    ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker();
640    ServerStatistics stats = tracker.getStats(server);
641    return asyncProcess.connection.getBackoffPolicy()
642        .getBackoffTime(server, regionName, stats);
643  }
644
645  /**
646   * Starts waiting to issue replica calls on a different thread; or issues them immediately.
647   */
648  private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) {
649    long startTime = EnvironmentEdgeManager.currentTime();
650    ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
651        actionsForReplicaThread, startTime);
652    if (asyncProcess.primaryCallTimeoutMicroseconds == 0) {
653      // Start replica calls immediately.
654      replicaRunnable.run();
655    } else {
656      // Start the thread that may kick off replica gets.
657      // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
658      try {
659        pool.submit(replicaRunnable);
660      } catch (RejectedExecutionException ree) {
661        LOG.warn("id=" + asyncProcess.id + " replica task rejected by pool; no replica calls", ree);
662      }
663    }
664  }
665
666  /**
667   * Check that we can retry acts accordingly: logs, set the error status.
668   *
669   * @param originalIndex the position in the list sent
670   * @param row           the row
671   * @param canRetry      if false, we won't retry whatever the settings.
672   * @param throwable     the throwable, if any (can be null)
673   * @param server        the location, if any (can be null)
674   * @return true if the action can be retried, false otherwise.
675   */
676  Retry manageError(int originalIndex, Row row, Retry canRetry,
677                                        Throwable throwable, ServerName server) {
678    if (canRetry == Retry.YES
679        && throwable != null && throwable instanceof DoNotRetryIOException) {
680      canRetry = Retry.NO_NOT_RETRIABLE;
681    }
682
683    if (canRetry != Retry.YES) {
684      // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
685      setError(originalIndex, row, throwable, server);
686    } else if (isActionComplete(originalIndex, row)) {
687      canRetry = Retry.NO_OTHER_SUCCEEDED;
688    }
689    return canRetry;
690  }
691
692  /**
693   * Resubmit all the actions from this multiaction after a failure.
694   *
695   * @param rsActions  the actions still to do from the initial list
696   * @param server   the destination
697   * @param numAttempt the number of attempts so far
698   * @param t the throwable (if any) that caused the resubmit
699   */
700  private void receiveGlobalFailure(
701      MultiAction rsActions, ServerName server, int numAttempt, Throwable t) {
702    errorsByServer.reportServerError(server);
703    Retry canRetry = errorsByServer.canTryMore(numAttempt)
704        ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
705
706    cleanServerCache(server, t);
707    int failed = 0;
708    int stopped = 0;
709    List<Action> toReplay = new ArrayList<>();
710    for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
711      byte[] regionName = e.getKey();
712      byte[] row = e.getValue().get(0).getAction().getRow();
713      // Do not use the exception for updating cache because it might be coming from
714      // any of the regions in the MultiAction.
715      updateCachedLocations(server, regionName, row,
716        ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
717      for (Action action : e.getValue()) {
718        Retry retry = manageError(
719            action.getOriginalIndex(), action.getAction(), canRetry, t, server);
720        if (retry == Retry.YES) {
721          toReplay.add(action);
722        } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
723          ++stopped;
724        } else {
725          ++failed;
726        }
727      }
728    }
729
730    if (toReplay.isEmpty()) {
731      logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
732    } else {
733      resubmit(server, toReplay, numAttempt, rsActions.size(), t);
734    }
735  }
736
737  /**
738   * Log as much info as possible, and, if there is something to replay,
739   * submit it again after a back off sleep.
740   */
741  private void resubmit(ServerName oldServer, List<Action> toReplay,
742                        int numAttempt, int failureCount, Throwable throwable) {
743    // We have something to replay. We're going to sleep a little before.
744
745    // We have two contradicting needs here:
746    //  1) We want to get the new location after having slept, as it may change.
747    //  2) We want to take into account the location when calculating the sleep time.
748    //  3) If all this is just because the response needed to be chunked try again FAST.
749    // It should be possible to have some heuristics to take the right decision. Short term,
750    //  we go for one.
751    boolean retryImmediately = throwable instanceof RetryImmediatelyException;
752    int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
753    long backOffTime;
754    if (retryImmediately) {
755      backOffTime = 0;
756    } else if (throwable instanceof CallQueueTooBigException) {
757      // Give a special check on CQTBE, see #HBASE-17114
758      backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE);
759    } else {
760      backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause);
761    }
762    if (numAttempt > asyncProcess.startLogErrorsCnt) {
763      // We use this value to have some logs when we have multiple failures, but not too many
764      //  logs, as errors are to be expected when a region moves, splits and so on
765      LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
766          oldServer, throwable, backOffTime, true, null, -1, -1));
767    }
768
769    try {
770      if (backOffTime > 0) {
771        Thread.sleep(backOffTime);
772      }
773    } catch (InterruptedException e) {
774      LOG.warn("#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
775      Thread.currentThread().interrupt();
776      return;
777    }
778
779    groupAndSendMultiAction(toReplay, nextAttemptNumber);
780  }
781
782  private void logNoResubmit(ServerName oldServer, int numAttempt,
783                             int failureCount, Throwable throwable, int failed, int stopped) {
784    if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) {
785      String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
786      String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
787          throwable, -1, false, timeStr, failed, stopped);
788      if (failed != 0) {
789        // Only log final failures as warning
790        LOG.warn(logMessage);
791      } else {
792        LOG.info(logMessage);
793      }
794    }
795  }
796
797  /**
798   * Called when we receive the result of a server query.
799   *
800   * @param multiAction    - the multiAction we sent
801   * @param server       - the location. It's used as a server name.
802   * @param responses      - the response, if any
803   * @param numAttempt     - the attempt
804   */
805  private void receiveMultiAction(MultiAction multiAction,
806                                  ServerName server, MultiResponse responses, int numAttempt) {
807    assert responses != null;
808
809    Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
810    updateStats(server, results);
811
812    // Success or partial success
813    // Analyze detailed results. We can still have individual failures to be redo.
814    // two specific throwables are managed:
815    //  - DoNotRetryIOException: we continue to retry for other actions
816    //  - RegionMovedException: we update the cache with the new region location
817
818    List<Action> toReplay = new ArrayList<>();
819    Throwable lastException = null;
820    int failureCount = 0;
821    int failed = 0;
822    int stopped = 0;
823    Retry retry = null;
824    // Go by original action.
825    for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
826      byte[] regionName = regionEntry.getKey();
827
828      Throwable regionException = responses.getExceptions().get(regionName);
829      if (regionException != null) {
830        cleanServerCache(server, regionException);
831      }
832
833      Map<Integer, Object> regionResults =
834        results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap();
835      boolean regionFailureRegistered = false;
836      for (Action sentAction : regionEntry.getValue()) {
837        Object result = regionResults.get(sentAction.getOriginalIndex());
838        if (result == null) {
839          if (regionException == null) {
840            LOG.error("Server sent us neither results nor exceptions for "
841              + Bytes.toStringBinary(regionName)
842              + ", numAttempt:" + numAttempt);
843            regionException = new RuntimeException("Invalid response");
844          }
845          // If the row operation encounters the region-lever error, the exception of action may be
846          // null.
847          result = regionException;
848        }
849        // Failure: retry if it's make sense else update the errors lists
850        if (result instanceof Throwable) {
851          Throwable actionException = (Throwable) result;
852          Row row = sentAction.getAction();
853          lastException = regionException != null ? regionException
854            : ClientExceptionsUtil.findException(actionException);
855          // Register corresponding failures once per server/once per region.
856          if (!regionFailureRegistered) {
857            regionFailureRegistered = true;
858            updateCachedLocations(server, regionName, row.getRow(), actionException);
859          }
860          if (retry == null) {
861            errorsByServer.reportServerError(server);
862            // We determine canRetry only once for all calls, after reporting server failure.
863            retry = errorsByServer.canTryMore(numAttempt) ?
864              Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
865          }
866          ++failureCount;
867          switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException,
868            server)) {
869            case YES:
870              toReplay.add(sentAction);
871              break;
872            case NO_OTHER_SUCCEEDED:
873              ++stopped;
874              break;
875            default:
876              ++failed;
877              break;
878          }
879        } else {
880          invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result);
881          setResult(sentAction, result);
882        }
883      }
884    }
885    if (toReplay.isEmpty()) {
886      logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped);
887    } else {
888      resubmit(server, toReplay, numAttempt, failureCount, lastException);
889    }
890  }
891
892  private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row,
893    Throwable rowException) {
894    if (tableName == null) {
895      return;
896    }
897    try {
898      asyncProcess.connection
899        .updateCachedLocations(tableName, regionName, row, rowException, server);
900    } catch (Throwable ex) {
901      // That should never happen, but if it did, we want to make sure
902      // we still process errors
903      LOG.error("Couldn't update cached region locations: " + ex);
904    }
905  }
906
907  private void invokeCallBack(byte[] regionName, byte[] row, CResult result) {
908    if (callback != null) {
909      try {
910        //noinspection unchecked
911        // TODO: would callback expect a replica region name if it gets one?
912        this.callback.update(regionName, row, result);
913      } catch (Throwable t) {
914        LOG.error("User callback threw an exception for "
915          + Bytes.toStringBinary(regionName) + ", ignoring", t);
916      }
917    }
918  }
919
920  private void cleanServerCache(ServerName server, Throwable regionException) {
921    if (ClientExceptionsUtil.isMetaClearingException(regionException)) {
922      // We want to make sure to clear the cache in case there were location-related exceptions.
923      // We don't to clear the cache for every possible exception that comes through, however.
924      asyncProcess.connection.clearCaches(server);
925    }
926  }
927
928  @VisibleForTesting
929  protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
930    boolean metrics = asyncProcess.connection.getConnectionMetrics() != null;
931    boolean stats = asyncProcess.connection.getStatisticsTracker() != null;
932    if (!stats && !metrics) {
933      return;
934    }
935    for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : results.entrySet()) {
936      byte[] regionName = regionStats.getKey();
937      ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
938      if (stat == null) {
939        LOG.error("No ClientProtos.RegionLoadStats found for server=" + server
940          + ", region=" + Bytes.toStringBinary(regionName));
941        continue;
942      }
943      RegionLoadStats regionLoadstats = ProtobufUtil.createRegionLoadStats(stat);
944      ResultStatsUtil.updateStats(asyncProcess.connection.getStatisticsTracker(), server,
945          regionName, regionLoadstats);
946      ResultStatsUtil.updateStats(asyncProcess.connection.getConnectionMetrics(),
947          server, regionName, regionLoadstats);
948    }
949  }
950
951
952  private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
953                           Throwable error, long backOffTime, boolean willRetry, String startTime,
954                           int failed, int stopped) {
955    StringBuilder sb = new StringBuilder();
956    sb.append("id=").append(asyncProcess.id).append(", table=").append(tableName).
957        append(", attempt=").append(numAttempt).append("/").append(asyncProcess.numTries).
958        append(", ");
959
960    if (failureCount > 0 || error != null){
961      sb.append("failureCount=").append(failureCount).append("ops").append(", last exception=").
962          append(error);
963    } else {
964      sb.append("succeeded");
965    }
966
967    sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
968
969    if (willRetry) {
970      sb.append(", retrying after=").append(backOffTime).append("ms").
971          append(", operationsToReplay=").append(replaySize);
972    } else if (failureCount > 0) {
973      if (stopped > 0) {
974        sb.append("; NOT retrying, stopped=").append(stopped).
975            append(" because successful operation on other replica");
976      }
977      if (failed > 0) {
978        sb.append("; NOT retrying, failed=").append(failed).append(" -- final attempt!");
979      }
980    }
981
982    return sb.toString();
983  }
984
985  /**
986   * Sets the non-error result from a particular action.
987   * @param action Action (request) that the server responded to.
988   * @param result The result.
989   */
990  private void setResult(Action action, Object result) {
991    if (result == null) {
992      throw new RuntimeException("Result cannot be null");
993    }
994    ReplicaResultState state = null;
995    boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
996    int index = action.getOriginalIndex();
997    if (results == null) {
998      decActionCounter(index);
999      return; // Simple case, no replica requests.
1000    }
1001    state = trySetResultSimple(index, action.getAction(), false, result, null, isStale);
1002    if (state == null) {
1003      return; // Simple case, no replica requests.
1004    }
1005    // At this point we know that state is set to replica tracking class.
1006    // It could be that someone else is also looking at it; however, we know there can
1007    // only be one state object, and only one thread can set callCount to 0. Other threads
1008    // will either see state with callCount 0 after locking it; or will not see state at all
1009    // we will replace it with the result.
1010    synchronized (state) {
1011      if (state.callCount == 0) {
1012        return; // someone already set the result
1013      }
1014      state.callCount = 0;
1015    }
1016    synchronized (replicaResultLock) {
1017      if (results[index] != state) {
1018        throw new AssertionError("We set the callCount but someone else replaced the result");
1019      }
1020      updateResult(index, result);
1021    }
1022
1023    decActionCounter(index);
1024  }
1025
1026  /**
1027   * Sets the error from a particular action.
1028   * @param index Original action index.
1029   * @param row Original request.
1030   * @param throwable The resulting error.
1031   * @param server The source server.
1032   */
1033  private void setError(int index, Row row, Throwable throwable, ServerName server) {
1034    ReplicaResultState state = null;
1035    if (results == null) {
1036      // Note that we currently cannot have replica requests with null results. So it shouldn't
1037      // happen that multiple replica calls will call dAC for same actions with results == null.
1038      // Only one call per action should be present in this case.
1039      errors.add(throwable, row, server);
1040      decActionCounter(index);
1041      return; // Simple case, no replica requests.
1042    }
1043    state = trySetResultSimple(index, row, true, throwable, server, false);
1044    if (state == null) {
1045      return; // Simple case, no replica requests.
1046    }
1047    BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1048    boolean isActionDone = false;
1049    synchronized (state) {
1050      switch (state.callCount) {
1051        case 0: return; // someone already set the result
1052        case 1: { // All calls failed, we are the last error.
1053          target = errors;
1054          isActionDone = true;
1055          break;
1056        }
1057        default: {
1058          assert state.callCount > 1;
1059          if (state.replicaErrors == null) {
1060            state.replicaErrors = new BatchErrors();
1061          }
1062          target = state.replicaErrors;
1063          break;
1064        }
1065      }
1066      --state.callCount;
1067    }
1068    target.add(throwable, row, server);
1069    if (isActionDone) {
1070      if (state.replicaErrors != null) { // last call, no need to lock
1071        errors.merge(state.replicaErrors);
1072      }
1073      // See setResult for explanations.
1074      synchronized (replicaResultLock) {
1075        if (results[index] != state) {
1076          throw new AssertionError("We set the callCount but someone else replaced the result");
1077        }
1078        updateResult(index, throwable);
1079      }
1080      decActionCounter(index);
1081    }
1082  }
1083
1084  /**
1085   * Checks if the action is complete; used on error to prevent needless retries.
1086   * Does not synchronize, assuming element index/field accesses are atomic.
1087   * This is an opportunistic optimization check, doesn't have to be strict.
1088   * @param index Original action index.
1089   * @param row Original request.
1090   */
1091  private boolean isActionComplete(int index, Row row) {
1092    if (!AsyncProcess.isReplicaGet(row)) return false;
1093    Object resObj = results[index];
1094    return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1095        || ((ReplicaResultState)resObj).callCount == 0);
1096  }
1097
1098  /**
1099   * Tries to set the result or error for a particular action as if there were no replica calls.
1100   * @return null if successful; replica state if there were in fact replica calls.
1101   */
1102  private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1103                                                Object result, ServerName server, boolean isFromReplica) {
1104    Object resObj = null;
1105    if (!AsyncProcess.isReplicaGet(row)) {
1106      if (isFromReplica) {
1107        throw new AssertionError("Unexpected stale result for " + row);
1108      }
1109      updateResult(index, result);
1110    } else {
1111      synchronized (replicaResultLock) {
1112        resObj = results[index];
1113        if (resObj == null) {
1114          if (isFromReplica) {
1115            throw new AssertionError("Unexpected stale result for " + row);
1116          }
1117          updateResult(index, result);
1118        }
1119      }
1120    }
1121
1122    ReplicaResultState rrs =
1123        (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1124    if (rrs == null && isError) {
1125      // The resObj is not replica state (null or already set).
1126      errors.add((Throwable)result, row, server);
1127    }
1128
1129    if (resObj == null) {
1130      // resObj is null - no replica calls were made.
1131      decActionCounter(index);
1132      return null;
1133    }
1134    return rrs;
1135  }
1136
1137  private void decActionCounter(int index) {
1138    long actionsRemaining = actionsInProgress.decrementAndGet();
1139    if (actionsRemaining < 0) {
1140      String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1141      throw new AssertionError(error);
1142    } else if (actionsRemaining == 0) {
1143      synchronized (actionsInProgress) {
1144        actionsInProgress.notifyAll();
1145      }
1146    }
1147  }
1148
1149  private String buildDetailedErrorMsg(String string, int index) {
1150    StringBuilder error = new StringBuilder(128);
1151    error.append(string).append("; called for ").append(index).append(", actionsInProgress ")
1152        .append(actionsInProgress.get()).append("; replica gets: ");
1153    if (replicaGetIndices != null) {
1154      for (int i = 0; i < replicaGetIndices.length; ++i) {
1155        error.append(replicaGetIndices[i]).append(", ");
1156      }
1157    } else {
1158      error.append(hasAnyReplicaGets ? "all" : "none");
1159    }
1160    error.append("; results ");
1161    if (results != null) {
1162      for (int i = 0; i < results.length; ++i) {
1163        Object o = results[i];
1164        error.append(((o == null) ? "null" : o.toString())).append(", ");
1165      }
1166    }
1167    return error.toString();
1168  }
1169
1170  @Override
1171  public void waitUntilDone() throws InterruptedIOException {
1172    try {
1173      waitUntilDone(Long.MAX_VALUE);
1174    } catch (InterruptedException iex) {
1175      throw new InterruptedIOException(iex.getMessage());
1176    } finally {
1177      if (callsInProgress != null) {
1178        for (CancellableRegionServerCallable clb : callsInProgress) {
1179          clb.cancel();
1180        }
1181      }
1182    }
1183  }
1184
1185  private boolean waitUntilDone(long cutoff) throws InterruptedException {
1186    boolean hasWait = cutoff != Long.MAX_VALUE;
1187    long lastLog = EnvironmentEdgeManager.currentTime();
1188    long currentInProgress;
1189    while (0 != (currentInProgress = actionsInProgress.get())) {
1190      long now = EnvironmentEdgeManager.currentTime();
1191      if (hasWait && (now * 1000L) > cutoff) {
1192        return false;
1193      }
1194      if (!hasWait) { // Only log if wait is infinite.
1195        if (now > lastLog + 10000) {
1196          lastLog = now;
1197          LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress
1198              + "  actions to finish on table: " + tableName);
1199        }
1200      }
1201      synchronized (actionsInProgress) {
1202        if (actionsInProgress.get() == 0) break;
1203        if (!hasWait) {
1204          actionsInProgress.wait(10);
1205        } else {
1206          long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1207          TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1208        }
1209      }
1210    }
1211    return true;
1212  }
1213
1214  @Override
1215  public boolean hasError() {
1216    return errors.hasErrors();
1217  }
1218
1219  @Override
1220  public List<? extends Row> getFailedOperations() {
1221    return errors.actions;
1222  }
1223
1224  @Override
1225  public RetriesExhaustedWithDetailsException getErrors() {
1226    return errors.makeException(asyncProcess.logBatchErrorDetails);
1227  }
1228
1229  @Override
1230  public Object[] getResults() throws InterruptedIOException {
1231    waitUntilDone();
1232    return results;
1233  }
1234
1235  /**
1236   * Creates the server error tracker to use inside process.
1237   * Currently, to preserve the main assumption about current retries, and to work well with
1238   * the retry-limit-based calculation, the calculation is local per Process object.
1239   * We may benefit from connection-wide tracking of server errors.
1240   * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1241   */
1242  private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
1243    return new ConnectionImplementation.ServerErrorTracker(
1244        asyncProcess.serverTrackerTimeout, asyncProcess.numTries);
1245  }
1246
1247  /**
1248   * Create a callable. Isolated to be easily overridden in the tests.
1249   */
1250  private MultiServerCallable createCallable(final ServerName server, TableName tableName,
1251      final MultiAction multi) {
1252    return new MultiServerCallable(asyncProcess.connection, tableName, server,
1253        multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
1254  }
1255
1256  private void updateResult(int index, Object result) {
1257    Object current = results[index];
1258    if (current != null) {
1259      if (LOG.isDebugEnabled()) {
1260        LOG.debug("The result is assigned repeatedly! current:" + current
1261          + ", new:" + result);
1262      }
1263    }
1264    results[index] = result;
1265  }
1266
1267  @VisibleForTesting
1268  long getNumberOfActionsInProgress() {
1269    return actionsInProgress.get();
1270  }
1271}