1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.ConcurrentSkipListMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.hbase.RetryImmediatelyException;
45 import org.apache.hadoop.hbase.classification.InterfaceAudience;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.hbase.DoNotRetryIOException;
48 import org.apache.hadoop.hbase.HConstants;
49 import org.apache.hadoop.hbase.HRegionInfo;
50 import org.apache.hadoop.hbase.HRegionLocation;
51 import org.apache.hadoop.hbase.RegionLocations;
52 import org.apache.hadoop.hbase.ServerName;
53 import org.apache.hadoop.hbase.TableName;
54 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
55 import org.apache.hadoop.hbase.client.coprocessor.Batch;
56 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
57 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60 import org.apache.htrace.Trace;
61
62 import com.google.common.annotations.VisibleForTesting;
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 @InterfaceAudience.Private
100 class AsyncProcess {
101 private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
102 protected static final AtomicLong COUNTER = new AtomicLong();
103
104 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
105
106
107
108
109
110
111
112 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
113 "hbase.client.start.log.errors.counter";
114 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
115
116
117
118
119 public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
120
121 private final int thresholdToLogUndoneTaskDetails;
122 private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
123 "hbase.client.threshold.log.details";
124 private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
125 private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
126
127
128
129
130
131
132
133 public static interface AsyncRequestFuture {
134 public boolean hasError();
135 public RetriesExhaustedWithDetailsException getErrors();
136 public List<? extends Row> getFailedOperations();
137 public Object[] getResults() throws InterruptedIOException;
138
139 public void waitUntilDone() throws InterruptedIOException;
140 }
141
142
143
144
145 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
146
147 final Object[] result = new Object[0];
148
149 @Override
150 public boolean hasError() {
151 return false;
152 }
153
154 @Override
155 public RetriesExhaustedWithDetailsException getErrors() {
156 return null;
157 }
158
159 @Override
160 public List<? extends Row> getFailedOperations() {
161 return null;
162 }
163
164 @Override
165 public Object[] getResults() {
166 return result;
167 }
168
169 @Override
170 public void waitUntilDone() throws InterruptedIOException {
171 }
172 };
173
174
175
176
177
178 private static class ReplicaResultState {
179 public ReplicaResultState(int callCount) {
180 this.callCount = callCount;
181 }
182
183
184 int callCount;
185
186
187 BatchErrors replicaErrors = null;
188
189 @Override
190 public String toString() {
191 return "[call count " + callCount + "; errors " + replicaErrors + "]";
192 }
193 }
194
195
196
197 protected final long id;
198
199 protected final ClusterConnection connection;
200 protected final RpcRetryingCallerFactory rpcCallerFactory;
201 protected final RpcControllerFactory rpcFactory;
202 protected final BatchErrors globalErrors;
203 protected final ExecutorService pool;
204
205 protected final AtomicLong tasksInProgress = new AtomicLong(0);
206 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
207 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
208 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
209 new ConcurrentHashMap<ServerName, AtomicInteger>();
210
211
212 private final int startLogErrorsCnt;
213
214
215
216
217 protected final int maxTotalConcurrentTasks;
218
219
220
221
222
223
224
225 protected final int maxConcurrentTasksPerRegion;
226
227
228
229
230 protected final int maxConcurrentTasksPerServer;
231 protected final long pause;
232 protected int numTries;
233 protected int serverTrackerTimeout;
234 protected int timeout;
235 protected long primaryCallTimeoutMicroseconds;
236
237
238 protected static class BatchErrors {
239 private final List<Throwable> throwables = new ArrayList<Throwable>();
240 private final List<Row> actions = new ArrayList<Row>();
241 private final List<String> addresses = new ArrayList<String>();
242
243 public synchronized void add(Throwable ex, Row row, ServerName serverName) {
244 if (row == null){
245 throw new IllegalArgumentException("row cannot be null. location=" + serverName);
246 }
247
248 throwables.add(ex);
249 actions.add(row);
250 addresses.add(serverName != null ? serverName.toString() : "null");
251 }
252
253 public boolean hasErrors() {
254 return !throwables.isEmpty();
255 }
256
257 private synchronized RetriesExhaustedWithDetailsException makeException() {
258 return new RetriesExhaustedWithDetailsException(
259 new ArrayList<Throwable>(throwables),
260 new ArrayList<Row>(actions), new ArrayList<String>(addresses));
261 }
262
263 public synchronized void clear() {
264 throwables.clear();
265 actions.clear();
266 addresses.clear();
267 }
268
269 public synchronized void merge(BatchErrors other) {
270 throwables.addAll(other.throwables);
271 actions.addAll(other.actions);
272 addresses.addAll(other.addresses);
273 }
274 }
275
276 public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
277 RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
278 if (hc == null) {
279 throw new IllegalArgumentException("HConnection cannot be null.");
280 }
281
282 this.connection = hc;
283 this.pool = pool;
284 this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
285
286 this.id = COUNTER.incrementAndGet();
287
288 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
289 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
290 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
291 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
292 this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
293 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
294 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
295
296 this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
297 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
298 this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
299 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
300 this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
301 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
302
303 this.startLogErrorsCnt =
304 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
305
306 if (this.maxTotalConcurrentTasks <= 0) {
307 throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
308 }
309 if (this.maxConcurrentTasksPerServer <= 0) {
310 throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
311 maxConcurrentTasksPerServer);
312 }
313 if (this.maxConcurrentTasksPerRegion <= 0) {
314 throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
315 maxConcurrentTasksPerRegion);
316 }
317
318
319
320
321
322
323
324
325 this.serverTrackerTimeout = 0;
326 for (int i = 0; i < this.numTries; ++i) {
327 serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
328 }
329
330 this.rpcCallerFactory = rpcCaller;
331 this.rpcFactory = rpcFactory;
332
333 this.thresholdToLogUndoneTaskDetails =
334 conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
335 DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
336 }
337
338
339
340
341
342 private ExecutorService getPool(ExecutorService pool) {
343 if (pool != null) {
344 return pool;
345 }
346 if (this.pool != null) {
347 return this.pool;
348 }
349 throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
350 }
351
352
353
354
355
356 public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
357 boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
358 throws InterruptedIOException {
359 return submit(null, tableName, rows, atLeastOne, callback, needResults);
360 }
361
362
363
364
365
366
367
368
369
370
371
372
373
374 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
375 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
376 boolean needResults) throws InterruptedIOException {
377 if (rows.isEmpty()) {
378 return NO_REQS_RESULT;
379 }
380
381 Map<ServerName, MultiAction<Row>> actionsByServer =
382 new HashMap<ServerName, MultiAction<Row>>();
383 List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
384
385 NonceGenerator ng = this.connection.getNonceGenerator();
386 long nonceGroup = ng.getNonceGroup();
387
388
389 List<Exception> locationErrors = null;
390 List<Integer> locationErrorRows = null;
391 do {
392
393 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
394
395
396
397 Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
398 Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
399
400 int posInList = -1;
401 Iterator<? extends Row> it = rows.iterator();
402 while (it.hasNext()) {
403 Row r = it.next();
404 HRegionLocation loc;
405 try {
406 if (r == null) {
407 throw new IllegalArgumentException("#" + id + ", row cannot be null");
408 }
409
410 RegionLocations locs = connection.locateRegion(
411 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
412 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
413 throw new IOException("#" + id + ", no location found, aborting submit for"
414 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
415 }
416 loc = locs.getDefaultRegionLocation();
417 } catch (IOException ex) {
418 locationErrors = new ArrayList<Exception>();
419 locationErrorRows = new ArrayList<Integer>();
420 LOG.error("Failed to get region location ", ex);
421
422
423 retainedActions.add(new Action<Row>(r, ++posInList));
424 locationErrors.add(ex);
425 locationErrorRows.add(posInList);
426 it.remove();
427 break;
428 }
429
430 if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
431 Action<Row> action = new Action<Row>(r, ++posInList);
432 setNonce(ng, r, action);
433 retainedActions.add(action);
434
435 byte[] regionName = loc.getRegionInfo().getRegionName();
436 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
437 it.remove();
438 }
439 }
440 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
441
442 if (retainedActions.isEmpty()) return NO_REQS_RESULT;
443
444 return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
445 locationErrors, locationErrorRows, actionsByServer, pool);
446 }
447
448 <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
449 List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
450 Object[] results, boolean needResults, List<Exception> locationErrors,
451 List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
452 ExecutorService pool) {
453 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
454 tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
455
456 if (locationErrors != null) {
457 for (int i = 0; i < locationErrors.size(); ++i) {
458 int originalIndex = locationErrorRows.get(i);
459 Row row = retainedActions.get(originalIndex).getAction();
460 ars.manageError(originalIndex, row,
461 Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
462 }
463 }
464 ars.sendMultiAction(actionsByServer, 1, null, false);
465 return ars;
466 }
467
468
469
470
471
472
473
474
475
476 private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
477 Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
478 MultiAction<Row> multiAction = actionsByServer.get(server);
479 if (multiAction == null) {
480 multiAction = new MultiAction<Row>();
481 actionsByServer.put(server, multiAction);
482 }
483 if (action.hasNonce() && !multiAction.hasNonceGroup()) {
484 multiAction.setNonceGroup(nonceGroup);
485 }
486
487 multiAction.add(regionName, action);
488 }
489
490
491
492
493
494
495
496
497
498 protected boolean canTakeOperation(HRegionLocation loc,
499 Map<HRegionInfo, Boolean> regionsIncluded,
500 Map<ServerName, Boolean> serversIncluded) {
501 HRegionInfo regionInfo = loc.getRegionInfo();
502 Boolean regionPrevious = regionsIncluded.get(regionInfo);
503
504 if (regionPrevious != null) {
505
506 return regionPrevious;
507 }
508
509 Boolean serverPrevious = serversIncluded.get(loc.getServerName());
510 if (Boolean.FALSE.equals(serverPrevious)) {
511
512 regionsIncluded.put(regionInfo, Boolean.FALSE);
513 return false;
514 }
515
516 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
517 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
518
519 regionsIncluded.put(regionInfo, Boolean.FALSE);
520 return false;
521 }
522
523 if (serverPrevious == null) {
524
525 int newServers = 0;
526 for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
527 if (kv.getValue()) {
528 newServers++;
529 }
530 }
531
532
533 boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
534
535 if (ok) {
536
537 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
538 ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
539 }
540
541 if (!ok) {
542 regionsIncluded.put(regionInfo, Boolean.FALSE);
543 serversIncluded.put(loc.getServerName(), Boolean.FALSE);
544 return false;
545 }
546
547 serversIncluded.put(loc.getServerName(), Boolean.TRUE);
548 } else {
549 assert serverPrevious.equals(Boolean.TRUE);
550 }
551
552 regionsIncluded.put(regionInfo, Boolean.TRUE);
553
554 return true;
555 }
556
557
558
559
560
561 public <CResult> AsyncRequestFuture submitAll(TableName tableName,
562 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
563 return submitAll(null, tableName, rows, callback, results);
564 }
565
566
567
568
569
570
571
572
573
574
575
576 public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
577 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
578 List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
579
580
581 int posInList = -1;
582 NonceGenerator ng = this.connection.getNonceGenerator();
583 for (Row r : rows) {
584 posInList++;
585 if (r instanceof Put) {
586 Put put = (Put) r;
587 if (put.isEmpty()) {
588 throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
589 }
590 }
591 Action<Row> action = new Action<Row>(r, posInList);
592 setNonce(ng, r, action);
593 actions.add(action);
594 }
595 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
596 tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
597 ars.groupAndSendMultiAction(actions, 1);
598 return ars;
599 }
600
601 private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
602 if (!(r instanceof Append) && !(r instanceof Increment)) return;
603 action.setNonce(ng.newNonce());
604 }
605
606
607
608
609
610
611
612
613
614
615 protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
616
617
618
619
620
621
622
623
624 private final class ReplicaCallIssuingRunnable implements Runnable {
625 private final long startTime;
626 private final List<Action<Row>> initialActions;
627
628 public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
629 this.initialActions = initialActions;
630 this.startTime = startTime;
631 }
632
633 @Override
634 public void run() {
635 boolean done = false;
636 if (primaryCallTimeoutMicroseconds > 0) {
637 try {
638 done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
639 } catch (InterruptedException ex) {
640 LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
641 return;
642 }
643 }
644 if (done) return;
645 Map<ServerName, MultiAction<Row>> actionsByServer =
646 new HashMap<ServerName, MultiAction<Row>>();
647 List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
648 if (replicaGetIndices == null) {
649 for (int i = 0; i < results.length; ++i) {
650 addReplicaActions(i, actionsByServer, unknownLocActions);
651 }
652 } else {
653 for (int replicaGetIndice : replicaGetIndices) {
654 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
655 }
656 }
657 if (!actionsByServer.isEmpty()) {
658 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
659 }
660 if (!unknownLocActions.isEmpty()) {
661 actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
662 for (Action<Row> action : unknownLocActions) {
663 addReplicaActionsAgain(action, actionsByServer);
664 }
665
666 if (!actionsByServer.isEmpty()) {
667 sendMultiAction(actionsByServer, 1, null, true);
668 }
669 }
670 }
671
672
673
674
675
676
677 private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
678 List<Action<Row>> unknownReplicaActions) {
679 if (results[index] != null) return;
680 Action<Row> action = initialActions.get(index);
681 RegionLocations loc = findAllLocationsOrFail(action, true);
682 if (loc == null) return;
683 HRegionLocation[] locs = loc.getRegionLocations();
684 if (locs.length == 1) {
685 LOG.warn("No replicas found for " + action.getAction());
686 return;
687 }
688 synchronized (replicaResultLock) {
689
690
691
692 if (results[index] != null) return;
693
694
695 results[index] = new ReplicaResultState(locs.length);
696 }
697 for (int i = 1; i < locs.length; ++i) {
698 Action<Row> replicaAction = new Action<Row>(action, i);
699 if (locs[i] != null) {
700 addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
701 replicaAction, actionsByServer, nonceGroup);
702 } else {
703 unknownReplicaActions.add(replicaAction);
704 }
705 }
706 }
707
708 private void addReplicaActionsAgain(
709 Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
710 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
711 throw new AssertionError("Cannot have default replica here");
712 }
713 HRegionLocation loc = getReplicaLocationOrFail(action);
714 if (loc == null) return;
715 addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
716 action, actionsByServer, nonceGroup);
717 }
718 }
719
720
721
722
723
724 private final class SingleServerRequestRunnable implements Runnable {
725 private final MultiAction<Row> multiAction;
726 private final int numAttempt;
727 private final ServerName server;
728 private final Set<MultiServerCallable<Row>> callsInProgress;
729
730 private SingleServerRequestRunnable(
731 MultiAction<Row> multiAction, int numAttempt, ServerName server,
732 Set<MultiServerCallable<Row>> callsInProgress) {
733 this.multiAction = multiAction;
734 this.numAttempt = numAttempt;
735 this.server = server;
736 this.callsInProgress = callsInProgress;
737 }
738
739 @Override
740 public void run() {
741 MultiResponse res;
742 MultiServerCallable<Row> callable = null;
743 try {
744 callable = createCallable(server, tableName, multiAction);
745 try {
746 RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
747 if (callsInProgress != null) callsInProgress.add(callable);
748 res = caller.callWithoutRetries(callable, timeout);
749
750 if (res == null) {
751
752 return;
753 }
754
755 } catch (IOException e) {
756
757
758 receiveGlobalFailure(multiAction, server, numAttempt, e);
759 return;
760 } catch (Throwable t) {
761
762 LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
763 " Retrying. Server is " + server + ", tableName=" + tableName, t);
764 receiveGlobalFailure(multiAction, server, numAttempt, t);
765 return;
766 }
767
768
769 receiveMultiAction(multiAction, server, res, numAttempt);
770 } catch (Throwable t) {
771
772 LOG.error("Internal AsyncProcess #" + id + " error for "
773 + tableName + " processing for " + server, t);
774 throw new RuntimeException(t);
775 } finally {
776 decTaskCounters(multiAction.getRegions(), server);
777 if (callsInProgress != null && callable != null) {
778 callsInProgress.remove(callable);
779 }
780 }
781 }
782 }
783
784 private final Batch.Callback<CResult> callback;
785 private final BatchErrors errors;
786 private final ConnectionManager.ServerErrorTracker errorsByServer;
787 private final ExecutorService pool;
788 private final Set<MultiServerCallable<Row>> callsInProgress;
789
790
791 private final TableName tableName;
792 private final AtomicLong actionsInProgress = new AtomicLong(-1);
793
794
795
796
797
798 private final Object replicaResultLock = new Object();
799
800
801
802
803
804
805
806
807
808 private final Object[] results;
809
810
811
812 private final int[] replicaGetIndices;
813 private final boolean hasAnyReplicaGets;
814 private final long nonceGroup;
815
816 public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
817 ExecutorService pool, boolean needResults, Object[] results,
818 Batch.Callback<CResult> callback) {
819 this.pool = pool;
820 this.callback = callback;
821 this.nonceGroup = nonceGroup;
822 this.tableName = tableName;
823 this.actionsInProgress.set(actions.size());
824 if (results != null) {
825 assert needResults;
826 if (results.length != actions.size()) {
827 throw new AssertionError("results.length");
828 }
829 this.results = results;
830 for (int i = 0; i != this.results.length; ++i) {
831 results[i] = null;
832 }
833 } else {
834 this.results = needResults ? new Object[actions.size()] : null;
835 }
836 List<Integer> replicaGetIndices = null;
837 boolean hasAnyReplicaGets = false;
838 if (needResults) {
839
840
841
842
843
844 boolean hasAnyNonReplicaReqs = false;
845 int posInList = 0;
846 for (Action<Row> action : actions) {
847 boolean isReplicaGet = isReplicaGet(action.getAction());
848 if (isReplicaGet) {
849 hasAnyReplicaGets = true;
850 if (hasAnyNonReplicaReqs) {
851 if (replicaGetIndices == null) {
852 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
853 }
854 replicaGetIndices.add(posInList);
855 }
856 } else if (!hasAnyNonReplicaReqs) {
857
858 hasAnyNonReplicaReqs = true;
859 if (posInList > 0) {
860
861
862 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
863 for (int i = 0; i < posInList; ++i) {
864 replicaGetIndices.add(i);
865 }
866 }
867 }
868 ++posInList;
869 }
870 }
871 this.hasAnyReplicaGets = hasAnyReplicaGets;
872 if (replicaGetIndices != null) {
873 this.replicaGetIndices = new int[replicaGetIndices.size()];
874 int i = 0;
875 for (Integer el : replicaGetIndices) {
876 this.replicaGetIndices[i++] = el;
877 }
878 } else {
879 this.replicaGetIndices = null;
880 }
881 this.callsInProgress = !hasAnyReplicaGets ? null :
882 Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
883
884 this.errorsByServer = createServerErrorTracker();
885 this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
886 }
887
888 @VisibleForTesting
889 long getActionsInProgress() {
890 return actionsInProgress.get();
891 }
892
893 public Set<MultiServerCallable<Row>> getCallsInProgress() {
894 return callsInProgress;
895 }
896
897
898
899
900
901
902
903 private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
904 Map<ServerName, MultiAction<Row>> actionsByServer =
905 new HashMap<ServerName, MultiAction<Row>>();
906
907 boolean isReplica = false;
908 List<Action<Row>> unknownReplicaActions = null;
909 for (Action<Row> action : currentActions) {
910 RegionLocations locs = findAllLocationsOrFail(action, true);
911 if (locs == null) continue;
912 boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
913 if (isReplica && !isReplicaAction) {
914
915 throw new AssertionError("Replica and non-replica actions in the same retry");
916 }
917 isReplica = isReplicaAction;
918 HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
919 if (loc == null || loc.getServerName() == null) {
920 if (isReplica) {
921 if (unknownReplicaActions == null) {
922 unknownReplicaActions = new ArrayList<Action<Row>>();
923 }
924 unknownReplicaActions.add(action);
925 } else {
926
927 manageLocationError(action, null);
928 }
929 } else {
930 byte[] regionName = loc.getRegionInfo().getRegionName();
931 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
932 }
933 }
934 boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
935 boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
936
937 if (!actionsByServer.isEmpty()) {
938
939 sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
940 ? currentActions : null, numAttempt > 1 && !hasUnknown);
941 }
942
943 if (hasUnknown) {
944 actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
945 for (Action<Row> action : unknownReplicaActions) {
946 HRegionLocation loc = getReplicaLocationOrFail(action);
947 if (loc == null) continue;
948 byte[] regionName = loc.getRegionInfo().getRegionName();
949 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
950 }
951 if (!actionsByServer.isEmpty()) {
952 sendMultiAction(
953 actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
954 }
955 }
956 }
957
958 private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
959
960
961 int replicaId = action.getReplicaId();
962 RegionLocations locs = findAllLocationsOrFail(action, true);
963 if (locs == null) return null;
964 HRegionLocation loc = locs.getRegionLocation(replicaId);
965 if (loc == null || loc.getServerName() == null) {
966 locs = findAllLocationsOrFail(action, false);
967 if (locs == null) return null;
968 loc = locs.getRegionLocation(replicaId);
969 }
970 if (loc == null || loc.getServerName() == null) {
971 manageLocationError(action, null);
972 return null;
973 }
974 return loc;
975 }
976
977 private void manageLocationError(Action<Row> action, Exception ex) {
978 String msg = "Cannot get replica " + action.getReplicaId()
979 + " location for " + action.getAction();
980 LOG.error(msg);
981 if (ex == null) {
982 ex = new IOException(msg);
983 }
984 manageError(action.getOriginalIndex(), action.getAction(),
985 Retry.NO_LOCATION_PROBLEM, ex, null);
986 }
987
988 private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
989 if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
990 ", row cannot be null");
991 RegionLocations loc = null;
992 try {
993 loc = connection.locateRegion(
994 tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
995 } catch (IOException ex) {
996 manageLocationError(action, ex);
997 }
998 return loc;
999 }
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009 private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
1010 int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
1011
1012
1013 int actionsRemaining = actionsByServer.size();
1014
1015 for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
1016 ServerName server = e.getKey();
1017 MultiAction<Row> multiAction = e.getValue();
1018 incTaskCounters(multiAction.getRegions(), server);
1019 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
1020 numAttempt);
1021
1022
1023 if (runnables.size() > actionsRemaining) {
1024 actionsRemaining = runnables.size();
1025 }
1026
1027
1028 for (Runnable runnable : runnables) {
1029 if ((--actionsRemaining == 0) && reuseThread) {
1030 runnable.run();
1031 } else {
1032 try {
1033 pool.submit(runnable);
1034 } catch (Throwable t) {
1035 if (t instanceof RejectedExecutionException) {
1036
1037
1038 LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1039 " Server is " + server.getServerName(), t);
1040 } else {
1041
1042 LOG.warn("Caught unexpected exception/error: ", t);
1043 }
1044 decTaskCounters(multiAction.getRegions(), server);
1045
1046
1047 receiveGlobalFailure(multiAction, server, numAttempt, t);
1048 }
1049 }
1050 }
1051 }
1052
1053 if (actionsForReplicaThread != null) {
1054 startWaitingForReplicaCalls(actionsForReplicaThread);
1055 }
1056 }
1057
1058 private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1059 MultiAction<Row> multiAction,
1060 int numAttempt) {
1061
1062 if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1063 if (connection.getConnectionMetrics() != null) {
1064 connection.getConnectionMetrics().incrNormalRunners();
1065 }
1066 return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1067 new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1068 }
1069
1070
1071 Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1072 .size());
1073
1074
1075 for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1076 Long backoff = getBackoff(server, e.getKey());
1077 DelayingRunner runner = actions.get(backoff);
1078 if (runner == null) {
1079 actions.put(backoff, new DelayingRunner(backoff, e));
1080 } else {
1081 runner.add(e);
1082 }
1083 }
1084
1085 List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1086 for (DelayingRunner runner : actions.values()) {
1087 String traceText = "AsyncProcess.sendMultiAction";
1088 Runnable runnable =
1089 new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1090 callsInProgress);
1091
1092 if (runner.getSleepTime() > 0) {
1093 runner.setRunner(runnable);
1094 traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1095 runnable = runner;
1096 if (connection.getConnectionMetrics() != null) {
1097 connection.getConnectionMetrics().incrDelayRunners();
1098 connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
1099 }
1100 } else {
1101 if (connection.getConnectionMetrics() != null) {
1102 connection.getConnectionMetrics().incrNormalRunners();
1103 }
1104 }
1105 runnable = Trace.wrap(traceText, runnable);
1106 toReturn.add(runnable);
1107
1108 }
1109 return toReturn;
1110 }
1111
1112
1113
1114
1115
1116
1117
1118 private Long getBackoff(ServerName server, byte[] regionName) {
1119 ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1120 ServerStatistics stats = tracker.getStats(server);
1121 return AsyncProcess.this.connection.getBackoffPolicy()
1122 .getBackoffTime(server, regionName, stats);
1123 }
1124
1125
1126
1127
1128 private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1129 long startTime = EnvironmentEdgeManager.currentTime();
1130 ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1131 actionsForReplicaThread, startTime);
1132 if (primaryCallTimeoutMicroseconds == 0) {
1133
1134 replicaRunnable.run();
1135 } else {
1136
1137
1138 try {
1139 pool.submit(replicaRunnable);
1140 } catch (RejectedExecutionException ree) {
1141 LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1142 }
1143 }
1144 }
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156 public Retry manageError(int originalIndex, Row row, Retry canRetry,
1157 Throwable throwable, ServerName server) {
1158 if (canRetry == Retry.YES
1159 && throwable != null && (throwable instanceof DoNotRetryIOException ||
1160 throwable instanceof NeedUnmanagedConnectionException)) {
1161 canRetry = Retry.NO_NOT_RETRIABLE;
1162 }
1163
1164 if (canRetry != Retry.YES) {
1165
1166 setError(originalIndex, row, throwable, server);
1167 } else if (isActionComplete(originalIndex, row)) {
1168 canRetry = Retry.NO_OTHER_SUCCEEDED;
1169 }
1170 return canRetry;
1171 }
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181 private void receiveGlobalFailure(
1182 MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1183 errorsByServer.reportServerError(server);
1184 Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1185 ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1186
1187 if (tableName == null) {
1188
1189 connection.clearCaches(server);
1190 }
1191 int failed = 0, stopped = 0;
1192 List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1193 for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1194 byte[] regionName = e.getKey();
1195 byte[] row = e.getValue().iterator().next().getAction().getRow();
1196
1197
1198 if (tableName != null) {
1199 connection.updateCachedLocations(tableName, regionName, row,
1200 ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
1201 }
1202 for (Action<Row> action : e.getValue()) {
1203 Retry retry = manageError(
1204 action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1205 if (retry == Retry.YES) {
1206 toReplay.add(action);
1207 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1208 ++stopped;
1209 } else {
1210 ++failed;
1211 }
1212 }
1213 }
1214
1215 if (toReplay.isEmpty()) {
1216 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1217 } else {
1218 resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1219 }
1220 }
1221
1222
1223
1224
1225
1226 private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1227 int numAttempt, int failureCount, Throwable throwable) {
1228
1229
1230
1231
1232
1233
1234
1235
1236 boolean retryImmediately = throwable instanceof RetryImmediatelyException;
1237 int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
1238 long backOffTime = retryImmediately ? 0 :
1239 errorsByServer.calculateBackoffTime(oldServer, pause);
1240 if (numAttempt > startLogErrorsCnt) {
1241
1242
1243 LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1244 oldServer, throwable, backOffTime, true, null, -1, -1));
1245 }
1246
1247 try {
1248 if (backOffTime > 0) {
1249 Thread.sleep(backOffTime);
1250 }
1251 } catch (InterruptedException e) {
1252 LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1253 Thread.currentThread().interrupt();
1254 return;
1255 }
1256
1257 groupAndSendMultiAction(toReplay, nextAttemptNumber);
1258 }
1259
1260 private void logNoResubmit(ServerName oldServer, int numAttempt,
1261 int failureCount, Throwable throwable, int failed, int stopped) {
1262 if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1263 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1264 String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1265 throwable, -1, false, timeStr, failed, stopped);
1266 if (failed != 0) {
1267
1268 LOG.warn(logMessage);
1269 } else {
1270 LOG.info(logMessage);
1271 }
1272 }
1273 }
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283 private void receiveMultiAction(MultiAction<Row> multiAction,
1284 ServerName server, MultiResponse responses, int numAttempt) {
1285 assert responses != null;
1286
1287
1288
1289
1290
1291
1292
1293 List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1294 Throwable throwable = null;
1295 int failureCount = 0;
1296 Retry canRetry = null;
1297 Map<byte[], Map<Integer, Object>> results = responses.getResults();
1298
1299 int failed = 0;
1300 int stopped = 0;
1301 for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1302 byte[] regionName = regionEntry.getKey();
1303
1304 Throwable regionException = responses.getExceptions().get(regionName);
1305 if (tableName == null && regionException != null &&
1306 ClientExceptionsUtil.isMetaClearingException(regionException)) {
1307
1308
1309
1310 connection.clearCaches(server);
1311 }
1312
1313 Map<Integer, Object> regionResults;
1314 if (results.containsKey(regionName)) {
1315 regionResults = results.get(regionName);
1316 } else {
1317 regionResults = Collections.emptyMap();
1318 }
1319
1320 boolean regionFailureRegistered = false;
1321 for (Action<Row> sentAction : regionEntry.getValue()) {
1322 Object result = regionResults.get(sentAction.getOriginalIndex());
1323 if (result == null) {
1324 if (regionException == null) {
1325 LOG.error("Server sent us neither results nor exceptions for "
1326 + Bytes.toStringBinary(regionName)
1327 + ", numAttempt:" + numAttempt);
1328 regionException = new RuntimeException("Invalid response");
1329 }
1330
1331
1332 result = regionException;
1333 }
1334
1335 if (result instanceof Throwable) {
1336 Row row = sentAction.getAction();
1337 throwable = regionException != null ? regionException
1338 : ClientExceptionsUtil.findException(result);
1339
1340 if (!regionFailureRegistered) {
1341 regionFailureRegistered = true;
1342 connection.updateCachedLocations(
1343 tableName, regionName, row.getRow(), result, server);
1344 }
1345 if (canRetry == null) {
1346 errorsByServer.reportServerError(server);
1347
1348 canRetry = errorsByServer.canRetryMore(numAttempt) ?
1349 Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1350 }
1351 ++failureCount;
1352 switch (manageError(sentAction.getOriginalIndex(), row, canRetry, (Throwable) result,
1353 server)) {
1354 case YES:
1355 toReplay.add(sentAction);
1356 break;
1357 case NO_OTHER_SUCCEEDED:
1358 ++stopped;
1359 break;
1360 default:
1361 ++failed;
1362 break;
1363 }
1364 } else {
1365
1366 if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
1367 AsyncProcess.this.connection.getConnectionMetrics().
1368 updateServerStats(server, regionName, result);
1369 }
1370
1371
1372
1373 if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1374 result = ResultStatsUtil.updateStats(result,
1375 AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1376 }
1377
1378 if (callback != null) {
1379 try {
1380
1381
1382 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1383 } catch (Throwable t) {
1384 LOG.error("User callback threw an exception for "
1385 + Bytes.toStringBinary(regionName) + ", ignoring", t);
1386 }
1387 }
1388 setResult(sentAction, result);
1389 }
1390 }
1391 }
1392 if (toReplay.isEmpty()) {
1393 logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1394 } else {
1395 resubmit(server, toReplay, numAttempt, failureCount, throwable);
1396 }
1397 }
1398
1399 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1400 Throwable error, long backOffTime, boolean willRetry, String startTime,
1401 int failed, int stopped) {
1402 StringBuilder sb = new StringBuilder();
1403 sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1404 .append("attempt=").append(numAttempt)
1405 .append("/").append(numTries).append(" ");
1406
1407 if (failureCount > 0 || error != null){
1408 sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1409 append(error == null ? "null" : error);
1410 } else {
1411 sb.append("succeeded");
1412 }
1413
1414 sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1415
1416 if (willRetry) {
1417 sb.append(", retrying after=").append(backOffTime).append("ms").
1418 append(", replay=").append(replaySize).append("ops");
1419 } else if (failureCount > 0) {
1420 if (stopped > 0) {
1421 sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1422 }
1423 if (failed > 0) {
1424 sb.append("; not retrying ").append(failed).append(" - final failure");
1425 }
1426
1427 }
1428
1429 return sb.toString();
1430 }
1431
1432
1433
1434
1435
1436
1437 private void setResult(Action<Row> action, Object result) {
1438 if (result == null) {
1439 throw new RuntimeException("Result cannot be null");
1440 }
1441 ReplicaResultState state = null;
1442 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1443 int index = action.getOriginalIndex();
1444 if (results == null) {
1445 decActionCounter(index);
1446 return;
1447 } else if ((state = trySetResultSimple(
1448 index, action.getAction(), false, result, null, isStale)) == null) {
1449 return;
1450 }
1451 assert state != null;
1452
1453
1454
1455
1456
1457 synchronized (state) {
1458 if (state.callCount == 0) {
1459 return;
1460 }
1461 state.callCount = 0;
1462 }
1463 synchronized (replicaResultLock) {
1464 if (results[index] != state) {
1465 throw new AssertionError("We set the callCount but someone else replaced the result");
1466 }
1467 results[index] = result;
1468 }
1469
1470 decActionCounter(index);
1471 }
1472
1473
1474
1475
1476
1477
1478
1479
1480 private void setError(int index, Row row, Throwable throwable, ServerName server) {
1481 ReplicaResultState state = null;
1482 if (results == null) {
1483
1484
1485
1486 errors.add(throwable, row, server);
1487 decActionCounter(index);
1488 return;
1489 } else if ((state = trySetResultSimple(
1490 index, row, true, throwable, server, false)) == null) {
1491 return;
1492 }
1493 assert state != null;
1494 BatchErrors target = null;
1495 boolean isActionDone = false;
1496 synchronized (state) {
1497 switch (state.callCount) {
1498 case 0: return;
1499 case 1: {
1500 target = errors;
1501 isActionDone = true;
1502 break;
1503 }
1504 default: {
1505 assert state.callCount > 1;
1506 if (state.replicaErrors == null) {
1507 state.replicaErrors = new BatchErrors();
1508 }
1509 target = state.replicaErrors;
1510 break;
1511 }
1512 }
1513 --state.callCount;
1514 }
1515 target.add(throwable, row, server);
1516 if (isActionDone) {
1517 if (state.replicaErrors != null) {
1518 errors.merge(state.replicaErrors);
1519 }
1520
1521 synchronized (replicaResultLock) {
1522 if (results[index] != state) {
1523 throw new AssertionError("We set the callCount but someone else replaced the result");
1524 }
1525 results[index] = throwable;
1526 }
1527 decActionCounter(index);
1528 }
1529 }
1530
1531
1532
1533
1534
1535
1536
1537
1538 private boolean isActionComplete(int index, Row row) {
1539 if (!isReplicaGet(row)) return false;
1540 Object resObj = results[index];
1541 return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1542 || ((ReplicaResultState)resObj).callCount == 0);
1543 }
1544
1545
1546
1547
1548
1549 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1550 Object result, ServerName server, boolean isFromReplica) {
1551 Object resObj = null;
1552 if (!isReplicaGet(row)) {
1553 if (isFromReplica) {
1554 throw new AssertionError("Unexpected stale result for " + row);
1555 }
1556 results[index] = result;
1557 } else {
1558 synchronized (replicaResultLock) {
1559 if ((resObj = results[index]) == null) {
1560 if (isFromReplica) {
1561 throw new AssertionError("Unexpected stale result for " + row);
1562 }
1563 results[index] = result;
1564 }
1565 }
1566 }
1567
1568 ReplicaResultState rrs =
1569 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1570 if (rrs == null && isError) {
1571
1572 errors.add((Throwable)result, row, server);
1573 }
1574
1575 if (resObj == null) {
1576
1577 decActionCounter(index);
1578 return null;
1579 }
1580 return rrs;
1581 }
1582
1583 private void decActionCounter(int index) {
1584 long actionsRemaining = actionsInProgress.decrementAndGet();
1585 if (actionsRemaining < 0) {
1586 String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1587 throw new AssertionError(error);
1588 } else if (actionsRemaining == 0) {
1589 synchronized (actionsInProgress) {
1590 actionsInProgress.notifyAll();
1591 }
1592 }
1593 }
1594
1595 private String buildDetailedErrorMsg(String string, int index) {
1596 StringBuilder error = new StringBuilder(string);
1597 error.append("; called for ").
1598 append(index).
1599 append(", actionsInProgress ").
1600 append(actionsInProgress.get()).
1601 append("; replica gets: ");
1602 if (replicaGetIndices != null) {
1603 for (int i = 0; i < replicaGetIndices.length; ++i) {
1604 error.append(replicaGetIndices[i]).append(", ");
1605 }
1606 } else {
1607 error.append(hasAnyReplicaGets ? "all" : "none");
1608 }
1609 error.append("; results ");
1610 if (results != null) {
1611 for (int i = 0; i < results.length; ++i) {
1612 Object o = results[i];
1613 error.append(((o == null) ? "null" : o.toString())).append(", ");
1614 }
1615 }
1616 return error.toString();
1617 }
1618
1619 @Override
1620 public void waitUntilDone() throws InterruptedIOException {
1621 try {
1622 waitUntilDone(Long.MAX_VALUE);
1623 } catch (InterruptedException iex) {
1624 throw new InterruptedIOException(iex.getMessage());
1625 } finally {
1626 if (callsInProgress != null) {
1627 for (MultiServerCallable<Row> clb : callsInProgress) {
1628 clb.cancel();
1629 }
1630 }
1631 }
1632 }
1633
1634 private boolean waitUntilDone(long cutoff) throws InterruptedException {
1635 boolean hasWait = cutoff != Long.MAX_VALUE;
1636 long lastLog = EnvironmentEdgeManager.currentTime();
1637 long currentInProgress;
1638 while (0 != (currentInProgress = actionsInProgress.get())) {
1639 long now = EnvironmentEdgeManager.currentTime();
1640 if (hasWait && (now * 1000L) > cutoff) {
1641 return false;
1642 }
1643 if (!hasWait) {
1644 if (now > lastLog + 10000) {
1645 lastLog = now;
1646 LOG.info("#" + id + ", waiting for " + currentInProgress
1647 + " actions to finish on table: " + tableName);
1648 if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1649 logDetailsOfUndoneTasks(currentInProgress);
1650 }
1651 }
1652 }
1653 synchronized (actionsInProgress) {
1654 if (actionsInProgress.get() == 0) break;
1655 if (!hasWait) {
1656 actionsInProgress.wait(10);
1657 } else {
1658 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1659 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1660 }
1661 }
1662 }
1663 return true;
1664 }
1665
1666 @Override
1667 public boolean hasError() {
1668 return errors.hasErrors();
1669 }
1670
1671 @Override
1672 public List<? extends Row> getFailedOperations() {
1673 return errors.actions;
1674 }
1675
1676 @Override
1677 public RetriesExhaustedWithDetailsException getErrors() {
1678 return errors.makeException();
1679 }
1680
1681 @Override
1682 public Object[] getResults() throws InterruptedIOException {
1683 waitUntilDone();
1684 return results;
1685 }
1686 }
1687
1688 @VisibleForTesting
1689
1690 protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1691 TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1692 Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1693 return new AsyncRequestFutureImpl<CResult>(
1694 tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1695 }
1696
1697
1698
1699
1700 @VisibleForTesting
1701 protected MultiServerCallable<Row> createCallable(final ServerName server,
1702 TableName tableName, final MultiAction<Row> multi) {
1703 return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1704 }
1705
1706
1707
1708
1709 @VisibleForTesting
1710 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1711 return rpcCallerFactory.<MultiResponse> newCaller();
1712 }
1713
1714 @VisibleForTesting
1715
1716 void waitUntilDone() throws InterruptedIOException {
1717 waitForMaximumCurrentTasks(0, null);
1718 }
1719
1720
1721 private void waitForMaximumCurrentTasks(int max, String tableName)
1722 throws InterruptedIOException {
1723 waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
1724 }
1725
1726
1727 @VisibleForTesting
1728 void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
1729 String tableName) throws InterruptedIOException {
1730 long lastLog = EnvironmentEdgeManager.currentTime();
1731 long currentInProgress, oldInProgress = Long.MAX_VALUE;
1732 while ((currentInProgress = tasksInProgress.get()) > max) {
1733 if (oldInProgress != currentInProgress) {
1734 long now = EnvironmentEdgeManager.currentTime();
1735 if (now > lastLog + 10000) {
1736 lastLog = now;
1737 LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1738 + max + ", tasksInProgress=" + currentInProgress +
1739 " hasError=" + hasError() + (tableName == null ? "" : ", tableName=" + tableName));
1740 if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1741 logDetailsOfUndoneTasks(currentInProgress);
1742 }
1743 }
1744 }
1745 oldInProgress = currentInProgress;
1746 try {
1747 synchronized (tasksInProgress) {
1748 if (tasksInProgress.get() == oldInProgress) {
1749 tasksInProgress.wait(10);
1750 }
1751 }
1752 } catch (InterruptedException e) {
1753 throw new InterruptedIOException("#" + id + ", interrupted." +
1754 " currentNumberOfTask=" + currentInProgress);
1755 }
1756 }
1757 }
1758
1759 private void logDetailsOfUndoneTasks(long taskInProgress) {
1760 ArrayList<ServerName> servers = new ArrayList<ServerName>();
1761 for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
1762 if (entry.getValue().get() > 0) {
1763 servers.add(entry.getKey());
1764 }
1765 }
1766 LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
1767 if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
1768 ArrayList<String> regions = new ArrayList<String>();
1769 for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
1770 if (entry.getValue().get() > 0) {
1771 regions.add(Bytes.toString(entry.getKey()));
1772 }
1773 }
1774 LOG.info("Regions against which left over task(s) are processed: " + regions);
1775 }
1776 }
1777
1778
1779
1780
1781
1782
1783 public boolean hasError() {
1784 return globalErrors.hasErrors();
1785 }
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797 public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1798 List<Row> failedRows, String tableName) throws InterruptedIOException {
1799 waitForMaximumCurrentTasks(0, tableName);
1800 if (!globalErrors.hasErrors()) {
1801 return null;
1802 }
1803 if (failedRows != null) {
1804 failedRows.addAll(globalErrors.actions);
1805 }
1806 RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1807 globalErrors.clear();
1808 return result;
1809 }
1810
1811
1812
1813
1814 protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1815 tasksInProgress.incrementAndGet();
1816
1817 AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1818 if (serverCnt == null) {
1819 taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1820 serverCnt = taskCounterPerServer.get(sn);
1821 }
1822 serverCnt.incrementAndGet();
1823
1824 for (byte[] regBytes : regions) {
1825 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1826 if (regionCnt == null) {
1827 regionCnt = new AtomicInteger();
1828 AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1829 if (oldCnt != null) {
1830 regionCnt = oldCnt;
1831 }
1832 }
1833 regionCnt.incrementAndGet();
1834 }
1835 }
1836
1837
1838
1839
1840 protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1841 for (byte[] regBytes : regions) {
1842 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1843 regionCnt.decrementAndGet();
1844 }
1845
1846 taskCounterPerServer.get(sn).decrementAndGet();
1847 tasksInProgress.decrementAndGet();
1848 synchronized (tasksInProgress) {
1849 tasksInProgress.notifyAll();
1850 }
1851 }
1852
1853
1854
1855
1856
1857
1858
1859
1860 protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1861 return new ConnectionManager.ServerErrorTracker(
1862 this.serverTrackerTimeout, this.numTries);
1863 }
1864
1865 private static boolean isReplicaGet(Row row) {
1866 return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1867 }
1868
1869
1870
1871
1872 private enum Retry {
1873 YES,
1874 NO_LOCATION_PROBLEM,
1875 NO_NOT_RETRIABLE,
1876 NO_RETRIES_EXHAUSTED,
1877 NO_OTHER_SUCCEEDED
1878 }
1879 }