1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Date;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.ConcurrentSkipListMap;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.hbase.RetryImmediatelyException;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.DoNotRetryIOException;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.RegionLocations;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
55  import org.apache.hadoop.hbase.client.coprocessor.Batch;
56  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
57  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60  import org.apache.htrace.Trace;
61  
62  import com.google.common.annotations.VisibleForTesting;
63  
64  
65  
66  
67  
68  
69  
70  
71  
72  
73  
74  
75  
76  
77  
78  
79  
80  
81  
82  
83  
84  
85  
86  
87  
88  
89  
90  
91  
92  
93  
94  
95  
96  
97  
98  
99  @InterfaceAudience.Private
100 class AsyncProcess {
101   private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
102   protected static final AtomicLong COUNTER = new AtomicLong();
103 
104   public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
105 
106   
107 
108 
109 
110 
111 
112   public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
113       "hbase.client.start.log.errors.counter";
114   public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
115 
116   
117 
118 
119   public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
120 
121   private final int thresholdToLogUndoneTaskDetails;
122   private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
123       "hbase.client.threshold.log.details";
124   private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
125   private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
126 
127   
128 
129 
130 
131 
132 
133   public static interface AsyncRequestFuture {
134     public boolean hasError();
135     public RetriesExhaustedWithDetailsException getErrors();
136     public List<? extends Row> getFailedOperations();
137     public Object[] getResults() throws InterruptedIOException;
138     
139     public void waitUntilDone() throws InterruptedIOException;
140   }
141 
142   
143 
144 
145   private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
146 
147     final Object[] result = new Object[0];
148 
149     @Override
150     public boolean hasError() {
151       return false;
152     }
153 
154     @Override
155     public RetriesExhaustedWithDetailsException getErrors() {
156       return null;
157     }
158 
159     @Override
160     public List<? extends Row> getFailedOperations() {
161       return null;
162     }
163 
164     @Override
165     public Object[] getResults() {
166       return result;
167     }
168 
169     @Override
170     public void waitUntilDone() throws InterruptedIOException {
171     }
172   };
173 
174   
175 
176 
177 
178   private static class ReplicaResultState {
179     public ReplicaResultState(int callCount) {
180       this.callCount = callCount;
181     }
182 
183     
184     int callCount;
185     
186 
187     BatchErrors replicaErrors = null;
188 
189     @Override
190     public String toString() {
191       return "[call count " + callCount + "; errors " + replicaErrors + "]";
192     }
193   }
194 
195 
196   
197   protected final long id;
198 
199   protected final ClusterConnection connection;
200   protected final RpcRetryingCallerFactory rpcCallerFactory;
201   protected final RpcControllerFactory rpcFactory;
202   protected final BatchErrors globalErrors;
203   protected final ExecutorService pool;
204 
205   protected final AtomicLong tasksInProgress = new AtomicLong(0);
206   protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
207       new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
208   protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
209       new ConcurrentHashMap<ServerName, AtomicInteger>();
210 
211   
212   private final int startLogErrorsCnt;
213 
214   
215 
216 
217   protected final int maxTotalConcurrentTasks;
218 
219   
220 
221 
222 
223 
224 
225   protected final int maxConcurrentTasksPerRegion;
226 
227   
228 
229 
230   protected final int maxConcurrentTasksPerServer;
231   protected final long pause;
232   protected int numTries;
233   protected int serverTrackerTimeout;
234   protected int timeout;
235   protected long primaryCallTimeoutMicroseconds;
236   
237 
238   protected static class BatchErrors {
239     private final List<Throwable> throwables = new ArrayList<Throwable>();
240     private final List<Row> actions = new ArrayList<Row>();
241     private final List<String> addresses = new ArrayList<String>();
242 
243     public synchronized void add(Throwable ex, Row row, ServerName serverName) {
244       if (row == null){
245         throw new IllegalArgumentException("row cannot be null. location=" + serverName);
246       }
247 
248       throwables.add(ex);
249       actions.add(row);
250       addresses.add(serverName != null ? serverName.toString() : "null");
251     }
252 
253     public boolean hasErrors() {
254       return !throwables.isEmpty();
255     }
256 
257     private synchronized RetriesExhaustedWithDetailsException makeException() {
258       return new RetriesExhaustedWithDetailsException(
259           new ArrayList<Throwable>(throwables),
260           new ArrayList<Row>(actions), new ArrayList<String>(addresses));
261     }
262 
263     public synchronized void clear() {
264       throwables.clear();
265       actions.clear();
266       addresses.clear();
267     }
268 
269     public synchronized void merge(BatchErrors other) {
270       throwables.addAll(other.throwables);
271       actions.addAll(other.actions);
272       addresses.addAll(other.addresses);
273     }
274   }
275 
276   public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
277       RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
278     if (hc == null) {
279       throw new IllegalArgumentException("HConnection cannot be null.");
280     }
281 
282     this.connection = hc;
283     this.pool = pool;
284     this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
285 
286     this.id = COUNTER.incrementAndGet();
287 
288     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
289         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
290     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
291         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
292     this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
293         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
294     this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
295 
296     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
297       HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
298     this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
299           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
300     this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
301           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
302 
303     this.startLogErrorsCnt =
304         conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
305 
306     if (this.maxTotalConcurrentTasks <= 0) {
307       throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
308     }
309     if (this.maxConcurrentTasksPerServer <= 0) {
310       throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
311           maxConcurrentTasksPerServer);
312     }
313     if (this.maxConcurrentTasksPerRegion <= 0) {
314       throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
315           maxConcurrentTasksPerRegion);
316     }
317 
318     
319     
320     
321     
322     
323     
324     
325     this.serverTrackerTimeout = 0;
326     for (int i = 0; i < this.numTries; ++i) {
327       serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
328     }
329 
330     this.rpcCallerFactory = rpcCaller;
331     this.rpcFactory = rpcFactory;
332 
333     this.thresholdToLogUndoneTaskDetails =
334         conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
335           DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
336   }
337 
338   
339 
340 
341 
342   private ExecutorService getPool(ExecutorService pool) {
343     if (pool != null) {
344       return pool;
345     }
346     if (this.pool != null) {
347       return this.pool;
348     }
349     throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
350   }
351 
352   
353 
354 
355 
356   public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
357       boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
358       throws InterruptedIOException {
359     return submit(null, tableName, rows, atLeastOne, callback, needResults);
360   }
361 
362   
363 
364 
365 
366 
367 
368 
369 
370 
371 
372 
373 
374   public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
375       List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
376       boolean needResults) throws InterruptedIOException {
377     if (rows.isEmpty()) {
378       return NO_REQS_RESULT;
379     }
380 
381     Map<ServerName, MultiAction<Row>> actionsByServer =
382         new HashMap<ServerName, MultiAction<Row>>();
383     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
384 
385     NonceGenerator ng = this.connection.getNonceGenerator();
386     long nonceGroup = ng.getNonceGroup(); 
387 
388     
389     List<Exception> locationErrors = null;
390     List<Integer> locationErrorRows = null;
391     do {
392       
393       waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
394 
395       
396       
397       Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
398       Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
399 
400       int posInList = -1;
401       Iterator<? extends Row> it = rows.iterator();
402       while (it.hasNext()) {
403         Row r = it.next();
404         HRegionLocation loc;
405         try {
406           if (r == null) {
407             throw new IllegalArgumentException("#" + id + ", row cannot be null");
408           }
409           
410           RegionLocations locs = connection.locateRegion(
411               tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
412           if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
413             throw new IOException("#" + id + ", no location found, aborting submit for"
414                 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
415           }
416           loc = locs.getDefaultRegionLocation();
417         } catch (IOException ex) {
418           locationErrors = new ArrayList<Exception>();
419           locationErrorRows = new ArrayList<Integer>();
420           LOG.error("Failed to get region location ", ex);
421           
422           
423           retainedActions.add(new Action<Row>(r, ++posInList));
424           locationErrors.add(ex);
425           locationErrorRows.add(posInList);
426           it.remove();
427           break; 
428         }
429 
430         if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
431           Action<Row> action = new Action<Row>(r, ++posInList);
432           setNonce(ng, r, action);
433           retainedActions.add(action);
434           
435           byte[] regionName = loc.getRegionInfo().getRegionName();
436           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
437           it.remove();
438         }
439       }
440     } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
441 
442     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
443 
444     return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
445       locationErrors, locationErrorRows, actionsByServer, pool);
446   }
447 
448   <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
449       List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
450       Object[] results, boolean needResults, List<Exception> locationErrors,
451       List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
452       ExecutorService pool) {
453     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
454       tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
455     
456     if (locationErrors != null) {
457       for (int i = 0; i < locationErrors.size(); ++i) {
458         int originalIndex = locationErrorRows.get(i);
459         Row row = retainedActions.get(originalIndex).getAction();
460         ars.manageError(originalIndex, row,
461           Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
462       }
463     }
464     ars.sendMultiAction(actionsByServer, 1, null, false);
465     return ars;
466   }
467 
468   
469 
470 
471 
472 
473 
474 
475 
476   private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
477       Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
478     MultiAction<Row> multiAction = actionsByServer.get(server);
479     if (multiAction == null) {
480       multiAction = new MultiAction<Row>();
481       actionsByServer.put(server, multiAction);
482     }
483     if (action.hasNonce() && !multiAction.hasNonceGroup()) {
484       multiAction.setNonceGroup(nonceGroup);
485     }
486 
487     multiAction.add(regionName, action);
488   }
489 
490   
491 
492 
493 
494 
495 
496 
497 
498   protected boolean canTakeOperation(HRegionLocation loc,
499                                      Map<HRegionInfo, Boolean> regionsIncluded,
500                                      Map<ServerName, Boolean> serversIncluded) {
501     HRegionInfo regionInfo = loc.getRegionInfo();
502     Boolean regionPrevious = regionsIncluded.get(regionInfo);
503 
504     if (regionPrevious != null) {
505       
506       return regionPrevious;
507     }
508 
509     Boolean serverPrevious = serversIncluded.get(loc.getServerName());
510     if (Boolean.FALSE.equals(serverPrevious)) {
511       
512       regionsIncluded.put(regionInfo, Boolean.FALSE);
513       return false;
514     }
515 
516     AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
517     if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
518       
519       regionsIncluded.put(regionInfo, Boolean.FALSE);
520       return false;
521     }
522 
523     if (serverPrevious == null) {
524       
525       int newServers = 0; 
526       for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
527         if (kv.getValue()) {
528           newServers++;
529         }
530       }
531 
532       
533       boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
534 
535       if (ok) {
536         
537         AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
538         ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
539       }
540 
541       if (!ok) {
542         regionsIncluded.put(regionInfo, Boolean.FALSE);
543         serversIncluded.put(loc.getServerName(), Boolean.FALSE);
544         return false;
545       }
546 
547       serversIncluded.put(loc.getServerName(), Boolean.TRUE);
548     } else {
549       assert serverPrevious.equals(Boolean.TRUE);
550     }
551 
552     regionsIncluded.put(regionInfo, Boolean.TRUE);
553 
554     return true;
555   }
556 
557   
558 
559 
560 
561   public <CResult> AsyncRequestFuture submitAll(TableName tableName,
562       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
563     return submitAll(null, tableName, rows, callback, results);
564   }
565 
566   
567 
568 
569 
570 
571 
572 
573 
574 
575 
576   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
577       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
578     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
579 
580     
581     int posInList = -1;
582     NonceGenerator ng = this.connection.getNonceGenerator();
583     for (Row r : rows) {
584       posInList++;
585       if (r instanceof Put) {
586         Put put = (Put) r;
587         if (put.isEmpty()) {
588           throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
589         }
590       }
591       Action<Row> action = new Action<Row>(r, posInList);
592       setNonce(ng, r, action);
593       actions.add(action);
594     }
595     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
596         tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
597     ars.groupAndSendMultiAction(actions, 1);
598     return ars;
599   }
600 
601   private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
602     if (!(r instanceof Append) && !(r instanceof Increment)) return;
603     action.setNonce(ng.newNonce()); 
604   }
605 
606   
607 
608 
609 
610 
611 
612 
613 
614 
615   protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
616 
617     
618 
619 
620 
621 
622 
623 
624     private final class ReplicaCallIssuingRunnable implements Runnable {
625       private final long startTime;
626       private final List<Action<Row>> initialActions;
627 
628       public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
629         this.initialActions = initialActions;
630         this.startTime = startTime;
631       }
632 
633       @Override
634       public void run() {
635         boolean done = false;
636         if (primaryCallTimeoutMicroseconds > 0) {
637           try {
638             done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
639           } catch (InterruptedException ex) {
640             LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
641             return;
642           }
643         }
644         if (done) return; 
645         Map<ServerName, MultiAction<Row>> actionsByServer =
646             new HashMap<ServerName, MultiAction<Row>>();
647         List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
648         if (replicaGetIndices == null) {
649           for (int i = 0; i < results.length; ++i) {
650             addReplicaActions(i, actionsByServer, unknownLocActions);
651           }
652         } else {
653           for (int replicaGetIndice : replicaGetIndices) {
654             addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
655           }
656         }
657         if (!actionsByServer.isEmpty()) {
658           sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
659         }
660         if (!unknownLocActions.isEmpty()) {
661           actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
662           for (Action<Row> action : unknownLocActions) {
663             addReplicaActionsAgain(action, actionsByServer);
664           }
665           
666           if (!actionsByServer.isEmpty()) {
667             sendMultiAction(actionsByServer, 1, null, true);
668           }
669         }
670       }
671 
672       
673 
674 
675 
676 
677       private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
678           List<Action<Row>> unknownReplicaActions) {
679         if (results[index] != null) return; 
680         Action<Row> action = initialActions.get(index);
681         RegionLocations loc = findAllLocationsOrFail(action, true);
682         if (loc == null) return;
683         HRegionLocation[] locs = loc.getRegionLocations();
684         if (locs.length == 1) {
685           LOG.warn("No replicas found for " + action.getAction());
686           return;
687         }
688         synchronized (replicaResultLock) {
689           
690           
691           
692           if (results[index] != null) return;
693           
694           
695           results[index] = new ReplicaResultState(locs.length);
696         }
697         for (int i = 1; i < locs.length; ++i) {
698           Action<Row> replicaAction = new Action<Row>(action, i);
699           if (locs[i] != null) {
700             addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
701                 replicaAction, actionsByServer, nonceGroup);
702           } else {
703             unknownReplicaActions.add(replicaAction);
704           }
705         }
706       }
707 
708       private void addReplicaActionsAgain(
709           Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
710         if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
711           throw new AssertionError("Cannot have default replica here");
712         }
713         HRegionLocation loc = getReplicaLocationOrFail(action);
714         if (loc == null) return;
715         addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
716             action, actionsByServer, nonceGroup);
717       }
718     }
719 
720     
721 
722 
723 
724     private final class SingleServerRequestRunnable implements Runnable {
725       private final MultiAction<Row> multiAction;
726       private final int numAttempt;
727       private final ServerName server;
728       private final Set<MultiServerCallable<Row>> callsInProgress;
729 
730       private SingleServerRequestRunnable(
731           MultiAction<Row> multiAction, int numAttempt, ServerName server,
732           Set<MultiServerCallable<Row>> callsInProgress) {
733         this.multiAction = multiAction;
734         this.numAttempt = numAttempt;
735         this.server = server;
736         this.callsInProgress = callsInProgress;
737       }
738 
739       @Override
740       public void run() {
741         MultiResponse res;
742         MultiServerCallable<Row> callable = null;
743         try {
744           callable = createCallable(server, tableName, multiAction);
745           try {
746             RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
747             if (callsInProgress != null) callsInProgress.add(callable);
748             res = caller.callWithoutRetries(callable, timeout);
749 
750             if (res == null) {
751               
752               return;
753             }
754 
755           } catch (IOException e) {
756             
757             
758             receiveGlobalFailure(multiAction, server, numAttempt, e);
759             return;
760           } catch (Throwable t) {
761             
762             LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
763                 " Retrying. Server is " + server + ", tableName=" + tableName, t);
764             receiveGlobalFailure(multiAction, server, numAttempt, t);
765             return;
766           }
767 
768           
769           receiveMultiAction(multiAction, server, res, numAttempt);
770         } catch (Throwable t) {
771           
772           LOG.error("Internal AsyncProcess #" + id + " error for "
773               + tableName + " processing for " + server, t);
774           throw new RuntimeException(t);
775         } finally {
776           decTaskCounters(multiAction.getRegions(), server);
777           if (callsInProgress != null && callable != null) {
778             callsInProgress.remove(callable);
779           }
780         }
781       }
782     }
783 
784     private final Batch.Callback<CResult> callback;
785     private final BatchErrors errors;
786     private final ConnectionManager.ServerErrorTracker errorsByServer;
787     private final ExecutorService pool;
788     private final Set<MultiServerCallable<Row>> callsInProgress;
789 
790 
791     private final TableName tableName;
792     private final AtomicLong actionsInProgress = new AtomicLong(-1);
793     
794 
795 
796 
797 
798     private final Object replicaResultLock = new Object();
799     
800 
801 
802 
803 
804 
805 
806 
807 
808     private final Object[] results;
809     
810 
811 
812     private final int[] replicaGetIndices;
813     private final boolean hasAnyReplicaGets;
814     private final long nonceGroup;
815 
816     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
817         ExecutorService pool, boolean needResults, Object[] results,
818         Batch.Callback<CResult> callback) {
819       this.pool = pool;
820       this.callback = callback;
821       this.nonceGroup = nonceGroup;
822       this.tableName = tableName;
823       this.actionsInProgress.set(actions.size());
824       if (results != null) {
825         assert needResults;
826         if (results.length != actions.size()) {
827           throw new AssertionError("results.length");
828         }
829         this.results = results;
830         for (int i = 0; i != this.results.length; ++i) {
831           results[i] = null;
832         }
833       } else {
834         this.results = needResults ? new Object[actions.size()] : null;
835       }
836       List<Integer> replicaGetIndices = null;
837       boolean hasAnyReplicaGets = false;
838       if (needResults) {
839         
840         
841         
842         
843         
844         boolean hasAnyNonReplicaReqs = false;
845         int posInList = 0;
846         for (Action<Row> action : actions) {
847           boolean isReplicaGet = isReplicaGet(action.getAction());
848           if (isReplicaGet) {
849             hasAnyReplicaGets = true;
850             if (hasAnyNonReplicaReqs) { 
851               if (replicaGetIndices == null) {
852                 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
853               }
854               replicaGetIndices.add(posInList);
855             }
856           } else if (!hasAnyNonReplicaReqs) {
857             
858             hasAnyNonReplicaReqs = true;
859             if (posInList > 0) {
860               
861               
862               replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
863               for (int i = 0; i < posInList; ++i) {
864                 replicaGetIndices.add(i);
865               }
866             }
867           }
868           ++posInList;
869         }
870       }
871       this.hasAnyReplicaGets = hasAnyReplicaGets;
872       if (replicaGetIndices != null) {
873         this.replicaGetIndices = new int[replicaGetIndices.size()];
874         int i = 0;
875         for (Integer el : replicaGetIndices) {
876           this.replicaGetIndices[i++] = el;
877         }
878       } else {
879         this.replicaGetIndices = null;
880       }
881       this.callsInProgress = !hasAnyReplicaGets ? null :
882           Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
883 
884       this.errorsByServer = createServerErrorTracker();
885       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
886     }
887 
888     @VisibleForTesting
889     long getActionsInProgress() {
890       return actionsInProgress.get();
891     }
892 
893     public Set<MultiServerCallable<Row>> getCallsInProgress() {
894       return callsInProgress;
895     }
896 
897     
898 
899 
900 
901 
902 
903     private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
904       Map<ServerName, MultiAction<Row>> actionsByServer =
905           new HashMap<ServerName, MultiAction<Row>>();
906 
907       boolean isReplica = false;
908       List<Action<Row>> unknownReplicaActions = null;
909       for (Action<Row> action : currentActions) {
910         RegionLocations locs = findAllLocationsOrFail(action, true);
911         if (locs == null) continue;
912         boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
913         if (isReplica && !isReplicaAction) {
914           
915           throw new AssertionError("Replica and non-replica actions in the same retry");
916         }
917         isReplica = isReplicaAction;
918         HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
919         if (loc == null || loc.getServerName() == null) {
920           if (isReplica) {
921             if (unknownReplicaActions == null) {
922               unknownReplicaActions = new ArrayList<Action<Row>>();
923             }
924             unknownReplicaActions.add(action);
925           } else {
926             
927             manageLocationError(action, null);
928           }
929         } else {
930           byte[] regionName = loc.getRegionInfo().getRegionName();
931           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
932         }
933       }
934       boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
935       boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
936 
937       if (!actionsByServer.isEmpty()) {
938         
939         sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
940             ? currentActions : null, numAttempt > 1 && !hasUnknown);
941       }
942 
943       if (hasUnknown) {
944         actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
945         for (Action<Row> action : unknownReplicaActions) {
946           HRegionLocation loc = getReplicaLocationOrFail(action);
947           if (loc == null) continue;
948           byte[] regionName = loc.getRegionInfo().getRegionName();
949           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
950         }
951         if (!actionsByServer.isEmpty()) {
952           sendMultiAction(
953               actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
954         }
955       }
956     }
957 
958     private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
959       
960       
961       int replicaId = action.getReplicaId();
962       RegionLocations locs = findAllLocationsOrFail(action, true);
963       if (locs == null) return null; 
964       HRegionLocation loc = locs.getRegionLocation(replicaId);
965       if (loc == null || loc.getServerName() == null) {
966         locs = findAllLocationsOrFail(action, false);
967         if (locs == null) return null; 
968         loc = locs.getRegionLocation(replicaId);
969       }
970       if (loc == null || loc.getServerName() == null) {
971         manageLocationError(action, null);
972         return null;
973       }
974       return loc;
975     }
976 
977     private void manageLocationError(Action<Row> action, Exception ex) {
978       String msg = "Cannot get replica " + action.getReplicaId()
979           + " location for " + action.getAction();
980       LOG.error(msg);
981       if (ex == null) {
982         ex = new IOException(msg);
983       }
984       manageError(action.getOriginalIndex(), action.getAction(),
985           Retry.NO_LOCATION_PROBLEM, ex, null);
986     }
987 
988     private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
989       if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
990           ", row cannot be null");
991       RegionLocations loc = null;
992       try {
993         loc = connection.locateRegion(
994             tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
995       } catch (IOException ex) {
996         manageLocationError(action, ex);
997       }
998       return loc;
999     }
1000 
1001     
1002 
1003 
1004 
1005 
1006 
1007 
1008 
1009     private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
1010         int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
1011       
1012       
1013       int actionsRemaining = actionsByServer.size();
1014       
1015       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
1016         ServerName server = e.getKey();
1017         MultiAction<Row> multiAction = e.getValue();
1018         incTaskCounters(multiAction.getRegions(), server);
1019         Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
1020             numAttempt);
1021         
1022         
1023         if (runnables.size() > actionsRemaining) {
1024           actionsRemaining = runnables.size();
1025         }
1026 
1027         
1028         for (Runnable runnable : runnables) {
1029           if ((--actionsRemaining == 0) && reuseThread) {
1030             runnable.run();
1031           } else {
1032             try {
1033               pool.submit(runnable);
1034             } catch (Throwable t) {
1035               if (t instanceof RejectedExecutionException) {
1036                 
1037                
1038                LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1039                   " Server is " + server.getServerName(), t);
1040               } else {
1041                 
1042                 LOG.warn("Caught unexpected exception/error: ", t);
1043               }
1044               decTaskCounters(multiAction.getRegions(), server);
1045               
1046              
1047               receiveGlobalFailure(multiAction, server, numAttempt, t);
1048             }
1049           }
1050         }
1051       }
1052 
1053       if (actionsForReplicaThread != null) {
1054         startWaitingForReplicaCalls(actionsForReplicaThread);
1055       }
1056     }
1057 
1058     private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1059         MultiAction<Row> multiAction,
1060         int numAttempt) {
1061       
1062       if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1063         if (connection.getConnectionMetrics() != null) {
1064           connection.getConnectionMetrics().incrNormalRunners();
1065         }
1066         return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1067             new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1068       }
1069 
1070       
1071       Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1072           .size());
1073 
1074       
1075       for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1076         Long backoff = getBackoff(server, e.getKey());
1077         DelayingRunner runner = actions.get(backoff);
1078         if (runner == null) {
1079           actions.put(backoff, new DelayingRunner(backoff, e));
1080         } else {
1081           runner.add(e);
1082         }
1083       }
1084 
1085       List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1086       for (DelayingRunner runner : actions.values()) {
1087         String traceText = "AsyncProcess.sendMultiAction";
1088         Runnable runnable =
1089             new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1090                 callsInProgress);
1091         
1092         if (runner.getSleepTime() > 0) {
1093           runner.setRunner(runnable);
1094           traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1095           runnable = runner;
1096           if (connection.getConnectionMetrics() != null) {
1097             connection.getConnectionMetrics().incrDelayRunners();
1098             connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
1099           }
1100         } else {
1101           if (connection.getConnectionMetrics() != null) {
1102             connection.getConnectionMetrics().incrNormalRunners();
1103           }
1104         }
1105         runnable = Trace.wrap(traceText, runnable);
1106         toReturn.add(runnable);
1107 
1108       }
1109       return toReturn;
1110     }
1111 
1112     
1113 
1114 
1115 
1116 
1117 
1118     private Long getBackoff(ServerName server, byte[] regionName) {
1119       ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1120       ServerStatistics stats = tracker.getStats(server);
1121       return AsyncProcess.this.connection.getBackoffPolicy()
1122           .getBackoffTime(server, regionName, stats);
1123     }
1124 
1125     
1126 
1127 
1128     private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1129       long startTime = EnvironmentEdgeManager.currentTime();
1130       ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1131           actionsForReplicaThread, startTime);
1132       if (primaryCallTimeoutMicroseconds == 0) {
1133         
1134         replicaRunnable.run();
1135       } else {
1136         
1137         
1138         try {
1139           pool.submit(replicaRunnable);
1140         } catch (RejectedExecutionException ree) {
1141           LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1142         }
1143       }
1144     }
1145 
1146     
1147 
1148 
1149 
1150 
1151 
1152 
1153 
1154 
1155 
1156     public Retry manageError(int originalIndex, Row row, Retry canRetry,
1157                                 Throwable throwable, ServerName server) {
1158       if (canRetry == Retry.YES
1159           && throwable != null && (throwable instanceof DoNotRetryIOException ||
1160           throwable instanceof NeedUnmanagedConnectionException)) {
1161         canRetry = Retry.NO_NOT_RETRIABLE;
1162       }
1163 
1164       if (canRetry != Retry.YES) {
1165         
1166         setError(originalIndex, row, throwable, server);
1167       } else if (isActionComplete(originalIndex, row)) {
1168         canRetry = Retry.NO_OTHER_SUCCEEDED;
1169       }
1170       return canRetry;
1171     }
1172 
1173     
1174 
1175 
1176 
1177 
1178 
1179 
1180 
1181     private void receiveGlobalFailure(
1182         MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1183       errorsByServer.reportServerError(server);
1184       Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1185           ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1186 
1187       if (tableName == null) {
1188         
1189         connection.clearCaches(server);
1190       }
1191       int failed = 0, stopped = 0;
1192       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1193       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1194         byte[] regionName = e.getKey();
1195         byte[] row = e.getValue().iterator().next().getAction().getRow();
1196         
1197         
1198         if (tableName != null) {
1199           connection.updateCachedLocations(tableName, regionName, row,
1200             ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
1201         }
1202         for (Action<Row> action : e.getValue()) {
1203           Retry retry = manageError(
1204               action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1205           if (retry == Retry.YES) {
1206             toReplay.add(action);
1207           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1208             ++stopped;
1209           } else {
1210             ++failed;
1211           }
1212         }
1213       }
1214 
1215       if (toReplay.isEmpty()) {
1216         logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1217       } else {
1218         resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1219       }
1220     }
1221 
1222     
1223 
1224 
1225 
1226     private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1227         int numAttempt, int failureCount, Throwable throwable) {
1228       
1229 
1230       
1231       
1232       
1233       
1234       
1235       
1236       boolean retryImmediately = throwable instanceof RetryImmediatelyException;
1237       int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
1238       long backOffTime = retryImmediately ? 0 :
1239           errorsByServer.calculateBackoffTime(oldServer, pause);
1240       if (numAttempt > startLogErrorsCnt) {
1241         
1242         
1243         LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1244             oldServer, throwable, backOffTime, true, null, -1, -1));
1245       }
1246 
1247       try {
1248         if (backOffTime > 0) {
1249           Thread.sleep(backOffTime);
1250         }
1251       } catch (InterruptedException e) {
1252         LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1253         Thread.currentThread().interrupt();
1254         return;
1255       }
1256 
1257       groupAndSendMultiAction(toReplay, nextAttemptNumber);
1258     }
1259 
1260     private void logNoResubmit(ServerName oldServer, int numAttempt,
1261         int failureCount, Throwable throwable, int failed, int stopped) {
1262       if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1263         String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1264         String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1265             throwable, -1, false, timeStr, failed, stopped);
1266         if (failed != 0) {
1267           
1268           LOG.warn(logMessage);
1269         } else {
1270           LOG.info(logMessage);
1271         }
1272       }
1273     }
1274 
1275     
1276 
1277 
1278 
1279 
1280 
1281 
1282 
1283     private void receiveMultiAction(MultiAction<Row> multiAction,
1284         ServerName server, MultiResponse responses, int numAttempt) {
1285        assert responses != null;
1286 
1287       
1288       
1289       
1290       
1291       
1292 
1293       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1294       Throwable throwable = null;
1295       int failureCount = 0;
1296       Retry canRetry = null;
1297       Map<byte[], Map<Integer, Object>> results = responses.getResults();
1298       
1299       int failed = 0;
1300       int stopped = 0;
1301       for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1302         byte[] regionName = regionEntry.getKey();
1303 
1304         Throwable regionException = responses.getExceptions().get(regionName);
1305         if (tableName == null && regionException != null &&
1306               ClientExceptionsUtil.isMetaClearingException(regionException)) {
1307           
1308           
1309           
1310           connection.clearCaches(server);
1311         }
1312 
1313         Map<Integer, Object> regionResults;
1314         if (results.containsKey(regionName)) {
1315           regionResults = results.get(regionName);
1316         } else {
1317           regionResults = Collections.emptyMap();
1318         }
1319 
1320         boolean regionFailureRegistered = false;
1321         for (Action<Row> sentAction : regionEntry.getValue()) {
1322           Object result = regionResults.get(sentAction.getOriginalIndex());
1323           if (result == null) {
1324             if (regionException == null) {
1325               LOG.error("Server sent us neither results nor exceptions for "
1326                 + Bytes.toStringBinary(regionName)
1327                 + ", numAttempt:" + numAttempt);
1328               regionException = new RuntimeException("Invalid response");
1329             }
1330             
1331             
1332             result = regionException;
1333           }
1334           
1335           if (result instanceof Throwable) {
1336             Row row = sentAction.getAction();
1337             throwable = regionException != null ? regionException
1338               : ClientExceptionsUtil.findException(result);
1339             
1340             if (!regionFailureRegistered) {
1341               regionFailureRegistered = true;
1342               connection.updateCachedLocations(
1343                   tableName, regionName, row.getRow(), result, server);
1344             }
1345             if (canRetry == null) {
1346               errorsByServer.reportServerError(server);
1347               
1348               canRetry = errorsByServer.canRetryMore(numAttempt) ?
1349                 Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1350             }
1351             ++failureCount;
1352             switch (manageError(sentAction.getOriginalIndex(), row, canRetry, (Throwable) result,
1353               server)) {
1354             case YES:
1355               toReplay.add(sentAction);
1356               break;
1357             case NO_OTHER_SUCCEEDED:
1358               ++stopped;
1359               break;
1360             default:
1361               ++failed;
1362               break;
1363             }
1364           } else {
1365             
1366             if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
1367               AsyncProcess.this.connection.getConnectionMetrics().
1368                       updateServerStats(server, regionName, result);
1369             }
1370 
1371             
1372             
1373             if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1374               result = ResultStatsUtil.updateStats(result,
1375                   AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1376             }
1377 
1378             if (callback != null) {
1379               try {
1380                 
1381                 
1382                 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1383               } catch (Throwable t) {
1384                 LOG.error("User callback threw an exception for "
1385                     + Bytes.toStringBinary(regionName) + ", ignoring", t);
1386               }
1387             }
1388             setResult(sentAction, result);
1389           }
1390         }
1391       }
1392       if (toReplay.isEmpty()) {
1393         logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1394       } else {
1395         resubmit(server, toReplay, numAttempt, failureCount, throwable);
1396       }
1397     }
1398 
1399     private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1400         Throwable error, long backOffTime, boolean willRetry, String startTime,
1401         int failed, int stopped) {
1402       StringBuilder sb = new StringBuilder();
1403       sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1404         .append("attempt=").append(numAttempt)
1405         .append("/").append(numTries).append(" ");
1406 
1407       if (failureCount > 0 || error != null){
1408         sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1409             append(error == null ? "null" : error);
1410       } else {
1411         sb.append("succeeded");
1412       }
1413 
1414       sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1415 
1416       if (willRetry) {
1417         sb.append(", retrying after=").append(backOffTime).append("ms").
1418             append(", replay=").append(replaySize).append("ops");
1419       } else if (failureCount > 0) {
1420         if (stopped > 0) {
1421           sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1422         }
1423         if (failed > 0) {
1424           sb.append("; not retrying ").append(failed).append(" - final failure");
1425         }
1426 
1427       }
1428 
1429       return sb.toString();
1430     }
1431 
1432     
1433 
1434 
1435 
1436 
1437     private void setResult(Action<Row> action, Object result) {
1438       if (result == null) {
1439         throw new RuntimeException("Result cannot be null");
1440       }
1441       ReplicaResultState state = null;
1442       boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1443       int index = action.getOriginalIndex();
1444       if (results == null) {
1445          decActionCounter(index);
1446          return; 
1447       } else if ((state = trySetResultSimple(
1448           index, action.getAction(), false, result, null, isStale)) == null) {
1449         return; 
1450       }
1451       assert state != null;
1452       
1453       
1454       
1455       
1456       
1457       synchronized (state) {
1458         if (state.callCount == 0) {
1459           return; 
1460         }
1461         state.callCount = 0;
1462       }
1463       synchronized (replicaResultLock) {
1464         if (results[index] != state) {
1465           throw new AssertionError("We set the callCount but someone else replaced the result");
1466         }
1467         results[index] = result;
1468       }
1469 
1470       decActionCounter(index);
1471     }
1472 
1473     
1474 
1475 
1476 
1477 
1478 
1479 
1480     private void setError(int index, Row row, Throwable throwable, ServerName server) {
1481       ReplicaResultState state = null;
1482       if (results == null) {
1483         
1484         
1485         
1486         errors.add(throwable, row, server);
1487         decActionCounter(index);
1488         return; 
1489       } else if ((state = trySetResultSimple(
1490           index, row, true, throwable, server, false)) == null) {
1491         return; 
1492       }
1493       assert state != null;
1494       BatchErrors target = null; 
1495       boolean isActionDone = false;
1496       synchronized (state) {
1497         switch (state.callCount) {
1498           case 0: return; 
1499           case 1: { 
1500             target = errors;
1501             isActionDone = true;
1502             break;
1503           }
1504           default: {
1505             assert state.callCount > 1;
1506             if (state.replicaErrors == null) {
1507               state.replicaErrors = new BatchErrors();
1508             }
1509             target = state.replicaErrors;
1510             break;
1511           }
1512         }
1513         --state.callCount;
1514       }
1515       target.add(throwable, row, server);
1516       if (isActionDone) {
1517         if (state.replicaErrors != null) { 
1518           errors.merge(state.replicaErrors);
1519         }
1520         
1521         synchronized (replicaResultLock) {
1522           if (results[index] != state) {
1523             throw new AssertionError("We set the callCount but someone else replaced the result");
1524           }
1525           results[index] = throwable;
1526         }
1527         decActionCounter(index);
1528       }
1529     }
1530 
1531     
1532 
1533 
1534 
1535 
1536 
1537 
1538     private boolean isActionComplete(int index, Row row) {
1539       if (!isReplicaGet(row)) return false;
1540       Object resObj = results[index];
1541       return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1542           || ((ReplicaResultState)resObj).callCount == 0);
1543     }
1544 
1545     
1546 
1547 
1548 
1549     private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1550         Object result, ServerName server, boolean isFromReplica) {
1551       Object resObj = null;
1552       if (!isReplicaGet(row)) {
1553         if (isFromReplica) {
1554           throw new AssertionError("Unexpected stale result for " + row);
1555         }
1556         results[index] = result;
1557       } else {
1558         synchronized (replicaResultLock) {
1559           if ((resObj = results[index]) == null) {
1560             if (isFromReplica) {
1561               throw new AssertionError("Unexpected stale result for " + row);
1562             }
1563             results[index] = result;
1564           }
1565         }
1566       }
1567 
1568       ReplicaResultState rrs =
1569           (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1570       if (rrs == null && isError) {
1571         
1572         errors.add((Throwable)result, row, server);
1573       }
1574 
1575       if (resObj == null) {
1576         
1577         decActionCounter(index);
1578         return null;
1579       }
1580       return rrs;
1581     }
1582 
1583     private void decActionCounter(int index) {
1584       long actionsRemaining = actionsInProgress.decrementAndGet();
1585       if (actionsRemaining < 0) {
1586         String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1587         throw new AssertionError(error);
1588       } else if (actionsRemaining == 0) {
1589         synchronized (actionsInProgress) {
1590           actionsInProgress.notifyAll();
1591         }
1592       }
1593     }
1594 
1595     private String buildDetailedErrorMsg(String string, int index) {
1596       StringBuilder error = new StringBuilder(string);
1597       error.append("; called for ").
1598         append(index).
1599         append(", actionsInProgress ").
1600         append(actionsInProgress.get()).
1601         append("; replica gets: ");
1602       if (replicaGetIndices != null) {
1603         for (int i = 0; i < replicaGetIndices.length; ++i) {
1604           error.append(replicaGetIndices[i]).append(", ");
1605         }
1606       } else {
1607         error.append(hasAnyReplicaGets ? "all" : "none");
1608       }
1609       error.append("; results ");
1610       if (results != null) {
1611         for (int i = 0; i < results.length; ++i) {
1612           Object o = results[i];
1613           error.append(((o == null) ? "null" : o.toString())).append(", ");
1614         }
1615       }
1616       return error.toString();
1617     }
1618 
1619     @Override
1620     public void waitUntilDone() throws InterruptedIOException {
1621       try {
1622         waitUntilDone(Long.MAX_VALUE);
1623       } catch (InterruptedException iex) {
1624         throw new InterruptedIOException(iex.getMessage());
1625       } finally {
1626         if (callsInProgress != null) {
1627           for (MultiServerCallable<Row> clb : callsInProgress) {
1628             clb.cancel();
1629           }
1630         }
1631       }
1632     }
1633 
1634     private boolean waitUntilDone(long cutoff) throws InterruptedException {
1635       boolean hasWait = cutoff != Long.MAX_VALUE;
1636       long lastLog = EnvironmentEdgeManager.currentTime();
1637       long currentInProgress;
1638       while (0 != (currentInProgress = actionsInProgress.get())) {
1639         long now = EnvironmentEdgeManager.currentTime();
1640         if (hasWait && (now * 1000L) > cutoff) {
1641           return false;
1642         }
1643         if (!hasWait) { 
1644           if (now > lastLog + 10000) {
1645             lastLog = now;
1646             LOG.info("#" + id + ", waiting for " + currentInProgress
1647                 + "  actions to finish on table: " + tableName);
1648             if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1649               logDetailsOfUndoneTasks(currentInProgress);
1650             }
1651           }
1652         }
1653         synchronized (actionsInProgress) {
1654           if (actionsInProgress.get() == 0) break;
1655           if (!hasWait) {
1656             actionsInProgress.wait(10);
1657           } else {
1658             long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1659             TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1660           }
1661         }
1662       }
1663       return true;
1664     }
1665 
1666     @Override
1667     public boolean hasError() {
1668       return errors.hasErrors();
1669     }
1670 
1671     @Override
1672     public List<? extends Row> getFailedOperations() {
1673       return errors.actions;
1674     }
1675 
1676     @Override
1677     public RetriesExhaustedWithDetailsException getErrors() {
1678       return errors.makeException();
1679     }
1680 
1681     @Override
1682     public Object[] getResults() throws InterruptedIOException {
1683       waitUntilDone();
1684       return results;
1685     }
1686   }
1687 
1688   @VisibleForTesting
1689   
1690   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1691       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1692       Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1693     return new AsyncRequestFutureImpl<CResult>(
1694         tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1695   }
1696 
1697   
1698 
1699 
1700   @VisibleForTesting
1701   protected MultiServerCallable<Row> createCallable(final ServerName server,
1702       TableName tableName, final MultiAction<Row> multi) {
1703     return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1704   }
1705 
1706   
1707 
1708 
1709   @VisibleForTesting
1710   protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1711     return rpcCallerFactory.<MultiResponse> newCaller();
1712   }
1713 
1714   @VisibleForTesting
1715   
1716   void waitUntilDone() throws InterruptedIOException {
1717     waitForMaximumCurrentTasks(0, null);
1718   }
1719 
1720   
1721   private void waitForMaximumCurrentTasks(int max, String tableName)
1722       throws InterruptedIOException {
1723     waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
1724   }
1725 
1726   
1727   @VisibleForTesting
1728   void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
1729       String tableName) throws InterruptedIOException {
1730     long lastLog = EnvironmentEdgeManager.currentTime();
1731     long currentInProgress, oldInProgress = Long.MAX_VALUE;
1732     while ((currentInProgress = tasksInProgress.get()) > max) {
1733       if (oldInProgress != currentInProgress) { 
1734         long now = EnvironmentEdgeManager.currentTime();
1735         if (now > lastLog + 10000) {
1736           lastLog = now;
1737           LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1738               + max + ", tasksInProgress=" + currentInProgress +
1739               " hasError=" + hasError() + (tableName == null ? "" : ", tableName=" + tableName));
1740           if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1741             logDetailsOfUndoneTasks(currentInProgress);
1742           }
1743         }
1744       }
1745       oldInProgress = currentInProgress;
1746       try {
1747         synchronized (tasksInProgress) {
1748           if (tasksInProgress.get() == oldInProgress) {
1749             tasksInProgress.wait(10);
1750           }
1751         }
1752       } catch (InterruptedException e) {
1753         throw new InterruptedIOException("#" + id + ", interrupted." +
1754             " currentNumberOfTask=" + currentInProgress);
1755       }
1756     }
1757   }
1758 
1759   private void logDetailsOfUndoneTasks(long taskInProgress) {
1760     ArrayList<ServerName> servers = new ArrayList<ServerName>();
1761     for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
1762       if (entry.getValue().get() > 0) {
1763         servers.add(entry.getKey());
1764       }
1765     }
1766     LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
1767     if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
1768       ArrayList<String> regions = new ArrayList<String>();
1769       for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
1770         if (entry.getValue().get() > 0) {
1771           regions.add(Bytes.toString(entry.getKey()));
1772         }
1773       }
1774       LOG.info("Regions against which left over task(s) are processed: " + regions);
1775     }
1776   }
1777 
1778   
1779 
1780 
1781 
1782 
1783   public boolean hasError() {
1784     return globalErrors.hasErrors();
1785   }
1786 
1787   
1788 
1789 
1790 
1791 
1792 
1793 
1794 
1795 
1796 
1797   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1798       List<Row> failedRows, String tableName) throws InterruptedIOException {
1799     waitForMaximumCurrentTasks(0, tableName);
1800     if (!globalErrors.hasErrors()) {
1801       return null;
1802     }
1803     if (failedRows != null) {
1804       failedRows.addAll(globalErrors.actions);
1805     }
1806     RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1807     globalErrors.clear();
1808     return result;
1809   }
1810 
1811   
1812 
1813 
1814   protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1815     tasksInProgress.incrementAndGet();
1816 
1817     AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1818     if (serverCnt == null) {
1819       taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1820       serverCnt = taskCounterPerServer.get(sn);
1821     }
1822     serverCnt.incrementAndGet();
1823 
1824     for (byte[] regBytes : regions) {
1825       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1826       if (regionCnt == null) {
1827         regionCnt = new AtomicInteger();
1828         AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1829         if (oldCnt != null) {
1830           regionCnt = oldCnt;
1831         }
1832       }
1833       regionCnt.incrementAndGet();
1834     }
1835   }
1836 
1837   
1838 
1839 
1840   protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1841     for (byte[] regBytes : regions) {
1842       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1843       regionCnt.decrementAndGet();
1844     }
1845 
1846     taskCounterPerServer.get(sn).decrementAndGet();
1847     tasksInProgress.decrementAndGet();
1848     synchronized (tasksInProgress) {
1849       tasksInProgress.notifyAll();
1850     }
1851   }
1852 
1853   
1854 
1855 
1856 
1857 
1858 
1859 
1860   protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1861     return new ConnectionManager.ServerErrorTracker(
1862         this.serverTrackerTimeout, this.numTries);
1863   }
1864 
1865   private static boolean isReplicaGet(Row row) {
1866     return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1867   }
1868 
1869   
1870 
1871 
1872   private enum Retry {
1873     YES,
1874     NO_LOCATION_PROBLEM,
1875     NO_NOT_RETRIABLE,
1876     NO_RETRIES_EXHAUSTED,
1877     NO_OTHER_SUCCEEDED
1878   }
1879 }