001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.RejectedExecutionException;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicLong;
038import org.apache.hadoop.hbase.CallQueueTooBigException;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HRegionLocation;
042import org.apache.hadoop.hbase.RegionLocations;
043import org.apache.hadoop.hbase.RetryImmediatelyException;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
047import org.apache.hadoop.hbase.client.coprocessor.Batch;
048import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
049import org.apache.hadoop.hbase.trace.TraceUtil;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.htrace.core.Tracer;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * The context, and return value, for a single submit/submitAll call.
059 * Note on how this class (one AP submit) works. Initially, all requests are split into groups
060 * by server; request is sent to each server in parallel; the RPC calls are not async so a
061 * thread per server is used. Every time some actions fail, regions/locations might have
062 * changed, so we re-group them by server and region again and send these groups in parallel
063 * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
064 * scheduling children. This is why lots of code doesn't require any synchronization.
065 */
066@InterfaceAudience.Private
067class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
068
069  private static final Logger LOG = LoggerFactory.getLogger(AsyncRequestFutureImpl.class);
070
071  private RetryingTimeTracker tracker;
072
073  /**
074   * Runnable (that can be submitted to thread pool) that waits for when it's time
075   * to issue replica calls, finds region replicas, groups the requests by replica and
076   * issues the calls (on separate threads, via sendMultiAction).
077   * This is done on a separate thread because we don't want to wait on user thread for
078   * our asynchronous call, and usually we have to wait before making replica calls.
079   */
080  private final class ReplicaCallIssuingRunnable implements Runnable {
081    private final long startTime;
082    private final List<Action> initialActions;
083
084    public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) {
085      this.initialActions = initialActions;
086      this.startTime = startTime;
087    }
088
089    @Override
090    public void run() {
091      boolean done = false;
092      if (asyncProcess.primaryCallTimeoutMicroseconds > 0) {
093        try {
094          done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds);
095        } catch (InterruptedException ex) {
096          LOG.error("Replica thread interrupted - no replica calls {}", ex.getMessage());
097          return;
098        }
099      }
100      if (done) return; // Done within primary timeout
101      Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
102      List<Action> unknownLocActions = new ArrayList<>();
103      if (replicaGetIndices == null) {
104        for (int i = 0; i < results.length; ++i) {
105          addReplicaActions(i, actionsByServer, unknownLocActions);
106        }
107      } else {
108        for (int replicaGetIndice : replicaGetIndices) {
109          addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
110        }
111      }
112      if (!actionsByServer.isEmpty()) {
113        sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
114      }
115      if (!unknownLocActions.isEmpty()) {
116        actionsByServer = new HashMap<>();
117        for (Action action : unknownLocActions) {
118          addReplicaActionsAgain(action, actionsByServer);
119        }
120        // Some actions may have completely failed, they are handled inside addAgain.
121        if (!actionsByServer.isEmpty()) {
122          sendMultiAction(actionsByServer, 1, null, true);
123        }
124      }
125    }
126
127    /**
128     * Add replica actions to action map by server.
129     * @param index Index of the original action.
130     * @param actionsByServer The map by server to add it to.
131     */
132    private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer,
133                                   List<Action> unknownReplicaActions) {
134      if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
135      Action action = initialActions.get(index);
136      RegionLocations loc = findAllLocationsOrFail(action, true);
137      if (loc == null) return;
138      HRegionLocation[] locs = loc.getRegionLocations();
139      if (locs.length == 1) {
140        LOG.warn("No replicas found for {}", action.getAction());
141        return;
142      }
143      synchronized (replicaResultLock) {
144        // Don't run replica calls if the original has finished. We could do it e.g. if
145        // original has already failed before first replica call (unlikely given retries),
146        // but that would require additional synchronization w.r.t. returning to caller.
147        if (results[index] != null) return;
148        // We set the number of calls here. After that any path must call setResult/setError.
149        // True even for replicas that are not found - if we refuse to send we MUST set error.
150        updateResult(index, new ReplicaResultState(locs.length));
151      }
152      for (int i = 1; i < locs.length; ++i) {
153        Action replicaAction = new Action(action, i);
154        if (locs[i] != null) {
155          asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
156              replicaAction, actionsByServer, nonceGroup);
157        } else {
158          unknownReplicaActions.add(replicaAction);
159        }
160      }
161    }
162
163    private void addReplicaActionsAgain(
164        Action action, Map<ServerName, MultiAction> actionsByServer) {
165      if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
166        throw new AssertionError("Cannot have default replica here");
167      }
168      HRegionLocation loc = getReplicaLocationOrFail(action);
169      if (loc == null) return;
170      asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
171          action, actionsByServer, nonceGroup);
172    }
173  }
174
175  /**
176   * Runnable (that can be submitted to thread pool) that submits MultiAction to a
177   * single server. The server call is synchronous, therefore we do it on a thread pool.
178   */
179  final class SingleServerRequestRunnable implements Runnable {
180    private final MultiAction multiAction;
181    private final int numAttempt;
182    private final ServerName server;
183    private final Set<CancellableRegionServerCallable> callsInProgress;
184    SingleServerRequestRunnable(
185        MultiAction multiAction, int numAttempt, ServerName server,
186        Set<CancellableRegionServerCallable> callsInProgress) {
187      this.multiAction = multiAction;
188      this.numAttempt = numAttempt;
189      this.server = server;
190      this.callsInProgress = callsInProgress;
191    }
192
193    @Override
194    public void run() {
195      AbstractResponse res = null;
196      CancellableRegionServerCallable callable = currentCallable;
197      try {
198        // setup the callable based on the actions, if we don't have one already from the request
199        if (callable == null) {
200          callable = createCallable(server, tableName, multiAction);
201        }
202        RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout);
203        try {
204          if (callsInProgress != null) {
205            callsInProgress.add(callable);
206          }
207          res = caller.callWithoutRetries(callable, operationTimeout);
208          if (res == null) {
209            // Cancelled
210            return;
211          }
212        } catch (IOException e) {
213          // The service itself failed . It may be an error coming from the communication
214          //   layer, but, as well, a functional error raised by the server.
215          receiveGlobalFailure(multiAction, server, numAttempt, e);
216          return;
217        } catch (Throwable t) {
218          // This should not happen. Let's log & retry anyway.
219          LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected." +
220              " Retrying. Server=" + server + ", tableName=" + tableName, t);
221          receiveGlobalFailure(multiAction, server, numAttempt, t);
222          return;
223        }
224        if (res.type() == AbstractResponse.ResponseType.MULTI) {
225          // Normal case: we received an answer from the server, and it's not an exception.
226          receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt);
227        } else {
228          if (results != null) {
229            SingleResponse singleResponse = (SingleResponse) res;
230            updateResult(0, singleResponse.getEntry());
231          }
232          decActionCounter(1);
233        }
234      } catch (Throwable t) {
235        // Something really bad happened. We are on the send thread that will now die.
236        LOG.error("id=" + asyncProcess.id + " error for " + tableName + " processing " + server, t);
237        throw new RuntimeException(t);
238      } finally {
239        asyncProcess.decTaskCounters(multiAction.getRegions(), server);
240        if (callsInProgress != null && callable != null && res != null) {
241          callsInProgress.remove(callable);
242        }
243      }
244    }
245  }
246
247  private final Batch.Callback<CResult> callback;
248  private final BatchErrors errors;
249  private final ConnectionImplementation.ServerErrorTracker errorsByServer;
250  private final ExecutorService pool;
251  private final Set<CancellableRegionServerCallable> callsInProgress;
252
253
254  private final TableName tableName;
255  private final AtomicLong actionsInProgress = new AtomicLong(-1);
256  /**
257   * The lock controls access to results. It is only held when populating results where
258   * there might be several callers (eventual consistency gets). For other requests,
259   * there's one unique call going on per result index.
260   */
261  private final Object replicaResultLock = new Object();
262  /**
263   * Result array.  Null if results are not needed. Otherwise, each index corresponds to
264   * the action index in initial actions submitted. For most request types, has null-s for
265   * requests that are not done, and result/exception for those that are done.
266   * For eventual-consistency gets, initially the same applies; at some point, replica calls
267   * might be started, and ReplicaResultState is put at the corresponding indices. The
268   * returning calls check the type to detect when this is the case. After all calls are done,
269   * ReplicaResultState-s are replaced with results for the user.
270   */
271  private final Object[] results;
272  /**
273   * Indices of replica gets in results. If null, all or no actions are replica-gets.
274   */
275  private final int[] replicaGetIndices;
276  private final boolean hasAnyReplicaGets;
277  private final long nonceGroup;
278  private final CancellableRegionServerCallable currentCallable;
279  private final int operationTimeout;
280  private final int rpcTimeout;
281  private final AsyncProcess asyncProcess;
282
283  /**
284   * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
285   * used to make logging more clear, we don't actually care why we don't retry.
286   */
287  public enum Retry {
288    YES,
289    NO_LOCATION_PROBLEM,
290    NO_NOT_RETRIABLE,
291    NO_RETRIES_EXHAUSTED,
292    NO_OTHER_SUCCEEDED
293  }
294
295  /** Sync point for calls to multiple replicas for the same user request (Get).
296   * Created and put in the results array (we assume replica calls require results) when
297   * the replica calls are launched. See results for details of this process.
298   * POJO, all fields are public. To modify them, the object itself is locked. */
299  private static class ReplicaResultState {
300    public ReplicaResultState(int callCount) {
301      this.callCount = callCount;
302    }
303
304    /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
305    int callCount;
306    /** Errors for which it is not decided whether we will report them to user. If one of the
307     * calls succeeds, we will discard the errors that may have happened in the other calls. */
308    BatchErrors replicaErrors = null;
309
310    @Override
311    public String toString() {
312      return "[call count " + callCount + "; errors " + replicaErrors + "]";
313    }
314  }
315
316  public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
317      long nonceGroup, AsyncProcess asyncProcess) {
318    this.pool = task.getPool();
319    this.callback = task.getCallback();
320    this.nonceGroup = nonceGroup;
321    this.tableName = task.getTableName();
322    this.actionsInProgress.set(actions.size());
323    if (task.getResults() == null) {
324      results = task.getNeedResults() ? new Object[actions.size()] : null;
325    } else {
326      if (task.getResults().length != actions.size()) {
327        throw new AssertionError("results.length");
328      }
329      this.results = task.getResults();
330      for (int i = 0; i != this.results.length; ++i) {
331        results[i] = null;
332      }
333    }
334    List<Integer> replicaGetIndices = null;
335    boolean hasAnyReplicaGets = false;
336    if (results != null) {
337      // Check to see if any requests might require replica calls.
338      // We expect that many requests will consist of all or no multi-replica gets; in such
339      // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
340      // store the list of action indexes for which replica gets are possible, and set
341      // hasAnyReplicaGets to true.
342      boolean hasAnyNonReplicaReqs = false;
343      int posInList = 0;
344      for (Action action : actions) {
345        boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction());
346        if (isReplicaGet) {
347          hasAnyReplicaGets = true;
348          if (hasAnyNonReplicaReqs) { // Mixed case
349            if (replicaGetIndices == null) {
350              replicaGetIndices = new ArrayList<>(actions.size() - 1);
351            }
352            replicaGetIndices.add(posInList);
353          }
354        } else if (!hasAnyNonReplicaReqs) {
355          // The first non-multi-replica request in the action list.
356          hasAnyNonReplicaReqs = true;
357          if (posInList > 0) {
358            // Add all the previous requests to the index lists. We know they are all
359            // replica-gets because this is the first non-multi-replica request in the list.
360            replicaGetIndices = new ArrayList<>(actions.size() - 1);
361            for (int i = 0; i < posInList; ++i) {
362              replicaGetIndices.add(i);
363            }
364          }
365        }
366        ++posInList;
367      }
368    }
369    this.hasAnyReplicaGets = hasAnyReplicaGets;
370    if (replicaGetIndices != null) {
371      this.replicaGetIndices = new int[replicaGetIndices.size()];
372      int i = 0;
373      for (Integer el : replicaGetIndices) {
374        this.replicaGetIndices[i++] = el;
375      }
376    } else {
377      this.replicaGetIndices = null;
378    }
379    this.callsInProgress = !hasAnyReplicaGets ? null :
380        Collections.newSetFromMap(
381            new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>());
382    this.asyncProcess = asyncProcess;
383    this.errorsByServer = createServerErrorTracker();
384    this.errors = new BatchErrors();
385    this.operationTimeout = task.getOperationTimeout();
386    this.rpcTimeout = task.getRpcTimeout();
387    this.currentCallable = task.getCallable();
388    if (task.getCallable() == null) {
389      tracker = new RetryingTimeTracker().start();
390    }
391  }
392
393  protected Set<CancellableRegionServerCallable> getCallsInProgress() {
394    return callsInProgress;
395  }
396
397  SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server,
398        Set<CancellableRegionServerCallable> callsInProgress) {
399    return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
400  }
401
402  /**
403   * Group a list of actions per region servers, and send them.
404   *
405   * @param currentActions - the list of row to submit
406   * @param numAttempt - the current numAttempt (first attempt is 1)
407   */
408  void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) {
409    Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
410
411    boolean isReplica = false;
412    List<Action> unknownReplicaActions = null;
413    for (Action action : currentActions) {
414      RegionLocations locs = findAllLocationsOrFail(action, true);
415      if (locs == null) continue;
416      boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
417      if (isReplica && !isReplicaAction) {
418        // This is the property of the current implementation, not a requirement.
419        throw new AssertionError("Replica and non-replica actions in the same retry");
420      }
421      isReplica = isReplicaAction;
422      HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
423      if (loc == null || loc.getServerName() == null) {
424        if (isReplica) {
425          if (unknownReplicaActions == null) {
426            unknownReplicaActions = new ArrayList<>(1);
427          }
428          unknownReplicaActions.add(action);
429        } else {
430          // TODO: relies on primary location always being fetched
431          manageLocationError(action, null);
432        }
433      } else {
434        byte[] regionName = loc.getRegionInfo().getRegionName();
435        AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
436      }
437    }
438    boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
439    boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
440
441    if (!actionsByServer.isEmpty()) {
442      // If this is a first attempt to group and send, no replicas, we need replica thread.
443      sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
444          ? currentActions : null, numAttempt > 1 && !hasUnknown);
445    }
446
447    if (hasUnknown) {
448      actionsByServer = new HashMap<>();
449      for (Action action : unknownReplicaActions) {
450        HRegionLocation loc = getReplicaLocationOrFail(action);
451        if (loc == null) continue;
452        byte[] regionName = loc.getRegionInfo().getRegionName();
453        AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
454      }
455      if (!actionsByServer.isEmpty()) {
456        sendMultiAction(
457            actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
458      }
459    }
460  }
461
462  private HRegionLocation getReplicaLocationOrFail(Action action) {
463    // We are going to try get location once again. For each action, we'll do it once
464    // from cache, because the previous calls in the loop might populate it.
465    int replicaId = action.getReplicaId();
466    RegionLocations locs = findAllLocationsOrFail(action, true);
467    if (locs == null) return null; // manageError already called
468    HRegionLocation loc = locs.getRegionLocation(replicaId);
469    if (loc == null || loc.getServerName() == null) {
470      locs = findAllLocationsOrFail(action, false);
471      if (locs == null) return null; // manageError already called
472      loc = locs.getRegionLocation(replicaId);
473    }
474    if (loc == null || loc.getServerName() == null) {
475      manageLocationError(action, null);
476      return null;
477    }
478    return loc;
479  }
480
481  private void manageLocationError(Action action, Exception ex) {
482    String msg = "Cannot get replica " + action.getReplicaId()
483        + " location for " + action.getAction();
484    LOG.error(msg);
485    if (ex == null) {
486      ex = new IOException(msg);
487    }
488    manageError(action.getOriginalIndex(), action.getAction(),
489        Retry.NO_LOCATION_PROBLEM, ex, null);
490  }
491
492  private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) {
493    if (action.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess.id +
494        ", row cannot be null");
495    RegionLocations loc = null;
496    try {
497      loc = asyncProcess.connection.locateRegion(
498          tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
499    } catch (IOException ex) {
500      manageLocationError(action, ex);
501    }
502    return loc;
503  }
504
505  /**
506   * Send a multi action structure to the servers, after a delay depending on the attempt
507   * number. Asynchronous.
508   *
509   * @param actionsByServer the actions structured by regions
510   * @param numAttempt the attempt number.
511   * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
512   */
513  void sendMultiAction(Map<ServerName, MultiAction> actionsByServer,
514                               int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) {
515    // Run the last item on the same thread if we are already on a send thread.
516    // We hope most of the time it will be the only item, so we can cut down on threads.
517    int actionsRemaining = actionsByServer.size();
518    // This iteration is by server (the HRegionLocation comparator is by server portion only).
519    for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) {
520      ServerName server = e.getKey();
521      MultiAction multiAction = e.getValue();
522      Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
523          numAttempt);
524      // make sure we correctly count the number of runnables before we try to reuse the send
525      // thread, in case we had to split the request into different runnables because of backoff
526      if (runnables.size() > actionsRemaining) {
527        actionsRemaining = runnables.size();
528      }
529
530      // run all the runnables
531      // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack overflow
532      // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth
533      for (Runnable runnable : runnables) {
534        if ((--actionsRemaining == 0) && reuseThread
535            && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) {
536          runnable.run();
537        } else {
538          try {
539            pool.submit(runnable);
540          } catch (Throwable t) {
541            if (t instanceof RejectedExecutionException) {
542              // This should never happen. But as the pool is provided by the end user,
543              // let's secure this a little.
544              LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." +
545                  " Server=" + server.getServerName(), t);
546            } else {
547              // see #HBASE-14359 for more details
548              LOG.warn("Caught unexpected exception/error: ", t);
549            }
550            asyncProcess.decTaskCounters(multiAction.getRegions(), server);
551            // We're likely to fail again, but this will increment the attempt counter,
552            // so it will finish.
553            receiveGlobalFailure(multiAction, server, numAttempt, t);
554          }
555        }
556      }
557    }
558
559    if (actionsForReplicaThread != null) {
560      startWaitingForReplicaCalls(actionsForReplicaThread);
561    }
562  }
563
564  private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
565                                                                   MultiAction multiAction,
566                                                                   int numAttempt) {
567    // no stats to manage, just do the standard action
568    if (asyncProcess.connection.getStatisticsTracker() == null) {
569      if (asyncProcess.connection.getConnectionMetrics() != null) {
570        asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
571      }
572      asyncProcess.incTaskCounters(multiAction.getRegions(), server);
573      SingleServerRequestRunnable runnable = createSingleServerRequest(
574              multiAction, numAttempt, server, callsInProgress);
575      Tracer tracer = Tracer.curThreadTracer();
576
577      if (tracer == null) {
578        return Collections.singletonList(runnable);
579      } else {
580        return Collections.singletonList(tracer.wrap(runnable, "AsyncProcess.sendMultiAction"));
581      }
582    }
583
584    // group the actions by the amount of delay
585    Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size());
586
587    // split up the actions
588    for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) {
589      Long backoff = getBackoff(server, e.getKey());
590      DelayingRunner runner = actions.get(backoff);
591      if (runner == null) {
592        actions.put(backoff, new DelayingRunner(backoff, e));
593      } else {
594        runner.add(e);
595      }
596    }
597
598    List<Runnable> toReturn = new ArrayList<>(actions.size());
599    for (DelayingRunner runner : actions.values()) {
600      asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
601      String traceText = "AsyncProcess.sendMultiAction";
602      Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
603      // use a delay runner only if we need to sleep for some time
604      if (runner.getSleepTime() > 0) {
605        runner.setRunner(runnable);
606        traceText = "AsyncProcess.clientBackoff.sendMultiAction";
607        runnable = runner;
608        if (asyncProcess.connection.getConnectionMetrics() != null) {
609          asyncProcess.connection.getConnectionMetrics()
610            .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime());
611        }
612      } else {
613        if (asyncProcess.connection.getConnectionMetrics() != null) {
614          asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
615        }
616      }
617      runnable = TraceUtil.wrap(runnable, traceText);
618      toReturn.add(runnable);
619
620    }
621    return toReturn;
622  }
623
624  /**
625   * @param server server location where the target region is hosted
626   * @param regionName name of the region which we are going to write some data
627   * @return the amount of time the client should wait until it submit a request to the
628   * specified server and region
629   */
630  private Long getBackoff(ServerName server, byte[] regionName) {
631    ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker();
632    ServerStatistics stats = tracker.getStats(server);
633    return asyncProcess.connection.getBackoffPolicy()
634        .getBackoffTime(server, regionName, stats);
635  }
636
637  /**
638   * Starts waiting to issue replica calls on a different thread; or issues them immediately.
639   */
640  private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) {
641    long startTime = EnvironmentEdgeManager.currentTime();
642    ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
643        actionsForReplicaThread, startTime);
644    if (asyncProcess.primaryCallTimeoutMicroseconds == 0) {
645      // Start replica calls immediately.
646      replicaRunnable.run();
647    } else {
648      // Start the thread that may kick off replica gets.
649      // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
650      try {
651        pool.submit(replicaRunnable);
652      } catch (RejectedExecutionException ree) {
653        LOG.warn("id=" + asyncProcess.id + " replica task rejected by pool; no replica calls", ree);
654      }
655    }
656  }
657
658  /**
659   * Check that we can retry acts accordingly: logs, set the error status.
660   *
661   * @param originalIndex the position in the list sent
662   * @param row           the row
663   * @param canRetry      if false, we won't retry whatever the settings.
664   * @param throwable     the throwable, if any (can be null)
665   * @param server        the location, if any (can be null)
666   * @return true if the action can be retried, false otherwise.
667   */
668  Retry manageError(int originalIndex, Row row, Retry canRetry,
669                                        Throwable throwable, ServerName server) {
670    if (canRetry == Retry.YES
671        && throwable != null && throwable instanceof DoNotRetryIOException) {
672      canRetry = Retry.NO_NOT_RETRIABLE;
673    }
674
675    if (canRetry != Retry.YES) {
676      // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
677      setError(originalIndex, row, throwable, server);
678    } else if (isActionComplete(originalIndex, row)) {
679      canRetry = Retry.NO_OTHER_SUCCEEDED;
680    }
681    return canRetry;
682  }
683
684  /**
685   * Resubmit all the actions from this multiaction after a failure.
686   *
687   * @param rsActions  the actions still to do from the initial list
688   * @param server   the destination
689   * @param numAttempt the number of attempts so far
690   * @param t the throwable (if any) that caused the resubmit
691   */
692  private void receiveGlobalFailure(
693      MultiAction rsActions, ServerName server, int numAttempt, Throwable t) {
694    errorsByServer.reportServerError(server);
695    Retry canRetry = errorsByServer.canTryMore(numAttempt)
696        ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
697
698    cleanServerCache(server, t);
699    int failed = 0;
700    int stopped = 0;
701    List<Action> toReplay = new ArrayList<>();
702    for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
703      byte[] regionName = e.getKey();
704      byte[] row = e.getValue().get(0).getAction().getRow();
705      // Do not use the exception for updating cache because it might be coming from
706      // any of the regions in the MultiAction.
707      updateCachedLocations(server, regionName, row,
708        ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
709      for (Action action : e.getValue()) {
710        Retry retry = manageError(
711            action.getOriginalIndex(), action.getAction(), canRetry, t, server);
712        if (retry == Retry.YES) {
713          toReplay.add(action);
714        } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
715          ++stopped;
716        } else {
717          ++failed;
718        }
719      }
720    }
721
722    if (toReplay.isEmpty()) {
723      logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
724    } else {
725      resubmit(server, toReplay, numAttempt, rsActions.size(), t);
726    }
727  }
728
729  /**
730   * Log as much info as possible, and, if there is something to replay,
731   * submit it again after a back off sleep.
732   */
733  private void resubmit(ServerName oldServer, List<Action> toReplay,
734                        int numAttempt, int failureCount, Throwable throwable) {
735    // We have something to replay. We're going to sleep a little before.
736
737    // We have two contradicting needs here:
738    //  1) We want to get the new location after having slept, as it may change.
739    //  2) We want to take into account the location when calculating the sleep time.
740    //  3) If all this is just because the response needed to be chunked try again FAST.
741    // It should be possible to have some heuristics to take the right decision. Short term,
742    //  we go for one.
743    boolean retryImmediately = throwable instanceof RetryImmediatelyException;
744    int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
745    long backOffTime;
746    if (retryImmediately) {
747      backOffTime = 0;
748    } else if (throwable instanceof CallQueueTooBigException) {
749      // Give a special check on CQTBE, see #HBASE-17114
750      backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE);
751    } else {
752      backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause);
753    }
754    if (numAttempt > asyncProcess.startLogErrorsCnt) {
755      // We use this value to have some logs when we have multiple failures, but not too many
756      //  logs, as errors are to be expected when a region moves, splits and so on
757      LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
758          oldServer, throwable, backOffTime, true, null, -1, -1));
759    }
760
761    try {
762      if (backOffTime > 0) {
763        Thread.sleep(backOffTime);
764      }
765    } catch (InterruptedException e) {
766      LOG.warn("#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
767      Thread.currentThread().interrupt();
768      return;
769    }
770
771    groupAndSendMultiAction(toReplay, nextAttemptNumber);
772  }
773
774  private void logNoResubmit(ServerName oldServer, int numAttempt,
775                             int failureCount, Throwable throwable, int failed, int stopped) {
776    if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) {
777      String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
778      String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
779          throwable, -1, false, timeStr, failed, stopped);
780      if (failed != 0) {
781        // Only log final failures as warning
782        LOG.warn(logMessage);
783      } else {
784        LOG.info(logMessage);
785      }
786    }
787  }
788
789  /**
790   * Called when we receive the result of a server query.
791   *
792   * @param multiAction    - the multiAction we sent
793   * @param server       - the location. It's used as a server name.
794   * @param responses      - the response, if any
795   * @param numAttempt     - the attempt
796   */
797  private void receiveMultiAction(MultiAction multiAction, ServerName server,
798      MultiResponse responses, int numAttempt) {
799    assert responses != null;
800    updateStats(server, responses);
801    // Success or partial success
802    // Analyze detailed results. We can still have individual failures to be redo.
803    // two specific throwables are managed:
804    //  - DoNotRetryIOException: we continue to retry for other actions
805    //  - RegionMovedException: we update the cache with the new region location
806    Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
807    List<Action> toReplay = new ArrayList<>();
808    Throwable lastException = null;
809    int failureCount = 0;
810    int failed = 0;
811    int stopped = 0;
812    Retry retry = null;
813    // Go by original action.
814    for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
815      byte[] regionName = regionEntry.getKey();
816
817      Throwable regionException = responses.getExceptions().get(regionName);
818      if (regionException != null) {
819        cleanServerCache(server, regionException);
820      }
821
822      Map<Integer, Object> regionResults =
823        results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap();
824      boolean regionFailureRegistered = false;
825      for (Action sentAction : regionEntry.getValue()) {
826        Object result = regionResults.get(sentAction.getOriginalIndex());
827        if (result == null) {
828          if (regionException == null) {
829            LOG.error("Server sent us neither results nor exceptions for "
830              + Bytes.toStringBinary(regionName)
831              + ", numAttempt:" + numAttempt);
832            regionException = new RuntimeException("Invalid response");
833          }
834          // If the row operation encounters the region-lever error, the exception of action may be
835          // null.
836          result = regionException;
837        }
838        // Failure: retry if it's make sense else update the errors lists
839        if (result instanceof Throwable) {
840          Throwable actionException = (Throwable) result;
841          Row row = sentAction.getAction();
842          lastException = regionException != null ? regionException
843            : ClientExceptionsUtil.findException(actionException);
844          // Register corresponding failures once per server/once per region.
845          if (!regionFailureRegistered) {
846            regionFailureRegistered = true;
847            updateCachedLocations(server, regionName, row.getRow(), actionException);
848          }
849          if (retry == null) {
850            errorsByServer.reportServerError(server);
851            // We determine canRetry only once for all calls, after reporting server failure.
852            retry = errorsByServer.canTryMore(numAttempt) ?
853              Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
854          }
855          ++failureCount;
856          switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException,
857            server)) {
858            case YES:
859              toReplay.add(sentAction);
860              break;
861            case NO_OTHER_SUCCEEDED:
862              ++stopped;
863              break;
864            default:
865              ++failed;
866              break;
867          }
868        } else {
869          invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result);
870          setResult(sentAction, result);
871        }
872      }
873    }
874    if (toReplay.isEmpty()) {
875      logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped);
876    } else {
877      resubmit(server, toReplay, numAttempt, failureCount, lastException);
878    }
879  }
880
881  private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row,
882    Throwable rowException) {
883    if (tableName == null) {
884      return;
885    }
886    try {
887      asyncProcess.connection
888        .updateCachedLocations(tableName, regionName, row, rowException, server);
889    } catch (Throwable ex) {
890      // That should never happen, but if it did, we want to make sure
891      // we still process errors
892      LOG.error("Couldn't update cached region locations: " + ex);
893    }
894  }
895
896  private void invokeCallBack(byte[] regionName, byte[] row, CResult result) {
897    if (callback != null) {
898      try {
899        //noinspection unchecked
900        // TODO: would callback expect a replica region name if it gets one?
901        this.callback.update(regionName, row, result);
902      } catch (Throwable t) {
903        LOG.error("User callback threw an exception for "
904          + Bytes.toStringBinary(regionName) + ", ignoring", t);
905      }
906    }
907  }
908
909  private void cleanServerCache(ServerName server, Throwable regionException) {
910    if (ClientExceptionsUtil.isMetaClearingException(regionException)) {
911      // We want to make sure to clear the cache in case there were location-related exceptions.
912      // We don't to clear the cache for every possible exception that comes through, however.
913      asyncProcess.connection.clearCaches(server);
914    }
915  }
916
917  protected void updateStats(ServerName server, MultiResponse resp) {
918    ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()),
919      Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp);
920  }
921
922  private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
923                           Throwable error, long backOffTime, boolean willRetry, String startTime,
924                           int failed, int stopped) {
925    StringBuilder sb = new StringBuilder();
926    sb.append("id=").append(asyncProcess.id).append(", table=").append(tableName).
927        append(", attempt=").append(numAttempt).append("/").append(asyncProcess.numTries).
928        append(", ");
929
930    if (failureCount > 0 || error != null){
931      sb.append("failureCount=").append(failureCount).append("ops").append(", last exception=").
932          append(error);
933    } else {
934      sb.append("succeeded");
935    }
936
937    sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
938
939    if (willRetry) {
940      sb.append(", retrying after=").append(backOffTime).append("ms").
941          append(", operationsToReplay=").append(replaySize);
942    } else if (failureCount > 0) {
943      if (stopped > 0) {
944        sb.append("; NOT retrying, stopped=").append(stopped).
945            append(" because successful operation on other replica");
946      }
947      if (failed > 0) {
948        sb.append("; NOT retrying, failed=").append(failed).append(" -- final attempt!");
949      }
950    }
951
952    return sb.toString();
953  }
954
955  /**
956   * Sets the non-error result from a particular action.
957   * @param action Action (request) that the server responded to.
958   * @param result The result.
959   */
960  private void setResult(Action action, Object result) {
961    if (result == null) {
962      throw new RuntimeException("Result cannot be null");
963    }
964    ReplicaResultState state = null;
965    boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
966    int index = action.getOriginalIndex();
967    if (results == null) {
968      decActionCounter(index);
969      return; // Simple case, no replica requests.
970    }
971    state = trySetResultSimple(index, action.getAction(), false, result, null, isStale);
972    if (state == null) {
973      return; // Simple case, no replica requests.
974    }
975    // At this point we know that state is set to replica tracking class.
976    // It could be that someone else is also looking at it; however, we know there can
977    // only be one state object, and only one thread can set callCount to 0. Other threads
978    // will either see state with callCount 0 after locking it; or will not see state at all
979    // we will replace it with the result.
980    synchronized (state) {
981      if (state.callCount == 0) {
982        return; // someone already set the result
983      }
984      state.callCount = 0;
985    }
986    synchronized (replicaResultLock) {
987      if (results[index] != state) {
988        throw new AssertionError("We set the callCount but someone else replaced the result");
989      }
990      updateResult(index, result);
991    }
992
993    decActionCounter(index);
994  }
995
996  /**
997   * Sets the error from a particular action.
998   * @param index Original action index.
999   * @param row Original request.
1000   * @param throwable The resulting error.
1001   * @param server The source server.
1002   */
1003  private void setError(int index, Row row, Throwable throwable, ServerName server) {
1004    ReplicaResultState state = null;
1005    if (results == null) {
1006      // Note that we currently cannot have replica requests with null results. So it shouldn't
1007      // happen that multiple replica calls will call dAC for same actions with results == null.
1008      // Only one call per action should be present in this case.
1009      errors.add(throwable, row, server);
1010      decActionCounter(index);
1011      return; // Simple case, no replica requests.
1012    }
1013    state = trySetResultSimple(index, row, true, throwable, server, false);
1014    if (state == null) {
1015      return; // Simple case, no replica requests.
1016    }
1017    BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1018    boolean isActionDone = false;
1019    synchronized (state) {
1020      switch (state.callCount) {
1021        case 0: return; // someone already set the result
1022        case 1: { // All calls failed, we are the last error.
1023          target = errors;
1024          isActionDone = true;
1025          break;
1026        }
1027        default: {
1028          assert state.callCount > 1;
1029          if (state.replicaErrors == null) {
1030            state.replicaErrors = new BatchErrors();
1031          }
1032          target = state.replicaErrors;
1033          break;
1034        }
1035      }
1036      --state.callCount;
1037    }
1038    target.add(throwable, row, server);
1039    if (isActionDone) {
1040      if (state.replicaErrors != null) { // last call, no need to lock
1041        errors.merge(state.replicaErrors);
1042      }
1043      // See setResult for explanations.
1044      synchronized (replicaResultLock) {
1045        if (results[index] != state) {
1046          throw new AssertionError("We set the callCount but someone else replaced the result");
1047        }
1048        updateResult(index, throwable);
1049      }
1050      decActionCounter(index);
1051    }
1052  }
1053
1054  /**
1055   * Checks if the action is complete; used on error to prevent needless retries.
1056   * Does not synchronize, assuming element index/field accesses are atomic.
1057   * This is an opportunistic optimization check, doesn't have to be strict.
1058   * @param index Original action index.
1059   * @param row Original request.
1060   */
1061  private boolean isActionComplete(int index, Row row) {
1062    if (!AsyncProcess.isReplicaGet(row)) return false;
1063    Object resObj = results[index];
1064    return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1065        || ((ReplicaResultState)resObj).callCount == 0);
1066  }
1067
1068  /**
1069   * Tries to set the result or error for a particular action as if there were no replica calls.
1070   * @return null if successful; replica state if there were in fact replica calls.
1071   */
1072  private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1073                                                Object result, ServerName server, boolean isFromReplica) {
1074    Object resObj = null;
1075    if (!AsyncProcess.isReplicaGet(row)) {
1076      if (isFromReplica) {
1077        throw new AssertionError("Unexpected stale result for " + row);
1078      }
1079      updateResult(index, result);
1080    } else {
1081      synchronized (replicaResultLock) {
1082        resObj = results[index];
1083        if (resObj == null) {
1084          if (isFromReplica) {
1085            throw new AssertionError("Unexpected stale result for " + row);
1086          }
1087          updateResult(index, result);
1088        }
1089      }
1090    }
1091
1092    ReplicaResultState rrs =
1093        (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1094    if (rrs == null && isError) {
1095      // The resObj is not replica state (null or already set).
1096      errors.add((Throwable)result, row, server);
1097    }
1098
1099    if (resObj == null) {
1100      // resObj is null - no replica calls were made.
1101      decActionCounter(index);
1102      return null;
1103    }
1104    return rrs;
1105  }
1106
1107  private void decActionCounter(int index) {
1108    long actionsRemaining = actionsInProgress.decrementAndGet();
1109    if (actionsRemaining < 0) {
1110      String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1111      throw new AssertionError(error);
1112    } else if (actionsRemaining == 0) {
1113      synchronized (actionsInProgress) {
1114        actionsInProgress.notifyAll();
1115      }
1116    }
1117  }
1118
1119  private String buildDetailedErrorMsg(String string, int index) {
1120    StringBuilder error = new StringBuilder(128);
1121    error.append(string).append("; called for ").append(index).append(", actionsInProgress ")
1122        .append(actionsInProgress.get()).append("; replica gets: ");
1123    if (replicaGetIndices != null) {
1124      for (int i = 0; i < replicaGetIndices.length; ++i) {
1125        error.append(replicaGetIndices[i]).append(", ");
1126      }
1127    } else {
1128      error.append(hasAnyReplicaGets ? "all" : "none");
1129    }
1130    error.append("; results ");
1131    if (results != null) {
1132      for (int i = 0; i < results.length; ++i) {
1133        Object o = results[i];
1134        error.append(((o == null) ? "null" : o.toString())).append(", ");
1135      }
1136    }
1137    return error.toString();
1138  }
1139
1140  @Override
1141  public void waitUntilDone() throws InterruptedIOException {
1142    try {
1143      waitUntilDone(Long.MAX_VALUE);
1144    } catch (InterruptedException iex) {
1145      throw new InterruptedIOException(iex.getMessage());
1146    } finally {
1147      if (callsInProgress != null) {
1148        for (CancellableRegionServerCallable clb : callsInProgress) {
1149          clb.cancel();
1150        }
1151      }
1152    }
1153  }
1154
1155  private boolean waitUntilDone(long cutoff) throws InterruptedException {
1156    boolean hasWait = cutoff != Long.MAX_VALUE;
1157    long lastLog = EnvironmentEdgeManager.currentTime();
1158    long currentInProgress;
1159    while (0 != (currentInProgress = actionsInProgress.get())) {
1160      long now = EnvironmentEdgeManager.currentTime();
1161      if (hasWait && (now * 1000L) > cutoff) {
1162        return false;
1163      }
1164      if (!hasWait) { // Only log if wait is infinite.
1165        if (now > lastLog + 10000) {
1166          lastLog = now;
1167          LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress
1168              + "  actions to finish on table: " + tableName);
1169        }
1170      }
1171      synchronized (actionsInProgress) {
1172        if (actionsInProgress.get() == 0) break;
1173        if (!hasWait) {
1174          actionsInProgress.wait(10);
1175        } else {
1176          long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1177          TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1178        }
1179      }
1180    }
1181    return true;
1182  }
1183
1184  @Override
1185  public boolean hasError() {
1186    return errors.hasErrors();
1187  }
1188
1189  @Override
1190  public List<? extends Row> getFailedOperations() {
1191    return errors.actions;
1192  }
1193
1194  @Override
1195  public RetriesExhaustedWithDetailsException getErrors() {
1196    return errors.makeException(asyncProcess.logBatchErrorDetails);
1197  }
1198
1199  @Override
1200  public Object[] getResults() throws InterruptedIOException {
1201    waitUntilDone();
1202    return results;
1203  }
1204
1205  /**
1206   * Creates the server error tracker to use inside process.
1207   * Currently, to preserve the main assumption about current retries, and to work well with
1208   * the retry-limit-based calculation, the calculation is local per Process object.
1209   * We may benefit from connection-wide tracking of server errors.
1210   * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1211   */
1212  private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
1213    return new ConnectionImplementation.ServerErrorTracker(
1214        asyncProcess.serverTrackerTimeout, asyncProcess.numTries);
1215  }
1216
1217  /**
1218   * Create a callable. Isolated to be easily overridden in the tests.
1219   */
1220  private MultiServerCallable createCallable(final ServerName server, TableName tableName,
1221      final MultiAction multi) {
1222    return new MultiServerCallable(asyncProcess.connection, tableName, server,
1223        multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
1224  }
1225
1226  private void updateResult(int index, Object result) {
1227    Object current = results[index];
1228    if (current != null) {
1229      if (LOG.isDebugEnabled()) {
1230        LOG.debug("The result is assigned repeatedly! current:" + current
1231          + ", new:" + result);
1232      }
1233    }
1234    results[index] = result;
1235  }
1236
1237  long getNumberOfActionsInProgress() {
1238    return actionsInProgress.get();
1239  }
1240}