View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Date;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.ConcurrentSkipListMap;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.hbase.RetryImmediatelyException;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.DoNotRetryIOException;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.RegionLocations;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
55  import org.apache.hadoop.hbase.client.coprocessor.Batch;
56  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
57  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60  import org.apache.htrace.Trace;
61  
62  import com.google.common.annotations.VisibleForTesting;
63  
64  /**
65   * This class  allows a continuous flow of requests. It's written to be compatible with a
66   * synchronous caller such as HTable.
67   * <p>
68   * The caller sends a buffer of operation, by calling submit. This class extract from this list
69   * the operations it can send, i.e. the operations that are on region that are not considered
70   * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
71   * iterate on the list. If, and only if, the maximum number of current task is reached, the call
72   * to submit will block. Alternatively, the caller can call submitAll, in which case all the
73   * operations will be sent. Each call to submit returns a future-like object that can be used
74   * to track operation progress.
75   * </p>
76   * <p>
77   * The class manages internally the retries.
78   * </p>
79   * <p>
80   * The class can be constructed in regular mode, or "global error" mode. In global error mode,
81   * AP tracks errors across all calls (each "future" also has global view of all errors). That
82   * mode is necessary for backward compat with HTable behavior, where multiple submissions are
83   * made and the errors can propagate using any put/flush call, from previous calls.
84   * In "regular" mode, the errors are tracked inside the Future object that is returned.
85   * The results are always tracked inside the Future object and can be retrieved when the call
86   * has finished. Partial results can also be retrieved if some part of multi-request failed.
87   * </p>
88   * <p>
89   * This class is thread safe in regular mode; in global error code, submitting operations and
90   * retrieving errors from different threads may be not thread safe.
91   * Internally, the class is thread safe enough to manage simultaneously new submission and results
92   * arising from older operations.
93   * </p>
94   * <p>
95   * Internally, this class works with {@link Row}, this mean it could be theoretically used for
96   * gets as well.
97   * </p>
98   */
99  @InterfaceAudience.Private
100 class AsyncProcess {
101   private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
102   protected static final AtomicLong COUNTER = new AtomicLong();
103 
104   public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
105 
106   /**
107    * Configure the number of failures after which the client will start logging. A few failures
108    * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
109    * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at
110    * this stage.
111    */
112   public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
113       "hbase.client.start.log.errors.counter";
114   public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
115 
116   /**
117    * Configuration to decide whether to log details for batch error
118    */
119   public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
120 
121   private final int thresholdToLogUndoneTaskDetails;
122   private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
123       "hbase.client.threshold.log.details";
124   private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
125   private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
126 
127   /**
128    * The context used to wait for results from one submit call.
129    * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
130    *    then errors and failed operations in this object will reflect global errors.
131    * 2) If submit call is made with needResults false, results will not be saved.
132    *  */
133   public static interface AsyncRequestFuture {
134     public boolean hasError();
135     public RetriesExhaustedWithDetailsException getErrors();
136     public List<? extends Row> getFailedOperations();
137     public Object[] getResults() throws InterruptedIOException;
138     /** Wait until all tasks are executed, successfully or not. */
139     public void waitUntilDone() throws InterruptedIOException;
140   }
141 
142   /**
143    * Return value from a submit that didn't contain any requests.
144    */
145   private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
146 
147     final Object[] result = new Object[0];
148 
149     @Override
150     public boolean hasError() {
151       return false;
152     }
153 
154     @Override
155     public RetriesExhaustedWithDetailsException getErrors() {
156       return null;
157     }
158 
159     @Override
160     public List<? extends Row> getFailedOperations() {
161       return null;
162     }
163 
164     @Override
165     public Object[] getResults() {
166       return result;
167     }
168 
169     @Override
170     public void waitUntilDone() throws InterruptedIOException {
171     }
172   };
173 
174   /** Sync point for calls to multiple replicas for the same user request (Get).
175    * Created and put in the results array (we assume replica calls require results) when
176    * the replica calls are launched. See results for details of this process.
177    * POJO, all fields are public. To modify them, the object itself is locked. */
178   private static class ReplicaResultState {
179     public ReplicaResultState(int callCount) {
180       this.callCount = callCount;
181     }
182 
183     /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
184     int callCount;
185     /** Errors for which it is not decided whether we will report them to user. If one of the
186      * calls succeeds, we will discard the errors that may have happened in the other calls. */
187     BatchErrors replicaErrors = null;
188 
189     @Override
190     public String toString() {
191       return "[call count " + callCount + "; errors " + replicaErrors + "]";
192     }
193   }
194 
195 
196   // TODO: many of the fields should be made private
197   protected final long id;
198 
199   protected final ClusterConnection connection;
200   protected final RpcRetryingCallerFactory rpcCallerFactory;
201   protected final RpcControllerFactory rpcFactory;
202   protected final BatchErrors globalErrors;
203   protected final ExecutorService pool;
204 
205   protected final AtomicLong tasksInProgress = new AtomicLong(0);
206   protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
207       new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
208   protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
209       new ConcurrentHashMap<ServerName, AtomicInteger>();
210 
211   // Start configuration settings.
212   private final int startLogErrorsCnt;
213 
214   /**
215    * The number of tasks simultaneously executed on the cluster.
216    */
217   protected final int maxTotalConcurrentTasks;
218 
219   /**
220    * The number of tasks we run in parallel on a single region.
221    * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
222    * a set of operations on a region before the previous one is done. As well, this limits
223    * the pressure we put on the region server.
224    */
225   protected final int maxConcurrentTasksPerRegion;
226 
227   /**
228    * The number of task simultaneously executed on a single region server.
229    */
230   protected final int maxConcurrentTasksPerServer;
231   protected final long pause;
232   protected int numTries;
233   protected int serverTrackerTimeout;
234   protected int timeout;
235   protected long primaryCallTimeoutMicroseconds;
236   // End configuration settings.
237 
238   protected static class BatchErrors {
239     private final List<Throwable> throwables = new ArrayList<Throwable>();
240     private final List<Row> actions = new ArrayList<Row>();
241     private final List<String> addresses = new ArrayList<String>();
242 
243     public synchronized void add(Throwable ex, Row row, ServerName serverName) {
244       if (row == null){
245         throw new IllegalArgumentException("row cannot be null. location=" + serverName);
246       }
247 
248       throwables.add(ex);
249       actions.add(row);
250       addresses.add(serverName != null ? serverName.toString() : "null");
251     }
252 
253     public boolean hasErrors() {
254       return !throwables.isEmpty();
255     }
256 
257     private synchronized RetriesExhaustedWithDetailsException makeException() {
258       return new RetriesExhaustedWithDetailsException(
259           new ArrayList<Throwable>(throwables),
260           new ArrayList<Row>(actions), new ArrayList<String>(addresses));
261     }
262 
263     public synchronized void clear() {
264       throwables.clear();
265       actions.clear();
266       addresses.clear();
267     }
268 
269     public synchronized void merge(BatchErrors other) {
270       throwables.addAll(other.throwables);
271       actions.addAll(other.actions);
272       addresses.addAll(other.addresses);
273     }
274   }
275 
276   public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
277       RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
278     if (hc == null) {
279       throw new IllegalArgumentException("HConnection cannot be null.");
280     }
281 
282     this.connection = hc;
283     this.pool = pool;
284     this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
285 
286     this.id = COUNTER.incrementAndGet();
287 
288     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
289         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
290     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
291         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
292     this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
293         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
294     this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
295 
296     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
297       HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
298     this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
299           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
300     this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
301           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
302 
303     this.startLogErrorsCnt =
304         conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
305 
306     if (this.maxTotalConcurrentTasks <= 0) {
307       throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
308     }
309     if (this.maxConcurrentTasksPerServer <= 0) {
310       throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
311           maxConcurrentTasksPerServer);
312     }
313     if (this.maxConcurrentTasksPerRegion <= 0) {
314       throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
315           maxConcurrentTasksPerRegion);
316     }
317 
318     // Server tracker allows us to do faster, and yet useful (hopefully), retries.
319     // However, if we are too useful, we might fail very quickly due to retry count limit.
320     // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
321     // retry time if normal retries were used. Then we will retry until this time runs out.
322     // If we keep hitting one server, the net effect will be the incremental backoff, and
323     // essentially the same number of retries as planned. If we have to do faster retries,
324     // we will do more retries in aggregate, but the user will be none the wiser.
325     this.serverTrackerTimeout = 0;
326     for (int i = 0; i < this.numTries; ++i) {
327       serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
328     }
329 
330     this.rpcCallerFactory = rpcCaller;
331     this.rpcFactory = rpcFactory;
332 
333     this.thresholdToLogUndoneTaskDetails =
334         conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
335           DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
336   }
337 
338   /**
339    * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
340    *         RuntimeException
341    */
342   private ExecutorService getPool(ExecutorService pool) {
343     if (pool != null) {
344       return pool;
345     }
346     if (this.pool != null) {
347       return this.pool;
348     }
349     throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
350   }
351 
352   /**
353    * See {@link #submit(ExecutorService, TableName, List, boolean, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, boolean)}.
354    * Uses default ExecutorService for this AP (must have been created with one).
355    */
356   public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
357       boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
358       throws InterruptedIOException {
359     return submit(null, tableName, rows, atLeastOne, callback, needResults);
360   }
361 
362   /**
363    * Extract from the rows list what we can submit. The rows we can not submit are kept in the
364    * list. Does not send requests to replicas (not currently used for anything other
365    * than streaming puts anyway).
366    *
367    * @param pool ExecutorService to use.
368    * @param tableName The table for which this request is needed.
369    * @param callback Batch callback. Only called on success (94 behavior).
370    * @param needResults Whether results are needed, or can be discarded.
371    * @param rows - the submitted row. Modified by the method: we remove the rows we took.
372    * @param atLeastOne true if we should submit at least a subset.
373    */
374   public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
375       List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
376       boolean needResults) throws InterruptedIOException {
377     if (rows.isEmpty()) {
378       return NO_REQS_RESULT;
379     }
380 
381     Map<ServerName, MultiAction<Row>> actionsByServer =
382         new HashMap<ServerName, MultiAction<Row>>();
383     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
384 
385     NonceGenerator ng = this.connection.getNonceGenerator();
386     long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
387 
388     // Location errors that happen before we decide what requests to take.
389     List<Exception> locationErrors = null;
390     List<Integer> locationErrorRows = null;
391     do {
392       // Wait until there is at least one slot for a new task.
393       waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
394 
395       // Remember the previous decisions about regions or region servers we put in the
396       //  final multi.
397       Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
398       Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
399 
400       int posInList = -1;
401       Iterator<? extends Row> it = rows.iterator();
402       while (it.hasNext()) {
403         Row r = it.next();
404         HRegionLocation loc;
405         try {
406           if (r == null) {
407             throw new IllegalArgumentException("#" + id + ", row cannot be null");
408           }
409           // Make sure we get 0-s replica.
410           RegionLocations locs = connection.locateRegion(
411               tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
412           if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
413             throw new IOException("#" + id + ", no location found, aborting submit for"
414                 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
415           }
416           loc = locs.getDefaultRegionLocation();
417         } catch (IOException ex) {
418           locationErrors = new ArrayList<Exception>();
419           locationErrorRows = new ArrayList<Integer>();
420           LOG.error("Failed to get region location ", ex);
421           // This action failed before creating ars. Retain it, but do not add to submit list.
422           // We will then add it to ars in an already-failed state.
423           retainedActions.add(new Action<Row>(r, ++posInList));
424           locationErrors.add(ex);
425           locationErrorRows.add(posInList);
426           it.remove();
427           break; // Backward compat: we stop considering actions on location error.
428         }
429 
430         if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
431           Action<Row> action = new Action<Row>(r, ++posInList);
432           setNonce(ng, r, action);
433           retainedActions.add(action);
434           // TODO: replica-get is not supported on this path
435           byte[] regionName = loc.getRegionInfo().getRegionName();
436           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
437           it.remove();
438         }
439       }
440     } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
441 
442     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
443 
444     return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
445       locationErrors, locationErrorRows, actionsByServer, pool);
446   }
447 
448   <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
449       List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
450       Object[] results, boolean needResults, List<Exception> locationErrors,
451       List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
452       ExecutorService pool) {
453     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
454       tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
455     // Add location errors if any
456     if (locationErrors != null) {
457       for (int i = 0; i < locationErrors.size(); ++i) {
458         int originalIndex = locationErrorRows.get(i);
459         Row row = retainedActions.get(originalIndex).getAction();
460         ars.manageError(originalIndex, row,
461           Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
462       }
463     }
464     ars.sendMultiAction(actionsByServer, 1, null, false);
465     return ars;
466   }
467 
468   /**
469    * Helper that is used when grouping the actions per region server.
470    *
471    * @param loc - the destination. Must not be null.
472    * @param action - the action to add to the multiaction
473    * @param actionsByServer the multiaction per server
474    * @param nonceGroup Nonce group.
475    */
476   private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
477       Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
478     MultiAction<Row> multiAction = actionsByServer.get(server);
479     if (multiAction == null) {
480       multiAction = new MultiAction<Row>();
481       actionsByServer.put(server, multiAction);
482     }
483     if (action.hasNonce() && !multiAction.hasNonceGroup()) {
484       multiAction.setNonceGroup(nonceGroup);
485     }
486 
487     multiAction.add(regionName, action);
488   }
489 
490   /**
491    * Check if we should send new operations to this region or region server.
492    * We're taking into account the past decision; if we have already accepted
493    * operation on a given region, we accept all operations for this region.
494    *
495    * @param loc; the region and the server name we want to use.
496    * @return true if this region is considered as busy.
497    */
498   protected boolean canTakeOperation(HRegionLocation loc,
499                                      Map<HRegionInfo, Boolean> regionsIncluded,
500                                      Map<ServerName, Boolean> serversIncluded) {
501     HRegionInfo regionInfo = loc.getRegionInfo();
502     Boolean regionPrevious = regionsIncluded.get(regionInfo);
503 
504     if (regionPrevious != null) {
505       // We already know what to do with this region.
506       return regionPrevious;
507     }
508 
509     Boolean serverPrevious = serversIncluded.get(loc.getServerName());
510     if (Boolean.FALSE.equals(serverPrevious)) {
511       // It's a new region, on a region server that we have already excluded.
512       regionsIncluded.put(regionInfo, Boolean.FALSE);
513       return false;
514     }
515 
516     AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
517     if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
518       // Too many tasks on this region already.
519       regionsIncluded.put(regionInfo, Boolean.FALSE);
520       return false;
521     }
522 
523     if (serverPrevious == null) {
524       // The region is ok, but we need to decide for this region server.
525       int newServers = 0; // number of servers we're going to contact so far
526       for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
527         if (kv.getValue()) {
528           newServers++;
529         }
530       }
531 
532       // Do we have too many total tasks already?
533       boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
534 
535       if (ok) {
536         // If the total is fine, is it ok for this individual server?
537         AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
538         ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
539       }
540 
541       if (!ok) {
542         regionsIncluded.put(regionInfo, Boolean.FALSE);
543         serversIncluded.put(loc.getServerName(), Boolean.FALSE);
544         return false;
545       }
546 
547       serversIncluded.put(loc.getServerName(), Boolean.TRUE);
548     } else {
549       assert serverPrevious.equals(Boolean.TRUE);
550     }
551 
552     regionsIncluded.put(regionInfo, Boolean.TRUE);
553 
554     return true;
555   }
556 
557   /**
558    * See {@link #submitAll(ExecutorService, TableName, List, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, Object[])}.
559    * Uses default ExecutorService for this AP (must have been created with one).
560    */
561   public <CResult> AsyncRequestFuture submitAll(TableName tableName,
562       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
563     return submitAll(null, tableName, rows, callback, results);
564   }
565 
566   /**
567    * Submit immediately the list of rows, whatever the server status. Kept for backward
568    * compatibility: it allows to be used with the batch interface that return an array of objects.
569    *
570    * @param pool ExecutorService to use.
571    * @param tableName name of the table for which the submission is made.
572    * @param rows the list of rows.
573    * @param callback the callback.
574    * @param results Optional array to return the results thru; backward compat.
575    */
576   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
577       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
578     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
579 
580     // The position will be used by the processBatch to match the object array returned.
581     int posInList = -1;
582     NonceGenerator ng = this.connection.getNonceGenerator();
583     for (Row r : rows) {
584       posInList++;
585       if (r instanceof Put) {
586         Put put = (Put) r;
587         if (put.isEmpty()) {
588           throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
589         }
590       }
591       Action<Row> action = new Action<Row>(r, posInList);
592       setNonce(ng, r, action);
593       actions.add(action);
594     }
595     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
596         tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
597     ars.groupAndSendMultiAction(actions, 1);
598     return ars;
599   }
600 
601   private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
602     if (!(r instanceof Append) && !(r instanceof Increment)) return;
603     action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
604   }
605 
606   /**
607    * The context, and return value, for a single submit/submitAll call.
608    * Note on how this class (one AP submit) works. Initially, all requests are split into groups
609    * by server; request is sent to each server in parallel; the RPC calls are not async so a
610    * thread per server is used. Every time some actions fail, regions/locations might have
611    * changed, so we re-group them by server and region again and send these groups in parallel
612    * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
613    * scheduling children. This is why lots of code doesn't require any synchronization.
614    */
615   protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
616 
617     /**
618      * Runnable (that can be submitted to thread pool) that waits for when it's time
619      * to issue replica calls, finds region replicas, groups the requests by replica and
620      * issues the calls (on separate threads, via sendMultiAction).
621      * This is done on a separate thread because we don't want to wait on user thread for
622      * our asynchronous call, and usually we have to wait before making replica calls.
623      */
624     private final class ReplicaCallIssuingRunnable implements Runnable {
625       private final long startTime;
626       private final List<Action<Row>> initialActions;
627 
628       public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
629         this.initialActions = initialActions;
630         this.startTime = startTime;
631       }
632 
633       @Override
634       public void run() {
635         boolean done = false;
636         if (primaryCallTimeoutMicroseconds > 0) {
637           try {
638             done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
639           } catch (InterruptedException ex) {
640             LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
641             return;
642           }
643         }
644         if (done) return; // Done within primary timeout
645         Map<ServerName, MultiAction<Row>> actionsByServer =
646             new HashMap<ServerName, MultiAction<Row>>();
647         List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
648         if (replicaGetIndices == null) {
649           for (int i = 0; i < results.length; ++i) {
650             addReplicaActions(i, actionsByServer, unknownLocActions);
651           }
652         } else {
653           for (int replicaGetIndice : replicaGetIndices) {
654             addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
655           }
656         }
657         if (!actionsByServer.isEmpty()) {
658           sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
659         }
660         if (!unknownLocActions.isEmpty()) {
661           actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
662           for (Action<Row> action : unknownLocActions) {
663             addReplicaActionsAgain(action, actionsByServer);
664           }
665           // Some actions may have completely failed, they are handled inside addAgain.
666           if (!actionsByServer.isEmpty()) {
667             sendMultiAction(actionsByServer, 1, null, true);
668           }
669         }
670       }
671 
672       /**
673        * Add replica actions to action map by server.
674        * @param index Index of the original action.
675        * @param actionsByServer The map by server to add it to.
676        */
677       private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
678           List<Action<Row>> unknownReplicaActions) {
679         if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
680         Action<Row> action = initialActions.get(index);
681         RegionLocations loc = findAllLocationsOrFail(action, true);
682         if (loc == null) return;
683         HRegionLocation[] locs = loc.getRegionLocations();
684         if (locs.length == 1) {
685           LOG.warn("No replicas found for " + action.getAction());
686           return;
687         }
688         synchronized (replicaResultLock) {
689           // Don't run replica calls if the original has finished. We could do it e.g. if
690           // original has already failed before first replica call (unlikely given retries),
691           // but that would require additional synchronization w.r.t. returning to caller.
692           if (results[index] != null) return;
693           // We set the number of calls here. After that any path must call setResult/setError.
694           // True even for replicas that are not found - if we refuse to send we MUST set error.
695           results[index] = new ReplicaResultState(locs.length);
696         }
697         for (int i = 1; i < locs.length; ++i) {
698           Action<Row> replicaAction = new Action<Row>(action, i);
699           if (locs[i] != null) {
700             addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
701                 replicaAction, actionsByServer, nonceGroup);
702           } else {
703             unknownReplicaActions.add(replicaAction);
704           }
705         }
706       }
707 
708       private void addReplicaActionsAgain(
709           Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
710         if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
711           throw new AssertionError("Cannot have default replica here");
712         }
713         HRegionLocation loc = getReplicaLocationOrFail(action);
714         if (loc == null) return;
715         addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
716             action, actionsByServer, nonceGroup);
717       }
718     }
719 
720     /**
721      * Runnable (that can be submitted to thread pool) that submits MultiAction to a
722      * single server. The server call is synchronous, therefore we do it on a thread pool.
723      */
724     private final class SingleServerRequestRunnable implements Runnable {
725       private final MultiAction<Row> multiAction;
726       private final int numAttempt;
727       private final ServerName server;
728       private final Set<MultiServerCallable<Row>> callsInProgress;
729 
730       private SingleServerRequestRunnable(
731           MultiAction<Row> multiAction, int numAttempt, ServerName server,
732           Set<MultiServerCallable<Row>> callsInProgress) {
733         this.multiAction = multiAction;
734         this.numAttempt = numAttempt;
735         this.server = server;
736         this.callsInProgress = callsInProgress;
737       }
738 
739       @Override
740       public void run() {
741         MultiResponse res;
742         MultiServerCallable<Row> callable = null;
743         try {
744           callable = createCallable(server, tableName, multiAction);
745           try {
746             RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
747             if (callsInProgress != null) callsInProgress.add(callable);
748             res = caller.callWithoutRetries(callable, timeout);
749 
750             if (res == null) {
751               // Cancelled
752               return;
753             }
754 
755           } catch (IOException e) {
756             // The service itself failed . It may be an error coming from the communication
757             //   layer, but, as well, a functional error raised by the server.
758             receiveGlobalFailure(multiAction, server, numAttempt, e);
759             return;
760           } catch (Throwable t) {
761             // This should not happen. Let's log & retry anyway.
762             LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
763                 " Retrying. Server is " + server + ", tableName=" + tableName, t);
764             receiveGlobalFailure(multiAction, server, numAttempt, t);
765             return;
766           }
767 
768           // Normal case: we received an answer from the server, and it's not an exception.
769           receiveMultiAction(multiAction, server, res, numAttempt);
770         } catch (Throwable t) {
771           // Something really bad happened. We are on the send thread that will now die.
772           LOG.error("Internal AsyncProcess #" + id + " error for "
773               + tableName + " processing for " + server, t);
774           throw new RuntimeException(t);
775         } finally {
776           decTaskCounters(multiAction.getRegions(), server);
777           if (callsInProgress != null && callable != null) {
778             callsInProgress.remove(callable);
779           }
780         }
781       }
782     }
783 
784     private final Batch.Callback<CResult> callback;
785     private final BatchErrors errors;
786     private final ConnectionManager.ServerErrorTracker errorsByServer;
787     private final ExecutorService pool;
788     private final Set<MultiServerCallable<Row>> callsInProgress;
789 
790 
791     private final TableName tableName;
792     private final AtomicLong actionsInProgress = new AtomicLong(-1);
793     /**
794      * The lock controls access to results. It is only held when populating results where
795      * there might be several callers (eventual consistency gets). For other requests,
796      * there's one unique call going on per result index.
797      */
798     private final Object replicaResultLock = new Object();
799     /**
800      * Result array.  Null if results are not needed. Otherwise, each index corresponds to
801      * the action index in initial actions submitted. For most request types, has null-s for
802      * requests that are not done, and result/exception for those that are done.
803      * For eventual-consistency gets, initially the same applies; at some point, replica calls
804      * might be started, and ReplicaResultState is put at the corresponding indices. The
805      * returning calls check the type to detect when this is the case. After all calls are done,
806      * ReplicaResultState-s are replaced with results for the user.
807      */
808     private final Object[] results;
809     /**
810      * Indices of replica gets in results. If null, all or no actions are replica-gets.
811      */
812     private final int[] replicaGetIndices;
813     private final boolean hasAnyReplicaGets;
814     private final long nonceGroup;
815 
816     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
817         ExecutorService pool, boolean needResults, Object[] results,
818         Batch.Callback<CResult> callback) {
819       this.pool = pool;
820       this.callback = callback;
821       this.nonceGroup = nonceGroup;
822       this.tableName = tableName;
823       this.actionsInProgress.set(actions.size());
824       if (results != null) {
825         assert needResults;
826         if (results.length != actions.size()) {
827           throw new AssertionError("results.length");
828         }
829         this.results = results;
830         for (int i = 0; i != this.results.length; ++i) {
831           results[i] = null;
832         }
833       } else {
834         this.results = needResults ? new Object[actions.size()] : null;
835       }
836       List<Integer> replicaGetIndices = null;
837       boolean hasAnyReplicaGets = false;
838       if (needResults) {
839         // Check to see if any requests might require replica calls.
840         // We expect that many requests will consist of all or no multi-replica gets; in such
841         // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
842         // store the list of action indexes for which replica gets are possible, and set
843         // hasAnyReplicaGets to true.
844         boolean hasAnyNonReplicaReqs = false;
845         int posInList = 0;
846         for (Action<Row> action : actions) {
847           boolean isReplicaGet = isReplicaGet(action.getAction());
848           if (isReplicaGet) {
849             hasAnyReplicaGets = true;
850             if (hasAnyNonReplicaReqs) { // Mixed case
851               if (replicaGetIndices == null) {
852                 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
853               }
854               replicaGetIndices.add(posInList);
855             }
856           } else if (!hasAnyNonReplicaReqs) {
857             // The first non-multi-replica request in the action list.
858             hasAnyNonReplicaReqs = true;
859             if (posInList > 0) {
860               // Add all the previous requests to the index lists. We know they are all
861               // replica-gets because this is the first non-multi-replica request in the list.
862               replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
863               for (int i = 0; i < posInList; ++i) {
864                 replicaGetIndices.add(i);
865               }
866             }
867           }
868           ++posInList;
869         }
870       }
871       this.hasAnyReplicaGets = hasAnyReplicaGets;
872       if (replicaGetIndices != null) {
873         this.replicaGetIndices = new int[replicaGetIndices.size()];
874         int i = 0;
875         for (Integer el : replicaGetIndices) {
876           this.replicaGetIndices[i++] = el;
877         }
878       } else {
879         this.replicaGetIndices = null;
880       }
881       this.callsInProgress = !hasAnyReplicaGets ? null :
882           Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
883 
884       this.errorsByServer = createServerErrorTracker();
885       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
886     }
887 
888     public Set<MultiServerCallable<Row>> getCallsInProgress() {
889       return callsInProgress;
890     }
891 
892     /**
893      * Group a list of actions per region servers, and send them.
894      *
895      * @param currentActions - the list of row to submit
896      * @param numAttempt - the current numAttempt (first attempt is 1)
897      */
898     private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
899       Map<ServerName, MultiAction<Row>> actionsByServer =
900           new HashMap<ServerName, MultiAction<Row>>();
901 
902       boolean isReplica = false;
903       List<Action<Row>> unknownReplicaActions = null;
904       for (Action<Row> action : currentActions) {
905         RegionLocations locs = findAllLocationsOrFail(action, true);
906         if (locs == null) continue;
907         boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
908         if (isReplica && !isReplicaAction) {
909           // This is the property of the current implementation, not a requirement.
910           throw new AssertionError("Replica and non-replica actions in the same retry");
911         }
912         isReplica = isReplicaAction;
913         HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
914         if (loc == null || loc.getServerName() == null) {
915           if (isReplica) {
916             if (unknownReplicaActions == null) {
917               unknownReplicaActions = new ArrayList<Action<Row>>();
918             }
919             unknownReplicaActions.add(action);
920           } else {
921             // TODO: relies on primary location always being fetched
922             manageLocationError(action, null);
923           }
924         } else {
925           byte[] regionName = loc.getRegionInfo().getRegionName();
926           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
927         }
928       }
929       boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
930       boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
931 
932       if (!actionsByServer.isEmpty()) {
933         // If this is a first attempt to group and send, no replicas, we need replica thread.
934         sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
935             ? currentActions : null, numAttempt > 1 && !hasUnknown);
936       }
937 
938       if (hasUnknown) {
939         actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
940         for (Action<Row> action : unknownReplicaActions) {
941           HRegionLocation loc = getReplicaLocationOrFail(action);
942           if (loc == null) continue;
943           byte[] regionName = loc.getRegionInfo().getRegionName();
944           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
945         }
946         if (!actionsByServer.isEmpty()) {
947           sendMultiAction(
948               actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
949         }
950       }
951     }
952 
953     private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
954       // We are going to try get location once again. For each action, we'll do it once
955       // from cache, because the previous calls in the loop might populate it.
956       int replicaId = action.getReplicaId();
957       RegionLocations locs = findAllLocationsOrFail(action, true);
958       if (locs == null) return null; // manageError already called
959       HRegionLocation loc = locs.getRegionLocation(replicaId);
960       if (loc == null || loc.getServerName() == null) {
961         locs = findAllLocationsOrFail(action, false);
962         if (locs == null) return null; // manageError already called
963         loc = locs.getRegionLocation(replicaId);
964       }
965       if (loc == null || loc.getServerName() == null) {
966         manageLocationError(action, null);
967         return null;
968       }
969       return loc;
970     }
971 
972     private void manageLocationError(Action<Row> action, Exception ex) {
973       String msg = "Cannot get replica " + action.getReplicaId()
974           + " location for " + action.getAction();
975       LOG.error(msg);
976       if (ex == null) {
977         ex = new IOException(msg);
978       }
979       manageError(action.getOriginalIndex(), action.getAction(),
980           Retry.NO_LOCATION_PROBLEM, ex, null);
981     }
982 
983     private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
984       if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
985           ", row cannot be null");
986       RegionLocations loc = null;
987       try {
988         loc = connection.locateRegion(
989             tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
990       } catch (IOException ex) {
991         manageLocationError(action, ex);
992       }
993       return loc;
994     }
995 
996     /**
997      * Send a multi action structure to the servers, after a delay depending on the attempt
998      * number. Asynchronous.
999      *
1000      * @param actionsByServer the actions structured by regions
1001      * @param numAttempt the attempt number.
1002      * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
1003      */
1004     private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
1005         int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
1006       // Run the last item on the same thread if we are already on a send thread.
1007       // We hope most of the time it will be the only item, so we can cut down on threads.
1008       int actionsRemaining = actionsByServer.size();
1009       // This iteration is by server (the HRegionLocation comparator is by server portion only).
1010       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
1011         ServerName server = e.getKey();
1012         MultiAction<Row> multiAction = e.getValue();
1013         incTaskCounters(multiAction.getRegions(), server);
1014         Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
1015             numAttempt);
1016         // make sure we correctly count the number of runnables before we try to reuse the send
1017         // thread, in case we had to split the request into different runnables because of backoff
1018         if (runnables.size() > actionsRemaining) {
1019           actionsRemaining = runnables.size();
1020         }
1021 
1022         // run all the runnables
1023         for (Runnable runnable : runnables) {
1024           if ((--actionsRemaining == 0) && reuseThread) {
1025             runnable.run();
1026           } else {
1027             try {
1028               pool.submit(runnable);
1029             } catch (Throwable t) {
1030               if (t instanceof RejectedExecutionException) {
1031                 // This should never happen. But as the pool is provided by the end user,
1032                // let's secure this a little.
1033                LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1034                   " Server is " + server.getServerName(), t);
1035               } else {
1036                 // see #HBASE-14359 for more details
1037                 LOG.warn("Caught unexpected exception/error: ", t);
1038               }
1039               decTaskCounters(multiAction.getRegions(), server);
1040               // We're likely to fail again, but this will increment the attempt counter,
1041              // so it will finish.
1042               receiveGlobalFailure(multiAction, server, numAttempt, t);
1043             }
1044           }
1045         }
1046       }
1047 
1048       if (actionsForReplicaThread != null) {
1049         startWaitingForReplicaCalls(actionsForReplicaThread);
1050       }
1051     }
1052 
1053     private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1054         MultiAction<Row> multiAction,
1055         int numAttempt) {
1056       // no stats to manage, just do the standard action
1057       if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1058         if (connection.getConnectionMetrics() != null) {
1059           connection.getConnectionMetrics().incrNormalRunners();
1060         }
1061         return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1062             new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1063       }
1064 
1065       // group the actions by the amount of delay
1066       Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1067           .size());
1068 
1069       // split up the actions
1070       for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1071         Long backoff = getBackoff(server, e.getKey());
1072         DelayingRunner runner = actions.get(backoff);
1073         if (runner == null) {
1074           actions.put(backoff, new DelayingRunner(backoff, e));
1075         } else {
1076           runner.add(e);
1077         }
1078       }
1079 
1080       List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1081       for (DelayingRunner runner : actions.values()) {
1082         String traceText = "AsyncProcess.sendMultiAction";
1083         Runnable runnable =
1084             new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1085                 callsInProgress);
1086         // use a delay runner only if we need to sleep for some time
1087         if (runner.getSleepTime() > 0) {
1088           runner.setRunner(runnable);
1089           traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1090           runnable = runner;
1091           if (connection.getConnectionMetrics() != null) {
1092             connection.getConnectionMetrics().incrDelayRunners();
1093             connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
1094           }
1095         } else {
1096           if (connection.getConnectionMetrics() != null) {
1097             connection.getConnectionMetrics().incrNormalRunners();
1098           }
1099         }
1100         runnable = Trace.wrap(traceText, runnable);
1101         toReturn.add(runnable);
1102 
1103       }
1104       return toReturn;
1105     }
1106 
1107     /**
1108      * @param server server location where the target region is hosted
1109      * @param regionName name of the region which we are going to write some data
1110      * @return the amount of time the client should wait until it submit a request to the
1111      * specified server and region
1112      */
1113     private Long getBackoff(ServerName server, byte[] regionName) {
1114       ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1115       ServerStatistics stats = tracker.getStats(server);
1116       return AsyncProcess.this.connection.getBackoffPolicy()
1117           .getBackoffTime(server, regionName, stats);
1118     }
1119 
1120     /**
1121      * Starts waiting to issue replica calls on a different thread; or issues them immediately.
1122      */
1123     private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1124       long startTime = EnvironmentEdgeManager.currentTime();
1125       ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1126           actionsForReplicaThread, startTime);
1127       if (primaryCallTimeoutMicroseconds == 0) {
1128         // Start replica calls immediately.
1129         replicaRunnable.run();
1130       } else {
1131         // Start the thread that may kick off replica gets.
1132         // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
1133         try {
1134           pool.submit(replicaRunnable);
1135         } catch (RejectedExecutionException ree) {
1136           LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1137         }
1138       }
1139     }
1140 
1141     /**
1142      * Check that we can retry acts accordingly: logs, set the error status.
1143      *
1144      * @param originalIndex the position in the list sent
1145      * @param row           the row
1146      * @param canRetry      if false, we won't retry whatever the settings.
1147      * @param throwable     the throwable, if any (can be null)
1148      * @param server        the location, if any (can be null)
1149      * @return true if the action can be retried, false otherwise.
1150      */
1151     public Retry manageError(int originalIndex, Row row, Retry canRetry,
1152                                 Throwable throwable, ServerName server) {
1153       if (canRetry == Retry.YES
1154           && throwable != null && (throwable instanceof DoNotRetryIOException ||
1155           throwable instanceof NeedUnmanagedConnectionException)) {
1156         canRetry = Retry.NO_NOT_RETRIABLE;
1157       }
1158 
1159       if (canRetry != Retry.YES) {
1160         // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
1161         setError(originalIndex, row, throwable, server);
1162       } else if (isActionComplete(originalIndex, row)) {
1163         canRetry = Retry.NO_OTHER_SUCCEEDED;
1164       }
1165       return canRetry;
1166     }
1167 
1168     /**
1169      * Resubmit all the actions from this multiaction after a failure.
1170      *
1171      * @param rsActions  the actions still to do from the initial list
1172      * @param server   the destination
1173      * @param numAttempt the number of attempts so far
1174      * @param t the throwable (if any) that caused the resubmit
1175      */
1176     private void receiveGlobalFailure(
1177         MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1178       errorsByServer.reportServerError(server);
1179       Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1180           ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1181 
1182       if (tableName == null) {
1183         // tableName is null when we made a cross-table RPC call.
1184         connection.clearCaches(server);
1185       }
1186       int failed = 0, stopped = 0;
1187       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1188       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1189         byte[] regionName = e.getKey();
1190         byte[] row = e.getValue().iterator().next().getAction().getRow();
1191         // Do not use the exception for updating cache because it might be coming from
1192         // any of the regions in the MultiAction.
1193         if (tableName != null) {
1194           connection.updateCachedLocations(tableName, regionName, row,
1195             ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
1196         }
1197         for (Action<Row> action : e.getValue()) {
1198           Retry retry = manageError(
1199               action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1200           if (retry == Retry.YES) {
1201             toReplay.add(action);
1202           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1203             ++stopped;
1204           } else {
1205             ++failed;
1206           }
1207         }
1208       }
1209 
1210       if (toReplay.isEmpty()) {
1211         logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1212       } else {
1213         resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1214       }
1215     }
1216 
1217     /**
1218      * Log as much info as possible, and, if there is something to replay,
1219      * submit it again after a back off sleep.
1220      */
1221     private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1222         int numAttempt, int failureCount, Throwable throwable) {
1223       // We have something to replay. We're going to sleep a little before.
1224 
1225       // We have two contradicting needs here:
1226       //  1) We want to get the new location after having slept, as it may change.
1227       //  2) We want to take into account the location when calculating the sleep time.
1228       //  3) If all this is just because the response needed to be chunked try again FAST.
1229       // It should be possible to have some heuristics to take the right decision. Short term,
1230       //  we go for one.
1231       boolean retryImmediately = throwable instanceof RetryImmediatelyException;
1232       int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
1233       long backOffTime = retryImmediately ? 0 :
1234           errorsByServer.calculateBackoffTime(oldServer, pause);
1235       if (numAttempt > startLogErrorsCnt) {
1236         // We use this value to have some logs when we have multiple failures, but not too many
1237         //  logs, as errors are to be expected when a region moves, splits and so on
1238         LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1239             oldServer, throwable, backOffTime, true, null, -1, -1));
1240       }
1241 
1242       try {
1243         if (backOffTime > 0) {
1244           Thread.sleep(backOffTime);
1245         }
1246       } catch (InterruptedException e) {
1247         LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1248         Thread.currentThread().interrupt();
1249         return;
1250       }
1251 
1252       groupAndSendMultiAction(toReplay, nextAttemptNumber);
1253     }
1254 
1255     private void logNoResubmit(ServerName oldServer, int numAttempt,
1256         int failureCount, Throwable throwable, int failed, int stopped) {
1257       if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1258         String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1259         String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1260             throwable, -1, false, timeStr, failed, stopped);
1261         if (failed != 0) {
1262           // Only log final failures as warning
1263           LOG.warn(logMessage);
1264         } else {
1265           LOG.info(logMessage);
1266         }
1267       }
1268     }
1269 
1270     /**
1271      * Called when we receive the result of a server query.
1272      *
1273      * @param multiAction    - the multiAction we sent
1274      * @param server       - the location. It's used as a server name.
1275      * @param responses      - the response, if any
1276      * @param numAttempt     - the attempt
1277      */
1278     private void receiveMultiAction(MultiAction<Row> multiAction,
1279         ServerName server, MultiResponse responses, int numAttempt) {
1280        assert responses != null;
1281 
1282       // Success or partial success
1283       // Analyze detailed results. We can still have individual failures to be redo.
1284       // two specific throwables are managed:
1285       //  - DoNotRetryIOException: we continue to retry for other actions
1286       //  - RegionMovedException: we update the cache with the new region location
1287 
1288       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1289       Throwable throwable = null;
1290       int failureCount = 0;
1291       boolean canRetry = true;
1292 
1293       // Go by original action.
1294       int failed = 0, stopped = 0;
1295       for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1296         byte[] regionName = regionEntry.getKey();
1297         Map<Integer, Object> regionResults = responses.getResults().get(regionName);
1298         if (regionResults == null) {
1299           if (!responses.getExceptions().containsKey(regionName)) {
1300             LOG.error("Server sent us neither results nor exceptions for "
1301                 + Bytes.toStringBinary(regionName));
1302             responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
1303           }
1304           continue;
1305         }
1306         boolean regionFailureRegistered = false;
1307         for (Action<Row> sentAction : regionEntry.getValue()) {
1308           Object result = regionResults.get(sentAction.getOriginalIndex());
1309           // Failure: retry if it's make sense else update the errors lists
1310           if (result == null || result instanceof Throwable) {
1311             Row row = sentAction.getAction();
1312             throwable = ClientExceptionsUtil.findException(result);
1313             // Register corresponding failures once per server/once per region.
1314             if (!regionFailureRegistered) {
1315               regionFailureRegistered = true;
1316               connection.updateCachedLocations(
1317                   tableName, regionName, row.getRow(), result, server);
1318             }
1319             if (failureCount == 0) {
1320               errorsByServer.reportServerError(server);
1321               // We determine canRetry only once for all calls, after reporting server failure.
1322               canRetry = errorsByServer.canRetryMore(numAttempt);
1323             }
1324             ++failureCount;
1325             Retry retry = manageError(sentAction.getOriginalIndex(), row,
1326                 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
1327             if (retry == Retry.YES) {
1328               toReplay.add(sentAction);
1329             } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1330               ++stopped;
1331             } else {
1332               ++failed;
1333             }
1334           } else {
1335             
1336             if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
1337               AsyncProcess.this.connection.getConnectionMetrics().
1338                       updateServerStats(server, regionName, result);
1339             }
1340 
1341             // update the stats about the region, if its a user table. We don't want to slow down
1342             // updates to meta tables, especially from internal updates (master, etc).
1343             if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1344               result = ResultStatsUtil.updateStats(result,
1345                   AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1346             }
1347 
1348             if (callback != null) {
1349               try {
1350                 //noinspection unchecked
1351                 // TODO: would callback expect a replica region name if it gets one?
1352                 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1353               } catch (Throwable t) {
1354                 LOG.error("User callback threw an exception for "
1355                     + Bytes.toStringBinary(regionName) + ", ignoring", t);
1356               }
1357             }
1358             setResult(sentAction, result);
1359           }
1360         }
1361       }
1362 
1363       // The failures global to a region. We will use for multiAction we sent previously to find the
1364       //   actions to replay.
1365       for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
1366         throwable = throwableEntry.getValue();
1367         byte[] region = throwableEntry.getKey();
1368         List<Action<Row>> actions = multiAction.actions.get(region);
1369         if (actions == null || actions.isEmpty()) {
1370           throw new IllegalStateException("Wrong response for the region: " +
1371               HRegionInfo.encodeRegionName(region));
1372         }
1373 
1374         if (failureCount == 0) {
1375           errorsByServer.reportServerError(server);
1376           canRetry = errorsByServer.canRetryMore(numAttempt);
1377         }
1378         if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
1379           // For multi-actions, we don't have a table name, but we want to make sure to clear the
1380           // cache in case there were location-related exceptions. We don't to clear the cache
1381           // for every possible exception that comes through, however.
1382           connection.clearCaches(server);
1383         } else {
1384           connection.updateCachedLocations(
1385               tableName, region, actions.get(0).getAction().getRow(), throwable, server);
1386         }
1387         failureCount += actions.size();
1388 
1389         for (Action<Row> action : actions) {
1390           Row row = action.getAction();
1391           Retry retry = manageError(action.getOriginalIndex(), row,
1392               canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
1393           if (retry == Retry.YES) {
1394             toReplay.add(action);
1395           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1396             ++stopped;
1397           } else {
1398             ++failed;
1399           }
1400         }
1401       }
1402 
1403       if (toReplay.isEmpty()) {
1404         logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1405       } else {
1406         resubmit(server, toReplay, numAttempt, failureCount, throwable);
1407       }
1408     }
1409 
1410     private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1411         Throwable error, long backOffTime, boolean willRetry, String startTime,
1412         int failed, int stopped) {
1413       StringBuilder sb = new StringBuilder();
1414       sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1415         .append("attempt=").append(numAttempt)
1416         .append("/").append(numTries).append(" ");
1417 
1418       if (failureCount > 0 || error != null){
1419         sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1420             append(error == null ? "null" : error);
1421       } else {
1422         sb.append("succeeded");
1423       }
1424 
1425       sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1426 
1427       if (willRetry) {
1428         sb.append(", retrying after=").append(backOffTime).append("ms").
1429             append(", replay=").append(replaySize).append("ops");
1430       } else if (failureCount > 0) {
1431         if (stopped > 0) {
1432           sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1433         }
1434         if (failed > 0) {
1435           sb.append("; not retrying ").append(failed).append(" - final failure");
1436         }
1437 
1438       }
1439 
1440       return sb.toString();
1441     }
1442 
1443     /**
1444      * Sets the non-error result from a particular action.
1445      * @param action Action (request) that the server responded to.
1446      * @param result The result.
1447      */
1448     private void setResult(Action<Row> action, Object result) {
1449       if (result == null) {
1450         throw new RuntimeException("Result cannot be null");
1451       }
1452       ReplicaResultState state = null;
1453       boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1454       int index = action.getOriginalIndex();
1455       if (results == null) {
1456          decActionCounter(index);
1457          return; // Simple case, no replica requests.
1458       } else if ((state = trySetResultSimple(
1459           index, action.getAction(), false, result, null, isStale)) == null) {
1460         return; // Simple case, no replica requests.
1461       }
1462       assert state != null;
1463       // At this point we know that state is set to replica tracking class.
1464       // It could be that someone else is also looking at it; however, we know there can
1465       // only be one state object, and only one thread can set callCount to 0. Other threads
1466       // will either see state with callCount 0 after locking it; or will not see state at all
1467       // we will replace it with the result.
1468       synchronized (state) {
1469         if (state.callCount == 0) {
1470           return; // someone already set the result
1471         }
1472         state.callCount = 0;
1473       }
1474       synchronized (replicaResultLock) {
1475         if (results[index] != state) {
1476           throw new AssertionError("We set the callCount but someone else replaced the result");
1477         }
1478         results[index] = result;
1479       }
1480 
1481       decActionCounter(index);
1482     }
1483 
1484     /**
1485      * Sets the error from a particular action.
1486      * @param index Original action index.
1487      * @param row Original request.
1488      * @param throwable The resulting error.
1489      * @param server The source server.
1490      */
1491     private void setError(int index, Row row, Throwable throwable, ServerName server) {
1492       ReplicaResultState state = null;
1493       if (results == null) {
1494         // Note that we currently cannot have replica requests with null results. So it shouldn't
1495         // happen that multiple replica calls will call dAC for same actions with results == null.
1496         // Only one call per action should be present in this case.
1497         errors.add(throwable, row, server);
1498         decActionCounter(index);
1499         return; // Simple case, no replica requests.
1500       } else if ((state = trySetResultSimple(
1501           index, row, true, throwable, server, false)) == null) {
1502         return; // Simple case, no replica requests.
1503       }
1504       assert state != null;
1505       BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1506       boolean isActionDone = false;
1507       synchronized (state) {
1508         switch (state.callCount) {
1509           case 0: return; // someone already set the result
1510           case 1: { // All calls failed, we are the last error.
1511             target = errors;
1512             isActionDone = true;
1513             break;
1514           }
1515           default: {
1516             assert state.callCount > 1;
1517             if (state.replicaErrors == null) {
1518               state.replicaErrors = new BatchErrors();
1519             }
1520             target = state.replicaErrors;
1521             break;
1522           }
1523         }
1524         --state.callCount;
1525       }
1526       target.add(throwable, row, server);
1527       if (isActionDone) {
1528         if (state.replicaErrors != null) { // last call, no need to lock
1529           errors.merge(state.replicaErrors);
1530         }
1531         // See setResult for explanations.
1532         synchronized (replicaResultLock) {
1533           if (results[index] != state) {
1534             throw new AssertionError("We set the callCount but someone else replaced the result");
1535           }
1536           results[index] = throwable;
1537         }
1538         decActionCounter(index);
1539       }
1540     }
1541 
1542     /**
1543      * Checks if the action is complete; used on error to prevent needless retries.
1544      * Does not synchronize, assuming element index/field accesses are atomic.
1545      * This is an opportunistic optimization check, doesn't have to be strict.
1546      * @param index Original action index.
1547      * @param row Original request.
1548      */
1549     private boolean isActionComplete(int index, Row row) {
1550       if (!isReplicaGet(row)) return false;
1551       Object resObj = results[index];
1552       return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1553           || ((ReplicaResultState)resObj).callCount == 0);
1554     }
1555 
1556     /**
1557      * Tries to set the result or error for a particular action as if there were no replica calls.
1558      * @return null if successful; replica state if there were in fact replica calls.
1559      */
1560     private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1561         Object result, ServerName server, boolean isFromReplica) {
1562       Object resObj = null;
1563       if (!isReplicaGet(row)) {
1564         if (isFromReplica) {
1565           throw new AssertionError("Unexpected stale result for " + row);
1566         }
1567         results[index] = result;
1568       } else {
1569         synchronized (replicaResultLock) {
1570           if ((resObj = results[index]) == null) {
1571             if (isFromReplica) {
1572               throw new AssertionError("Unexpected stale result for " + row);
1573             }
1574             results[index] = result;
1575           }
1576         }
1577       }
1578 
1579       ReplicaResultState rrs =
1580           (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1581       if (rrs == null && isError) {
1582         // The resObj is not replica state (null or already set).
1583         errors.add((Throwable)result, row, server);
1584       }
1585 
1586       if (resObj == null) {
1587         // resObj is null - no replica calls were made.
1588         decActionCounter(index);
1589         return null;
1590       }
1591       return rrs;
1592     }
1593 
1594     private void decActionCounter(int index) {
1595       long actionsRemaining = actionsInProgress.decrementAndGet();
1596       if (actionsRemaining < 0) {
1597         String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1598         throw new AssertionError(error);
1599       } else if (actionsRemaining == 0) {
1600         synchronized (actionsInProgress) {
1601           actionsInProgress.notifyAll();
1602         }
1603       }
1604     }
1605 
1606     private String buildDetailedErrorMsg(String string, int index) {
1607       StringBuilder error = new StringBuilder(string);
1608       error.append("; called for ").
1609         append(index).
1610         append(", actionsInProgress ").
1611         append(actionsInProgress.get()).
1612         append("; replica gets: ");
1613       if (replicaGetIndices != null) {
1614         for (int i = 0; i < replicaGetIndices.length; ++i) {
1615           error.append(replicaGetIndices[i]).append(", ");
1616         }
1617       } else {
1618         error.append(hasAnyReplicaGets ? "all" : "none");
1619       }
1620       error.append("; results ");
1621       if (results != null) {
1622         for (int i = 0; i < results.length; ++i) {
1623           Object o = results[i];
1624           error.append(((o == null) ? "null" : o.toString())).append(", ");
1625         }
1626       }
1627       return error.toString();
1628     }
1629 
1630     @Override
1631     public void waitUntilDone() throws InterruptedIOException {
1632       try {
1633         waitUntilDone(Long.MAX_VALUE);
1634       } catch (InterruptedException iex) {
1635         throw new InterruptedIOException(iex.getMessage());
1636       } finally {
1637         if (callsInProgress != null) {
1638           for (MultiServerCallable<Row> clb : callsInProgress) {
1639             clb.cancel();
1640           }
1641         }
1642       }
1643     }
1644 
1645     private boolean waitUntilDone(long cutoff) throws InterruptedException {
1646       boolean hasWait = cutoff != Long.MAX_VALUE;
1647       long lastLog = EnvironmentEdgeManager.currentTime();
1648       long currentInProgress;
1649       while (0 != (currentInProgress = actionsInProgress.get())) {
1650         long now = EnvironmentEdgeManager.currentTime();
1651         if (hasWait && (now * 1000L) > cutoff) {
1652           return false;
1653         }
1654         if (!hasWait) { // Only log if wait is infinite.
1655           if (now > lastLog + 10000) {
1656             lastLog = now;
1657             LOG.info("#" + id + ", waiting for " + currentInProgress
1658                 + "  actions to finish on table: " + tableName);
1659             if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1660               logDetailsOfUndoneTasks(currentInProgress);
1661             }
1662           }
1663         }
1664         synchronized (actionsInProgress) {
1665           if (actionsInProgress.get() == 0) break;
1666           if (!hasWait) {
1667             actionsInProgress.wait(10);
1668           } else {
1669             long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1670             TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1671           }
1672         }
1673       }
1674       return true;
1675     }
1676 
1677     @Override
1678     public boolean hasError() {
1679       return errors.hasErrors();
1680     }
1681 
1682     @Override
1683     public List<? extends Row> getFailedOperations() {
1684       return errors.actions;
1685     }
1686 
1687     @Override
1688     public RetriesExhaustedWithDetailsException getErrors() {
1689       return errors.makeException();
1690     }
1691 
1692     @Override
1693     public Object[] getResults() throws InterruptedIOException {
1694       waitUntilDone();
1695       return results;
1696     }
1697   }
1698 
1699   @VisibleForTesting
1700   /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
1701   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1702       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1703       Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1704     return new AsyncRequestFutureImpl<CResult>(
1705         tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1706   }
1707 
1708   /**
1709    * Create a callable. Isolated to be easily overridden in the tests.
1710    */
1711   @VisibleForTesting
1712   protected MultiServerCallable<Row> createCallable(final ServerName server,
1713       TableName tableName, final MultiAction<Row> multi) {
1714     return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1715   }
1716 
1717   /**
1718    * Create a caller. Isolated to be easily overridden in the tests.
1719    */
1720   @VisibleForTesting
1721   protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1722     return rpcCallerFactory.<MultiResponse> newCaller();
1723   }
1724 
1725   @VisibleForTesting
1726   /** Waits until all outstanding tasks are done. Used in tests. */
1727   void waitUntilDone() throws InterruptedIOException {
1728     waitForMaximumCurrentTasks(0, null);
1729   }
1730 
1731   /** Wait until the async does not have more than max tasks in progress. */
1732   private void waitForMaximumCurrentTasks(int max, String tableName)
1733       throws InterruptedIOException {
1734     waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
1735   }
1736 
1737   // Break out this method so testable
1738   @VisibleForTesting
1739   void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
1740       String tableName) throws InterruptedIOException {
1741     long lastLog = EnvironmentEdgeManager.currentTime();
1742     long currentInProgress, oldInProgress = Long.MAX_VALUE;
1743     while ((currentInProgress = tasksInProgress.get()) > max) {
1744       if (oldInProgress != currentInProgress) { // Wait for in progress to change.
1745         long now = EnvironmentEdgeManager.currentTime();
1746         if (now > lastLog + 10000) {
1747           lastLog = now;
1748           LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1749               + max + ", tasksInProgress=" + currentInProgress +
1750               " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
1751           if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1752             logDetailsOfUndoneTasks(currentInProgress);
1753           }
1754         }
1755       }
1756       oldInProgress = currentInProgress;
1757       try {
1758         synchronized (tasksInProgress) {
1759           if (tasksInProgress.get() == oldInProgress) {
1760             tasksInProgress.wait(10);
1761           }
1762         }
1763       } catch (InterruptedException e) {
1764         throw new InterruptedIOException("#" + id + ", interrupted." +
1765             " currentNumberOfTask=" + currentInProgress);
1766       }
1767     }
1768   }
1769 
1770   private void logDetailsOfUndoneTasks(long taskInProgress) {
1771     ArrayList<ServerName> servers = new ArrayList<ServerName>();
1772     for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
1773       if (entry.getValue().get() > 0) {
1774         servers.add(entry.getKey());
1775       }
1776     }
1777     LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
1778     if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
1779       ArrayList<String> regions = new ArrayList<String>();
1780       for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
1781         if (entry.getValue().get() > 0) {
1782           regions.add(Bytes.toString(entry.getKey()));
1783         }
1784       }
1785       LOG.info("Regions against which left over task(s) are processed: " + regions);
1786     }
1787   }
1788 
1789   /**
1790    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1791    * @return Whether there were any errors in any request since the last time
1792    *          {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created.
1793    */
1794   public boolean hasError() {
1795     return globalErrors.hasErrors();
1796   }
1797 
1798   /**
1799    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1800    * Waits for all previous operations to finish, and returns errors and (optionally)
1801    * failed operations themselves.
1802    * @param failedRows an optional list into which the rows that failed since the last time
1803    *        {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
1804    * @param tableName name of the table
1805    * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
1806    *          was called, or AP was created.
1807    */
1808   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1809       List<Row> failedRows, String tableName) throws InterruptedIOException {
1810     waitForMaximumCurrentTasks(0, tableName);
1811     if (!globalErrors.hasErrors()) {
1812       return null;
1813     }
1814     if (failedRows != null) {
1815       failedRows.addAll(globalErrors.actions);
1816     }
1817     RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1818     globalErrors.clear();
1819     return result;
1820   }
1821 
1822   /**
1823    * increment the tasks counters for a given set of regions. MT safe.
1824    */
1825   protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1826     tasksInProgress.incrementAndGet();
1827 
1828     AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1829     if (serverCnt == null) {
1830       taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1831       serverCnt = taskCounterPerServer.get(sn);
1832     }
1833     serverCnt.incrementAndGet();
1834 
1835     for (byte[] regBytes : regions) {
1836       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1837       if (regionCnt == null) {
1838         regionCnt = new AtomicInteger();
1839         AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1840         if (oldCnt != null) {
1841           regionCnt = oldCnt;
1842         }
1843       }
1844       regionCnt.incrementAndGet();
1845     }
1846   }
1847 
1848   /**
1849    * Decrements the counters for a given region and the region server. MT Safe.
1850    */
1851   protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1852     for (byte[] regBytes : regions) {
1853       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1854       regionCnt.decrementAndGet();
1855     }
1856 
1857     taskCounterPerServer.get(sn).decrementAndGet();
1858     tasksInProgress.decrementAndGet();
1859     synchronized (tasksInProgress) {
1860       tasksInProgress.notifyAll();
1861     }
1862   }
1863 
1864   /**
1865    * Creates the server error tracker to use inside process.
1866    * Currently, to preserve the main assumption about current retries, and to work well with
1867    * the retry-limit-based calculation, the calculation is local per Process object.
1868    * We may benefit from connection-wide tracking of server errors.
1869    * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1870    */
1871   protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1872     return new ConnectionManager.ServerErrorTracker(
1873         this.serverTrackerTimeout, this.numTries);
1874   }
1875 
1876   private static boolean isReplicaGet(Row row) {
1877     return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1878   }
1879 
1880   /**
1881    * For manageError. Only used to make logging more clear, we don't actually care why we don't retry.
1882    */
1883   private enum Retry {
1884     YES,
1885     NO_LOCATION_PROBLEM,
1886     NO_NOT_RETRIABLE,
1887     NO_RETRIES_EXHAUSTED,
1888     NO_OTHER_SUCCEEDED
1889   }
1890 }