View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Date;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.ConcurrentSkipListMap;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HRegionLocation;
50  import org.apache.hadoop.hbase.RegionLocations;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
54  import org.apache.hadoop.hbase.client.coprocessor.Batch;
55  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58  import org.apache.htrace.Trace;
59  
60  import com.google.common.annotations.VisibleForTesting;
61  
62  /**
63   * This class  allows a continuous flow of requests. It's written to be compatible with a
64   * synchronous caller such as HTable.
65   * <p>
66   * The caller sends a buffer of operation, by calling submit. This class extract from this list
67   * the operations it can send, i.e. the operations that are on region that are not considered
68   * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
69   * iterate on the list. If, and only if, the maximum number of current task is reached, the call
70   * to submit will block. Alternatively, the caller can call submitAll, in which case all the
71   * operations will be sent. Each call to submit returns a future-like object that can be used
72   * to track operation progress.
73   * </p>
74   * <p>
75   * The class manages internally the retries.
76   * </p>
77   * <p>
78   * The class can be constructed in regular mode, or "global error" mode. In global error mode,
79   * AP tracks errors across all calls (each "future" also has global view of all errors). That
80   * mode is necessary for backward compat with HTable behavior, where multiple submissions are
81   * made and the errors can propagate using any put/flush call, from previous calls.
82   * In "regular" mode, the errors are tracked inside the Future object that is returned.
83   * The results are always tracked inside the Future object and can be retrieved when the call
84   * has finished. Partial results can also be retrieved if some part of multi-request failed.
85   * </p>
86   * <p>
87   * This class is thread safe in regular mode; in global error code, submitting operations and
88   * retrieving errors from different threads may be not thread safe.
89   * Internally, the class is thread safe enough to manage simultaneously new submission and results
90   * arising from older operations.
91   * </p>
92   * <p>
93   * Internally, this class works with {@link Row}, this mean it could be theoretically used for
94   * gets as well.
95   * </p>
96   */
97  @InterfaceAudience.Private
98  class AsyncProcess {
99    protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
100   protected static final AtomicLong COUNTER = new AtomicLong();
101 
102   public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
103 
104   /**
105    * Configure the number of failures after which the client will start logging. A few failures
106    * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
107    * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at
108    * this stage.
109    */
110   public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
111       "hbase.client.start.log.errors.counter";
112   public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
113 
114   /**
115    * Configuration to decide whether to log details for batch error
116    */
117   public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
118 
119   private final int thresholdToLogUndoneTaskDetails;
120   private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
121       "hbase.client.threshold.log.details";
122   private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
123   private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
124 
125   /**
126    * The context used to wait for results from one submit call.
127    * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
128    *    then errors and failed operations in this object will reflect global errors.
129    * 2) If submit call is made with needResults false, results will not be saved.
130    *  */
131   public static interface AsyncRequestFuture {
132     public boolean hasError();
133     public RetriesExhaustedWithDetailsException getErrors();
134     public List<? extends Row> getFailedOperations();
135     public Object[] getResults() throws InterruptedIOException;
136     /** Wait until all tasks are executed, successfully or not. */
137     public void waitUntilDone() throws InterruptedIOException;
138   }
139 
140   /** Return value from a submit that didn't contain any requests. */
141   private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
142     public final Object[] result = new Object[0];
143     @Override
144     public boolean hasError() { return false; }
145     @Override
146     public RetriesExhaustedWithDetailsException getErrors() { return null; }
147     @Override
148     public List<? extends Row> getFailedOperations() { return null; }
149     @Override
150     public Object[] getResults() { return result; }
151     @Override
152     public void waitUntilDone() throws InterruptedIOException {}
153   };
154 
155   /** Sync point for calls to multiple replicas for the same user request (Get).
156    * Created and put in the results array (we assume replica calls require results) when
157    * the replica calls are launched. See results for details of this process.
158    * POJO, all fields are public. To modify them, the object itself is locked. */
159   private static class ReplicaResultState {
160     public ReplicaResultState(int callCount) {
161       this.callCount = callCount;
162     }
163 
164     /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
165     int callCount;
166     /** Errors for which it is not decided whether we will report them to user. If one of the
167      * calls succeeds, we will discard the errors that may have happened in the other calls. */
168     BatchErrors replicaErrors = null;
169 
170     @Override
171     public String toString() {
172       return "[call count " + callCount + "; errors " + replicaErrors + "]";
173     }
174   }
175 
176 
177   // TODO: many of the fields should be made private
178   protected final long id;
179 
180   protected final ClusterConnection connection;
181   protected final RpcRetryingCallerFactory rpcCallerFactory;
182   protected final RpcControllerFactory rpcFactory;
183   protected final BatchErrors globalErrors;
184   protected final ExecutorService pool;
185 
186   protected final AtomicLong tasksInProgress = new AtomicLong(0);
187   protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
188       new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
189   protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
190       new ConcurrentHashMap<ServerName, AtomicInteger>();
191 
192   // Start configuration settings.
193   private final int startLogErrorsCnt;
194 
195   /**
196    * The number of tasks simultaneously executed on the cluster.
197    */
198   protected final int maxTotalConcurrentTasks;
199 
200   /**
201    * The number of tasks we run in parallel on a single region.
202    * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
203    * a set of operations on a region before the previous one is done. As well, this limits
204    * the pressure we put on the region server.
205    */
206   protected final int maxConcurrentTasksPerRegion;
207 
208   /**
209    * The number of task simultaneously executed on a single region server.
210    */
211   protected final int maxConcurrentTasksPerServer;
212   protected final long pause;
213   protected int numTries;
214   protected int serverTrackerTimeout;
215   protected int timeout;
216   protected long primaryCallTimeoutMicroseconds;
217   // End configuration settings.
218 
219   protected static class BatchErrors {
220     private final List<Throwable> throwables = new ArrayList<Throwable>();
221     private final List<Row> actions = new ArrayList<Row>();
222     private final List<String> addresses = new ArrayList<String>();
223 
224     public synchronized void add(Throwable ex, Row row, ServerName serverName) {
225       if (row == null){
226         throw new IllegalArgumentException("row cannot be null. location=" + serverName);
227       }
228 
229       throwables.add(ex);
230       actions.add(row);
231       addresses.add(serverName != null ? serverName.toString() : "null");
232     }
233 
234     public boolean hasErrors() {
235       return !throwables.isEmpty();
236     }
237 
238     private synchronized RetriesExhaustedWithDetailsException makeException() {
239       return new RetriesExhaustedWithDetailsException(
240           new ArrayList<Throwable>(throwables),
241           new ArrayList<Row>(actions), new ArrayList<String>(addresses));
242     }
243 
244     public synchronized void clear() {
245       throwables.clear();
246       actions.clear();
247       addresses.clear();
248     }
249 
250     public synchronized void merge(BatchErrors other) {
251       throwables.addAll(other.throwables);
252       actions.addAll(other.actions);
253       addresses.addAll(other.addresses);
254     }
255   }
256 
257   public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
258       RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
259     if (hc == null) {
260       throw new IllegalArgumentException("HConnection cannot be null.");
261     }
262 
263     this.connection = hc;
264     this.pool = pool;
265     this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
266 
267     this.id = COUNTER.incrementAndGet();
268 
269     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
270         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
271     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
272         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
273     this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
274         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
275     this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
276 
277     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
278       HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
279     this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
280           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
281     this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
282           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
283 
284     this.startLogErrorsCnt =
285         conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
286 
287     if (this.maxTotalConcurrentTasks <= 0) {
288       throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
289     }
290     if (this.maxConcurrentTasksPerServer <= 0) {
291       throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
292           maxConcurrentTasksPerServer);
293     }
294     if (this.maxConcurrentTasksPerRegion <= 0) {
295       throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
296           maxConcurrentTasksPerRegion);
297     }
298 
299     // Server tracker allows us to do faster, and yet useful (hopefully), retries.
300     // However, if we are too useful, we might fail very quickly due to retry count limit.
301     // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
302     // retry time if normal retries were used. Then we will retry until this time runs out.
303     // If we keep hitting one server, the net effect will be the incremental backoff, and
304     // essentially the same number of retries as planned. If we have to do faster retries,
305     // we will do more retries in aggregate, but the user will be none the wiser.
306     this.serverTrackerTimeout = 0;
307     for (int i = 0; i < this.numTries; ++i) {
308       serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
309     }
310 
311     this.rpcCallerFactory = rpcCaller;
312     this.rpcFactory = rpcFactory;
313 
314     this.thresholdToLogUndoneTaskDetails =
315         conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
316           DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
317   }
318 
319   /**
320    * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
321    *         RuntimeException
322    */
323   private ExecutorService getPool(ExecutorService pool) {
324     if (pool != null) return pool;
325     if (this.pool != null) return this.pool;
326     throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
327   }
328 
329   /**
330    * See {@link #submit(ExecutorService, TableName, List, boolean, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, boolean)}.
331    * Uses default ExecutorService for this AP (must have been created with one).
332    */
333   public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
334       boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
335       throws InterruptedIOException {
336     return submit(null, tableName, rows, atLeastOne, callback, needResults);
337   }
338 
339   /**
340    * Extract from the rows list what we can submit. The rows we can not submit are kept in the
341    * list. Does not send requests to replicas (not currently used for anything other
342    * than streaming puts anyway).
343    *
344    * @param pool ExecutorService to use.
345    * @param tableName The table for which this request is needed.
346    * @param callback Batch callback. Only called on success (94 behavior).
347    * @param needResults Whether results are needed, or can be discarded.
348    * @param rows - the submitted row. Modified by the method: we remove the rows we took.
349    * @param atLeastOne true if we should submit at least a subset.
350    */
351   public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
352       List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
353       boolean needResults) throws InterruptedIOException {
354     if (rows.isEmpty()) {
355       return NO_REQS_RESULT;
356     }
357 
358     Map<ServerName, MultiAction<Row>> actionsByServer =
359         new HashMap<ServerName, MultiAction<Row>>();
360     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
361 
362     NonceGenerator ng = this.connection.getNonceGenerator();
363     long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
364 
365     // Location errors that happen before we decide what requests to take.
366     List<Exception> locationErrors = null;
367     List<Integer> locationErrorRows = null;
368     do {
369       // Wait until there is at least one slot for a new task.
370       waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
371 
372       // Remember the previous decisions about regions or region servers we put in the
373       //  final multi.
374       Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
375       Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
376 
377       int posInList = -1;
378       Iterator<? extends Row> it = rows.iterator();
379       while (it.hasNext()) {
380         Row r = it.next();
381         HRegionLocation loc;
382         try {
383           if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
384           // Make sure we get 0-s replica.
385           RegionLocations locs = connection.locateRegion(
386               tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
387           if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
388             throw new IOException("#" + id + ", no location found, aborting submit for"
389                 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
390           }
391           loc = locs.getDefaultRegionLocation();
392         } catch (IOException ex) {
393           locationErrors = new ArrayList<Exception>();
394           locationErrorRows = new ArrayList<Integer>();
395           LOG.error("Failed to get region location ", ex);
396           // This action failed before creating ars. Retain it, but do not add to submit list.
397           // We will then add it to ars in an already-failed state.
398           retainedActions.add(new Action<Row>(r, ++posInList));
399           locationErrors.add(ex);
400           locationErrorRows.add(posInList);
401           it.remove();
402           break; // Backward compat: we stop considering actions on location error.
403         }
404 
405         if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
406           Action<Row> action = new Action<Row>(r, ++posInList);
407           setNonce(ng, r, action);
408           retainedActions.add(action);
409           // TODO: replica-get is not supported on this path
410           byte[] regionName = loc.getRegionInfo().getRegionName();
411           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
412           it.remove();
413         }
414       }
415     } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
416 
417     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
418 
419     return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
420       locationErrors, locationErrorRows, actionsByServer, pool);
421   }
422 
423   <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
424       List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
425       Object[] results, boolean needResults, List<Exception> locationErrors,
426       List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
427       ExecutorService pool) {
428     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
429       tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
430     // Add location errors if any
431     if (locationErrors != null) {
432       for (int i = 0; i < locationErrors.size(); ++i) {
433         int originalIndex = locationErrorRows.get(i);
434         Row row = retainedActions.get(originalIndex).getAction();
435         ars.manageError(originalIndex, row,
436           Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
437       }
438     }
439     ars.sendMultiAction(actionsByServer, 1, null, false);
440     return ars;
441   }
442 
443   /**
444    * Helper that is used when grouping the actions per region server.
445    *
446    * @param loc - the destination. Must not be null.
447    * @param action - the action to add to the multiaction
448    * @param actionsByServer the multiaction per server
449    * @param nonceGroup Nonce group.
450    */
451   private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
452       Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
453     MultiAction<Row> multiAction = actionsByServer.get(server);
454     if (multiAction == null) {
455       multiAction = new MultiAction<Row>();
456       actionsByServer.put(server, multiAction);
457     }
458     if (action.hasNonce() && !multiAction.hasNonceGroup()) {
459       multiAction.setNonceGroup(nonceGroup);
460     }
461 
462     multiAction.add(regionName, action);
463   }
464 
465   /**
466    * Check if we should send new operations to this region or region server.
467    * We're taking into account the past decision; if we have already accepted
468    * operation on a given region, we accept all operations for this region.
469    *
470    * @param loc; the region and the server name we want to use.
471    * @return true if this region is considered as busy.
472    */
473   protected boolean canTakeOperation(HRegionLocation loc,
474                                      Map<HRegionInfo, Boolean> regionsIncluded,
475                                      Map<ServerName, Boolean> serversIncluded) {
476     HRegionInfo regionInfo = loc.getRegionInfo();
477     Boolean regionPrevious = regionsIncluded.get(regionInfo);
478 
479     if (regionPrevious != null) {
480       // We already know what to do with this region.
481       return regionPrevious;
482     }
483 
484     Boolean serverPrevious = serversIncluded.get(loc.getServerName());
485     if (Boolean.FALSE.equals(serverPrevious)) {
486       // It's a new region, on a region server that we have already excluded.
487       regionsIncluded.put(regionInfo, Boolean.FALSE);
488       return false;
489     }
490 
491     AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
492     if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
493       // Too many tasks on this region already.
494       regionsIncluded.put(regionInfo, Boolean.FALSE);
495       return false;
496     }
497 
498     if (serverPrevious == null) {
499       // The region is ok, but we need to decide for this region server.
500       int newServers = 0; // number of servers we're going to contact so far
501       for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
502         if (kv.getValue()) {
503           newServers++;
504         }
505       }
506 
507       // Do we have too many total tasks already?
508       boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
509 
510       if (ok) {
511         // If the total is fine, is it ok for this individual server?
512         AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
513         ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
514       }
515 
516       if (!ok) {
517         regionsIncluded.put(regionInfo, Boolean.FALSE);
518         serversIncluded.put(loc.getServerName(), Boolean.FALSE);
519         return false;
520       }
521 
522       serversIncluded.put(loc.getServerName(), Boolean.TRUE);
523     } else {
524       assert serverPrevious.equals(Boolean.TRUE);
525     }
526 
527     regionsIncluded.put(regionInfo, Boolean.TRUE);
528 
529     return true;
530   }
531 
532   /**
533    * See {@link #submitAll(ExecutorService, TableName, List, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, Object[])}.
534    * Uses default ExecutorService for this AP (must have been created with one).
535    */
536   public <CResult> AsyncRequestFuture submitAll(TableName tableName,
537       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
538     return submitAll(null, tableName, rows, callback, results);
539   }
540 
541   /**
542    * Submit immediately the list of rows, whatever the server status. Kept for backward
543    * compatibility: it allows to be used with the batch interface that return an array of objects.
544    *
545    * @param pool ExecutorService to use.
546    * @param tableName name of the table for which the submission is made.
547    * @param rows the list of rows.
548    * @param callback the callback.
549    * @param results Optional array to return the results thru; backward compat.
550    */
551   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
552       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
553     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
554 
555     // The position will be used by the processBatch to match the object array returned.
556     int posInList = -1;
557     NonceGenerator ng = this.connection.getNonceGenerator();
558     for (Row r : rows) {
559       posInList++;
560       if (r instanceof Put) {
561         Put put = (Put) r;
562         if (put.isEmpty()) {
563           throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
564         }
565       }
566       Action<Row> action = new Action<Row>(r, posInList);
567       setNonce(ng, r, action);
568       actions.add(action);
569     }
570     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
571         tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
572     ars.groupAndSendMultiAction(actions, 1);
573     return ars;
574   }
575 
576   private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
577     if (!(r instanceof Append) && !(r instanceof Increment)) return;
578     action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
579   }
580 
581   /**
582    * The context, and return value, for a single submit/submitAll call.
583    * Note on how this class (one AP submit) works. Initially, all requests are split into groups
584    * by server; request is sent to each server in parallel; the RPC calls are not async so a
585    * thread per server is used. Every time some actions fail, regions/locations might have
586    * changed, so we re-group them by server and region again and send these groups in parallel
587    * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
588    * scheduling children. This is why lots of code doesn't require any synchronization.
589    */
590   protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
591 
592     /**
593      * Runnable (that can be submitted to thread pool) that waits for when it's time
594      * to issue replica calls, finds region replicas, groups the requests by replica and
595      * issues the calls (on separate threads, via sendMultiAction).
596      * This is done on a separate thread because we don't want to wait on user thread for
597      * our asynchronous call, and usually we have to wait before making replica calls.
598      */
599     private final class ReplicaCallIssuingRunnable implements Runnable {
600       private final long startTime;
601       private final List<Action<Row>> initialActions;
602 
603       public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
604         this.initialActions = initialActions;
605         this.startTime = startTime;
606       }
607 
608       @Override
609       public void run() {
610         boolean done = false;
611         if (primaryCallTimeoutMicroseconds > 0) {
612           try {
613             done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
614           } catch (InterruptedException ex) {
615             LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
616             return;
617           }
618         }
619         if (done) return; // Done within primary timeout
620         Map<ServerName, MultiAction<Row>> actionsByServer =
621             new HashMap<ServerName, MultiAction<Row>>();
622         List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
623         if (replicaGetIndices == null) {
624           for (int i = 0; i < results.length; ++i) {
625             addReplicaActions(i, actionsByServer, unknownLocActions);
626           }
627         } else {
628           for (int replicaGetIndice : replicaGetIndices) {
629             addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
630           }
631         }
632         if (!actionsByServer.isEmpty()) {
633           sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
634         }
635         if (!unknownLocActions.isEmpty()) {
636           actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
637           for (Action<Row> action : unknownLocActions) {
638             addReplicaActionsAgain(action, actionsByServer);
639           }
640           // Some actions may have completely failed, they are handled inside addAgain.
641           if (!actionsByServer.isEmpty()) {
642             sendMultiAction(actionsByServer, 1, null, true);
643           }
644         }
645       }
646 
647       /**
648        * Add replica actions to action map by server.
649        * @param index Index of the original action.
650        * @param actionsByServer The map by server to add it to.
651        */
652       private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
653           List<Action<Row>> unknownReplicaActions) {
654         if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
655         Action<Row> action = initialActions.get(index);
656         RegionLocations loc = findAllLocationsOrFail(action, true);
657         if (loc == null) return;
658         HRegionLocation[] locs = loc.getRegionLocations();
659         if (locs.length == 1) {
660           LOG.warn("No replicas found for " + action.getAction());
661           return;
662         }
663         synchronized (replicaResultLock) {
664           // Don't run replica calls if the original has finished. We could do it e.g. if
665           // original has already failed before first replica call (unlikely given retries),
666           // but that would require additional synchronization w.r.t. returning to caller.
667           if (results[index] != null) return;
668           // We set the number of calls here. After that any path must call setResult/setError.
669           // True even for replicas that are not found - if we refuse to send we MUST set error.
670           results[index] = new ReplicaResultState(locs.length);
671         }
672         for (int i = 1; i < locs.length; ++i) {
673           Action<Row> replicaAction = new Action<Row>(action, i);
674           if (locs[i] != null) {
675             addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
676                 replicaAction, actionsByServer, nonceGroup);
677           } else {
678             unknownReplicaActions.add(replicaAction);
679           }
680         }
681       }
682 
683       private void addReplicaActionsAgain(
684           Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
685         if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
686           throw new AssertionError("Cannot have default replica here");
687         }
688         HRegionLocation loc = getReplicaLocationOrFail(action);
689         if (loc == null) return;
690         addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
691             action, actionsByServer, nonceGroup);
692       }
693     }
694 
695     /**
696      * Runnable (that can be submitted to thread pool) that submits MultiAction to a
697      * single server. The server call is synchronous, therefore we do it on a thread pool.
698      */
699     private final class SingleServerRequestRunnable implements Runnable {
700       private final MultiAction<Row> multiAction;
701       private final int numAttempt;
702       private final ServerName server;
703       private final Set<MultiServerCallable<Row>> callsInProgress;
704 
705       private SingleServerRequestRunnable(
706           MultiAction<Row> multiAction, int numAttempt, ServerName server,
707           Set<MultiServerCallable<Row>> callsInProgress) {
708         this.multiAction = multiAction;
709         this.numAttempt = numAttempt;
710         this.server = server;
711         this.callsInProgress = callsInProgress;
712       }
713 
714       @Override
715       public void run() {
716         MultiResponse res;
717         MultiServerCallable<Row> callable = null;
718         try {
719           callable = createCallable(server, tableName, multiAction);
720           try {
721             RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
722             if (callsInProgress != null) callsInProgress.add(callable);
723             res = caller.callWithoutRetries(callable, timeout);
724 
725             if (res == null) {
726               // Cancelled
727               return;
728             }
729 
730           } catch (IOException e) {
731             // The service itself failed . It may be an error coming from the communication
732             //   layer, but, as well, a functional error raised by the server.
733             receiveGlobalFailure(multiAction, server, numAttempt, e);
734             return;
735           } catch (Throwable t) {
736             // This should not happen. Let's log & retry anyway.
737             LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
738                 " Retrying. Server is " + server + ", tableName=" + tableName, t);
739             receiveGlobalFailure(multiAction, server, numAttempt, t);
740             return;
741           }
742 
743           // Normal case: we received an answer from the server, and it's not an exception.
744           receiveMultiAction(multiAction, server, res, numAttempt);
745         } catch (Throwable t) {
746               // Something really bad happened. We are on the send thread that will now die.
747               LOG.error("Internal AsyncProcess #" + id + " error for "
748                   + tableName + " processing for " + server, t);
749               throw new RuntimeException(t);
750         } finally {
751           decTaskCounters(multiAction.getRegions(), server);
752           if (callsInProgress != null && callable != null) {
753             callsInProgress.remove(callable);
754           }
755         }
756       }
757     }
758 
759     private final Batch.Callback<CResult> callback;
760     private final BatchErrors errors;
761     private final ConnectionManager.ServerErrorTracker errorsByServer;
762     private final ExecutorService pool;
763     private final Set<MultiServerCallable<Row>> callsInProgress;
764 
765 
766     private final TableName tableName;
767     private final AtomicLong actionsInProgress = new AtomicLong(-1);
768     /** The lock controls access to results. It is only held when populating results where
769      * there might be several callers (eventual consistency gets). For other requests,
770      * there's one unique call going on per result index. */
771     private final Object replicaResultLock = new Object();
772     /** Result array.  Null if results are not needed. Otherwise, each index corresponds to
773      * the action index in initial actions submitted. For most request types, has null-s for
774      * requests that are not done, and result/exception for those that are done.
775      * For eventual-consistency gets, initially the same applies; at some point, replica calls
776      * might be started, and ReplicaResultState is put at the corresponding indices. The
777      * returning calls check the type to detect when this is the case. After all calls are done,
778      * ReplicaResultState-s are replaced with results for the user. */
779     private final Object[] results;
780     /** Indices of replica gets in results. If null, all or no actions are replica-gets. */
781     private final int[] replicaGetIndices;
782     private final boolean hasAnyReplicaGets;
783     private final long nonceGroup;
784 
785     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
786         ExecutorService pool, boolean needResults, Object[] results,
787         Batch.Callback<CResult> callback) {
788       this.pool = pool;
789       this.callback = callback;
790       this.nonceGroup = nonceGroup;
791       this.tableName = tableName;
792       this.actionsInProgress.set(actions.size());
793       if (results != null) {
794         assert needResults;
795         if (results.length != actions.size()) throw new AssertionError("results.length");
796         this.results = results;
797         for (int i = 0; i != this.results.length; ++i) {
798           results[i] = null;
799         }
800       } else {
801         this.results = needResults ? new Object[actions.size()] : null;
802       }
803       List<Integer> replicaGetIndices = null;
804       boolean hasAnyReplicaGets = false;
805       if (needResults) {
806         // Check to see if any requests might require replica calls.
807         // We expect that many requests will consist of all or no multi-replica gets; in such
808         // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
809         // store the list of action indexes for which replica gets are possible, and set
810         // hasAnyReplicaGets to true.
811         boolean hasAnyNonReplicaReqs = false;
812         int posInList = 0;
813         for (Action<Row> action : actions) {
814           boolean isReplicaGet = isReplicaGet(action.getAction());
815           if (isReplicaGet) {
816             hasAnyReplicaGets = true;
817             if (hasAnyNonReplicaReqs) { // Mixed case
818               if (replicaGetIndices == null) {
819                 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
820               }
821               replicaGetIndices.add(posInList);
822             }
823           } else if (!hasAnyNonReplicaReqs) {
824             // The first non-multi-replica request in the action list.
825             hasAnyNonReplicaReqs = true;
826             if (posInList > 0) {
827               // Add all the previous requests to the index lists. We know they are all
828               // replica-gets because this is the first non-multi-replica request in the list.
829               replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
830               for (int i = 0; i < posInList; ++i) {
831                 replicaGetIndices.add(i);
832               }
833             }
834           }
835           ++posInList;
836         }
837       }
838       this.hasAnyReplicaGets = hasAnyReplicaGets;
839       if (replicaGetIndices != null) {
840         this.replicaGetIndices = new int[replicaGetIndices.size()];
841         int i = 0;
842         for (Integer el : replicaGetIndices) {
843           this.replicaGetIndices[i++] = el;
844         }
845       } else {
846         this.replicaGetIndices = null;
847       }
848       this.callsInProgress = !hasAnyReplicaGets ? null :
849           Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
850 
851       this.errorsByServer = createServerErrorTracker();
852       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
853     }
854 
855     public Set<MultiServerCallable<Row>> getCallsInProgress() {
856       return callsInProgress;
857     }
858 
859     /**
860      * Group a list of actions per region servers, and send them.
861      *
862      * @param currentActions - the list of row to submit
863      * @param numAttempt - the current numAttempt (first attempt is 1)
864      */
865     private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
866       Map<ServerName, MultiAction<Row>> actionsByServer =
867           new HashMap<ServerName, MultiAction<Row>>();
868 
869       boolean isReplica = false;
870       List<Action<Row>> unknownReplicaActions = null;
871       for (Action<Row> action : currentActions) {
872         RegionLocations locs = findAllLocationsOrFail(action, true);
873         if (locs == null) continue;
874         boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
875         if (isReplica && !isReplicaAction) {
876           // This is the property of the current implementation, not a requirement.
877           throw new AssertionError("Replica and non-replica actions in the same retry");
878         }
879         isReplica = isReplicaAction;
880         HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
881         if (loc == null || loc.getServerName() == null) {
882           if (isReplica) {
883             if (unknownReplicaActions == null) {
884               unknownReplicaActions = new ArrayList<Action<Row>>();
885             }
886             unknownReplicaActions.add(action);
887           } else {
888             // TODO: relies on primary location always being fetched
889             manageLocationError(action, null);
890           }
891         } else {
892           byte[] regionName = loc.getRegionInfo().getRegionName();
893           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
894         }
895       }
896       boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
897       boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
898 
899       if (!actionsByServer.isEmpty()) {
900         // If this is a first attempt to group and send, no replicas, we need replica thread.
901         sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
902             ? currentActions : null, numAttempt > 1 && !hasUnknown);
903       }
904 
905       if (hasUnknown) {
906         actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
907         for (Action<Row> action : unknownReplicaActions) {
908           HRegionLocation loc = getReplicaLocationOrFail(action);
909           if (loc == null) continue;
910           byte[] regionName = loc.getRegionInfo().getRegionName();
911           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
912         }
913         if (!actionsByServer.isEmpty()) {
914           sendMultiAction(
915               actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
916         }
917       }
918     }
919 
920     private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
921       // We are going to try get location once again. For each action, we'll do it once
922       // from cache, because the previous calls in the loop might populate it.
923       int replicaId = action.getReplicaId();
924       RegionLocations locs = findAllLocationsOrFail(action, true);
925       if (locs == null) return null; // manageError already called
926       HRegionLocation loc = locs.getRegionLocation(replicaId);
927       if (loc == null || loc.getServerName() == null) {
928         locs = findAllLocationsOrFail(action, false);
929         if (locs == null) return null; // manageError already called
930         loc = locs.getRegionLocation(replicaId);
931       }
932       if (loc == null || loc.getServerName() == null) {
933         manageLocationError(action, null);
934         return null;
935       }
936       return loc;
937     }
938 
939     private void manageLocationError(Action<Row> action, Exception ex) {
940       String msg = "Cannot get replica " + action.getReplicaId()
941           + " location for " + action.getAction();
942       LOG.error(msg);
943       if (ex == null) {
944         ex = new IOException(msg);
945       }
946       manageError(action.getOriginalIndex(), action.getAction(),
947           Retry.NO_LOCATION_PROBLEM, ex, null);
948     }
949 
950     private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
951       if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
952           ", row cannot be null");
953       RegionLocations loc = null;
954       try {
955         loc = connection.locateRegion(
956             tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
957       } catch (IOException ex) {
958         manageLocationError(action, ex);
959       }
960       return loc;
961     }
962 
963     /**
964      * Send a multi action structure to the servers, after a delay depending on the attempt
965      * number. Asynchronous.
966      *
967      * @param actionsByServer the actions structured by regions
968      * @param numAttempt the attempt number.
969      * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
970      */
971     private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
972         int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
973       // Run the last item on the same thread if we are already on a send thread.
974       // We hope most of the time it will be the only item, so we can cut down on threads.
975       int actionsRemaining = actionsByServer.size();
976       // This iteration is by server (the HRegionLocation comparator is by server portion only).
977       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
978         ServerName server = e.getKey();
979         MultiAction<Row> multiAction = e.getValue();
980         incTaskCounters(multiAction.getRegions(), server);
981         Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
982             numAttempt);
983         // make sure we correctly count the number of runnables before we try to reuse the send
984         // thread, in case we had to split the request into different runnables because of backoff
985         if (runnables.size() > actionsRemaining) {
986           actionsRemaining = runnables.size();
987         }
988 
989         // run all the runnables
990         for (Runnable runnable : runnables) {
991           if ((--actionsRemaining == 0) && reuseThread) {
992             runnable.run();
993           } else {
994             try {
995               pool.submit(runnable);
996             } catch (Throwable t) {
997               if (t instanceof RejectedExecutionException) {
998                 // This should never happen. But as the pool is provided by the end user,
999                // let's secure this a little.
1000                LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1001                   " Server is " + server.getServerName(), t);
1002               } else {
1003                 // see #HBASE-14359 for more details
1004                 LOG.warn("Caught unexpected exception/error: ", t);
1005               }
1006               decTaskCounters(multiAction.getRegions(), server);
1007               // We're likely to fail again, but this will increment the attempt counter,
1008              // so it will finish.
1009               receiveGlobalFailure(multiAction, server, numAttempt, t);
1010             }
1011           }
1012         }
1013       }
1014 
1015       if (actionsForReplicaThread != null) {
1016         startWaitingForReplicaCalls(actionsForReplicaThread);
1017       }
1018     }
1019 
1020     private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1021         MultiAction<Row> multiAction,
1022         int numAttempt) {
1023       // no stats to manage, just do the standard action
1024       if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1025         return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1026             new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1027       }
1028 
1029       // group the actions by the amount of delay
1030       Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1031           .size());
1032 
1033       // split up the actions
1034       for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1035         Long backoff = getBackoff(server, e.getKey());
1036         DelayingRunner runner = actions.get(backoff);
1037         if (runner == null) {
1038           actions.put(backoff, new DelayingRunner(backoff, e));
1039         } else {
1040           runner.add(e);
1041         }
1042       }
1043 
1044       List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1045       for (DelayingRunner runner : actions.values()) {
1046         String traceText = "AsyncProcess.sendMultiAction";
1047         Runnable runnable =
1048             new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1049                 callsInProgress);
1050         // use a delay runner only if we need to sleep for some time
1051         if (runner.getSleepTime() > 0) {
1052           runner.setRunner(runnable);
1053           traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1054           runnable = runner;
1055         }
1056         runnable = Trace.wrap(traceText, runnable);
1057         toReturn.add(runnable);
1058 
1059       }
1060       return toReturn;
1061     }
1062 
1063     /**
1064      * @param server server location where the target region is hosted
1065      * @param regionName name of the region which we are going to write some data
1066      * @return the amount of time the client should wait until it submit a request to the
1067      * specified server and region
1068      */
1069     private Long getBackoff(ServerName server, byte[] regionName) {
1070       ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1071       ServerStatistics stats = tracker.getStats(server);
1072       return AsyncProcess.this.connection.getBackoffPolicy()
1073           .getBackoffTime(server, regionName, stats);
1074     }
1075 
1076     /**
1077      * Starts waiting to issue replica calls on a different thread; or issues them immediately.
1078      */
1079     private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1080       long startTime = EnvironmentEdgeManager.currentTime();
1081       ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1082           actionsForReplicaThread, startTime);
1083       if (primaryCallTimeoutMicroseconds == 0) {
1084         // Start replica calls immediately.
1085         replicaRunnable.run();
1086       } else {
1087         // Start the thread that may kick off replica gets.
1088         // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
1089         try {
1090           pool.submit(replicaRunnable);
1091         } catch (RejectedExecutionException ree) {
1092           LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1093         }
1094       }
1095     }
1096 
1097     /**
1098      * Check that we can retry acts accordingly: logs, set the error status.
1099      *
1100      * @param originalIndex the position in the list sent
1101      * @param row           the row
1102      * @param canRetry      if false, we won't retry whatever the settings.
1103      * @param throwable     the throwable, if any (can be null)
1104      * @param server        the location, if any (can be null)
1105      * @return true if the action can be retried, false otherwise.
1106      */
1107     public Retry manageError(int originalIndex, Row row, Retry canRetry,
1108                                 Throwable throwable, ServerName server) {
1109       if (canRetry == Retry.YES
1110           && throwable != null && throwable instanceof DoNotRetryIOException) {
1111         canRetry = Retry.NO_NOT_RETRIABLE;
1112       }
1113 
1114       if (canRetry != Retry.YES) {
1115         // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
1116         setError(originalIndex, row, throwable, server);
1117       } else if (isActionComplete(originalIndex, row)) {
1118         canRetry = Retry.NO_OTHER_SUCCEEDED;
1119       }
1120       return canRetry;
1121     }
1122 
1123     /**
1124      * Resubmit all the actions from this multiaction after a failure.
1125      *
1126      * @param rsActions  the actions still to do from the initial list
1127      * @param server   the destination
1128      * @param numAttempt the number of attempts so far
1129      * @param t the throwable (if any) that caused the resubmit
1130      */
1131     private void receiveGlobalFailure(
1132         MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1133       errorsByServer.reportServerError(server);
1134       Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1135           ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1136 
1137       if (tableName == null) {
1138         // tableName is null when we made a cross-table RPC call.
1139         connection.clearCaches(server);
1140       }
1141       int failed = 0, stopped = 0;
1142       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1143       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1144         byte[] regionName = e.getKey();
1145         byte[] row = e.getValue().iterator().next().getAction().getRow();
1146         // Do not use the exception for updating cache because it might be coming from
1147         // any of the regions in the MultiAction.
1148         // TODO: depending on type of exception we might not want to update cache at all?
1149         if (tableName != null) {
1150           connection.updateCachedLocations(tableName, regionName, row, null, server);
1151         }
1152         for (Action<Row> action : e.getValue()) {
1153           Retry retry = manageError(
1154               action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1155           if (retry == Retry.YES) {
1156             toReplay.add(action);
1157           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1158             ++stopped;
1159           } else {
1160             ++failed;
1161           }
1162         }
1163       }
1164 
1165       if (toReplay.isEmpty()) {
1166         logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1167       } else {
1168         resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1169       }
1170     }
1171 
1172     /**
1173      * Log as much info as possible, and, if there is something to replay,
1174      * submit it again after a back off sleep.
1175      */
1176     private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1177         int numAttempt, int failureCount, Throwable throwable) {
1178       // We have something to replay. We're going to sleep a little before.
1179 
1180       // We have two contradicting needs here:
1181       //  1) We want to get the new location after having slept, as it may change.
1182       //  2) We want to take into account the location when calculating the sleep time.
1183       // It should be possible to have some heuristics to take the right decision. Short term,
1184       //  we go for one.
1185       long backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause);
1186       if (numAttempt > startLogErrorsCnt) {
1187         // We use this value to have some logs when we have multiple failures, but not too many
1188         //  logs, as errors are to be expected when a region moves, splits and so on
1189         LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1190             oldServer, throwable, backOffTime, true, null, -1, -1));
1191       }
1192 
1193       try {
1194         Thread.sleep(backOffTime);
1195       } catch (InterruptedException e) {
1196         LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1197         Thread.currentThread().interrupt();
1198         return;
1199       }
1200 
1201       groupAndSendMultiAction(toReplay, numAttempt + 1);
1202     }
1203 
1204     private void logNoResubmit(ServerName oldServer, int numAttempt,
1205         int failureCount, Throwable throwable, int failed, int stopped) {
1206       if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1207         String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1208         String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1209             throwable, -1, false, timeStr, failed, stopped);
1210         if (failed != 0) {
1211           // Only log final failures as warning
1212           LOG.warn(logMessage);
1213         } else {
1214           LOG.info(logMessage);
1215         }
1216       }
1217     }
1218 
1219     /**
1220      * Called when we receive the result of a server query.
1221      *
1222      * @param multiAction    - the multiAction we sent
1223      * @param server       - the location. It's used as a server name.
1224      * @param responses      - the response, if any
1225      * @param numAttempt     - the attempt
1226      */
1227     private void receiveMultiAction(MultiAction<Row> multiAction,
1228         ServerName server, MultiResponse responses, int numAttempt) {
1229        assert responses != null;
1230 
1231       // Success or partial success
1232       // Analyze detailed results. We can still have individual failures to be redo.
1233       // two specific throwables are managed:
1234       //  - DoNotRetryIOException: we continue to retry for other actions
1235       //  - RegionMovedException: we update the cache with the new region location
1236 
1237       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1238       Throwable throwable = null;
1239       int failureCount = 0;
1240       boolean canRetry = true;
1241 
1242       // Go by original action.
1243       int failed = 0, stopped = 0;
1244       for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1245         byte[] regionName = regionEntry.getKey();
1246         Map<Integer, Object> regionResults = responses.getResults().get(regionName);
1247         if (regionResults == null) {
1248           if (!responses.getExceptions().containsKey(regionName)) {
1249             LOG.error("Server sent us neither results nor exceptions for "
1250                 + Bytes.toStringBinary(regionName));
1251             responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
1252           }
1253           continue;
1254         }
1255         boolean regionFailureRegistered = false;
1256         for (Action<Row> sentAction : regionEntry.getValue()) {
1257           Object result = regionResults.get(sentAction.getOriginalIndex());
1258           // Failure: retry if it's make sense else update the errors lists
1259           if (result == null || result instanceof Throwable) {
1260             Row row = sentAction.getAction();
1261             // Register corresponding failures once per server/once per region.
1262             if (!regionFailureRegistered) {
1263               regionFailureRegistered = true;
1264               connection.updateCachedLocations(
1265                   tableName, regionName, row.getRow(), result, server);
1266             }
1267             if (failureCount == 0) {
1268               errorsByServer.reportServerError(server);
1269               // We determine canRetry only once for all calls, after reporting server failure.
1270               canRetry = errorsByServer.canRetryMore(numAttempt);
1271             }
1272             ++failureCount;
1273             Retry retry = manageError(sentAction.getOriginalIndex(), row,
1274                 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
1275             if (retry == Retry.YES) {
1276               toReplay.add(sentAction);
1277             } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1278               ++stopped;
1279             } else {
1280               ++failed;
1281             }
1282           } else {
1283             // update the stats about the region, if its a user table. We don't want to slow down
1284             // updates to meta tables, especially from internal updates (master, etc).
1285             if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1286               result = ResultStatsUtil.updateStats(result,
1287                   AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1288             }
1289 
1290             if (callback != null) {
1291               try {
1292                 //noinspection unchecked
1293                 // TODO: would callback expect a replica region name if it gets one?
1294                 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1295               } catch (Throwable t) {
1296                 LOG.error("User callback threw an exception for "
1297                     + Bytes.toStringBinary(regionName) + ", ignoring", t);
1298               }
1299             }
1300             setResult(sentAction, result);
1301           }
1302         }
1303       }
1304 
1305       // The failures global to a region. We will use for multiAction we sent previously to find the
1306       //   actions to replay.
1307       for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
1308         throwable = throwableEntry.getValue();
1309         byte[] region = throwableEntry.getKey();
1310         List<Action<Row>> actions = multiAction.actions.get(region);
1311         if (actions == null || actions.isEmpty()) {
1312           throw new IllegalStateException("Wrong response for the region: " +
1313               HRegionInfo.encodeRegionName(region));
1314         }
1315 
1316         if (failureCount == 0) {
1317           errorsByServer.reportServerError(server);
1318           canRetry = errorsByServer.canRetryMore(numAttempt);
1319         }
1320         connection.updateCachedLocations(
1321             tableName, region, actions.get(0).getAction().getRow(), throwable, server);
1322         failureCount += actions.size();
1323 
1324         for (Action<Row> action : actions) {
1325           Row row = action.getAction();
1326           Retry retry = manageError(action.getOriginalIndex(), row,
1327               canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
1328           if (retry == Retry.YES) {
1329             toReplay.add(action);
1330           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1331             ++stopped;
1332           } else {
1333             ++failed;
1334           }
1335         }
1336       }
1337 
1338       if (toReplay.isEmpty()) {
1339         logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1340       } else {
1341         resubmit(server, toReplay, numAttempt, failureCount, throwable);
1342       }
1343     }
1344 
1345     private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1346         Throwable error, long backOffTime, boolean willRetry, String startTime,
1347         int failed, int stopped) {
1348       StringBuilder sb = new StringBuilder();
1349       sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1350         .append("attempt=").append(numAttempt)
1351         .append("/").append(numTries).append(" ");
1352 
1353       if (failureCount > 0 || error != null){
1354         sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1355             append(error == null ? "null" : error);
1356       } else {
1357         sb.append("succeeded");
1358       }
1359 
1360       sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1361 
1362       if (willRetry) {
1363         sb.append(", retrying after=").append(backOffTime).append("ms").
1364             append(", replay=").append(replaySize).append("ops");
1365       } else if (failureCount > 0) {
1366         if (stopped > 0) {
1367           sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1368         }
1369         if (failed > 0) {
1370           sb.append("; not retrying ").append(failed).append(" - final failure");
1371         }
1372 
1373       }
1374 
1375       return sb.toString();
1376     }
1377 
1378     /**
1379      * Sets the non-error result from a particular action.
1380      * @param action Action (request) that the server responded to.
1381      * @param result The result.
1382      */
1383     private void setResult(Action<Row> action, Object result) {
1384       if (result == null) {
1385         throw new RuntimeException("Result cannot be null");
1386       }
1387       ReplicaResultState state = null;
1388       boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1389       int index = action.getOriginalIndex();
1390       if (results == null) {
1391          decActionCounter(index);
1392          return; // Simple case, no replica requests.
1393       } else if ((state = trySetResultSimple(
1394           index, action.getAction(), false, result, null, isStale)) == null) {
1395         return; // Simple case, no replica requests.
1396       }
1397       assert state != null;
1398       // At this point we know that state is set to replica tracking class.
1399       // It could be that someone else is also looking at it; however, we know there can
1400       // only be one state object, and only one thread can set callCount to 0. Other threads
1401       // will either see state with callCount 0 after locking it; or will not see state at all
1402       // we will replace it with the result.
1403       synchronized (state) {
1404         if (state.callCount == 0) return; // someone already set the result
1405         state.callCount = 0;
1406       }
1407       synchronized (replicaResultLock) {
1408         if (results[index] != state) {
1409           throw new AssertionError("We set the callCount but someone else replaced the result");
1410         }
1411         results[index] = result;
1412       }
1413 
1414       decActionCounter(index);
1415     }
1416 
1417     /**
1418      * Sets the error from a particular action.
1419      * @param index Original action index.
1420      * @param row Original request.
1421      * @param throwable The resulting error.
1422      * @param server The source server.
1423      */
1424     private void setError(int index, Row row, Throwable throwable, ServerName server) {
1425       ReplicaResultState state = null;
1426       if (results == null) {
1427         // Note that we currently cannot have replica requests with null results. So it shouldn't
1428         // happen that multiple replica calls will call dAC for same actions with results == null.
1429         // Only one call per action should be present in this case.
1430         errors.add(throwable, row, server);
1431         decActionCounter(index);
1432         return; // Simple case, no replica requests.
1433       } else if ((state = trySetResultSimple(
1434           index, row, true, throwable, server, false)) == null) {
1435         return; // Simple case, no replica requests.
1436       }
1437       assert state != null;
1438       BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1439       boolean isActionDone = false;
1440       synchronized (state) {
1441         switch (state.callCount) {
1442           case 0: return; // someone already set the result
1443           case 1: { // All calls failed, we are the last error.
1444             target = errors;
1445             isActionDone = true;
1446             break;
1447           }
1448           default: {
1449             assert state.callCount > 1;
1450             if (state.replicaErrors == null) {
1451               state.replicaErrors = new BatchErrors();
1452             }
1453             target = state.replicaErrors;
1454             break;
1455           }
1456         }
1457         --state.callCount;
1458       }
1459       target.add(throwable, row, server);
1460       if (isActionDone) {
1461         if (state.replicaErrors != null) { // last call, no need to lock
1462           errors.merge(state.replicaErrors);
1463         }
1464         // See setResult for explanations.
1465         synchronized (replicaResultLock) {
1466           if (results[index] != state) {
1467             throw new AssertionError("We set the callCount but someone else replaced the result");
1468           }
1469           results[index] = throwable;
1470         }
1471         decActionCounter(index);
1472       }
1473     }
1474 
1475     /**
1476      * Checks if the action is complete; used on error to prevent needless retries.
1477      * Does not synchronize, assuming element index/field accesses are atomic.
1478      * This is an opportunistic optimization check, doesn't have to be strict.
1479      * @param index Original action index.
1480      * @param row Original request.
1481      */
1482     private boolean isActionComplete(int index, Row row) {
1483       if (!isReplicaGet(row)) return false;
1484       Object resObj = results[index];
1485       return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1486           || ((ReplicaResultState)resObj).callCount == 0);
1487     }
1488 
1489     /**
1490      * Tries to set the result or error for a particular action as if there were no replica calls.
1491      * @return null if successful; replica state if there were in fact replica calls.
1492      */
1493     private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1494         Object result, ServerName server, boolean isFromReplica) {
1495       Object resObj = null;
1496       if (!isReplicaGet(row)) {
1497         if (isFromReplica) {
1498           throw new AssertionError("Unexpected stale result for " + row);
1499         }
1500         results[index] = result;
1501       } else {
1502         synchronized (replicaResultLock) {
1503           if ((resObj = results[index]) == null) {
1504             if (isFromReplica) {
1505               throw new AssertionError("Unexpected stale result for " + row);
1506             }
1507             results[index] = result;
1508           }
1509         }
1510       }
1511 
1512       ReplicaResultState rrs =
1513           (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1514       if (rrs == null && isError) {
1515         // The resObj is not replica state (null or already set).
1516         errors.add((Throwable)result, row, server);
1517       }
1518 
1519       if (resObj == null) {
1520         // resObj is null - no replica calls were made.
1521         decActionCounter(index);
1522         return null;
1523       }
1524       return rrs;
1525     }
1526 
1527     private void decActionCounter(int index) {
1528       long actionsRemaining = actionsInProgress.decrementAndGet();
1529       if (actionsRemaining < 0) {
1530         String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1531         throw new AssertionError(error);
1532       } else if (actionsRemaining == 0) {
1533         synchronized (actionsInProgress) {
1534           actionsInProgress.notifyAll();
1535         }
1536       }
1537     }
1538 
1539     private String buildDetailedErrorMsg(String string, int index) {
1540       String error = string + "; called for " + index +
1541           ", actionsInProgress " + actionsInProgress.get() + "; replica gets: ";
1542       if (replicaGetIndices != null) {
1543         for (int i = 0; i < replicaGetIndices.length; ++i) {
1544           error += replicaGetIndices[i] + ", ";
1545         }
1546       } else {
1547         error += (hasAnyReplicaGets ? "all" : "none");
1548       }
1549       error += "; results ";
1550       if (results != null) {
1551         for (int i = 0; i < results.length; ++i) {
1552           Object o = results[i];
1553           error += ((o == null) ? "null" : o.toString()) + ", ";
1554         }
1555       }
1556       return error;
1557     }
1558 
1559     @Override
1560     public void waitUntilDone() throws InterruptedIOException {
1561       try {
1562         waitUntilDone(Long.MAX_VALUE);
1563       } catch (InterruptedException iex) {
1564         throw new InterruptedIOException(iex.getMessage());
1565       } finally {
1566         if (callsInProgress != null) {
1567           for (MultiServerCallable<Row> clb : callsInProgress) {
1568             clb.cancel();
1569           }
1570         }
1571       }
1572     }
1573 
1574     private boolean waitUntilDone(long cutoff) throws InterruptedException {
1575       boolean hasWait = cutoff != Long.MAX_VALUE;
1576       long lastLog = EnvironmentEdgeManager.currentTime();
1577       long currentInProgress;
1578       while (0 != (currentInProgress = actionsInProgress.get())) {
1579         long now = EnvironmentEdgeManager.currentTime();
1580         if (hasWait && (now * 1000L) > cutoff) {
1581           return false;
1582         }
1583         if (!hasWait) { // Only log if wait is infinite.
1584           if (now > lastLog + 10000) {
1585             lastLog = now;
1586             LOG.info("#" + id + ", waiting for " + currentInProgress
1587                 + "  actions to finish on table: " + tableName);
1588             if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1589               logDetailsOfUndoneTasks(currentInProgress);
1590             }
1591           }
1592         }
1593         synchronized (actionsInProgress) {
1594           if (actionsInProgress.get() == 0) break;
1595           if (!hasWait) {
1596             actionsInProgress.wait(10);
1597           } else {
1598             long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1599             TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1600           }
1601         }
1602       }
1603       return true;
1604     }
1605 
1606     @Override
1607     public boolean hasError() {
1608       return errors.hasErrors();
1609     }
1610 
1611     @Override
1612     public List<? extends Row> getFailedOperations() {
1613       return errors.actions;
1614     }
1615 
1616     @Override
1617     public RetriesExhaustedWithDetailsException getErrors() {
1618       return errors.makeException();
1619     }
1620 
1621     @Override
1622     public Object[] getResults() throws InterruptedIOException {
1623       waitUntilDone();
1624       return results;
1625     }
1626   }
1627 
1628   @VisibleForTesting
1629   /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
1630   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1631       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1632       Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1633     return new AsyncRequestFutureImpl<CResult>(
1634         tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1635   }
1636 
1637   /**
1638    * Create a callable. Isolated to be easily overridden in the tests.
1639    */
1640   @VisibleForTesting
1641   protected MultiServerCallable<Row> createCallable(final ServerName server,
1642       TableName tableName, final MultiAction<Row> multi) {
1643     return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1644   }
1645 
1646   /**
1647    * Create a caller. Isolated to be easily overridden in the tests.
1648    */
1649   @VisibleForTesting
1650   protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1651     return rpcCallerFactory.<MultiResponse> newCaller();
1652   }
1653 
1654   @VisibleForTesting
1655   /** Waits until all outstanding tasks are done. Used in tests. */
1656   void waitUntilDone() throws InterruptedIOException {
1657     waitForMaximumCurrentTasks(0, null);
1658   }
1659 
1660   /** Wait until the async does not have more than max tasks in progress. */
1661   private void waitForMaximumCurrentTasks(int max, String tableName)
1662       throws InterruptedIOException {
1663     waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
1664   }
1665 
1666   // Break out this method so testable
1667   @VisibleForTesting
1668   void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
1669       String tableName) throws InterruptedIOException {
1670     long lastLog = EnvironmentEdgeManager.currentTime();
1671     long currentInProgress, oldInProgress = Long.MAX_VALUE;
1672     while ((currentInProgress = tasksInProgress.get()) > max) {
1673       if (oldInProgress != currentInProgress) { // Wait for in progress to change.
1674         long now = EnvironmentEdgeManager.currentTime();
1675         if (now > lastLog + 10000) {
1676           lastLog = now;
1677           LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1678               + max + ", tasksInProgress=" + currentInProgress +
1679               " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
1680           if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1681             logDetailsOfUndoneTasks(currentInProgress);
1682           }
1683         }
1684       }
1685       oldInProgress = currentInProgress;
1686       try {
1687         synchronized (tasksInProgress) {
1688           if (tasksInProgress.get() == oldInProgress) {
1689             tasksInProgress.wait(10);
1690           }
1691         }
1692       } catch (InterruptedException e) {
1693         throw new InterruptedIOException("#" + id + ", interrupted." +
1694             " currentNumberOfTask=" + currentInProgress);
1695       }
1696     }
1697   }
1698 
1699   private void logDetailsOfUndoneTasks(long taskInProgress) {
1700     ArrayList<ServerName> servers = new ArrayList<ServerName>();
1701     for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
1702       if (entry.getValue().get() > 0) {
1703         servers.add(entry.getKey());
1704       }
1705     }
1706     LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
1707     if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
1708       ArrayList<String> regions = new ArrayList<String>();
1709       for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
1710         if (entry.getValue().get() > 0) {
1711           regions.add(Bytes.toString(entry.getKey()));
1712         }
1713       }
1714       LOG.info("Regions against which left over task(s) are processed: " + regions);
1715     }
1716   }
1717 
1718   /**
1719    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1720    * @return Whether there were any errors in any request since the last time
1721    *          {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created.
1722    */
1723   public boolean hasError() {
1724     return globalErrors.hasErrors();
1725   }
1726 
1727   /**
1728    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1729    * Waits for all previous operations to finish, and returns errors and (optionally)
1730    * failed operations themselves.
1731    * @param failedRows an optional list into which the rows that failed since the last time
1732    *        {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
1733    * @param tableName name of the table
1734    * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
1735    *          was called, or AP was created.
1736    */
1737   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1738       List<Row> failedRows, String tableName) throws InterruptedIOException {
1739     waitForMaximumCurrentTasks(0, tableName);
1740     if (!globalErrors.hasErrors()) {
1741       return null;
1742     }
1743     if (failedRows != null) {
1744       failedRows.addAll(globalErrors.actions);
1745     }
1746     RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1747     globalErrors.clear();
1748     return result;
1749   }
1750 
1751   /**
1752    * increment the tasks counters for a given set of regions. MT safe.
1753    */
1754   protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1755     tasksInProgress.incrementAndGet();
1756 
1757     AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1758     if (serverCnt == null) {
1759       taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1760       serverCnt = taskCounterPerServer.get(sn);
1761     }
1762     serverCnt.incrementAndGet();
1763 
1764     for (byte[] regBytes : regions) {
1765       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1766       if (regionCnt == null) {
1767         regionCnt = new AtomicInteger();
1768         AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1769         if (oldCnt != null) {
1770           regionCnt = oldCnt;
1771         }
1772       }
1773       regionCnt.incrementAndGet();
1774     }
1775   }
1776 
1777   /**
1778    * Decrements the counters for a given region and the region server. MT Safe.
1779    */
1780   protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1781     for (byte[] regBytes : regions) {
1782       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1783       regionCnt.decrementAndGet();
1784     }
1785 
1786     taskCounterPerServer.get(sn).decrementAndGet();
1787     tasksInProgress.decrementAndGet();
1788     synchronized (tasksInProgress) {
1789       tasksInProgress.notifyAll();
1790     }
1791   }
1792 
1793   /**
1794    * Creates the server error tracker to use inside process.
1795    * Currently, to preserve the main assumption about current retries, and to work well with
1796    * the retry-limit-based calculation, the calculation is local per Process object.
1797    * We may benefit from connection-wide tracking of server errors.
1798    * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1799    */
1800   protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1801     return new ConnectionManager.ServerErrorTracker(
1802         this.serverTrackerTimeout, this.numTries);
1803   }
1804 
1805   private static boolean isReplicaGet(Row row) {
1806     return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1807   }
1808 
1809   /**
1810    * For manageError. Only used to make logging more clear, we don't actually care why we don't retry.
1811    */
1812   private enum Retry {
1813     YES,
1814     NO_LOCATION_PROBLEM,
1815     NO_NOT_RETRIABLE,
1816     NO_RETRIES_EXHAUSTED,
1817     NO_OTHER_SUCCEEDED
1818   }
1819 }