View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Date;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.ConcurrentSkipListMap;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.hbase.RetryImmediatelyException;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.DoNotRetryIOException;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.RegionLocations;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
55  import org.apache.hadoop.hbase.client.coprocessor.Batch;
56  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
57  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60  import org.apache.htrace.Trace;
61  
62  import com.google.common.annotations.VisibleForTesting;
63  
64  /**
65   * This class  allows a continuous flow of requests. It's written to be compatible with a
66   * synchronous caller such as HTable.
67   * <p>
68   * The caller sends a buffer of operation, by calling submit. This class extract from this list
69   * the operations it can send, i.e. the operations that are on region that are not considered
70   * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
71   * iterate on the list. If, and only if, the maximum number of current task is reached, the call
72   * to submit will block. Alternatively, the caller can call submitAll, in which case all the
73   * operations will be sent. Each call to submit returns a future-like object that can be used
74   * to track operation progress.
75   * </p>
76   * <p>
77   * The class manages internally the retries.
78   * </p>
79   * <p>
80   * The class can be constructed in regular mode, or "global error" mode. In global error mode,
81   * AP tracks errors across all calls (each "future" also has global view of all errors). That
82   * mode is necessary for backward compat with HTable behavior, where multiple submissions are
83   * made and the errors can propagate using any put/flush call, from previous calls.
84   * In "regular" mode, the errors are tracked inside the Future object that is returned.
85   * The results are always tracked inside the Future object and can be retrieved when the call
86   * has finished. Partial results can also be retrieved if some part of multi-request failed.
87   * </p>
88   * <p>
89   * This class is thread safe in regular mode; in global error code, submitting operations and
90   * retrieving errors from different threads may be not thread safe.
91   * Internally, the class is thread safe enough to manage simultaneously new submission and results
92   * arising from older operations.
93   * </p>
94   * <p>
95   * Internally, this class works with {@link Row}, this mean it could be theoretically used for
96   * gets as well.
97   * </p>
98   */
99  @InterfaceAudience.Private
100 class AsyncProcess {
101   private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
102   protected static final AtomicLong COUNTER = new AtomicLong();
103 
104   public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
105 
106   /**
107    * Configure the number of failures after which the client will start logging. A few failures
108    * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
109    * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at
110    * this stage.
111    */
112   public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
113       "hbase.client.start.log.errors.counter";
114   public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
115 
116   /**
117    * Configuration to decide whether to log details for batch error
118    */
119   public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
120 
121   private final int thresholdToLogUndoneTaskDetails;
122   private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
123       "hbase.client.threshold.log.details";
124   private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
125   private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
126 
127   /**
128    * The context used to wait for results from one submit call.
129    * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
130    *    then errors and failed operations in this object will reflect global errors.
131    * 2) If submit call is made with needResults false, results will not be saved.
132    *  */
133   public static interface AsyncRequestFuture {
134     public boolean hasError();
135     public RetriesExhaustedWithDetailsException getErrors();
136     public List<? extends Row> getFailedOperations();
137     public Object[] getResults() throws InterruptedIOException;
138     /** Wait until all tasks are executed, successfully or not. */
139     public void waitUntilDone() throws InterruptedIOException;
140   }
141 
142   /**
143    * Return value from a submit that didn't contain any requests.
144    */
145   private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
146 
147     final Object[] result = new Object[0];
148 
149     @Override
150     public boolean hasError() {
151       return false;
152     }
153 
154     @Override
155     public RetriesExhaustedWithDetailsException getErrors() {
156       return null;
157     }
158 
159     @Override
160     public List<? extends Row> getFailedOperations() {
161       return null;
162     }
163 
164     @Override
165     public Object[] getResults() {
166       return result;
167     }
168 
169     @Override
170     public void waitUntilDone() throws InterruptedIOException {
171     }
172   };
173 
174   /** Sync point for calls to multiple replicas for the same user request (Get).
175    * Created and put in the results array (we assume replica calls require results) when
176    * the replica calls are launched. See results for details of this process.
177    * POJO, all fields are public. To modify them, the object itself is locked. */
178   private static class ReplicaResultState {
179     public ReplicaResultState(int callCount) {
180       this.callCount = callCount;
181     }
182 
183     /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
184     int callCount;
185     /** Errors for which it is not decided whether we will report them to user. If one of the
186      * calls succeeds, we will discard the errors that may have happened in the other calls. */
187     BatchErrors replicaErrors = null;
188 
189     @Override
190     public String toString() {
191       return "[call count " + callCount + "; errors " + replicaErrors + "]";
192     }
193   }
194 
195 
196   // TODO: many of the fields should be made private
197   protected final long id;
198 
199   protected final ClusterConnection connection;
200   protected final RpcRetryingCallerFactory rpcCallerFactory;
201   protected final RpcControllerFactory rpcFactory;
202   protected final BatchErrors globalErrors;
203   protected final ExecutorService pool;
204 
205   protected final AtomicLong tasksInProgress = new AtomicLong(0);
206   protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
207       new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
208   protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
209       new ConcurrentHashMap<ServerName, AtomicInteger>();
210 
211   // Start configuration settings.
212   private final int startLogErrorsCnt;
213 
214   /**
215    * The number of tasks simultaneously executed on the cluster.
216    */
217   protected final int maxTotalConcurrentTasks;
218 
219   /**
220    * The number of tasks we run in parallel on a single region.
221    * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
222    * a set of operations on a region before the previous one is done. As well, this limits
223    * the pressure we put on the region server.
224    */
225   protected final int maxConcurrentTasksPerRegion;
226 
227   /**
228    * The number of task simultaneously executed on a single region server.
229    */
230   protected final int maxConcurrentTasksPerServer;
231   protected final long pause;
232   protected int numTries;
233   protected int serverTrackerTimeout;
234   protected int timeout;
235   protected long primaryCallTimeoutMicroseconds;
236   // End configuration settings.
237 
238   protected static class BatchErrors {
239     private final List<Throwable> throwables = new ArrayList<Throwable>();
240     private final List<Row> actions = new ArrayList<Row>();
241     private final List<String> addresses = new ArrayList<String>();
242 
243     public synchronized void add(Throwable ex, Row row, ServerName serverName) {
244       if (row == null){
245         throw new IllegalArgumentException("row cannot be null. location=" + serverName);
246       }
247 
248       throwables.add(ex);
249       actions.add(row);
250       addresses.add(serverName != null ? serverName.toString() : "null");
251     }
252 
253     public boolean hasErrors() {
254       return !throwables.isEmpty();
255     }
256 
257     private synchronized RetriesExhaustedWithDetailsException makeException() {
258       return new RetriesExhaustedWithDetailsException(
259           new ArrayList<Throwable>(throwables),
260           new ArrayList<Row>(actions), new ArrayList<String>(addresses));
261     }
262 
263     public synchronized void clear() {
264       throwables.clear();
265       actions.clear();
266       addresses.clear();
267     }
268 
269     public synchronized void merge(BatchErrors other) {
270       throwables.addAll(other.throwables);
271       actions.addAll(other.actions);
272       addresses.addAll(other.addresses);
273     }
274   }
275 
276   public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
277       RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
278     if (hc == null) {
279       throw new IllegalArgumentException("HConnection cannot be null.");
280     }
281 
282     this.connection = hc;
283     this.pool = pool;
284     this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
285 
286     this.id = COUNTER.incrementAndGet();
287 
288     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
289         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
290     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
291         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
292     this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
293         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
294     this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
295 
296     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
297       HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
298     this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
299           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
300     this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
301           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
302 
303     this.startLogErrorsCnt =
304         conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
305 
306     if (this.maxTotalConcurrentTasks <= 0) {
307       throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
308     }
309     if (this.maxConcurrentTasksPerServer <= 0) {
310       throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
311           maxConcurrentTasksPerServer);
312     }
313     if (this.maxConcurrentTasksPerRegion <= 0) {
314       throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
315           maxConcurrentTasksPerRegion);
316     }
317 
318     // Server tracker allows us to do faster, and yet useful (hopefully), retries.
319     // However, if we are too useful, we might fail very quickly due to retry count limit.
320     // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
321     // retry time if normal retries were used. Then we will retry until this time runs out.
322     // If we keep hitting one server, the net effect will be the incremental backoff, and
323     // essentially the same number of retries as planned. If we have to do faster retries,
324     // we will do more retries in aggregate, but the user will be none the wiser.
325     this.serverTrackerTimeout = 0;
326     for (int i = 0; i < this.numTries; ++i) {
327       serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
328     }
329 
330     this.rpcCallerFactory = rpcCaller;
331     this.rpcFactory = rpcFactory;
332 
333     this.thresholdToLogUndoneTaskDetails =
334         conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
335           DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
336   }
337 
338   /**
339    * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
340    *         RuntimeException
341    */
342   private ExecutorService getPool(ExecutorService pool) {
343     if (pool != null) {
344       return pool;
345     }
346     if (this.pool != null) {
347       return this.pool;
348     }
349     throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
350   }
351 
352   /**
353    * See {@link #submit(ExecutorService, TableName, List, boolean, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, boolean)}.
354    * Uses default ExecutorService for this AP (must have been created with one).
355    */
356   public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
357       boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
358       throws InterruptedIOException {
359     return submit(null, tableName, rows, atLeastOne, callback, needResults);
360   }
361 
362   /**
363    * Extract from the rows list what we can submit. The rows we can not submit are kept in the
364    * list. Does not send requests to replicas (not currently used for anything other
365    * than streaming puts anyway).
366    *
367    * @param pool ExecutorService to use.
368    * @param tableName The table for which this request is needed.
369    * @param callback Batch callback. Only called on success (94 behavior).
370    * @param needResults Whether results are needed, or can be discarded.
371    * @param rows - the submitted row. Modified by the method: we remove the rows we took.
372    * @param atLeastOne true if we should submit at least a subset.
373    */
374   public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
375       List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
376       boolean needResults) throws InterruptedIOException {
377     if (rows.isEmpty()) {
378       return NO_REQS_RESULT;
379     }
380 
381     Map<ServerName, MultiAction<Row>> actionsByServer =
382         new HashMap<ServerName, MultiAction<Row>>();
383     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
384 
385     NonceGenerator ng = this.connection.getNonceGenerator();
386     long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
387 
388     // Location errors that happen before we decide what requests to take.
389     List<Exception> locationErrors = null;
390     List<Integer> locationErrorRows = null;
391     do {
392       // Wait until there is at least one slot for a new task.
393       waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
394 
395       // Remember the previous decisions about regions or region servers we put in the
396       //  final multi.
397       Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
398       Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
399 
400       int posInList = -1;
401       Iterator<? extends Row> it = rows.iterator();
402       while (it.hasNext()) {
403         Row r = it.next();
404         HRegionLocation loc;
405         try {
406           if (r == null) {
407             throw new IllegalArgumentException("#" + id + ", row cannot be null");
408           }
409           // Make sure we get 0-s replica.
410           RegionLocations locs = connection.locateRegion(
411               tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
412           if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
413             throw new IOException("#" + id + ", no location found, aborting submit for"
414                 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
415           }
416           loc = locs.getDefaultRegionLocation();
417         } catch (IOException ex) {
418           locationErrors = new ArrayList<Exception>();
419           locationErrorRows = new ArrayList<Integer>();
420           LOG.error("Failed to get region location ", ex);
421           // This action failed before creating ars. Retain it, but do not add to submit list.
422           // We will then add it to ars in an already-failed state.
423           retainedActions.add(new Action<Row>(r, ++posInList));
424           locationErrors.add(ex);
425           locationErrorRows.add(posInList);
426           it.remove();
427           break; // Backward compat: we stop considering actions on location error.
428         }
429 
430         if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
431           Action<Row> action = new Action<Row>(r, ++posInList);
432           setNonce(ng, r, action);
433           retainedActions.add(action);
434           // TODO: replica-get is not supported on this path
435           byte[] regionName = loc.getRegionInfo().getRegionName();
436           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
437           it.remove();
438         }
439       }
440     } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
441 
442     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
443 
444     return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
445       locationErrors, locationErrorRows, actionsByServer, pool);
446   }
447 
448   <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
449       List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
450       Object[] results, boolean needResults, List<Exception> locationErrors,
451       List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
452       ExecutorService pool) {
453     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
454       tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
455     // Add location errors if any
456     if (locationErrors != null) {
457       for (int i = 0; i < locationErrors.size(); ++i) {
458         int originalIndex = locationErrorRows.get(i);
459         Row row = retainedActions.get(originalIndex).getAction();
460         ars.manageError(originalIndex, row,
461           Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
462       }
463     }
464     ars.sendMultiAction(actionsByServer, 1, null, false);
465     return ars;
466   }
467 
468   /**
469    * Helper that is used when grouping the actions per region server.
470    *
471    * @param loc - the destination. Must not be null.
472    * @param action - the action to add to the multiaction
473    * @param actionsByServer the multiaction per server
474    * @param nonceGroup Nonce group.
475    */
476   private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
477       Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
478     MultiAction<Row> multiAction = actionsByServer.get(server);
479     if (multiAction == null) {
480       multiAction = new MultiAction<Row>();
481       actionsByServer.put(server, multiAction);
482     }
483     if (action.hasNonce() && !multiAction.hasNonceGroup()) {
484       multiAction.setNonceGroup(nonceGroup);
485     }
486 
487     multiAction.add(regionName, action);
488   }
489 
490   /**
491    * Check if we should send new operations to this region or region server.
492    * We're taking into account the past decision; if we have already accepted
493    * operation on a given region, we accept all operations for this region.
494    *
495    * @param loc; the region and the server name we want to use.
496    * @return true if this region is considered as busy.
497    */
498   protected boolean canTakeOperation(HRegionLocation loc,
499                                      Map<HRegionInfo, Boolean> regionsIncluded,
500                                      Map<ServerName, Boolean> serversIncluded) {
501     HRegionInfo regionInfo = loc.getRegionInfo();
502     Boolean regionPrevious = regionsIncluded.get(regionInfo);
503 
504     if (regionPrevious != null) {
505       // We already know what to do with this region.
506       return regionPrevious;
507     }
508 
509     Boolean serverPrevious = serversIncluded.get(loc.getServerName());
510     if (Boolean.FALSE.equals(serverPrevious)) {
511       // It's a new region, on a region server that we have already excluded.
512       regionsIncluded.put(regionInfo, Boolean.FALSE);
513       return false;
514     }
515 
516     AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
517     if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
518       // Too many tasks on this region already.
519       regionsIncluded.put(regionInfo, Boolean.FALSE);
520       return false;
521     }
522 
523     if (serverPrevious == null) {
524       // The region is ok, but we need to decide for this region server.
525       int newServers = 0; // number of servers we're going to contact so far
526       for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
527         if (kv.getValue()) {
528           newServers++;
529         }
530       }
531 
532       // Do we have too many total tasks already?
533       boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
534 
535       if (ok) {
536         // If the total is fine, is it ok for this individual server?
537         AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
538         ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
539       }
540 
541       if (!ok) {
542         regionsIncluded.put(regionInfo, Boolean.FALSE);
543         serversIncluded.put(loc.getServerName(), Boolean.FALSE);
544         return false;
545       }
546 
547       serversIncluded.put(loc.getServerName(), Boolean.TRUE);
548     } else {
549       assert serverPrevious.equals(Boolean.TRUE);
550     }
551 
552     regionsIncluded.put(regionInfo, Boolean.TRUE);
553 
554     return true;
555   }
556 
557   /**
558    * See {@link #submitAll(ExecutorService, TableName, List, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, Object[])}.
559    * Uses default ExecutorService for this AP (must have been created with one).
560    */
561   public <CResult> AsyncRequestFuture submitAll(TableName tableName,
562       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
563     return submitAll(null, tableName, rows, callback, results);
564   }
565 
566   /**
567    * Submit immediately the list of rows, whatever the server status. Kept for backward
568    * compatibility: it allows to be used with the batch interface that return an array of objects.
569    *
570    * @param pool ExecutorService to use.
571    * @param tableName name of the table for which the submission is made.
572    * @param rows the list of rows.
573    * @param callback the callback.
574    * @param results Optional array to return the results thru; backward compat.
575    */
576   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
577       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
578     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
579 
580     // The position will be used by the processBatch to match the object array returned.
581     int posInList = -1;
582     NonceGenerator ng = this.connection.getNonceGenerator();
583     for (Row r : rows) {
584       posInList++;
585       if (r instanceof Put) {
586         Put put = (Put) r;
587         if (put.isEmpty()) {
588           throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
589         }
590       }
591       Action<Row> action = new Action<Row>(r, posInList);
592       setNonce(ng, r, action);
593       actions.add(action);
594     }
595     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
596         tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
597     ars.groupAndSendMultiAction(actions, 1);
598     return ars;
599   }
600 
601   private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
602     if (!(r instanceof Append) && !(r instanceof Increment)) return;
603     action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
604   }
605 
606   /**
607    * The context, and return value, for a single submit/submitAll call.
608    * Note on how this class (one AP submit) works. Initially, all requests are split into groups
609    * by server; request is sent to each server in parallel; the RPC calls are not async so a
610    * thread per server is used. Every time some actions fail, regions/locations might have
611    * changed, so we re-group them by server and region again and send these groups in parallel
612    * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
613    * scheduling children. This is why lots of code doesn't require any synchronization.
614    */
615   protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
616 
617     /**
618      * Runnable (that can be submitted to thread pool) that waits for when it's time
619      * to issue replica calls, finds region replicas, groups the requests by replica and
620      * issues the calls (on separate threads, via sendMultiAction).
621      * This is done on a separate thread because we don't want to wait on user thread for
622      * our asynchronous call, and usually we have to wait before making replica calls.
623      */
624     private final class ReplicaCallIssuingRunnable implements Runnable {
625       private final long startTime;
626       private final List<Action<Row>> initialActions;
627 
628       public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
629         this.initialActions = initialActions;
630         this.startTime = startTime;
631       }
632 
633       @Override
634       public void run() {
635         boolean done = false;
636         if (primaryCallTimeoutMicroseconds > 0) {
637           try {
638             done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
639           } catch (InterruptedException ex) {
640             LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
641             return;
642           }
643         }
644         if (done) return; // Done within primary timeout
645         Map<ServerName, MultiAction<Row>> actionsByServer =
646             new HashMap<ServerName, MultiAction<Row>>();
647         List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
648         if (replicaGetIndices == null) {
649           for (int i = 0; i < results.length; ++i) {
650             addReplicaActions(i, actionsByServer, unknownLocActions);
651           }
652         } else {
653           for (int replicaGetIndice : replicaGetIndices) {
654             addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
655           }
656         }
657         if (!actionsByServer.isEmpty()) {
658           sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
659         }
660         if (!unknownLocActions.isEmpty()) {
661           actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
662           for (Action<Row> action : unknownLocActions) {
663             addReplicaActionsAgain(action, actionsByServer);
664           }
665           // Some actions may have completely failed, they are handled inside addAgain.
666           if (!actionsByServer.isEmpty()) {
667             sendMultiAction(actionsByServer, 1, null, true);
668           }
669         }
670       }
671 
672       /**
673        * Add replica actions to action map by server.
674        * @param index Index of the original action.
675        * @param actionsByServer The map by server to add it to.
676        */
677       private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
678           List<Action<Row>> unknownReplicaActions) {
679         if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
680         Action<Row> action = initialActions.get(index);
681         RegionLocations loc = findAllLocationsOrFail(action, true);
682         if (loc == null) return;
683         HRegionLocation[] locs = loc.getRegionLocations();
684         if (locs.length == 1) {
685           LOG.warn("No replicas found for " + action.getAction());
686           return;
687         }
688         synchronized (replicaResultLock) {
689           // Don't run replica calls if the original has finished. We could do it e.g. if
690           // original has already failed before first replica call (unlikely given retries),
691           // but that would require additional synchronization w.r.t. returning to caller.
692           if (results[index] != null) return;
693           // We set the number of calls here. After that any path must call setResult/setError.
694           // True even for replicas that are not found - if we refuse to send we MUST set error.
695           results[index] = new ReplicaResultState(locs.length);
696         }
697         for (int i = 1; i < locs.length; ++i) {
698           Action<Row> replicaAction = new Action<Row>(action, i);
699           if (locs[i] != null) {
700             addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
701                 replicaAction, actionsByServer, nonceGroup);
702           } else {
703             unknownReplicaActions.add(replicaAction);
704           }
705         }
706       }
707 
708       private void addReplicaActionsAgain(
709           Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
710         if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
711           throw new AssertionError("Cannot have default replica here");
712         }
713         HRegionLocation loc = getReplicaLocationOrFail(action);
714         if (loc == null) return;
715         addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
716             action, actionsByServer, nonceGroup);
717       }
718     }
719 
720     /**
721      * Runnable (that can be submitted to thread pool) that submits MultiAction to a
722      * single server. The server call is synchronous, therefore we do it on a thread pool.
723      */
724     private final class SingleServerRequestRunnable implements Runnable {
725       private final MultiAction<Row> multiAction;
726       private final int numAttempt;
727       private final ServerName server;
728       private final Set<MultiServerCallable<Row>> callsInProgress;
729 
730       private SingleServerRequestRunnable(
731           MultiAction<Row> multiAction, int numAttempt, ServerName server,
732           Set<MultiServerCallable<Row>> callsInProgress) {
733         this.multiAction = multiAction;
734         this.numAttempt = numAttempt;
735         this.server = server;
736         this.callsInProgress = callsInProgress;
737       }
738 
739       @Override
740       public void run() {
741         MultiResponse res;
742         MultiServerCallable<Row> callable = null;
743         try {
744           callable = createCallable(server, tableName, multiAction);
745           try {
746             RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
747             if (callsInProgress != null) callsInProgress.add(callable);
748             res = caller.callWithoutRetries(callable, timeout);
749 
750             if (res == null) {
751               // Cancelled
752               return;
753             }
754 
755           } catch (IOException e) {
756             // The service itself failed . It may be an error coming from the communication
757             //   layer, but, as well, a functional error raised by the server.
758             receiveGlobalFailure(multiAction, server, numAttempt, e);
759             return;
760           } catch (Throwable t) {
761             // This should not happen. Let's log & retry anyway.
762             LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
763                 " Retrying. Server is " + server + ", tableName=" + tableName, t);
764             receiveGlobalFailure(multiAction, server, numAttempt, t);
765             return;
766           }
767 
768           // Normal case: we received an answer from the server, and it's not an exception.
769           receiveMultiAction(multiAction, server, res, numAttempt);
770         } catch (Throwable t) {
771           // Something really bad happened. We are on the send thread that will now die.
772           LOG.error("Internal AsyncProcess #" + id + " error for "
773               + tableName + " processing for " + server, t);
774           throw new RuntimeException(t);
775         } finally {
776           decTaskCounters(multiAction.getRegions(), server);
777           if (callsInProgress != null && callable != null) {
778             callsInProgress.remove(callable);
779           }
780         }
781       }
782     }
783 
784     private final Batch.Callback<CResult> callback;
785     private final BatchErrors errors;
786     private final ConnectionManager.ServerErrorTracker errorsByServer;
787     private final ExecutorService pool;
788     private final Set<MultiServerCallable<Row>> callsInProgress;
789 
790 
791     private final TableName tableName;
792     private final AtomicLong actionsInProgress = new AtomicLong(-1);
793     /**
794      * The lock controls access to results. It is only held when populating results where
795      * there might be several callers (eventual consistency gets). For other requests,
796      * there's one unique call going on per result index.
797      */
798     private final Object replicaResultLock = new Object();
799     /**
800      * Result array.  Null if results are not needed. Otherwise, each index corresponds to
801      * the action index in initial actions submitted. For most request types, has null-s for
802      * requests that are not done, and result/exception for those that are done.
803      * For eventual-consistency gets, initially the same applies; at some point, replica calls
804      * might be started, and ReplicaResultState is put at the corresponding indices. The
805      * returning calls check the type to detect when this is the case. After all calls are done,
806      * ReplicaResultState-s are replaced with results for the user.
807      */
808     private final Object[] results;
809     /**
810      * Indices of replica gets in results. If null, all or no actions are replica-gets.
811      */
812     private final int[] replicaGetIndices;
813     private final boolean hasAnyReplicaGets;
814     private final long nonceGroup;
815 
816     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
817         ExecutorService pool, boolean needResults, Object[] results,
818         Batch.Callback<CResult> callback) {
819       this.pool = pool;
820       this.callback = callback;
821       this.nonceGroup = nonceGroup;
822       this.tableName = tableName;
823       this.actionsInProgress.set(actions.size());
824       if (results != null) {
825         assert needResults;
826         if (results.length != actions.size()) {
827           throw new AssertionError("results.length");
828         }
829         this.results = results;
830         for (int i = 0; i != this.results.length; ++i) {
831           results[i] = null;
832         }
833       } else {
834         this.results = needResults ? new Object[actions.size()] : null;
835       }
836       List<Integer> replicaGetIndices = null;
837       boolean hasAnyReplicaGets = false;
838       if (needResults) {
839         // Check to see if any requests might require replica calls.
840         // We expect that many requests will consist of all or no multi-replica gets; in such
841         // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
842         // store the list of action indexes for which replica gets are possible, and set
843         // hasAnyReplicaGets to true.
844         boolean hasAnyNonReplicaReqs = false;
845         int posInList = 0;
846         for (Action<Row> action : actions) {
847           boolean isReplicaGet = isReplicaGet(action.getAction());
848           if (isReplicaGet) {
849             hasAnyReplicaGets = true;
850             if (hasAnyNonReplicaReqs) { // Mixed case
851               if (replicaGetIndices == null) {
852                 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
853               }
854               replicaGetIndices.add(posInList);
855             }
856           } else if (!hasAnyNonReplicaReqs) {
857             // The first non-multi-replica request in the action list.
858             hasAnyNonReplicaReqs = true;
859             if (posInList > 0) {
860               // Add all the previous requests to the index lists. We know they are all
861               // replica-gets because this is the first non-multi-replica request in the list.
862               replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
863               for (int i = 0; i < posInList; ++i) {
864                 replicaGetIndices.add(i);
865               }
866             }
867           }
868           ++posInList;
869         }
870       }
871       this.hasAnyReplicaGets = hasAnyReplicaGets;
872       if (replicaGetIndices != null) {
873         this.replicaGetIndices = new int[replicaGetIndices.size()];
874         int i = 0;
875         for (Integer el : replicaGetIndices) {
876           this.replicaGetIndices[i++] = el;
877         }
878       } else {
879         this.replicaGetIndices = null;
880       }
881       this.callsInProgress = !hasAnyReplicaGets ? null :
882           Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
883 
884       this.errorsByServer = createServerErrorTracker();
885       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
886     }
887 
888     @VisibleForTesting
889     long getActionsInProgress() {
890       return actionsInProgress.get();
891     }
892 
893     public Set<MultiServerCallable<Row>> getCallsInProgress() {
894       return callsInProgress;
895     }
896 
897     /**
898      * Group a list of actions per region servers, and send them.
899      *
900      * @param currentActions - the list of row to submit
901      * @param numAttempt - the current numAttempt (first attempt is 1)
902      */
903     private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
904       Map<ServerName, MultiAction<Row>> actionsByServer =
905           new HashMap<ServerName, MultiAction<Row>>();
906 
907       boolean isReplica = false;
908       List<Action<Row>> unknownReplicaActions = null;
909       for (Action<Row> action : currentActions) {
910         RegionLocations locs = findAllLocationsOrFail(action, true);
911         if (locs == null) continue;
912         boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
913         if (isReplica && !isReplicaAction) {
914           // This is the property of the current implementation, not a requirement.
915           throw new AssertionError("Replica and non-replica actions in the same retry");
916         }
917         isReplica = isReplicaAction;
918         HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
919         if (loc == null || loc.getServerName() == null) {
920           if (isReplica) {
921             if (unknownReplicaActions == null) {
922               unknownReplicaActions = new ArrayList<Action<Row>>();
923             }
924             unknownReplicaActions.add(action);
925           } else {
926             // TODO: relies on primary location always being fetched
927             manageLocationError(action, null);
928           }
929         } else {
930           byte[] regionName = loc.getRegionInfo().getRegionName();
931           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
932         }
933       }
934       boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
935       boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
936 
937       if (!actionsByServer.isEmpty()) {
938         // If this is a first attempt to group and send, no replicas, we need replica thread.
939         sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
940             ? currentActions : null, numAttempt > 1 && !hasUnknown);
941       }
942 
943       if (hasUnknown) {
944         actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
945         for (Action<Row> action : unknownReplicaActions) {
946           HRegionLocation loc = getReplicaLocationOrFail(action);
947           if (loc == null) continue;
948           byte[] regionName = loc.getRegionInfo().getRegionName();
949           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
950         }
951         if (!actionsByServer.isEmpty()) {
952           sendMultiAction(
953               actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
954         }
955       }
956     }
957 
958     private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
959       // We are going to try get location once again. For each action, we'll do it once
960       // from cache, because the previous calls in the loop might populate it.
961       int replicaId = action.getReplicaId();
962       RegionLocations locs = findAllLocationsOrFail(action, true);
963       if (locs == null) return null; // manageError already called
964       HRegionLocation loc = locs.getRegionLocation(replicaId);
965       if (loc == null || loc.getServerName() == null) {
966         locs = findAllLocationsOrFail(action, false);
967         if (locs == null) return null; // manageError already called
968         loc = locs.getRegionLocation(replicaId);
969       }
970       if (loc == null || loc.getServerName() == null) {
971         manageLocationError(action, null);
972         return null;
973       }
974       return loc;
975     }
976 
977     private void manageLocationError(Action<Row> action, Exception ex) {
978       String msg = "Cannot get replica " + action.getReplicaId()
979           + " location for " + action.getAction();
980       LOG.error(msg);
981       if (ex == null) {
982         ex = new IOException(msg);
983       }
984       manageError(action.getOriginalIndex(), action.getAction(),
985           Retry.NO_LOCATION_PROBLEM, ex, null);
986     }
987 
988     private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
989       if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
990           ", row cannot be null");
991       RegionLocations loc = null;
992       try {
993         loc = connection.locateRegion(
994             tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
995       } catch (IOException ex) {
996         manageLocationError(action, ex);
997       }
998       return loc;
999     }
1000 
1001     /**
1002      * Send a multi action structure to the servers, after a delay depending on the attempt
1003      * number. Asynchronous.
1004      *
1005      * @param actionsByServer the actions structured by regions
1006      * @param numAttempt the attempt number.
1007      * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
1008      */
1009     private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
1010         int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
1011       // Run the last item on the same thread if we are already on a send thread.
1012       // We hope most of the time it will be the only item, so we can cut down on threads.
1013       int actionsRemaining = actionsByServer.size();
1014       // This iteration is by server (the HRegionLocation comparator is by server portion only).
1015       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
1016         ServerName server = e.getKey();
1017         MultiAction<Row> multiAction = e.getValue();
1018         incTaskCounters(multiAction.getRegions(), server);
1019         Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
1020             numAttempt);
1021         // make sure we correctly count the number of runnables before we try to reuse the send
1022         // thread, in case we had to split the request into different runnables because of backoff
1023         if (runnables.size() > actionsRemaining) {
1024           actionsRemaining = runnables.size();
1025         }
1026 
1027         // run all the runnables
1028         for (Runnable runnable : runnables) {
1029           if ((--actionsRemaining == 0) && reuseThread) {
1030             runnable.run();
1031           } else {
1032             try {
1033               pool.submit(runnable);
1034             } catch (Throwable t) {
1035               if (t instanceof RejectedExecutionException) {
1036                 // This should never happen. But as the pool is provided by the end user,
1037                // let's secure this a little.
1038                LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1039                   " Server is " + server.getServerName(), t);
1040               } else {
1041                 // see #HBASE-14359 for more details
1042                 LOG.warn("Caught unexpected exception/error: ", t);
1043               }
1044               decTaskCounters(multiAction.getRegions(), server);
1045               // We're likely to fail again, but this will increment the attempt counter,
1046              // so it will finish.
1047               receiveGlobalFailure(multiAction, server, numAttempt, t);
1048             }
1049           }
1050         }
1051       }
1052 
1053       if (actionsForReplicaThread != null) {
1054         startWaitingForReplicaCalls(actionsForReplicaThread);
1055       }
1056     }
1057 
1058     private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1059         MultiAction<Row> multiAction,
1060         int numAttempt) {
1061       // no stats to manage, just do the standard action
1062       if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1063         if (connection.getConnectionMetrics() != null) {
1064           connection.getConnectionMetrics().incrNormalRunners();
1065         }
1066         return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1067             new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1068       }
1069 
1070       // group the actions by the amount of delay
1071       Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1072           .size());
1073 
1074       // split up the actions
1075       for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1076         Long backoff = getBackoff(server, e.getKey());
1077         DelayingRunner runner = actions.get(backoff);
1078         if (runner == null) {
1079           actions.put(backoff, new DelayingRunner(backoff, e));
1080         } else {
1081           runner.add(e);
1082         }
1083       }
1084 
1085       List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1086       for (DelayingRunner runner : actions.values()) {
1087         String traceText = "AsyncProcess.sendMultiAction";
1088         Runnable runnable =
1089             new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1090                 callsInProgress);
1091         // use a delay runner only if we need to sleep for some time
1092         if (runner.getSleepTime() > 0) {
1093           runner.setRunner(runnable);
1094           traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1095           runnable = runner;
1096           if (connection.getConnectionMetrics() != null) {
1097             connection.getConnectionMetrics().incrDelayRunners();
1098             connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
1099           }
1100         } else {
1101           if (connection.getConnectionMetrics() != null) {
1102             connection.getConnectionMetrics().incrNormalRunners();
1103           }
1104         }
1105         runnable = Trace.wrap(traceText, runnable);
1106         toReturn.add(runnable);
1107 
1108       }
1109       return toReturn;
1110     }
1111 
1112     /**
1113      * @param server server location where the target region is hosted
1114      * @param regionName name of the region which we are going to write some data
1115      * @return the amount of time the client should wait until it submit a request to the
1116      * specified server and region
1117      */
1118     private Long getBackoff(ServerName server, byte[] regionName) {
1119       ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1120       ServerStatistics stats = tracker.getStats(server);
1121       return AsyncProcess.this.connection.getBackoffPolicy()
1122           .getBackoffTime(server, regionName, stats);
1123     }
1124 
1125     /**
1126      * Starts waiting to issue replica calls on a different thread; or issues them immediately.
1127      */
1128     private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1129       long startTime = EnvironmentEdgeManager.currentTime();
1130       ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1131           actionsForReplicaThread, startTime);
1132       if (primaryCallTimeoutMicroseconds == 0) {
1133         // Start replica calls immediately.
1134         replicaRunnable.run();
1135       } else {
1136         // Start the thread that may kick off replica gets.
1137         // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
1138         try {
1139           pool.submit(replicaRunnable);
1140         } catch (RejectedExecutionException ree) {
1141           LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1142         }
1143       }
1144     }
1145 
1146     /**
1147      * Check that we can retry acts accordingly: logs, set the error status.
1148      *
1149      * @param originalIndex the position in the list sent
1150      * @param row           the row
1151      * @param canRetry      if false, we won't retry whatever the settings.
1152      * @param throwable     the throwable, if any (can be null)
1153      * @param server        the location, if any (can be null)
1154      * @return true if the action can be retried, false otherwise.
1155      */
1156     public Retry manageError(int originalIndex, Row row, Retry canRetry,
1157                                 Throwable throwable, ServerName server) {
1158       if (canRetry == Retry.YES
1159           && throwable != null && (throwable instanceof DoNotRetryIOException ||
1160           throwable instanceof NeedUnmanagedConnectionException)) {
1161         canRetry = Retry.NO_NOT_RETRIABLE;
1162       }
1163 
1164       if (canRetry != Retry.YES) {
1165         // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
1166         setError(originalIndex, row, throwable, server);
1167       } else if (isActionComplete(originalIndex, row)) {
1168         canRetry = Retry.NO_OTHER_SUCCEEDED;
1169       }
1170       return canRetry;
1171     }
1172 
1173     /**
1174      * Resubmit all the actions from this multiaction after a failure.
1175      *
1176      * @param rsActions  the actions still to do from the initial list
1177      * @param server   the destination
1178      * @param numAttempt the number of attempts so far
1179      * @param t the throwable (if any) that caused the resubmit
1180      */
1181     private void receiveGlobalFailure(
1182         MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1183       errorsByServer.reportServerError(server);
1184       Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1185           ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1186 
1187       if (tableName == null) {
1188         // tableName is null when we made a cross-table RPC call.
1189         connection.clearCaches(server);
1190       }
1191       int failed = 0, stopped = 0;
1192       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1193       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1194         byte[] regionName = e.getKey();
1195         byte[] row = e.getValue().iterator().next().getAction().getRow();
1196         // Do not use the exception for updating cache because it might be coming from
1197         // any of the regions in the MultiAction.
1198         if (tableName != null) {
1199           connection.updateCachedLocations(tableName, regionName, row,
1200             ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
1201         }
1202         for (Action<Row> action : e.getValue()) {
1203           Retry retry = manageError(
1204               action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1205           if (retry == Retry.YES) {
1206             toReplay.add(action);
1207           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1208             ++stopped;
1209           } else {
1210             ++failed;
1211           }
1212         }
1213       }
1214 
1215       if (toReplay.isEmpty()) {
1216         logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1217       } else {
1218         resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1219       }
1220     }
1221 
1222     /**
1223      * Log as much info as possible, and, if there is something to replay,
1224      * submit it again after a back off sleep.
1225      */
1226     private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1227         int numAttempt, int failureCount, Throwable throwable) {
1228       // We have something to replay. We're going to sleep a little before.
1229 
1230       // We have two contradicting needs here:
1231       //  1) We want to get the new location after having slept, as it may change.
1232       //  2) We want to take into account the location when calculating the sleep time.
1233       //  3) If all this is just because the response needed to be chunked try again FAST.
1234       // It should be possible to have some heuristics to take the right decision. Short term,
1235       //  we go for one.
1236       boolean retryImmediately = throwable instanceof RetryImmediatelyException;
1237       int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
1238       long backOffTime = retryImmediately ? 0 :
1239           errorsByServer.calculateBackoffTime(oldServer, pause);
1240       if (numAttempt > startLogErrorsCnt) {
1241         // We use this value to have some logs when we have multiple failures, but not too many
1242         //  logs, as errors are to be expected when a region moves, splits and so on
1243         LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1244             oldServer, throwable, backOffTime, true, null, -1, -1));
1245       }
1246 
1247       try {
1248         if (backOffTime > 0) {
1249           Thread.sleep(backOffTime);
1250         }
1251       } catch (InterruptedException e) {
1252         LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1253         Thread.currentThread().interrupt();
1254         return;
1255       }
1256 
1257       groupAndSendMultiAction(toReplay, nextAttemptNumber);
1258     }
1259 
1260     private void logNoResubmit(ServerName oldServer, int numAttempt,
1261         int failureCount, Throwable throwable, int failed, int stopped) {
1262       if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1263         String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1264         String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1265             throwable, -1, false, timeStr, failed, stopped);
1266         if (failed != 0) {
1267           // Only log final failures as warning
1268           LOG.warn(logMessage);
1269         } else {
1270           LOG.info(logMessage);
1271         }
1272       }
1273     }
1274 
1275     /**
1276      * Called when we receive the result of a server query.
1277      *
1278      * @param multiAction    - the multiAction we sent
1279      * @param server       - the location. It's used as a server name.
1280      * @param responses      - the response, if any
1281      * @param numAttempt     - the attempt
1282      */
1283     private void receiveMultiAction(MultiAction<Row> multiAction,
1284         ServerName server, MultiResponse responses, int numAttempt) {
1285        assert responses != null;
1286 
1287       // Success or partial success
1288       // Analyze detailed results. We can still have individual failures to be redo.
1289       // two specific throwables are managed:
1290       //  - DoNotRetryIOException: we continue to retry for other actions
1291       //  - RegionMovedException: we update the cache with the new region location
1292 
1293       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1294       Throwable throwable = null;
1295       int failureCount = 0;
1296       Retry canRetry = null;
1297       Map<byte[], Map<Integer, Object>> results = responses.getResults();
1298       // Go by original action.
1299       int failed = 0;
1300       int stopped = 0;
1301       for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1302         byte[] regionName = regionEntry.getKey();
1303 
1304         Throwable regionException = responses.getExceptions().get(regionName);
1305         if (tableName == null && regionException != null &&
1306               ClientExceptionsUtil.isMetaClearingException(regionException)) {
1307           // For multi-actions, we don't have a table name, but we want to make sure to clear the
1308           // cache in case there were location-related exceptions. We don't to clear the cache
1309           // for every possible exception that comes through, however.
1310           connection.clearCaches(server);
1311         }
1312 
1313         Map<Integer, Object> regionResults;
1314         if (results.containsKey(regionName)) {
1315           regionResults = results.get(regionName);
1316         } else {
1317           regionResults = Collections.emptyMap();
1318         }
1319 
1320         boolean regionFailureRegistered = false;
1321         for (Action<Row> sentAction : regionEntry.getValue()) {
1322           Object result = regionResults.get(sentAction.getOriginalIndex());
1323           if (result == null) {
1324             if (regionException == null) {
1325               LOG.error("Server sent us neither results nor exceptions for "
1326                 + Bytes.toStringBinary(regionName)
1327                 + ", numAttempt:" + numAttempt);
1328               regionException = new RuntimeException("Invalid response");
1329             }
1330             // If the row operation encounters the region-lever error, the exception of action may be
1331             // null.
1332             result = regionException;
1333           }
1334           // Failure: retry if it's make sense else update the errors lists
1335           if (result instanceof Throwable) {
1336             Row row = sentAction.getAction();
1337             throwable = regionException != null ? regionException
1338               : ClientExceptionsUtil.findException(result);
1339             // Register corresponding failures once per server/once per region.
1340             if (!regionFailureRegistered) {
1341               regionFailureRegistered = true;
1342               connection.updateCachedLocations(
1343                   tableName, regionName, row.getRow(), result, server);
1344             }
1345             if (canRetry == null) {
1346               errorsByServer.reportServerError(server);
1347               // We determine canRetry only once for all calls, after reporting server failure.
1348               canRetry = errorsByServer.canRetryMore(numAttempt) ?
1349                 Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1350             }
1351             ++failureCount;
1352             switch (manageError(sentAction.getOriginalIndex(), row, canRetry, (Throwable) result,
1353               server)) {
1354             case YES:
1355               toReplay.add(sentAction);
1356               break;
1357             case NO_OTHER_SUCCEEDED:
1358               ++stopped;
1359               break;
1360             default:
1361               ++failed;
1362               break;
1363             }
1364           } else {
1365             
1366             if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
1367               AsyncProcess.this.connection.getConnectionMetrics().
1368                       updateServerStats(server, regionName, result);
1369             }
1370 
1371             // update the stats about the region, if its a user table. We don't want to slow down
1372             // updates to meta tables, especially from internal updates (master, etc).
1373             if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1374               result = ResultStatsUtil.updateStats(result,
1375                   AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1376             }
1377 
1378             if (callback != null) {
1379               try {
1380                 //noinspection unchecked
1381                 // TODO: would callback expect a replica region name if it gets one?
1382                 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1383               } catch (Throwable t) {
1384                 LOG.error("User callback threw an exception for "
1385                     + Bytes.toStringBinary(regionName) + ", ignoring", t);
1386               }
1387             }
1388             setResult(sentAction, result);
1389           }
1390         }
1391       }
1392       if (toReplay.isEmpty()) {
1393         logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1394       } else {
1395         resubmit(server, toReplay, numAttempt, failureCount, throwable);
1396       }
1397     }
1398 
1399     private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1400         Throwable error, long backOffTime, boolean willRetry, String startTime,
1401         int failed, int stopped) {
1402       StringBuilder sb = new StringBuilder();
1403       sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1404         .append("attempt=").append(numAttempt)
1405         .append("/").append(numTries).append(" ");
1406 
1407       if (failureCount > 0 || error != null){
1408         sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1409             append(error == null ? "null" : error);
1410       } else {
1411         sb.append("succeeded");
1412       }
1413 
1414       sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1415 
1416       if (willRetry) {
1417         sb.append(", retrying after=").append(backOffTime).append("ms").
1418             append(", replay=").append(replaySize).append("ops");
1419       } else if (failureCount > 0) {
1420         if (stopped > 0) {
1421           sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1422         }
1423         if (failed > 0) {
1424           sb.append("; not retrying ").append(failed).append(" - final failure");
1425         }
1426 
1427       }
1428 
1429       return sb.toString();
1430     }
1431 
1432     /**
1433      * Sets the non-error result from a particular action.
1434      * @param action Action (request) that the server responded to.
1435      * @param result The result.
1436      */
1437     private void setResult(Action<Row> action, Object result) {
1438       if (result == null) {
1439         throw new RuntimeException("Result cannot be null");
1440       }
1441       ReplicaResultState state = null;
1442       boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1443       int index = action.getOriginalIndex();
1444       if (results == null) {
1445          decActionCounter(index);
1446          return; // Simple case, no replica requests.
1447       } else if ((state = trySetResultSimple(
1448           index, action.getAction(), false, result, null, isStale)) == null) {
1449         return; // Simple case, no replica requests.
1450       }
1451       assert state != null;
1452       // At this point we know that state is set to replica tracking class.
1453       // It could be that someone else is also looking at it; however, we know there can
1454       // only be one state object, and only one thread can set callCount to 0. Other threads
1455       // will either see state with callCount 0 after locking it; or will not see state at all
1456       // we will replace it with the result.
1457       synchronized (state) {
1458         if (state.callCount == 0) {
1459           return; // someone already set the result
1460         }
1461         state.callCount = 0;
1462       }
1463       synchronized (replicaResultLock) {
1464         if (results[index] != state) {
1465           throw new AssertionError("We set the callCount but someone else replaced the result");
1466         }
1467         results[index] = result;
1468       }
1469 
1470       decActionCounter(index);
1471     }
1472 
1473     /**
1474      * Sets the error from a particular action.
1475      * @param index Original action index.
1476      * @param row Original request.
1477      * @param throwable The resulting error.
1478      * @param server The source server.
1479      */
1480     private void setError(int index, Row row, Throwable throwable, ServerName server) {
1481       ReplicaResultState state = null;
1482       if (results == null) {
1483         // Note that we currently cannot have replica requests with null results. So it shouldn't
1484         // happen that multiple replica calls will call dAC for same actions with results == null.
1485         // Only one call per action should be present in this case.
1486         errors.add(throwable, row, server);
1487         decActionCounter(index);
1488         return; // Simple case, no replica requests.
1489       } else if ((state = trySetResultSimple(
1490           index, row, true, throwable, server, false)) == null) {
1491         return; // Simple case, no replica requests.
1492       }
1493       assert state != null;
1494       BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1495       boolean isActionDone = false;
1496       synchronized (state) {
1497         switch (state.callCount) {
1498           case 0: return; // someone already set the result
1499           case 1: { // All calls failed, we are the last error.
1500             target = errors;
1501             isActionDone = true;
1502             break;
1503           }
1504           default: {
1505             assert state.callCount > 1;
1506             if (state.replicaErrors == null) {
1507               state.replicaErrors = new BatchErrors();
1508             }
1509             target = state.replicaErrors;
1510             break;
1511           }
1512         }
1513         --state.callCount;
1514       }
1515       target.add(throwable, row, server);
1516       if (isActionDone) {
1517         if (state.replicaErrors != null) { // last call, no need to lock
1518           errors.merge(state.replicaErrors);
1519         }
1520         // See setResult for explanations.
1521         synchronized (replicaResultLock) {
1522           if (results[index] != state) {
1523             throw new AssertionError("We set the callCount but someone else replaced the result");
1524           }
1525           results[index] = throwable;
1526         }
1527         decActionCounter(index);
1528       }
1529     }
1530 
1531     /**
1532      * Checks if the action is complete; used on error to prevent needless retries.
1533      * Does not synchronize, assuming element index/field accesses are atomic.
1534      * This is an opportunistic optimization check, doesn't have to be strict.
1535      * @param index Original action index.
1536      * @param row Original request.
1537      */
1538     private boolean isActionComplete(int index, Row row) {
1539       if (!isReplicaGet(row)) return false;
1540       Object resObj = results[index];
1541       return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1542           || ((ReplicaResultState)resObj).callCount == 0);
1543     }
1544 
1545     /**
1546      * Tries to set the result or error for a particular action as if there were no replica calls.
1547      * @return null if successful; replica state if there were in fact replica calls.
1548      */
1549     private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1550         Object result, ServerName server, boolean isFromReplica) {
1551       Object resObj = null;
1552       if (!isReplicaGet(row)) {
1553         if (isFromReplica) {
1554           throw new AssertionError("Unexpected stale result for " + row);
1555         }
1556         results[index] = result;
1557       } else {
1558         synchronized (replicaResultLock) {
1559           if ((resObj = results[index]) == null) {
1560             if (isFromReplica) {
1561               throw new AssertionError("Unexpected stale result for " + row);
1562             }
1563             results[index] = result;
1564           }
1565         }
1566       }
1567 
1568       ReplicaResultState rrs =
1569           (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1570       if (rrs == null && isError) {
1571         // The resObj is not replica state (null or already set).
1572         errors.add((Throwable)result, row, server);
1573       }
1574 
1575       if (resObj == null) {
1576         // resObj is null - no replica calls were made.
1577         decActionCounter(index);
1578         return null;
1579       }
1580       return rrs;
1581     }
1582 
1583     private void decActionCounter(int index) {
1584       long actionsRemaining = actionsInProgress.decrementAndGet();
1585       if (actionsRemaining < 0) {
1586         String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1587         throw new AssertionError(error);
1588       } else if (actionsRemaining == 0) {
1589         synchronized (actionsInProgress) {
1590           actionsInProgress.notifyAll();
1591         }
1592       }
1593     }
1594 
1595     private String buildDetailedErrorMsg(String string, int index) {
1596       StringBuilder error = new StringBuilder(string);
1597       error.append("; called for ").
1598         append(index).
1599         append(", actionsInProgress ").
1600         append(actionsInProgress.get()).
1601         append("; replica gets: ");
1602       if (replicaGetIndices != null) {
1603         for (int i = 0; i < replicaGetIndices.length; ++i) {
1604           error.append(replicaGetIndices[i]).append(", ");
1605         }
1606       } else {
1607         error.append(hasAnyReplicaGets ? "all" : "none");
1608       }
1609       error.append("; results ");
1610       if (results != null) {
1611         for (int i = 0; i < results.length; ++i) {
1612           Object o = results[i];
1613           error.append(((o == null) ? "null" : o.toString())).append(", ");
1614         }
1615       }
1616       return error.toString();
1617     }
1618 
1619     @Override
1620     public void waitUntilDone() throws InterruptedIOException {
1621       try {
1622         waitUntilDone(Long.MAX_VALUE);
1623       } catch (InterruptedException iex) {
1624         throw new InterruptedIOException(iex.getMessage());
1625       } finally {
1626         if (callsInProgress != null) {
1627           for (MultiServerCallable<Row> clb : callsInProgress) {
1628             clb.cancel();
1629           }
1630         }
1631       }
1632     }
1633 
1634     private boolean waitUntilDone(long cutoff) throws InterruptedException {
1635       boolean hasWait = cutoff != Long.MAX_VALUE;
1636       long lastLog = EnvironmentEdgeManager.currentTime();
1637       long currentInProgress;
1638       while (0 != (currentInProgress = actionsInProgress.get())) {
1639         long now = EnvironmentEdgeManager.currentTime();
1640         if (hasWait && (now * 1000L) > cutoff) {
1641           return false;
1642         }
1643         if (!hasWait) { // Only log if wait is infinite.
1644           if (now > lastLog + 10000) {
1645             lastLog = now;
1646             LOG.info("#" + id + ", waiting for " + currentInProgress
1647                 + "  actions to finish on table: " + tableName);
1648             if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1649               logDetailsOfUndoneTasks(currentInProgress);
1650             }
1651           }
1652         }
1653         synchronized (actionsInProgress) {
1654           if (actionsInProgress.get() == 0) break;
1655           if (!hasWait) {
1656             actionsInProgress.wait(10);
1657           } else {
1658             long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1659             TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1660           }
1661         }
1662       }
1663       return true;
1664     }
1665 
1666     @Override
1667     public boolean hasError() {
1668       return errors.hasErrors();
1669     }
1670 
1671     @Override
1672     public List<? extends Row> getFailedOperations() {
1673       return errors.actions;
1674     }
1675 
1676     @Override
1677     public RetriesExhaustedWithDetailsException getErrors() {
1678       return errors.makeException();
1679     }
1680 
1681     @Override
1682     public Object[] getResults() throws InterruptedIOException {
1683       waitUntilDone();
1684       return results;
1685     }
1686   }
1687 
1688   @VisibleForTesting
1689   /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
1690   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1691       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1692       Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1693     return new AsyncRequestFutureImpl<CResult>(
1694         tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1695   }
1696 
1697   /**
1698    * Create a callable. Isolated to be easily overridden in the tests.
1699    */
1700   @VisibleForTesting
1701   protected MultiServerCallable<Row> createCallable(final ServerName server,
1702       TableName tableName, final MultiAction<Row> multi) {
1703     return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1704   }
1705 
1706   /**
1707    * Create a caller. Isolated to be easily overridden in the tests.
1708    */
1709   @VisibleForTesting
1710   protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1711     return rpcCallerFactory.<MultiResponse> newCaller();
1712   }
1713 
1714   @VisibleForTesting
1715   /** Waits until all outstanding tasks are done. Used in tests. */
1716   void waitUntilDone() throws InterruptedIOException {
1717     waitForMaximumCurrentTasks(0, null);
1718   }
1719 
1720   /** Wait until the async does not have more than max tasks in progress. */
1721   private void waitForMaximumCurrentTasks(int max, String tableName)
1722       throws InterruptedIOException {
1723     waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
1724   }
1725 
1726   // Break out this method so testable
1727   @VisibleForTesting
1728   void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
1729       String tableName) throws InterruptedIOException {
1730     long lastLog = EnvironmentEdgeManager.currentTime();
1731     long currentInProgress, oldInProgress = Long.MAX_VALUE;
1732     while ((currentInProgress = tasksInProgress.get()) > max) {
1733       if (oldInProgress != currentInProgress) { // Wait for in progress to change.
1734         long now = EnvironmentEdgeManager.currentTime();
1735         if (now > lastLog + 10000) {
1736           lastLog = now;
1737           LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1738               + max + ", tasksInProgress=" + currentInProgress +
1739               " hasError=" + hasError() + (tableName == null ? "" : ", tableName=" + tableName));
1740           if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1741             logDetailsOfUndoneTasks(currentInProgress);
1742           }
1743         }
1744       }
1745       oldInProgress = currentInProgress;
1746       try {
1747         synchronized (tasksInProgress) {
1748           if (tasksInProgress.get() == oldInProgress) {
1749             tasksInProgress.wait(10);
1750           }
1751         }
1752       } catch (InterruptedException e) {
1753         throw new InterruptedIOException("#" + id + ", interrupted." +
1754             " currentNumberOfTask=" + currentInProgress);
1755       }
1756     }
1757   }
1758 
1759   private void logDetailsOfUndoneTasks(long taskInProgress) {
1760     ArrayList<ServerName> servers = new ArrayList<ServerName>();
1761     for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
1762       if (entry.getValue().get() > 0) {
1763         servers.add(entry.getKey());
1764       }
1765     }
1766     LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
1767     if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
1768       ArrayList<String> regions = new ArrayList<String>();
1769       for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
1770         if (entry.getValue().get() > 0) {
1771           regions.add(Bytes.toString(entry.getKey()));
1772         }
1773       }
1774       LOG.info("Regions against which left over task(s) are processed: " + regions);
1775     }
1776   }
1777 
1778   /**
1779    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1780    * @return Whether there were any errors in any request since the last time
1781    *          {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created.
1782    */
1783   public boolean hasError() {
1784     return globalErrors.hasErrors();
1785   }
1786 
1787   /**
1788    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1789    * Waits for all previous operations to finish, and returns errors and (optionally)
1790    * failed operations themselves.
1791    * @param failedRows an optional list into which the rows that failed since the last time
1792    *        {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
1793    * @param tableName name of the table
1794    * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
1795    *          was called, or AP was created.
1796    */
1797   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1798       List<Row> failedRows, String tableName) throws InterruptedIOException {
1799     waitForMaximumCurrentTasks(0, tableName);
1800     if (!globalErrors.hasErrors()) {
1801       return null;
1802     }
1803     if (failedRows != null) {
1804       failedRows.addAll(globalErrors.actions);
1805     }
1806     RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1807     globalErrors.clear();
1808     return result;
1809   }
1810 
1811   /**
1812    * increment the tasks counters for a given set of regions. MT safe.
1813    */
1814   protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1815     tasksInProgress.incrementAndGet();
1816 
1817     AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1818     if (serverCnt == null) {
1819       taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1820       serverCnt = taskCounterPerServer.get(sn);
1821     }
1822     serverCnt.incrementAndGet();
1823 
1824     for (byte[] regBytes : regions) {
1825       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1826       if (regionCnt == null) {
1827         regionCnt = new AtomicInteger();
1828         AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1829         if (oldCnt != null) {
1830           regionCnt = oldCnt;
1831         }
1832       }
1833       regionCnt.incrementAndGet();
1834     }
1835   }
1836 
1837   /**
1838    * Decrements the counters for a given region and the region server. MT Safe.
1839    */
1840   protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1841     for (byte[] regBytes : regions) {
1842       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1843       regionCnt.decrementAndGet();
1844     }
1845 
1846     taskCounterPerServer.get(sn).decrementAndGet();
1847     tasksInProgress.decrementAndGet();
1848     synchronized (tasksInProgress) {
1849       tasksInProgress.notifyAll();
1850     }
1851   }
1852 
1853   /**
1854    * Creates the server error tracker to use inside process.
1855    * Currently, to preserve the main assumption about current retries, and to work well with
1856    * the retry-limit-based calculation, the calculation is local per Process object.
1857    * We may benefit from connection-wide tracking of server errors.
1858    * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1859    */
1860   protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1861     return new ConnectionManager.ServerErrorTracker(
1862         this.serverTrackerTimeout, this.numTries);
1863   }
1864 
1865   private static boolean isReplicaGet(Row row) {
1866     return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1867   }
1868 
1869   /**
1870    * For manageError. Only used to make logging more clear, we don't actually care why we don't retry.
1871    */
1872   private enum Retry {
1873     YES,
1874     NO_LOCATION_PROBLEM,
1875     NO_NOT_RETRIABLE,
1876     NO_RETRIES_EXHAUSTED,
1877     NO_OTHER_SUCCEEDED
1878   }
1879 }