View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Date;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.ConcurrentSkipListMap;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.hbase.RetryImmediatelyException;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.DoNotRetryIOException;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.RegionLocations;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
55  import org.apache.hadoop.hbase.client.coprocessor.Batch;
56  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
57  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60  import org.apache.htrace.Trace;
61  
62  import com.google.common.annotations.VisibleForTesting;
63  
64  /**
65   * This class  allows a continuous flow of requests. It's written to be compatible with a
66   * synchronous caller such as HTable.
67   * <p>
68   * The caller sends a buffer of operation, by calling submit. This class extract from this list
69   * the operations it can send, i.e. the operations that are on region that are not considered
70   * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
71   * iterate on the list. If, and only if, the maximum number of current task is reached, the call
72   * to submit will block. Alternatively, the caller can call submitAll, in which case all the
73   * operations will be sent. Each call to submit returns a future-like object that can be used
74   * to track operation progress.
75   * </p>
76   * <p>
77   * The class manages internally the retries.
78   * </p>
79   * <p>
80   * The class can be constructed in regular mode, or "global error" mode. In global error mode,
81   * AP tracks errors across all calls (each "future" also has global view of all errors). That
82   * mode is necessary for backward compat with HTable behavior, where multiple submissions are
83   * made and the errors can propagate using any put/flush call, from previous calls.
84   * In "regular" mode, the errors are tracked inside the Future object that is returned.
85   * The results are always tracked inside the Future object and can be retrieved when the call
86   * has finished. Partial results can also be retrieved if some part of multi-request failed.
87   * </p>
88   * <p>
89   * This class is thread safe in regular mode; in global error code, submitting operations and
90   * retrieving errors from different threads may be not thread safe.
91   * Internally, the class is thread safe enough to manage simultaneously new submission and results
92   * arising from older operations.
93   * </p>
94   * <p>
95   * Internally, this class works with {@link Row}, this mean it could be theoretically used for
96   * gets as well.
97   * </p>
98   */
99  @InterfaceAudience.Private
100 class AsyncProcess {
101   private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
102   protected static final AtomicLong COUNTER = new AtomicLong();
103 
104   public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
105 
106   /**
107    * Configure the number of failures after which the client will start logging. A few failures
108    * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
109    * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at
110    * this stage.
111    */
112   public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
113       "hbase.client.start.log.errors.counter";
114   public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
115 
116   /**
117    * Configuration to decide whether to log details for batch error
118    */
119   public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
120 
121   private final int thresholdToLogUndoneTaskDetails;
122   private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
123       "hbase.client.threshold.log.details";
124   private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
125   private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
126 
127   /**
128    * The context used to wait for results from one submit call.
129    * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
130    *    then errors and failed operations in this object will reflect global errors.
131    * 2) If submit call is made with needResults false, results will not be saved.
132    *  */
133   public static interface AsyncRequestFuture {
134     public boolean hasError();
135     public RetriesExhaustedWithDetailsException getErrors();
136     public List<? extends Row> getFailedOperations();
137     public Object[] getResults() throws InterruptedIOException;
138     /** Wait until all tasks are executed, successfully or not. */
139     public void waitUntilDone() throws InterruptedIOException;
140   }
141 
142   /**
143    * Return value from a submit that didn't contain any requests.
144    */
145   private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
146 
147     final Object[] result = new Object[0];
148 
149     @Override
150     public boolean hasError() {
151       return false;
152     }
153 
154     @Override
155     public RetriesExhaustedWithDetailsException getErrors() {
156       return null;
157     }
158 
159     @Override
160     public List<? extends Row> getFailedOperations() {
161       return null;
162     }
163 
164     @Override
165     public Object[] getResults() {
166       return result;
167     }
168 
169     @Override
170     public void waitUntilDone() throws InterruptedIOException {
171     }
172   };
173 
174   /** Sync point for calls to multiple replicas for the same user request (Get).
175    * Created and put in the results array (we assume replica calls require results) when
176    * the replica calls are launched. See results for details of this process.
177    * POJO, all fields are public. To modify them, the object itself is locked. */
178   private static class ReplicaResultState {
179     public ReplicaResultState(int callCount) {
180       this.callCount = callCount;
181     }
182 
183     /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
184     int callCount;
185     /** Errors for which it is not decided whether we will report them to user. If one of the
186      * calls succeeds, we will discard the errors that may have happened in the other calls. */
187     BatchErrors replicaErrors = null;
188 
189     @Override
190     public String toString() {
191       return "[call count " + callCount + "; errors " + replicaErrors + "]";
192     }
193   }
194 
195 
196   // TODO: many of the fields should be made private
197   protected final long id;
198 
199   protected final ClusterConnection connection;
200   protected final RpcRetryingCallerFactory rpcCallerFactory;
201   protected final RpcControllerFactory rpcFactory;
202   protected final BatchErrors globalErrors;
203   protected final ExecutorService pool;
204 
205   protected final AtomicLong tasksInProgress = new AtomicLong(0);
206   protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
207       new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
208   protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
209       new ConcurrentHashMap<ServerName, AtomicInteger>();
210 
211   // Start configuration settings.
212   private final int startLogErrorsCnt;
213 
214   /**
215    * The number of tasks simultaneously executed on the cluster.
216    */
217   protected final int maxTotalConcurrentTasks;
218 
219   /**
220    * The number of tasks we run in parallel on a single region.
221    * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
222    * a set of operations on a region before the previous one is done. As well, this limits
223    * the pressure we put on the region server.
224    */
225   protected final int maxConcurrentTasksPerRegion;
226 
227   /**
228    * The number of task simultaneously executed on a single region server.
229    */
230   protected final int maxConcurrentTasksPerServer;
231   protected final long pause;
232   protected int numTries;
233   protected int serverTrackerTimeout;
234   protected int timeout;
235   protected long primaryCallTimeoutMicroseconds;
236   // End configuration settings.
237 
238   protected static class BatchErrors {
239     private final List<Throwable> throwables = new ArrayList<Throwable>();
240     private final List<Row> actions = new ArrayList<Row>();
241     private final List<String> addresses = new ArrayList<String>();
242 
243     public synchronized void add(Throwable ex, Row row, ServerName serverName) {
244       if (row == null){
245         throw new IllegalArgumentException("row cannot be null. location=" + serverName);
246       }
247 
248       throwables.add(ex);
249       actions.add(row);
250       addresses.add(serverName != null ? serverName.toString() : "null");
251     }
252 
253     public boolean hasErrors() {
254       return !throwables.isEmpty();
255     }
256 
257     private synchronized RetriesExhaustedWithDetailsException makeException() {
258       return new RetriesExhaustedWithDetailsException(
259           new ArrayList<Throwable>(throwables),
260           new ArrayList<Row>(actions), new ArrayList<String>(addresses));
261     }
262 
263     public synchronized void clear() {
264       throwables.clear();
265       actions.clear();
266       addresses.clear();
267     }
268 
269     public synchronized void merge(BatchErrors other) {
270       throwables.addAll(other.throwables);
271       actions.addAll(other.actions);
272       addresses.addAll(other.addresses);
273     }
274   }
275 
276   public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
277       RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
278     if (hc == null) {
279       throw new IllegalArgumentException("HConnection cannot be null.");
280     }
281 
282     this.connection = hc;
283     this.pool = pool;
284     this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
285 
286     this.id = COUNTER.incrementAndGet();
287 
288     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
289         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
290     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
291         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
292     this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
293         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
294     this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
295 
296     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
297       HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
298     this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
299           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
300     this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
301           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
302 
303     this.startLogErrorsCnt =
304         conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
305 
306     if (this.maxTotalConcurrentTasks <= 0) {
307       throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
308     }
309     if (this.maxConcurrentTasksPerServer <= 0) {
310       throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
311           maxConcurrentTasksPerServer);
312     }
313     if (this.maxConcurrentTasksPerRegion <= 0) {
314       throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
315           maxConcurrentTasksPerRegion);
316     }
317 
318     // Server tracker allows us to do faster, and yet useful (hopefully), retries.
319     // However, if we are too useful, we might fail very quickly due to retry count limit.
320     // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
321     // retry time if normal retries were used. Then we will retry until this time runs out.
322     // If we keep hitting one server, the net effect will be the incremental backoff, and
323     // essentially the same number of retries as planned. If we have to do faster retries,
324     // we will do more retries in aggregate, but the user will be none the wiser.
325     this.serverTrackerTimeout = 0;
326     for (int i = 0; i < this.numTries; ++i) {
327       serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
328     }
329 
330     this.rpcCallerFactory = rpcCaller;
331     this.rpcFactory = rpcFactory;
332 
333     this.thresholdToLogUndoneTaskDetails =
334         conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
335           DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
336   }
337 
338   /**
339    * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
340    *         RuntimeException
341    */
342   private ExecutorService getPool(ExecutorService pool) {
343     if (pool != null) {
344       return pool;
345     }
346     if (this.pool != null) {
347       return this.pool;
348     }
349     throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
350   }
351 
352   /**
353    * See {@link #submit(ExecutorService, TableName, List, boolean, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, boolean)}.
354    * Uses default ExecutorService for this AP (must have been created with one).
355    */
356   public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
357       boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
358       throws InterruptedIOException {
359     return submit(null, tableName, rows, atLeastOne, callback, needResults);
360   }
361 
362   /**
363    * Extract from the rows list what we can submit. The rows we can not submit are kept in the
364    * list. Does not send requests to replicas (not currently used for anything other
365    * than streaming puts anyway).
366    *
367    * @param pool ExecutorService to use.
368    * @param tableName The table for which this request is needed.
369    * @param callback Batch callback. Only called on success (94 behavior).
370    * @param needResults Whether results are needed, or can be discarded.
371    * @param rows - the submitted row. Modified by the method: we remove the rows we took.
372    * @param atLeastOne true if we should submit at least a subset.
373    */
374   public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
375       List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
376       boolean needResults) throws InterruptedIOException {
377     if (rows.isEmpty()) {
378       return NO_REQS_RESULT;
379     }
380 
381     Map<ServerName, MultiAction<Row>> actionsByServer =
382         new HashMap<ServerName, MultiAction<Row>>();
383     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
384 
385     NonceGenerator ng = this.connection.getNonceGenerator();
386     long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
387 
388     // Location errors that happen before we decide what requests to take.
389     List<Exception> locationErrors = null;
390     List<Integer> locationErrorRows = null;
391     do {
392       // Wait until there is at least one slot for a new task.
393       waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
394 
395       // Remember the previous decisions about regions or region servers we put in the
396       //  final multi.
397       Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
398       Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
399 
400       int posInList = -1;
401       Iterator<? extends Row> it = rows.iterator();
402       while (it.hasNext()) {
403         Row r = it.next();
404         HRegionLocation loc;
405         try {
406           if (r == null) {
407             throw new IllegalArgumentException("#" + id + ", row cannot be null");
408           }
409           // Make sure we get 0-s replica.
410           RegionLocations locs = connection.locateRegion(
411               tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
412           if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
413             throw new IOException("#" + id + ", no location found, aborting submit for"
414                 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
415           }
416           loc = locs.getDefaultRegionLocation();
417         } catch (IOException ex) {
418           locationErrors = new ArrayList<Exception>();
419           locationErrorRows = new ArrayList<Integer>();
420           LOG.error("Failed to get region location ", ex);
421           // This action failed before creating ars. Retain it, but do not add to submit list.
422           // We will then add it to ars in an already-failed state.
423           retainedActions.add(new Action<Row>(r, ++posInList));
424           locationErrors.add(ex);
425           locationErrorRows.add(posInList);
426           it.remove();
427           break; // Backward compat: we stop considering actions on location error.
428         }
429 
430         if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
431           Action<Row> action = new Action<Row>(r, ++posInList);
432           setNonce(ng, r, action);
433           retainedActions.add(action);
434           // TODO: replica-get is not supported on this path
435           byte[] regionName = loc.getRegionInfo().getRegionName();
436           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
437           it.remove();
438         }
439       }
440     } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
441 
442     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
443 
444     return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
445       locationErrors, locationErrorRows, actionsByServer, pool);
446   }
447 
448   <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
449       List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
450       Object[] results, boolean needResults, List<Exception> locationErrors,
451       List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
452       ExecutorService pool) {
453     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
454       tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
455     // Add location errors if any
456     if (locationErrors != null) {
457       for (int i = 0; i < locationErrors.size(); ++i) {
458         int originalIndex = locationErrorRows.get(i);
459         Row row = retainedActions.get(originalIndex).getAction();
460         ars.manageError(originalIndex, row,
461           Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
462       }
463     }
464     ars.sendMultiAction(actionsByServer, 1, null, false);
465     return ars;
466   }
467 
468   /**
469    * Helper that is used when grouping the actions per region server.
470    *
471    * @param loc - the destination. Must not be null.
472    * @param action - the action to add to the multiaction
473    * @param actionsByServer the multiaction per server
474    * @param nonceGroup Nonce group.
475    */
476   private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
477       Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
478     MultiAction<Row> multiAction = actionsByServer.get(server);
479     if (multiAction == null) {
480       multiAction = new MultiAction<Row>();
481       actionsByServer.put(server, multiAction);
482     }
483     if (action.hasNonce() && !multiAction.hasNonceGroup()) {
484       multiAction.setNonceGroup(nonceGroup);
485     }
486 
487     multiAction.add(regionName, action);
488   }
489 
490   /**
491    * Check if we should send new operations to this region or region server.
492    * We're taking into account the past decision; if we have already accepted
493    * operation on a given region, we accept all operations for this region.
494    *
495    * @param loc; the region and the server name we want to use.
496    * @return true if this region is considered as busy.
497    */
498   protected boolean canTakeOperation(HRegionLocation loc,
499                                      Map<HRegionInfo, Boolean> regionsIncluded,
500                                      Map<ServerName, Boolean> serversIncluded) {
501     HRegionInfo regionInfo = loc.getRegionInfo();
502     Boolean regionPrevious = regionsIncluded.get(regionInfo);
503 
504     if (regionPrevious != null) {
505       // We already know what to do with this region.
506       return regionPrevious;
507     }
508 
509     Boolean serverPrevious = serversIncluded.get(loc.getServerName());
510     if (Boolean.FALSE.equals(serverPrevious)) {
511       // It's a new region, on a region server that we have already excluded.
512       regionsIncluded.put(regionInfo, Boolean.FALSE);
513       return false;
514     }
515 
516     AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
517     if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
518       // Too many tasks on this region already.
519       regionsIncluded.put(regionInfo, Boolean.FALSE);
520       return false;
521     }
522 
523     if (serverPrevious == null) {
524       // The region is ok, but we need to decide for this region server.
525       int newServers = 0; // number of servers we're going to contact so far
526       for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
527         if (kv.getValue()) {
528           newServers++;
529         }
530       }
531 
532       // Do we have too many total tasks already?
533       boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
534 
535       if (ok) {
536         // If the total is fine, is it ok for this individual server?
537         AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
538         ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
539       }
540 
541       if (!ok) {
542         regionsIncluded.put(regionInfo, Boolean.FALSE);
543         serversIncluded.put(loc.getServerName(), Boolean.FALSE);
544         return false;
545       }
546 
547       serversIncluded.put(loc.getServerName(), Boolean.TRUE);
548     } else {
549       assert serverPrevious.equals(Boolean.TRUE);
550     }
551 
552     regionsIncluded.put(regionInfo, Boolean.TRUE);
553 
554     return true;
555   }
556 
557   /**
558    * See {@link #submitAll(ExecutorService, TableName, List, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, Object[])}.
559    * Uses default ExecutorService for this AP (must have been created with one).
560    */
561   public <CResult> AsyncRequestFuture submitAll(TableName tableName,
562       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
563     return submitAll(null, tableName, rows, callback, results);
564   }
565 
566   /**
567    * Submit immediately the list of rows, whatever the server status. Kept for backward
568    * compatibility: it allows to be used with the batch interface that return an array of objects.
569    *
570    * @param pool ExecutorService to use.
571    * @param tableName name of the table for which the submission is made.
572    * @param rows the list of rows.
573    * @param callback the callback.
574    * @param results Optional array to return the results thru; backward compat.
575    */
576   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
577       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
578     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
579 
580     // The position will be used by the processBatch to match the object array returned.
581     int posInList = -1;
582     NonceGenerator ng = this.connection.getNonceGenerator();
583     for (Row r : rows) {
584       posInList++;
585       if (r instanceof Put) {
586         Put put = (Put) r;
587         if (put.isEmpty()) {
588           throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
589         }
590       }
591       Action<Row> action = new Action<Row>(r, posInList);
592       setNonce(ng, r, action);
593       actions.add(action);
594     }
595     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
596         tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
597     ars.groupAndSendMultiAction(actions, 1);
598     return ars;
599   }
600 
601   private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
602     if (!(r instanceof Append) && !(r instanceof Increment)) return;
603     action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
604   }
605 
606   /**
607    * The context, and return value, for a single submit/submitAll call.
608    * Note on how this class (one AP submit) works. Initially, all requests are split into groups
609    * by server; request is sent to each server in parallel; the RPC calls are not async so a
610    * thread per server is used. Every time some actions fail, regions/locations might have
611    * changed, so we re-group them by server and region again and send these groups in parallel
612    * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
613    * scheduling children. This is why lots of code doesn't require any synchronization.
614    */
615   protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
616 
617     /**
618      * Runnable (that can be submitted to thread pool) that waits for when it's time
619      * to issue replica calls, finds region replicas, groups the requests by replica and
620      * issues the calls (on separate threads, via sendMultiAction).
621      * This is done on a separate thread because we don't want to wait on user thread for
622      * our asynchronous call, and usually we have to wait before making replica calls.
623      */
624     private final class ReplicaCallIssuingRunnable implements Runnable {
625       private final long startTime;
626       private final List<Action<Row>> initialActions;
627 
628       public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
629         this.initialActions = initialActions;
630         this.startTime = startTime;
631       }
632 
633       @Override
634       public void run() {
635         boolean done = false;
636         if (primaryCallTimeoutMicroseconds > 0) {
637           try {
638             done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
639           } catch (InterruptedException ex) {
640             LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
641             return;
642           }
643         }
644         if (done) return; // Done within primary timeout
645         Map<ServerName, MultiAction<Row>> actionsByServer =
646             new HashMap<ServerName, MultiAction<Row>>();
647         List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
648         if (replicaGetIndices == null) {
649           for (int i = 0; i < results.length; ++i) {
650             addReplicaActions(i, actionsByServer, unknownLocActions);
651           }
652         } else {
653           for (int replicaGetIndice : replicaGetIndices) {
654             addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
655           }
656         }
657         if (!actionsByServer.isEmpty()) {
658           sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
659         }
660         if (!unknownLocActions.isEmpty()) {
661           actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
662           for (Action<Row> action : unknownLocActions) {
663             addReplicaActionsAgain(action, actionsByServer);
664           }
665           // Some actions may have completely failed, they are handled inside addAgain.
666           if (!actionsByServer.isEmpty()) {
667             sendMultiAction(actionsByServer, 1, null, true);
668           }
669         }
670       }
671 
672       /**
673        * Add replica actions to action map by server.
674        * @param index Index of the original action.
675        * @param actionsByServer The map by server to add it to.
676        */
677       private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
678           List<Action<Row>> unknownReplicaActions) {
679         if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
680         Action<Row> action = initialActions.get(index);
681         RegionLocations loc = findAllLocationsOrFail(action, true);
682         if (loc == null) return;
683         HRegionLocation[] locs = loc.getRegionLocations();
684         if (locs.length == 1) {
685           LOG.warn("No replicas found for " + action.getAction());
686           return;
687         }
688         synchronized (replicaResultLock) {
689           // Don't run replica calls if the original has finished. We could do it e.g. if
690           // original has already failed before first replica call (unlikely given retries),
691           // but that would require additional synchronization w.r.t. returning to caller.
692           if (results[index] != null) return;
693           // We set the number of calls here. After that any path must call setResult/setError.
694           // True even for replicas that are not found - if we refuse to send we MUST set error.
695           results[index] = new ReplicaResultState(locs.length);
696         }
697         for (int i = 1; i < locs.length; ++i) {
698           Action<Row> replicaAction = new Action<Row>(action, i);
699           if (locs[i] != null) {
700             addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
701                 replicaAction, actionsByServer, nonceGroup);
702           } else {
703             unknownReplicaActions.add(replicaAction);
704           }
705         }
706       }
707 
708       private void addReplicaActionsAgain(
709           Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
710         if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
711           throw new AssertionError("Cannot have default replica here");
712         }
713         HRegionLocation loc = getReplicaLocationOrFail(action);
714         if (loc == null) return;
715         addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
716             action, actionsByServer, nonceGroup);
717       }
718     }
719 
720     /**
721      * Runnable (that can be submitted to thread pool) that submits MultiAction to a
722      * single server. The server call is synchronous, therefore we do it on a thread pool.
723      */
724     private final class SingleServerRequestRunnable implements Runnable {
725       private final MultiAction<Row> multiAction;
726       private final int numAttempt;
727       private final ServerName server;
728       private final Set<MultiServerCallable<Row>> callsInProgress;
729 
730       private SingleServerRequestRunnable(
731           MultiAction<Row> multiAction, int numAttempt, ServerName server,
732           Set<MultiServerCallable<Row>> callsInProgress) {
733         this.multiAction = multiAction;
734         this.numAttempt = numAttempt;
735         this.server = server;
736         this.callsInProgress = callsInProgress;
737       }
738 
739       @Override
740       public void run() {
741         MultiResponse res;
742         MultiServerCallable<Row> callable = null;
743         try {
744           callable = createCallable(server, tableName, multiAction);
745           try {
746             RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
747             if (callsInProgress != null) callsInProgress.add(callable);
748             res = caller.callWithoutRetries(callable, timeout);
749 
750             if (res == null) {
751               // Cancelled
752               return;
753             }
754 
755           } catch (IOException e) {
756             // The service itself failed . It may be an error coming from the communication
757             //   layer, but, as well, a functional error raised by the server.
758             receiveGlobalFailure(multiAction, server, numAttempt, e);
759             return;
760           } catch (Throwable t) {
761             // This should not happen. Let's log & retry anyway.
762             LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
763                 " Retrying. Server is " + server + ", tableName=" + tableName, t);
764             receiveGlobalFailure(multiAction, server, numAttempt, t);
765             return;
766           }
767 
768           // Normal case: we received an answer from the server, and it's not an exception.
769           receiveMultiAction(multiAction, server, res, numAttempt);
770         } catch (Throwable t) {
771           // Something really bad happened. We are on the send thread that will now die.
772           LOG.error("Internal AsyncProcess #" + id + " error for "
773               + tableName + " processing for " + server, t);
774           throw new RuntimeException(t);
775         } finally {
776           decTaskCounters(multiAction.getRegions(), server);
777           if (callsInProgress != null && callable != null) {
778             callsInProgress.remove(callable);
779           }
780         }
781       }
782     }
783 
784     private final Batch.Callback<CResult> callback;
785     private final BatchErrors errors;
786     private final ConnectionManager.ServerErrorTracker errorsByServer;
787     private final ExecutorService pool;
788     private final Set<MultiServerCallable<Row>> callsInProgress;
789 
790 
791     private final TableName tableName;
792     private final AtomicLong actionsInProgress = new AtomicLong(-1);
793     /**
794      * The lock controls access to results. It is only held when populating results where
795      * there might be several callers (eventual consistency gets). For other requests,
796      * there's one unique call going on per result index.
797      */
798     private final Object replicaResultLock = new Object();
799     /**
800      * Result array.  Null if results are not needed. Otherwise, each index corresponds to
801      * the action index in initial actions submitted. For most request types, has null-s for
802      * requests that are not done, and result/exception for those that are done.
803      * For eventual-consistency gets, initially the same applies; at some point, replica calls
804      * might be started, and ReplicaResultState is put at the corresponding indices. The
805      * returning calls check the type to detect when this is the case. After all calls are done,
806      * ReplicaResultState-s are replaced with results for the user.
807      */
808     private final Object[] results;
809     /**
810      * Indices of replica gets in results. If null, all or no actions are replica-gets.
811      */
812     private final int[] replicaGetIndices;
813     private final boolean hasAnyReplicaGets;
814     private final long nonceGroup;
815 
816     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
817         ExecutorService pool, boolean needResults, Object[] results,
818         Batch.Callback<CResult> callback) {
819       this.pool = pool;
820       this.callback = callback;
821       this.nonceGroup = nonceGroup;
822       this.tableName = tableName;
823       this.actionsInProgress.set(actions.size());
824       if (results != null) {
825         assert needResults;
826         if (results.length != actions.size()) {
827           throw new AssertionError("results.length");
828         }
829         this.results = results;
830         for (int i = 0; i != this.results.length; ++i) {
831           results[i] = null;
832         }
833       } else {
834         this.results = needResults ? new Object[actions.size()] : null;
835       }
836       List<Integer> replicaGetIndices = null;
837       boolean hasAnyReplicaGets = false;
838       if (needResults) {
839         // Check to see if any requests might require replica calls.
840         // We expect that many requests will consist of all or no multi-replica gets; in such
841         // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
842         // store the list of action indexes for which replica gets are possible, and set
843         // hasAnyReplicaGets to true.
844         boolean hasAnyNonReplicaReqs = false;
845         int posInList = 0;
846         for (Action<Row> action : actions) {
847           boolean isReplicaGet = isReplicaGet(action.getAction());
848           if (isReplicaGet) {
849             hasAnyReplicaGets = true;
850             if (hasAnyNonReplicaReqs) { // Mixed case
851               if (replicaGetIndices == null) {
852                 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
853               }
854               replicaGetIndices.add(posInList);
855             }
856           } else if (!hasAnyNonReplicaReqs) {
857             // The first non-multi-replica request in the action list.
858             hasAnyNonReplicaReqs = true;
859             if (posInList > 0) {
860               // Add all the previous requests to the index lists. We know they are all
861               // replica-gets because this is the first non-multi-replica request in the list.
862               replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
863               for (int i = 0; i < posInList; ++i) {
864                 replicaGetIndices.add(i);
865               }
866             }
867           }
868           ++posInList;
869         }
870       }
871       this.hasAnyReplicaGets = hasAnyReplicaGets;
872       if (replicaGetIndices != null) {
873         this.replicaGetIndices = new int[replicaGetIndices.size()];
874         int i = 0;
875         for (Integer el : replicaGetIndices) {
876           this.replicaGetIndices[i++] = el;
877         }
878       } else {
879         this.replicaGetIndices = null;
880       }
881       this.callsInProgress = !hasAnyReplicaGets ? null :
882           Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
883 
884       this.errorsByServer = createServerErrorTracker();
885       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
886     }
887 
888     @VisibleForTesting
889     long getActionsInProgress() {
890       return actionsInProgress.get();
891     }
892 
893     public Set<MultiServerCallable<Row>> getCallsInProgress() {
894       return callsInProgress;
895     }
896 
897     /**
898      * Group a list of actions per region servers, and send them.
899      *
900      * @param currentActions - the list of row to submit
901      * @param numAttempt - the current numAttempt (first attempt is 1)
902      */
903     private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
904       Map<ServerName, MultiAction<Row>> actionsByServer =
905           new HashMap<ServerName, MultiAction<Row>>();
906 
907       boolean isReplica = false;
908       List<Action<Row>> unknownReplicaActions = null;
909       for (Action<Row> action : currentActions) {
910         RegionLocations locs = findAllLocationsOrFail(action, true);
911         if (locs == null) continue;
912         boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
913         if (isReplica && !isReplicaAction) {
914           // This is the property of the current implementation, not a requirement.
915           throw new AssertionError("Replica and non-replica actions in the same retry");
916         }
917         isReplica = isReplicaAction;
918         HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
919         if (loc == null || loc.getServerName() == null) {
920           if (isReplica) {
921             if (unknownReplicaActions == null) {
922               unknownReplicaActions = new ArrayList<Action<Row>>();
923             }
924             unknownReplicaActions.add(action);
925           } else {
926             // TODO: relies on primary location always being fetched
927             manageLocationError(action, null);
928           }
929         } else {
930           byte[] regionName = loc.getRegionInfo().getRegionName();
931           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
932         }
933       }
934       boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
935       boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
936 
937       if (!actionsByServer.isEmpty()) {
938         // If this is a first attempt to group and send, no replicas, we need replica thread.
939         sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
940             ? currentActions : null, numAttempt > 1 && !hasUnknown);
941       }
942 
943       if (hasUnknown) {
944         actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
945         for (Action<Row> action : unknownReplicaActions) {
946           HRegionLocation loc = getReplicaLocationOrFail(action);
947           if (loc == null) continue;
948           byte[] regionName = loc.getRegionInfo().getRegionName();
949           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
950         }
951         if (!actionsByServer.isEmpty()) {
952           sendMultiAction(
953               actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
954         }
955       }
956     }
957 
958     private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
959       // We are going to try get location once again. For each action, we'll do it once
960       // from cache, because the previous calls in the loop might populate it.
961       int replicaId = action.getReplicaId();
962       RegionLocations locs = findAllLocationsOrFail(action, true);
963       if (locs == null) return null; // manageError already called
964       HRegionLocation loc = locs.getRegionLocation(replicaId);
965       if (loc == null || loc.getServerName() == null) {
966         locs = findAllLocationsOrFail(action, false);
967         if (locs == null) return null; // manageError already called
968         loc = locs.getRegionLocation(replicaId);
969       }
970       if (loc == null || loc.getServerName() == null) {
971         manageLocationError(action, null);
972         return null;
973       }
974       return loc;
975     }
976 
977     private void manageLocationError(Action<Row> action, Exception ex) {
978       String msg = "Cannot get replica " + action.getReplicaId()
979           + " location for " + action.getAction();
980       LOG.error(msg);
981       if (ex == null) {
982         ex = new IOException(msg);
983       }
984       manageError(action.getOriginalIndex(), action.getAction(),
985           Retry.NO_LOCATION_PROBLEM, ex, null);
986     }
987 
988     private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
989       if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
990           ", row cannot be null");
991       RegionLocations loc = null;
992       try {
993         loc = connection.locateRegion(
994             tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
995       } catch (IOException ex) {
996         manageLocationError(action, ex);
997       }
998       return loc;
999     }
1000 
1001     /**
1002      * Send a multi action structure to the servers, after a delay depending on the attempt
1003      * number. Asynchronous.
1004      *
1005      * @param actionsByServer the actions structured by regions
1006      * @param numAttempt the attempt number.
1007      * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
1008      */
1009     private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
1010         int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
1011       // Run the last item on the same thread if we are already on a send thread.
1012       // We hope most of the time it will be the only item, so we can cut down on threads.
1013       int actionsRemaining = actionsByServer.size();
1014       // This iteration is by server (the HRegionLocation comparator is by server portion only).
1015       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
1016         ServerName server = e.getKey();
1017         MultiAction<Row> multiAction = e.getValue();
1018         incTaskCounters(multiAction.getRegions(), server);
1019         Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
1020             numAttempt);
1021         // make sure we correctly count the number of runnables before we try to reuse the send
1022         // thread, in case we had to split the request into different runnables because of backoff
1023         if (runnables.size() > actionsRemaining) {
1024           actionsRemaining = runnables.size();
1025         }
1026 
1027         // run all the runnables
1028         for (Runnable runnable : runnables) {
1029           if ((--actionsRemaining == 0) && reuseThread) {
1030             runnable.run();
1031           } else {
1032             try {
1033               pool.submit(runnable);
1034             } catch (Throwable t) {
1035               if (t instanceof RejectedExecutionException) {
1036                 // This should never happen. But as the pool is provided by the end user,
1037                // let's secure this a little.
1038                LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1039                   " Server is " + server.getServerName(), t);
1040               } else {
1041                 // see #HBASE-14359 for more details
1042                 LOG.warn("Caught unexpected exception/error: ", t);
1043               }
1044               decTaskCounters(multiAction.getRegions(), server);
1045               // We're likely to fail again, but this will increment the attempt counter,
1046              // so it will finish.
1047               receiveGlobalFailure(multiAction, server, numAttempt, t);
1048             }
1049           }
1050         }
1051       }
1052 
1053       if (actionsForReplicaThread != null) {
1054         startWaitingForReplicaCalls(actionsForReplicaThread);
1055       }
1056     }
1057 
1058     private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1059         MultiAction<Row> multiAction,
1060         int numAttempt) {
1061       // no stats to manage, just do the standard action
1062       if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1063         if (connection.getConnectionMetrics() != null) {
1064           connection.getConnectionMetrics().incrNormalRunners();
1065         }
1066         return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1067             new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1068       }
1069 
1070       // group the actions by the amount of delay
1071       Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1072           .size());
1073 
1074       // split up the actions
1075       for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1076         Long backoff = getBackoff(server, e.getKey());
1077         DelayingRunner runner = actions.get(backoff);
1078         if (runner == null) {
1079           actions.put(backoff, new DelayingRunner(backoff, e));
1080         } else {
1081           runner.add(e);
1082         }
1083       }
1084 
1085       List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1086       for (DelayingRunner runner : actions.values()) {
1087         String traceText = "AsyncProcess.sendMultiAction";
1088         Runnable runnable =
1089             new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1090                 callsInProgress);
1091         // use a delay runner only if we need to sleep for some time
1092         if (runner.getSleepTime() > 0) {
1093           runner.setRunner(runnable);
1094           traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1095           runnable = runner;
1096           if (connection.getConnectionMetrics() != null) {
1097             connection.getConnectionMetrics().incrDelayRunners();
1098             connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
1099           }
1100         } else {
1101           if (connection.getConnectionMetrics() != null) {
1102             connection.getConnectionMetrics().incrNormalRunners();
1103           }
1104         }
1105         runnable = Trace.wrap(traceText, runnable);
1106         toReturn.add(runnable);
1107 
1108       }
1109       return toReturn;
1110     }
1111 
1112     /**
1113      * @param server server location where the target region is hosted
1114      * @param regionName name of the region which we are going to write some data
1115      * @return the amount of time the client should wait until it submit a request to the
1116      * specified server and region
1117      */
1118     private Long getBackoff(ServerName server, byte[] regionName) {
1119       ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1120       ServerStatistics stats = tracker.getStats(server);
1121       return AsyncProcess.this.connection.getBackoffPolicy()
1122           .getBackoffTime(server, regionName, stats);
1123     }
1124 
1125     /**
1126      * Starts waiting to issue replica calls on a different thread; or issues them immediately.
1127      */
1128     private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1129       long startTime = EnvironmentEdgeManager.currentTime();
1130       ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1131           actionsForReplicaThread, startTime);
1132       if (primaryCallTimeoutMicroseconds == 0) {
1133         // Start replica calls immediately.
1134         replicaRunnable.run();
1135       } else {
1136         // Start the thread that may kick off replica gets.
1137         // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
1138         try {
1139           pool.submit(replicaRunnable);
1140         } catch (RejectedExecutionException ree) {
1141           LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1142         }
1143       }
1144     }
1145 
1146     /**
1147      * Check that we can retry acts accordingly: logs, set the error status.
1148      *
1149      * @param originalIndex the position in the list sent
1150      * @param row           the row
1151      * @param canRetry      if false, we won't retry whatever the settings.
1152      * @param throwable     the throwable, if any (can be null)
1153      * @param server        the location, if any (can be null)
1154      * @return true if the action can be retried, false otherwise.
1155      */
1156     public Retry manageError(int originalIndex, Row row, Retry canRetry,
1157                                 Throwable throwable, ServerName server) {
1158       if (canRetry == Retry.YES
1159           && throwable != null && (throwable instanceof DoNotRetryIOException ||
1160           throwable instanceof NeedUnmanagedConnectionException)) {
1161         canRetry = Retry.NO_NOT_RETRIABLE;
1162       }
1163 
1164       if (canRetry != Retry.YES) {
1165         // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
1166         setError(originalIndex, row, throwable, server);
1167       } else if (isActionComplete(originalIndex, row)) {
1168         canRetry = Retry.NO_OTHER_SUCCEEDED;
1169       }
1170       return canRetry;
1171     }
1172 
1173     /**
1174      * Resubmit all the actions from this multiaction after a failure.
1175      *
1176      * @param rsActions  the actions still to do from the initial list
1177      * @param server   the destination
1178      * @param numAttempt the number of attempts so far
1179      * @param t the throwable (if any) that caused the resubmit
1180      */
1181     private void receiveGlobalFailure(
1182         MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1183       errorsByServer.reportServerError(server);
1184       Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1185           ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1186 
1187       if (tableName == null) {
1188         // tableName is null when we made a cross-table RPC call.
1189         connection.clearCaches(server);
1190       }
1191       int failed = 0, stopped = 0;
1192       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1193       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1194         byte[] regionName = e.getKey();
1195         byte[] row = e.getValue().iterator().next().getAction().getRow();
1196         // Do not use the exception for updating cache because it might be coming from
1197         // any of the regions in the MultiAction.
1198         if (tableName != null) {
1199           connection.updateCachedLocations(tableName, regionName, row,
1200             ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
1201         }
1202         for (Action<Row> action : e.getValue()) {
1203           Retry retry = manageError(
1204               action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1205           if (retry == Retry.YES) {
1206             toReplay.add(action);
1207           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1208             ++stopped;
1209           } else {
1210             ++failed;
1211           }
1212         }
1213       }
1214 
1215       if (toReplay.isEmpty()) {
1216         logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1217       } else {
1218         resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1219       }
1220     }
1221 
1222     /**
1223      * Log as much info as possible, and, if there is something to replay,
1224      * submit it again after a back off sleep.
1225      */
1226     private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1227         int numAttempt, int failureCount, Throwable throwable) {
1228       // We have something to replay. We're going to sleep a little before.
1229 
1230       // We have two contradicting needs here:
1231       //  1) We want to get the new location after having slept, as it may change.
1232       //  2) We want to take into account the location when calculating the sleep time.
1233       //  3) If all this is just because the response needed to be chunked try again FAST.
1234       // It should be possible to have some heuristics to take the right decision. Short term,
1235       //  we go for one.
1236       boolean retryImmediately = throwable instanceof RetryImmediatelyException;
1237       int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
1238       long backOffTime = retryImmediately ? 0 :
1239           errorsByServer.calculateBackoffTime(oldServer, pause);
1240       if (numAttempt > startLogErrorsCnt) {
1241         // We use this value to have some logs when we have multiple failures, but not too many
1242         //  logs, as errors are to be expected when a region moves, splits and so on
1243         LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1244             oldServer, throwable, backOffTime, true, null, -1, -1));
1245       }
1246 
1247       try {
1248         if (backOffTime > 0) {
1249           Thread.sleep(backOffTime);
1250         }
1251       } catch (InterruptedException e) {
1252         LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1253         Thread.currentThread().interrupt();
1254         return;
1255       }
1256 
1257       groupAndSendMultiAction(toReplay, nextAttemptNumber);
1258     }
1259 
1260     private void logNoResubmit(ServerName oldServer, int numAttempt,
1261         int failureCount, Throwable throwable, int failed, int stopped) {
1262       if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1263         String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1264         String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1265             throwable, -1, false, timeStr, failed, stopped);
1266         if (failed != 0) {
1267           // Only log final failures as warning
1268           LOG.warn(logMessage);
1269         } else {
1270           LOG.info(logMessage);
1271         }
1272       }
1273     }
1274 
1275     /**
1276      * Called when we receive the result of a server query.
1277      *
1278      * @param multiAction    - the multiAction we sent
1279      * @param server       - the location. It's used as a server name.
1280      * @param responses      - the response, if any
1281      * @param numAttempt     - the attempt
1282      */
1283     private void receiveMultiAction(MultiAction<Row> multiAction,
1284         ServerName server, MultiResponse responses, int numAttempt) {
1285        assert responses != null;
1286 
1287       // Success or partial success
1288       // Analyze detailed results. We can still have individual failures to be redo.
1289       // two specific throwables are managed:
1290       //  - DoNotRetryIOException: we continue to retry for other actions
1291       //  - RegionMovedException: we update the cache with the new region location
1292 
1293       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1294       Throwable throwable = null;
1295       int failureCount = 0;
1296       Retry canRetry = null;
1297       Map<byte[], Map<Integer, Object>> results = responses.getResults();
1298       // Go by original action.
1299       int failed = 0;
1300       int stopped = 0;
1301       for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1302         byte[] regionName = regionEntry.getKey();
1303 
1304         Throwable regionException = responses.getExceptions().get(regionName);
1305         if (tableName == null && ClientExceptionsUtil.isMetaClearingException(regionException)) {
1306           // For multi-actions, we don't have a table name, but we want to make sure to clear the
1307           // cache in case there were location-related exceptions. We don't to clear the cache
1308           // for every possible exception that comes through, however.
1309           connection.clearCaches(server);
1310         }
1311 
1312         Map<Integer, Object> regionResults;
1313         if (results.containsKey(regionName)) {
1314           regionResults = results.get(regionName);
1315         } else {
1316           regionResults = Collections.emptyMap();
1317         }
1318 
1319         boolean regionFailureRegistered = false;
1320         for (Action<Row> sentAction : regionEntry.getValue()) {
1321           Object result = regionResults.get(sentAction.getOriginalIndex());
1322           if (result == null) {
1323             if (regionException == null) {
1324               LOG.error("Server sent us neither results nor exceptions for "
1325                 + Bytes.toStringBinary(regionName)
1326                 + ", numAttempt:" + numAttempt);
1327               regionException = new RuntimeException("Invalid response");
1328             }
1329             // If the row operation encounters the region-lever error, the exception of action may be
1330             // null.
1331             result = regionException;
1332           }
1333           // Failure: retry if it's make sense else update the errors lists
1334           if (result instanceof Throwable) {
1335             Row row = sentAction.getAction();
1336             throwable = regionException != null ? regionException
1337               : ClientExceptionsUtil.findException(result);
1338             // Register corresponding failures once per server/once per region.
1339             if (!regionFailureRegistered) {
1340               regionFailureRegistered = true;
1341               connection.updateCachedLocations(
1342                   tableName, regionName, row.getRow(), result, server);
1343             }
1344             if (canRetry == null) {
1345               errorsByServer.reportServerError(server);
1346               // We determine canRetry only once for all calls, after reporting server failure.
1347               canRetry = errorsByServer.canRetryMore(numAttempt) ?
1348                 Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1349             }
1350             ++failureCount;
1351             switch (manageError(sentAction.getOriginalIndex(), row, canRetry, (Throwable) result,
1352               server)) {
1353             case YES:
1354               toReplay.add(sentAction);
1355               break;
1356             case NO_OTHER_SUCCEEDED:
1357               ++stopped;
1358               break;
1359             default:
1360               ++failed;
1361               break;
1362             }
1363           } else {
1364             
1365             if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
1366               AsyncProcess.this.connection.getConnectionMetrics().
1367                       updateServerStats(server, regionName, result);
1368             }
1369 
1370             // update the stats about the region, if its a user table. We don't want to slow down
1371             // updates to meta tables, especially from internal updates (master, etc).
1372             if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1373               result = ResultStatsUtil.updateStats(result,
1374                   AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1375             }
1376 
1377             if (callback != null) {
1378               try {
1379                 //noinspection unchecked
1380                 // TODO: would callback expect a replica region name if it gets one?
1381                 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1382               } catch (Throwable t) {
1383                 LOG.error("User callback threw an exception for "
1384                     + Bytes.toStringBinary(regionName) + ", ignoring", t);
1385               }
1386             }
1387             setResult(sentAction, result);
1388           }
1389         }
1390       }
1391       if (toReplay.isEmpty()) {
1392         logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1393       } else {
1394         resubmit(server, toReplay, numAttempt, failureCount, throwable);
1395       }
1396     }
1397 
1398     private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1399         Throwable error, long backOffTime, boolean willRetry, String startTime,
1400         int failed, int stopped) {
1401       StringBuilder sb = new StringBuilder();
1402       sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1403         .append("attempt=").append(numAttempt)
1404         .append("/").append(numTries).append(" ");
1405 
1406       if (failureCount > 0 || error != null){
1407         sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1408             append(error == null ? "null" : error);
1409       } else {
1410         sb.append("succeeded");
1411       }
1412 
1413       sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1414 
1415       if (willRetry) {
1416         sb.append(", retrying after=").append(backOffTime).append("ms").
1417             append(", replay=").append(replaySize).append("ops");
1418       } else if (failureCount > 0) {
1419         if (stopped > 0) {
1420           sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1421         }
1422         if (failed > 0) {
1423           sb.append("; not retrying ").append(failed).append(" - final failure");
1424         }
1425 
1426       }
1427 
1428       return sb.toString();
1429     }
1430 
1431     /**
1432      * Sets the non-error result from a particular action.
1433      * @param action Action (request) that the server responded to.
1434      * @param result The result.
1435      */
1436     private void setResult(Action<Row> action, Object result) {
1437       if (result == null) {
1438         throw new RuntimeException("Result cannot be null");
1439       }
1440       ReplicaResultState state = null;
1441       boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1442       int index = action.getOriginalIndex();
1443       if (results == null) {
1444          decActionCounter(index);
1445          return; // Simple case, no replica requests.
1446       } else if ((state = trySetResultSimple(
1447           index, action.getAction(), false, result, null, isStale)) == null) {
1448         return; // Simple case, no replica requests.
1449       }
1450       assert state != null;
1451       // At this point we know that state is set to replica tracking class.
1452       // It could be that someone else is also looking at it; however, we know there can
1453       // only be one state object, and only one thread can set callCount to 0. Other threads
1454       // will either see state with callCount 0 after locking it; or will not see state at all
1455       // we will replace it with the result.
1456       synchronized (state) {
1457         if (state.callCount == 0) {
1458           return; // someone already set the result
1459         }
1460         state.callCount = 0;
1461       }
1462       synchronized (replicaResultLock) {
1463         if (results[index] != state) {
1464           throw new AssertionError("We set the callCount but someone else replaced the result");
1465         }
1466         results[index] = result;
1467       }
1468 
1469       decActionCounter(index);
1470     }
1471 
1472     /**
1473      * Sets the error from a particular action.
1474      * @param index Original action index.
1475      * @param row Original request.
1476      * @param throwable The resulting error.
1477      * @param server The source server.
1478      */
1479     private void setError(int index, Row row, Throwable throwable, ServerName server) {
1480       ReplicaResultState state = null;
1481       if (results == null) {
1482         // Note that we currently cannot have replica requests with null results. So it shouldn't
1483         // happen that multiple replica calls will call dAC for same actions with results == null.
1484         // Only one call per action should be present in this case.
1485         errors.add(throwable, row, server);
1486         decActionCounter(index);
1487         return; // Simple case, no replica requests.
1488       } else if ((state = trySetResultSimple(
1489           index, row, true, throwable, server, false)) == null) {
1490         return; // Simple case, no replica requests.
1491       }
1492       assert state != null;
1493       BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1494       boolean isActionDone = false;
1495       synchronized (state) {
1496         switch (state.callCount) {
1497           case 0: return; // someone already set the result
1498           case 1: { // All calls failed, we are the last error.
1499             target = errors;
1500             isActionDone = true;
1501             break;
1502           }
1503           default: {
1504             assert state.callCount > 1;
1505             if (state.replicaErrors == null) {
1506               state.replicaErrors = new BatchErrors();
1507             }
1508             target = state.replicaErrors;
1509             break;
1510           }
1511         }
1512         --state.callCount;
1513       }
1514       target.add(throwable, row, server);
1515       if (isActionDone) {
1516         if (state.replicaErrors != null) { // last call, no need to lock
1517           errors.merge(state.replicaErrors);
1518         }
1519         // See setResult for explanations.
1520         synchronized (replicaResultLock) {
1521           if (results[index] != state) {
1522             throw new AssertionError("We set the callCount but someone else replaced the result");
1523           }
1524           results[index] = throwable;
1525         }
1526         decActionCounter(index);
1527       }
1528     }
1529 
1530     /**
1531      * Checks if the action is complete; used on error to prevent needless retries.
1532      * Does not synchronize, assuming element index/field accesses are atomic.
1533      * This is an opportunistic optimization check, doesn't have to be strict.
1534      * @param index Original action index.
1535      * @param row Original request.
1536      */
1537     private boolean isActionComplete(int index, Row row) {
1538       if (!isReplicaGet(row)) return false;
1539       Object resObj = results[index];
1540       return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1541           || ((ReplicaResultState)resObj).callCount == 0);
1542     }
1543 
1544     /**
1545      * Tries to set the result or error for a particular action as if there were no replica calls.
1546      * @return null if successful; replica state if there were in fact replica calls.
1547      */
1548     private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1549         Object result, ServerName server, boolean isFromReplica) {
1550       Object resObj = null;
1551       if (!isReplicaGet(row)) {
1552         if (isFromReplica) {
1553           throw new AssertionError("Unexpected stale result for " + row);
1554         }
1555         results[index] = result;
1556       } else {
1557         synchronized (replicaResultLock) {
1558           if ((resObj = results[index]) == null) {
1559             if (isFromReplica) {
1560               throw new AssertionError("Unexpected stale result for " + row);
1561             }
1562             results[index] = result;
1563           }
1564         }
1565       }
1566 
1567       ReplicaResultState rrs =
1568           (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1569       if (rrs == null && isError) {
1570         // The resObj is not replica state (null or already set).
1571         errors.add((Throwable)result, row, server);
1572       }
1573 
1574       if (resObj == null) {
1575         // resObj is null - no replica calls were made.
1576         decActionCounter(index);
1577         return null;
1578       }
1579       return rrs;
1580     }
1581 
1582     private void decActionCounter(int index) {
1583       long actionsRemaining = actionsInProgress.decrementAndGet();
1584       if (actionsRemaining < 0) {
1585         String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1586         throw new AssertionError(error);
1587       } else if (actionsRemaining == 0) {
1588         synchronized (actionsInProgress) {
1589           actionsInProgress.notifyAll();
1590         }
1591       }
1592     }
1593 
1594     private String buildDetailedErrorMsg(String string, int index) {
1595       StringBuilder error = new StringBuilder(string);
1596       error.append("; called for ").
1597         append(index).
1598         append(", actionsInProgress ").
1599         append(actionsInProgress.get()).
1600         append("; replica gets: ");
1601       if (replicaGetIndices != null) {
1602         for (int i = 0; i < replicaGetIndices.length; ++i) {
1603           error.append(replicaGetIndices[i]).append(", ");
1604         }
1605       } else {
1606         error.append(hasAnyReplicaGets ? "all" : "none");
1607       }
1608       error.append("; results ");
1609       if (results != null) {
1610         for (int i = 0; i < results.length; ++i) {
1611           Object o = results[i];
1612           error.append(((o == null) ? "null" : o.toString())).append(", ");
1613         }
1614       }
1615       return error.toString();
1616     }
1617 
1618     @Override
1619     public void waitUntilDone() throws InterruptedIOException {
1620       try {
1621         waitUntilDone(Long.MAX_VALUE);
1622       } catch (InterruptedException iex) {
1623         throw new InterruptedIOException(iex.getMessage());
1624       } finally {
1625         if (callsInProgress != null) {
1626           for (MultiServerCallable<Row> clb : callsInProgress) {
1627             clb.cancel();
1628           }
1629         }
1630       }
1631     }
1632 
1633     private boolean waitUntilDone(long cutoff) throws InterruptedException {
1634       boolean hasWait = cutoff != Long.MAX_VALUE;
1635       long lastLog = EnvironmentEdgeManager.currentTime();
1636       long currentInProgress;
1637       while (0 != (currentInProgress = actionsInProgress.get())) {
1638         long now = EnvironmentEdgeManager.currentTime();
1639         if (hasWait && (now * 1000L) > cutoff) {
1640           return false;
1641         }
1642         if (!hasWait) { // Only log if wait is infinite.
1643           if (now > lastLog + 10000) {
1644             lastLog = now;
1645             LOG.info("#" + id + ", waiting for " + currentInProgress
1646                 + "  actions to finish on table: " + tableName);
1647             if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1648               logDetailsOfUndoneTasks(currentInProgress);
1649             }
1650           }
1651         }
1652         synchronized (actionsInProgress) {
1653           if (actionsInProgress.get() == 0) break;
1654           if (!hasWait) {
1655             actionsInProgress.wait(10);
1656           } else {
1657             long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1658             TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1659           }
1660         }
1661       }
1662       return true;
1663     }
1664 
1665     @Override
1666     public boolean hasError() {
1667       return errors.hasErrors();
1668     }
1669 
1670     @Override
1671     public List<? extends Row> getFailedOperations() {
1672       return errors.actions;
1673     }
1674 
1675     @Override
1676     public RetriesExhaustedWithDetailsException getErrors() {
1677       return errors.makeException();
1678     }
1679 
1680     @Override
1681     public Object[] getResults() throws InterruptedIOException {
1682       waitUntilDone();
1683       return results;
1684     }
1685   }
1686 
1687   @VisibleForTesting
1688   /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
1689   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1690       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1691       Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1692     return new AsyncRequestFutureImpl<CResult>(
1693         tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1694   }
1695 
1696   /**
1697    * Create a callable. Isolated to be easily overridden in the tests.
1698    */
1699   @VisibleForTesting
1700   protected MultiServerCallable<Row> createCallable(final ServerName server,
1701       TableName tableName, final MultiAction<Row> multi) {
1702     return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1703   }
1704 
1705   /**
1706    * Create a caller. Isolated to be easily overridden in the tests.
1707    */
1708   @VisibleForTesting
1709   protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1710     return rpcCallerFactory.<MultiResponse> newCaller();
1711   }
1712 
1713   @VisibleForTesting
1714   /** Waits until all outstanding tasks are done. Used in tests. */
1715   void waitUntilDone() throws InterruptedIOException {
1716     waitForMaximumCurrentTasks(0, null);
1717   }
1718 
1719   /** Wait until the async does not have more than max tasks in progress. */
1720   private void waitForMaximumCurrentTasks(int max, String tableName)
1721       throws InterruptedIOException {
1722     waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
1723   }
1724 
1725   // Break out this method so testable
1726   @VisibleForTesting
1727   void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
1728       String tableName) throws InterruptedIOException {
1729     long lastLog = EnvironmentEdgeManager.currentTime();
1730     long currentInProgress, oldInProgress = Long.MAX_VALUE;
1731     while ((currentInProgress = tasksInProgress.get()) > max) {
1732       if (oldInProgress != currentInProgress) { // Wait for in progress to change.
1733         long now = EnvironmentEdgeManager.currentTime();
1734         if (now > lastLog + 10000) {
1735           lastLog = now;
1736           LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1737               + max + ", tasksInProgress=" + currentInProgress +
1738               " hasError=" + hasError() + (tableName == null ? "" : ", tableName=" + tableName));
1739           if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1740             logDetailsOfUndoneTasks(currentInProgress);
1741           }
1742         }
1743       }
1744       oldInProgress = currentInProgress;
1745       try {
1746         synchronized (tasksInProgress) {
1747           if (tasksInProgress.get() == oldInProgress) {
1748             tasksInProgress.wait(10);
1749           }
1750         }
1751       } catch (InterruptedException e) {
1752         throw new InterruptedIOException("#" + id + ", interrupted." +
1753             " currentNumberOfTask=" + currentInProgress);
1754       }
1755     }
1756   }
1757 
1758   private void logDetailsOfUndoneTasks(long taskInProgress) {
1759     ArrayList<ServerName> servers = new ArrayList<ServerName>();
1760     for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
1761       if (entry.getValue().get() > 0) {
1762         servers.add(entry.getKey());
1763       }
1764     }
1765     LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
1766     if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
1767       ArrayList<String> regions = new ArrayList<String>();
1768       for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
1769         if (entry.getValue().get() > 0) {
1770           regions.add(Bytes.toString(entry.getKey()));
1771         }
1772       }
1773       LOG.info("Regions against which left over task(s) are processed: " + regions);
1774     }
1775   }
1776 
1777   /**
1778    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1779    * @return Whether there were any errors in any request since the last time
1780    *          {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created.
1781    */
1782   public boolean hasError() {
1783     return globalErrors.hasErrors();
1784   }
1785 
1786   /**
1787    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1788    * Waits for all previous operations to finish, and returns errors and (optionally)
1789    * failed operations themselves.
1790    * @param failedRows an optional list into which the rows that failed since the last time
1791    *        {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
1792    * @param tableName name of the table
1793    * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
1794    *          was called, or AP was created.
1795    */
1796   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1797       List<Row> failedRows, String tableName) throws InterruptedIOException {
1798     waitForMaximumCurrentTasks(0, tableName);
1799     if (!globalErrors.hasErrors()) {
1800       return null;
1801     }
1802     if (failedRows != null) {
1803       failedRows.addAll(globalErrors.actions);
1804     }
1805     RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1806     globalErrors.clear();
1807     return result;
1808   }
1809 
1810   /**
1811    * increment the tasks counters for a given set of regions. MT safe.
1812    */
1813   protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1814     tasksInProgress.incrementAndGet();
1815 
1816     AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1817     if (serverCnt == null) {
1818       taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1819       serverCnt = taskCounterPerServer.get(sn);
1820     }
1821     serverCnt.incrementAndGet();
1822 
1823     for (byte[] regBytes : regions) {
1824       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1825       if (regionCnt == null) {
1826         regionCnt = new AtomicInteger();
1827         AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1828         if (oldCnt != null) {
1829           regionCnt = oldCnt;
1830         }
1831       }
1832       regionCnt.incrementAndGet();
1833     }
1834   }
1835 
1836   /**
1837    * Decrements the counters for a given region and the region server. MT Safe.
1838    */
1839   protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1840     for (byte[] regBytes : regions) {
1841       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1842       regionCnt.decrementAndGet();
1843     }
1844 
1845     taskCounterPerServer.get(sn).decrementAndGet();
1846     tasksInProgress.decrementAndGet();
1847     synchronized (tasksInProgress) {
1848       tasksInProgress.notifyAll();
1849     }
1850   }
1851 
1852   /**
1853    * Creates the server error tracker to use inside process.
1854    * Currently, to preserve the main assumption about current retries, and to work well with
1855    * the retry-limit-based calculation, the calculation is local per Process object.
1856    * We may benefit from connection-wide tracking of server errors.
1857    * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1858    */
1859   protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1860     return new ConnectionManager.ServerErrorTracker(
1861         this.serverTrackerTimeout, this.numTries);
1862   }
1863 
1864   private static boolean isReplicaGet(Row row) {
1865     return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1866   }
1867 
1868   /**
1869    * For manageError. Only used to make logging more clear, we don't actually care why we don't retry.
1870    */
1871   private enum Retry {
1872     YES,
1873     NO_LOCATION_PROBLEM,
1874     NO_NOT_RETRIABLE,
1875     NO_RETRIES_EXHAUSTED,
1876     NO_OTHER_SUCCEEDED
1877   }
1878 }