1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.ConcurrentSkipListMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.hbase.classification.InterfaceAudience;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.hbase.DoNotRetryIOException;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.HRegionLocation;
50 import org.apache.hadoop.hbase.RegionLocations;
51 import org.apache.hadoop.hbase.ServerName;
52 import org.apache.hadoop.hbase.TableName;
53 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
54 import org.apache.hadoop.hbase.client.coprocessor.Batch;
55 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58 import org.apache.htrace.Trace;
59
60 import com.google.common.annotations.VisibleForTesting;
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 @InterfaceAudience.Private
98 class AsyncProcess {
99 protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
100 protected static final AtomicLong COUNTER = new AtomicLong();
101
102 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
103
104
105
106
107
108
109
110 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
111 "hbase.client.start.log.errors.counter";
112 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
113
114
115
116
117 public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
118
119 private final int thresholdToLogUndoneTaskDetails;
120 private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
121 "hbase.client.threshold.log.details";
122 private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
123 private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
124
125
126
127
128
129
130
131 public static interface AsyncRequestFuture {
132 public boolean hasError();
133 public RetriesExhaustedWithDetailsException getErrors();
134 public List<? extends Row> getFailedOperations();
135 public Object[] getResults() throws InterruptedIOException;
136
137 public void waitUntilDone() throws InterruptedIOException;
138 }
139
140
141 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
142 public final Object[] result = new Object[0];
143 @Override
144 public boolean hasError() { return false; }
145 @Override
146 public RetriesExhaustedWithDetailsException getErrors() { return null; }
147 @Override
148 public List<? extends Row> getFailedOperations() { return null; }
149 @Override
150 public Object[] getResults() { return result; }
151 @Override
152 public void waitUntilDone() throws InterruptedIOException {}
153 };
154
155
156
157
158
159 private static class ReplicaResultState {
160 public ReplicaResultState(int callCount) {
161 this.callCount = callCount;
162 }
163
164
165 int callCount;
166
167
168 BatchErrors replicaErrors = null;
169
170 @Override
171 public String toString() {
172 return "[call count " + callCount + "; errors " + replicaErrors + "]";
173 }
174 }
175
176
177
178 protected final long id;
179
180 protected final ClusterConnection connection;
181 protected final RpcRetryingCallerFactory rpcCallerFactory;
182 protected final RpcControllerFactory rpcFactory;
183 protected final BatchErrors globalErrors;
184 protected final ExecutorService pool;
185
186 protected final AtomicLong tasksInProgress = new AtomicLong(0);
187 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
188 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
189 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
190 new ConcurrentHashMap<ServerName, AtomicInteger>();
191
192
193 private final int startLogErrorsCnt;
194
195
196
197
198 protected final int maxTotalConcurrentTasks;
199
200
201
202
203
204
205
206 protected final int maxConcurrentTasksPerRegion;
207
208
209
210
211 protected final int maxConcurrentTasksPerServer;
212 protected final long pause;
213 protected int numTries;
214 protected int serverTrackerTimeout;
215 protected int timeout;
216 protected long primaryCallTimeoutMicroseconds;
217
218
219 protected static class BatchErrors {
220 private final List<Throwable> throwables = new ArrayList<Throwable>();
221 private final List<Row> actions = new ArrayList<Row>();
222 private final List<String> addresses = new ArrayList<String>();
223
224 public synchronized void add(Throwable ex, Row row, ServerName serverName) {
225 if (row == null){
226 throw new IllegalArgumentException("row cannot be null. location=" + serverName);
227 }
228
229 throwables.add(ex);
230 actions.add(row);
231 addresses.add(serverName != null ? serverName.toString() : "null");
232 }
233
234 public boolean hasErrors() {
235 return !throwables.isEmpty();
236 }
237
238 private synchronized RetriesExhaustedWithDetailsException makeException() {
239 return new RetriesExhaustedWithDetailsException(
240 new ArrayList<Throwable>(throwables),
241 new ArrayList<Row>(actions), new ArrayList<String>(addresses));
242 }
243
244 public synchronized void clear() {
245 throwables.clear();
246 actions.clear();
247 addresses.clear();
248 }
249
250 public synchronized void merge(BatchErrors other) {
251 throwables.addAll(other.throwables);
252 actions.addAll(other.actions);
253 addresses.addAll(other.addresses);
254 }
255 }
256
257 public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
258 RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
259 if (hc == null) {
260 throw new IllegalArgumentException("HConnection cannot be null.");
261 }
262
263 this.connection = hc;
264 this.pool = pool;
265 this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
266
267 this.id = COUNTER.incrementAndGet();
268
269 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
270 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
271 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
272 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
273 this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
274 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
275 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
276
277 this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
278 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
279 this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
280 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
281 this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
282 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
283
284 this.startLogErrorsCnt =
285 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
286
287 if (this.maxTotalConcurrentTasks <= 0) {
288 throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
289 }
290 if (this.maxConcurrentTasksPerServer <= 0) {
291 throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
292 maxConcurrentTasksPerServer);
293 }
294 if (this.maxConcurrentTasksPerRegion <= 0) {
295 throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
296 maxConcurrentTasksPerRegion);
297 }
298
299
300
301
302
303
304
305
306 this.serverTrackerTimeout = 0;
307 for (int i = 0; i < this.numTries; ++i) {
308 serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
309 }
310
311 this.rpcCallerFactory = rpcCaller;
312 this.rpcFactory = rpcFactory;
313
314 this.thresholdToLogUndoneTaskDetails =
315 conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
316 DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
317 }
318
319
320
321
322
323 private ExecutorService getPool(ExecutorService pool) {
324 if (pool != null) return pool;
325 if (this.pool != null) return this.pool;
326 throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
327 }
328
329
330
331
332
333 public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
334 boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
335 throws InterruptedIOException {
336 return submit(null, tableName, rows, atLeastOne, callback, needResults);
337 }
338
339
340
341
342
343
344
345
346
347
348
349
350
351 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
352 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
353 boolean needResults) throws InterruptedIOException {
354 if (rows.isEmpty()) {
355 return NO_REQS_RESULT;
356 }
357
358 Map<ServerName, MultiAction<Row>> actionsByServer =
359 new HashMap<ServerName, MultiAction<Row>>();
360 List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
361
362 NonceGenerator ng = this.connection.getNonceGenerator();
363 long nonceGroup = ng.getNonceGroup();
364
365
366 List<Exception> locationErrors = null;
367 List<Integer> locationErrorRows = null;
368 do {
369
370 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
371
372
373
374 Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
375 Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
376
377 int posInList = -1;
378 Iterator<? extends Row> it = rows.iterator();
379 while (it.hasNext()) {
380 Row r = it.next();
381 HRegionLocation loc;
382 try {
383 if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
384
385 RegionLocations locs = connection.locateRegion(
386 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
387 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
388 throw new IOException("#" + id + ", no location found, aborting submit for"
389 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
390 }
391 loc = locs.getDefaultRegionLocation();
392 } catch (IOException ex) {
393 locationErrors = new ArrayList<Exception>();
394 locationErrorRows = new ArrayList<Integer>();
395 LOG.error("Failed to get region location ", ex);
396
397
398 retainedActions.add(new Action<Row>(r, ++posInList));
399 locationErrors.add(ex);
400 locationErrorRows.add(posInList);
401 it.remove();
402 break;
403 }
404
405 if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
406 Action<Row> action = new Action<Row>(r, ++posInList);
407 setNonce(ng, r, action);
408 retainedActions.add(action);
409
410 byte[] regionName = loc.getRegionInfo().getRegionName();
411 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
412 it.remove();
413 }
414 }
415 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
416
417 if (retainedActions.isEmpty()) return NO_REQS_RESULT;
418
419 return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
420 locationErrors, locationErrorRows, actionsByServer, pool);
421 }
422
423 <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
424 List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
425 Object[] results, boolean needResults, List<Exception> locationErrors,
426 List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
427 ExecutorService pool) {
428 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
429 tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
430
431 if (locationErrors != null) {
432 for (int i = 0; i < locationErrors.size(); ++i) {
433 int originalIndex = locationErrorRows.get(i);
434 Row row = retainedActions.get(originalIndex).getAction();
435 ars.manageError(originalIndex, row,
436 Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
437 }
438 }
439 ars.sendMultiAction(actionsByServer, 1, null, false);
440 return ars;
441 }
442
443
444
445
446
447
448
449
450
451 private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
452 Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
453 MultiAction<Row> multiAction = actionsByServer.get(server);
454 if (multiAction == null) {
455 multiAction = new MultiAction<Row>();
456 actionsByServer.put(server, multiAction);
457 }
458 if (action.hasNonce() && !multiAction.hasNonceGroup()) {
459 multiAction.setNonceGroup(nonceGroup);
460 }
461
462 multiAction.add(regionName, action);
463 }
464
465
466
467
468
469
470
471
472
473 protected boolean canTakeOperation(HRegionLocation loc,
474 Map<HRegionInfo, Boolean> regionsIncluded,
475 Map<ServerName, Boolean> serversIncluded) {
476 HRegionInfo regionInfo = loc.getRegionInfo();
477 Boolean regionPrevious = regionsIncluded.get(regionInfo);
478
479 if (regionPrevious != null) {
480
481 return regionPrevious;
482 }
483
484 Boolean serverPrevious = serversIncluded.get(loc.getServerName());
485 if (Boolean.FALSE.equals(serverPrevious)) {
486
487 regionsIncluded.put(regionInfo, Boolean.FALSE);
488 return false;
489 }
490
491 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
492 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
493
494 regionsIncluded.put(regionInfo, Boolean.FALSE);
495 return false;
496 }
497
498 if (serverPrevious == null) {
499
500 int newServers = 0;
501 for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
502 if (kv.getValue()) {
503 newServers++;
504 }
505 }
506
507
508 boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
509
510 if (ok) {
511
512 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
513 ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
514 }
515
516 if (!ok) {
517 regionsIncluded.put(regionInfo, Boolean.FALSE);
518 serversIncluded.put(loc.getServerName(), Boolean.FALSE);
519 return false;
520 }
521
522 serversIncluded.put(loc.getServerName(), Boolean.TRUE);
523 } else {
524 assert serverPrevious.equals(Boolean.TRUE);
525 }
526
527 regionsIncluded.put(regionInfo, Boolean.TRUE);
528
529 return true;
530 }
531
532
533
534
535
536 public <CResult> AsyncRequestFuture submitAll(TableName tableName,
537 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
538 return submitAll(null, tableName, rows, callback, results);
539 }
540
541
542
543
544
545
546
547
548
549
550
551 public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
552 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
553 List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
554
555
556 int posInList = -1;
557 NonceGenerator ng = this.connection.getNonceGenerator();
558 for (Row r : rows) {
559 posInList++;
560 if (r instanceof Put) {
561 Put put = (Put) r;
562 if (put.isEmpty()) {
563 throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
564 }
565 }
566 Action<Row> action = new Action<Row>(r, posInList);
567 setNonce(ng, r, action);
568 actions.add(action);
569 }
570 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
571 tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
572 ars.groupAndSendMultiAction(actions, 1);
573 return ars;
574 }
575
576 private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
577 if (!(r instanceof Append) && !(r instanceof Increment)) return;
578 action.setNonce(ng.newNonce());
579 }
580
581
582
583
584
585
586
587
588
589
590 protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
591
592
593
594
595
596
597
598
599 private final class ReplicaCallIssuingRunnable implements Runnable {
600 private final long startTime;
601 private final List<Action<Row>> initialActions;
602
603 public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
604 this.initialActions = initialActions;
605 this.startTime = startTime;
606 }
607
608 @Override
609 public void run() {
610 boolean done = false;
611 if (primaryCallTimeoutMicroseconds > 0) {
612 try {
613 done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
614 } catch (InterruptedException ex) {
615 LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
616 return;
617 }
618 }
619 if (done) return;
620 Map<ServerName, MultiAction<Row>> actionsByServer =
621 new HashMap<ServerName, MultiAction<Row>>();
622 List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
623 if (replicaGetIndices == null) {
624 for (int i = 0; i < results.length; ++i) {
625 addReplicaActions(i, actionsByServer, unknownLocActions);
626 }
627 } else {
628 for (int replicaGetIndice : replicaGetIndices) {
629 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
630 }
631 }
632 if (!actionsByServer.isEmpty()) {
633 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
634 }
635 if (!unknownLocActions.isEmpty()) {
636 actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
637 for (Action<Row> action : unknownLocActions) {
638 addReplicaActionsAgain(action, actionsByServer);
639 }
640
641 if (!actionsByServer.isEmpty()) {
642 sendMultiAction(actionsByServer, 1, null, true);
643 }
644 }
645 }
646
647
648
649
650
651
652 private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
653 List<Action<Row>> unknownReplicaActions) {
654 if (results[index] != null) return;
655 Action<Row> action = initialActions.get(index);
656 RegionLocations loc = findAllLocationsOrFail(action, true);
657 if (loc == null) return;
658 HRegionLocation[] locs = loc.getRegionLocations();
659 if (locs.length == 1) {
660 LOG.warn("No replicas found for " + action.getAction());
661 return;
662 }
663 synchronized (replicaResultLock) {
664
665
666
667 if (results[index] != null) return;
668
669
670 results[index] = new ReplicaResultState(locs.length);
671 }
672 for (int i = 1; i < locs.length; ++i) {
673 Action<Row> replicaAction = new Action<Row>(action, i);
674 if (locs[i] != null) {
675 addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
676 replicaAction, actionsByServer, nonceGroup);
677 } else {
678 unknownReplicaActions.add(replicaAction);
679 }
680 }
681 }
682
683 private void addReplicaActionsAgain(
684 Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
685 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
686 throw new AssertionError("Cannot have default replica here");
687 }
688 HRegionLocation loc = getReplicaLocationOrFail(action);
689 if (loc == null) return;
690 addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
691 action, actionsByServer, nonceGroup);
692 }
693 }
694
695
696
697
698
699 private final class SingleServerRequestRunnable implements Runnable {
700 private final MultiAction<Row> multiAction;
701 private final int numAttempt;
702 private final ServerName server;
703 private final Set<MultiServerCallable<Row>> callsInProgress;
704
705 private SingleServerRequestRunnable(
706 MultiAction<Row> multiAction, int numAttempt, ServerName server,
707 Set<MultiServerCallable<Row>> callsInProgress) {
708 this.multiAction = multiAction;
709 this.numAttempt = numAttempt;
710 this.server = server;
711 this.callsInProgress = callsInProgress;
712 }
713
714 @Override
715 public void run() {
716 MultiResponse res;
717 MultiServerCallable<Row> callable = null;
718 try {
719 callable = createCallable(server, tableName, multiAction);
720 try {
721 RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
722 if (callsInProgress != null) callsInProgress.add(callable);
723 res = caller.callWithoutRetries(callable, timeout);
724
725 if (res == null) {
726
727 return;
728 }
729
730 } catch (IOException e) {
731
732
733 receiveGlobalFailure(multiAction, server, numAttempt, e);
734 return;
735 } catch (Throwable t) {
736
737 LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
738 " Retrying. Server is " + server + ", tableName=" + tableName, t);
739 receiveGlobalFailure(multiAction, server, numAttempt, t);
740 return;
741 }
742
743
744 receiveMultiAction(multiAction, server, res, numAttempt);
745 } catch (Throwable t) {
746
747 LOG.error("Internal AsyncProcess #" + id + " error for "
748 + tableName + " processing for " + server, t);
749 throw new RuntimeException(t);
750 } finally {
751 decTaskCounters(multiAction.getRegions(), server);
752 if (callsInProgress != null && callable != null) {
753 callsInProgress.remove(callable);
754 }
755 }
756 }
757 }
758
759 private final Batch.Callback<CResult> callback;
760 private final BatchErrors errors;
761 private final ConnectionManager.ServerErrorTracker errorsByServer;
762 private final ExecutorService pool;
763 private final Set<MultiServerCallable<Row>> callsInProgress;
764
765
766 private final TableName tableName;
767 private final AtomicLong actionsInProgress = new AtomicLong(-1);
768
769
770
771 private final Object replicaResultLock = new Object();
772
773
774
775
776
777
778
779 private final Object[] results;
780
781 private final int[] replicaGetIndices;
782 private final boolean hasAnyReplicaGets;
783 private final long nonceGroup;
784
785 public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
786 ExecutorService pool, boolean needResults, Object[] results,
787 Batch.Callback<CResult> callback) {
788 this.pool = pool;
789 this.callback = callback;
790 this.nonceGroup = nonceGroup;
791 this.tableName = tableName;
792 this.actionsInProgress.set(actions.size());
793 if (results != null) {
794 assert needResults;
795 if (results.length != actions.size()) throw new AssertionError("results.length");
796 this.results = results;
797 for (int i = 0; i != this.results.length; ++i) {
798 results[i] = null;
799 }
800 } else {
801 this.results = needResults ? new Object[actions.size()] : null;
802 }
803 List<Integer> replicaGetIndices = null;
804 boolean hasAnyReplicaGets = false;
805 if (needResults) {
806
807
808
809
810
811 boolean hasAnyNonReplicaReqs = false;
812 int posInList = 0;
813 for (Action<Row> action : actions) {
814 boolean isReplicaGet = isReplicaGet(action.getAction());
815 if (isReplicaGet) {
816 hasAnyReplicaGets = true;
817 if (hasAnyNonReplicaReqs) {
818 if (replicaGetIndices == null) {
819 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
820 }
821 replicaGetIndices.add(posInList);
822 }
823 } else if (!hasAnyNonReplicaReqs) {
824
825 hasAnyNonReplicaReqs = true;
826 if (posInList > 0) {
827
828
829 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
830 for (int i = 0; i < posInList; ++i) {
831 replicaGetIndices.add(i);
832 }
833 }
834 }
835 ++posInList;
836 }
837 }
838 this.hasAnyReplicaGets = hasAnyReplicaGets;
839 if (replicaGetIndices != null) {
840 this.replicaGetIndices = new int[replicaGetIndices.size()];
841 int i = 0;
842 for (Integer el : replicaGetIndices) {
843 this.replicaGetIndices[i++] = el;
844 }
845 } else {
846 this.replicaGetIndices = null;
847 }
848 this.callsInProgress = !hasAnyReplicaGets ? null :
849 Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
850
851 this.errorsByServer = createServerErrorTracker();
852 this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
853 }
854
855 public Set<MultiServerCallable<Row>> getCallsInProgress() {
856 return callsInProgress;
857 }
858
859
860
861
862
863
864
865 private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
866 Map<ServerName, MultiAction<Row>> actionsByServer =
867 new HashMap<ServerName, MultiAction<Row>>();
868
869 boolean isReplica = false;
870 List<Action<Row>> unknownReplicaActions = null;
871 for (Action<Row> action : currentActions) {
872 RegionLocations locs = findAllLocationsOrFail(action, true);
873 if (locs == null) continue;
874 boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
875 if (isReplica && !isReplicaAction) {
876
877 throw new AssertionError("Replica and non-replica actions in the same retry");
878 }
879 isReplica = isReplicaAction;
880 HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
881 if (loc == null || loc.getServerName() == null) {
882 if (isReplica) {
883 if (unknownReplicaActions == null) {
884 unknownReplicaActions = new ArrayList<Action<Row>>();
885 }
886 unknownReplicaActions.add(action);
887 } else {
888
889 manageLocationError(action, null);
890 }
891 } else {
892 byte[] regionName = loc.getRegionInfo().getRegionName();
893 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
894 }
895 }
896 boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
897 boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
898
899 if (!actionsByServer.isEmpty()) {
900
901 sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
902 ? currentActions : null, numAttempt > 1 && !hasUnknown);
903 }
904
905 if (hasUnknown) {
906 actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
907 for (Action<Row> action : unknownReplicaActions) {
908 HRegionLocation loc = getReplicaLocationOrFail(action);
909 if (loc == null) continue;
910 byte[] regionName = loc.getRegionInfo().getRegionName();
911 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
912 }
913 if (!actionsByServer.isEmpty()) {
914 sendMultiAction(
915 actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
916 }
917 }
918 }
919
920 private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
921
922
923 int replicaId = action.getReplicaId();
924 RegionLocations locs = findAllLocationsOrFail(action, true);
925 if (locs == null) return null;
926 HRegionLocation loc = locs.getRegionLocation(replicaId);
927 if (loc == null || loc.getServerName() == null) {
928 locs = findAllLocationsOrFail(action, false);
929 if (locs == null) return null;
930 loc = locs.getRegionLocation(replicaId);
931 }
932 if (loc == null || loc.getServerName() == null) {
933 manageLocationError(action, null);
934 return null;
935 }
936 return loc;
937 }
938
939 private void manageLocationError(Action<Row> action, Exception ex) {
940 String msg = "Cannot get replica " + action.getReplicaId()
941 + " location for " + action.getAction();
942 LOG.error(msg);
943 if (ex == null) {
944 ex = new IOException(msg);
945 }
946 manageError(action.getOriginalIndex(), action.getAction(),
947 Retry.NO_LOCATION_PROBLEM, ex, null);
948 }
949
950 private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
951 if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
952 ", row cannot be null");
953 RegionLocations loc = null;
954 try {
955 loc = connection.locateRegion(
956 tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
957 } catch (IOException ex) {
958 manageLocationError(action, ex);
959 }
960 return loc;
961 }
962
963
964
965
966
967
968
969
970
971 private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
972 int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
973
974
975 int actionsRemaining = actionsByServer.size();
976
977 for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
978 ServerName server = e.getKey();
979 MultiAction<Row> multiAction = e.getValue();
980 incTaskCounters(multiAction.getRegions(), server);
981 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
982 numAttempt);
983
984
985 if (runnables.size() > actionsRemaining) {
986 actionsRemaining = runnables.size();
987 }
988
989
990 for (Runnable runnable : runnables) {
991 if ((--actionsRemaining == 0) && reuseThread) {
992 runnable.run();
993 } else {
994 try {
995 pool.submit(runnable);
996 } catch (Throwable t) {
997 if (t instanceof RejectedExecutionException) {
998
999
1000 LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1001 " Server is " + server.getServerName(), t);
1002 } else {
1003
1004 LOG.warn("Caught unexpected exception/error: ", t);
1005 }
1006 decTaskCounters(multiAction.getRegions(), server);
1007
1008
1009 receiveGlobalFailure(multiAction, server, numAttempt, t);
1010 }
1011 }
1012 }
1013 }
1014
1015 if (actionsForReplicaThread != null) {
1016 startWaitingForReplicaCalls(actionsForReplicaThread);
1017 }
1018 }
1019
1020 private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1021 MultiAction<Row> multiAction,
1022 int numAttempt) {
1023
1024 if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1025 return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1026 new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1027 }
1028
1029
1030 Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1031 .size());
1032
1033
1034 for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1035 Long backoff = getBackoff(server, e.getKey());
1036 DelayingRunner runner = actions.get(backoff);
1037 if (runner == null) {
1038 actions.put(backoff, new DelayingRunner(backoff, e));
1039 } else {
1040 runner.add(e);
1041 }
1042 }
1043
1044 List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1045 for (DelayingRunner runner : actions.values()) {
1046 String traceText = "AsyncProcess.sendMultiAction";
1047 Runnable runnable =
1048 new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1049 callsInProgress);
1050
1051 if (runner.getSleepTime() > 0) {
1052 runner.setRunner(runnable);
1053 traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1054 runnable = runner;
1055 }
1056 runnable = Trace.wrap(traceText, runnable);
1057 toReturn.add(runnable);
1058
1059 }
1060 return toReturn;
1061 }
1062
1063
1064
1065
1066
1067
1068
1069 private Long getBackoff(ServerName server, byte[] regionName) {
1070 ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1071 ServerStatistics stats = tracker.getStats(server);
1072 return AsyncProcess.this.connection.getBackoffPolicy()
1073 .getBackoffTime(server, regionName, stats);
1074 }
1075
1076
1077
1078
1079 private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1080 long startTime = EnvironmentEdgeManager.currentTime();
1081 ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1082 actionsForReplicaThread, startTime);
1083 if (primaryCallTimeoutMicroseconds == 0) {
1084
1085 replicaRunnable.run();
1086 } else {
1087
1088
1089 try {
1090 pool.submit(replicaRunnable);
1091 } catch (RejectedExecutionException ree) {
1092 LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1093 }
1094 }
1095 }
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107 public Retry manageError(int originalIndex, Row row, Retry canRetry,
1108 Throwable throwable, ServerName server) {
1109 if (canRetry == Retry.YES
1110 && throwable != null && throwable instanceof DoNotRetryIOException) {
1111 canRetry = Retry.NO_NOT_RETRIABLE;
1112 }
1113
1114 if (canRetry != Retry.YES) {
1115
1116 setError(originalIndex, row, throwable, server);
1117 } else if (isActionComplete(originalIndex, row)) {
1118 canRetry = Retry.NO_OTHER_SUCCEEDED;
1119 }
1120 return canRetry;
1121 }
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131 private void receiveGlobalFailure(
1132 MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1133 errorsByServer.reportServerError(server);
1134 Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1135 ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1136
1137 if (tableName == null) {
1138
1139 connection.clearCaches(server);
1140 }
1141 int failed = 0, stopped = 0;
1142 List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1143 for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1144 byte[] regionName = e.getKey();
1145 byte[] row = e.getValue().iterator().next().getAction().getRow();
1146
1147
1148
1149 if (tableName != null) {
1150 connection.updateCachedLocations(tableName, regionName, row, null, server);
1151 }
1152 for (Action<Row> action : e.getValue()) {
1153 Retry retry = manageError(
1154 action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1155 if (retry == Retry.YES) {
1156 toReplay.add(action);
1157 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1158 ++stopped;
1159 } else {
1160 ++failed;
1161 }
1162 }
1163 }
1164
1165 if (toReplay.isEmpty()) {
1166 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1167 } else {
1168 resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1169 }
1170 }
1171
1172
1173
1174
1175
1176 private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1177 int numAttempt, int failureCount, Throwable throwable) {
1178
1179
1180
1181
1182
1183
1184
1185 long backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause);
1186 if (numAttempt > startLogErrorsCnt) {
1187
1188
1189 LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1190 oldServer, throwable, backOffTime, true, null, -1, -1));
1191 }
1192
1193 try {
1194 Thread.sleep(backOffTime);
1195 } catch (InterruptedException e) {
1196 LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1197 Thread.currentThread().interrupt();
1198 return;
1199 }
1200
1201 groupAndSendMultiAction(toReplay, numAttempt + 1);
1202 }
1203
1204 private void logNoResubmit(ServerName oldServer, int numAttempt,
1205 int failureCount, Throwable throwable, int failed, int stopped) {
1206 if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1207 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1208 String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1209 throwable, -1, false, timeStr, failed, stopped);
1210 if (failed != 0) {
1211
1212 LOG.warn(logMessage);
1213 } else {
1214 LOG.info(logMessage);
1215 }
1216 }
1217 }
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227 private void receiveMultiAction(MultiAction<Row> multiAction,
1228 ServerName server, MultiResponse responses, int numAttempt) {
1229 assert responses != null;
1230
1231
1232
1233
1234
1235
1236
1237 List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1238 Throwable throwable = null;
1239 int failureCount = 0;
1240 boolean canRetry = true;
1241
1242
1243 int failed = 0, stopped = 0;
1244 for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1245 byte[] regionName = regionEntry.getKey();
1246 Map<Integer, Object> regionResults = responses.getResults().get(regionName);
1247 if (regionResults == null) {
1248 if (!responses.getExceptions().containsKey(regionName)) {
1249 LOG.error("Server sent us neither results nor exceptions for "
1250 + Bytes.toStringBinary(regionName));
1251 responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
1252 }
1253 continue;
1254 }
1255 boolean regionFailureRegistered = false;
1256 for (Action<Row> sentAction : regionEntry.getValue()) {
1257 Object result = regionResults.get(sentAction.getOriginalIndex());
1258
1259 if (result == null || result instanceof Throwable) {
1260 Row row = sentAction.getAction();
1261
1262 if (!regionFailureRegistered) {
1263 regionFailureRegistered = true;
1264 connection.updateCachedLocations(
1265 tableName, regionName, row.getRow(), result, server);
1266 }
1267 if (failureCount == 0) {
1268 errorsByServer.reportServerError(server);
1269
1270 canRetry = errorsByServer.canRetryMore(numAttempt);
1271 }
1272 ++failureCount;
1273 Retry retry = manageError(sentAction.getOriginalIndex(), row,
1274 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
1275 if (retry == Retry.YES) {
1276 toReplay.add(sentAction);
1277 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1278 ++stopped;
1279 } else {
1280 ++failed;
1281 }
1282 } else {
1283
1284
1285 if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1286 result = ResultStatsUtil.updateStats(result,
1287 AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1288 }
1289
1290 if (callback != null) {
1291 try {
1292
1293
1294 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1295 } catch (Throwable t) {
1296 LOG.error("User callback threw an exception for "
1297 + Bytes.toStringBinary(regionName) + ", ignoring", t);
1298 }
1299 }
1300 setResult(sentAction, result);
1301 }
1302 }
1303 }
1304
1305
1306
1307 for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
1308 throwable = throwableEntry.getValue();
1309 byte[] region = throwableEntry.getKey();
1310 List<Action<Row>> actions = multiAction.actions.get(region);
1311 if (actions == null || actions.isEmpty()) {
1312 throw new IllegalStateException("Wrong response for the region: " +
1313 HRegionInfo.encodeRegionName(region));
1314 }
1315
1316 if (failureCount == 0) {
1317 errorsByServer.reportServerError(server);
1318 canRetry = errorsByServer.canRetryMore(numAttempt);
1319 }
1320 connection.updateCachedLocations(
1321 tableName, region, actions.get(0).getAction().getRow(), throwable, server);
1322 failureCount += actions.size();
1323
1324 for (Action<Row> action : actions) {
1325 Row row = action.getAction();
1326 Retry retry = manageError(action.getOriginalIndex(), row,
1327 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
1328 if (retry == Retry.YES) {
1329 toReplay.add(action);
1330 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1331 ++stopped;
1332 } else {
1333 ++failed;
1334 }
1335 }
1336 }
1337
1338 if (toReplay.isEmpty()) {
1339 logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1340 } else {
1341 resubmit(server, toReplay, numAttempt, failureCount, throwable);
1342 }
1343 }
1344
1345 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1346 Throwable error, long backOffTime, boolean willRetry, String startTime,
1347 int failed, int stopped) {
1348 StringBuilder sb = new StringBuilder();
1349 sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1350 .append("attempt=").append(numAttempt)
1351 .append("/").append(numTries).append(" ");
1352
1353 if (failureCount > 0 || error != null){
1354 sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1355 append(error == null ? "null" : error);
1356 } else {
1357 sb.append("succeeded");
1358 }
1359
1360 sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1361
1362 if (willRetry) {
1363 sb.append(", retrying after=").append(backOffTime).append("ms").
1364 append(", replay=").append(replaySize).append("ops");
1365 } else if (failureCount > 0) {
1366 if (stopped > 0) {
1367 sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1368 }
1369 if (failed > 0) {
1370 sb.append("; not retrying ").append(failed).append(" - final failure");
1371 }
1372
1373 }
1374
1375 return sb.toString();
1376 }
1377
1378
1379
1380
1381
1382
1383 private void setResult(Action<Row> action, Object result) {
1384 if (result == null) {
1385 throw new RuntimeException("Result cannot be null");
1386 }
1387 ReplicaResultState state = null;
1388 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1389 int index = action.getOriginalIndex();
1390 if (results == null) {
1391 decActionCounter(index);
1392 return;
1393 } else if ((state = trySetResultSimple(
1394 index, action.getAction(), false, result, null, isStale)) == null) {
1395 return;
1396 }
1397 assert state != null;
1398
1399
1400
1401
1402
1403 synchronized (state) {
1404 if (state.callCount == 0) return;
1405 state.callCount = 0;
1406 }
1407 synchronized (replicaResultLock) {
1408 if (results[index] != state) {
1409 throw new AssertionError("We set the callCount but someone else replaced the result");
1410 }
1411 results[index] = result;
1412 }
1413
1414 decActionCounter(index);
1415 }
1416
1417
1418
1419
1420
1421
1422
1423
1424 private void setError(int index, Row row, Throwable throwable, ServerName server) {
1425 ReplicaResultState state = null;
1426 if (results == null) {
1427
1428
1429
1430 errors.add(throwable, row, server);
1431 decActionCounter(index);
1432 return;
1433 } else if ((state = trySetResultSimple(
1434 index, row, true, throwable, server, false)) == null) {
1435 return;
1436 }
1437 assert state != null;
1438 BatchErrors target = null;
1439 boolean isActionDone = false;
1440 synchronized (state) {
1441 switch (state.callCount) {
1442 case 0: return;
1443 case 1: {
1444 target = errors;
1445 isActionDone = true;
1446 break;
1447 }
1448 default: {
1449 assert state.callCount > 1;
1450 if (state.replicaErrors == null) {
1451 state.replicaErrors = new BatchErrors();
1452 }
1453 target = state.replicaErrors;
1454 break;
1455 }
1456 }
1457 --state.callCount;
1458 }
1459 target.add(throwable, row, server);
1460 if (isActionDone) {
1461 if (state.replicaErrors != null) {
1462 errors.merge(state.replicaErrors);
1463 }
1464
1465 synchronized (replicaResultLock) {
1466 if (results[index] != state) {
1467 throw new AssertionError("We set the callCount but someone else replaced the result");
1468 }
1469 results[index] = throwable;
1470 }
1471 decActionCounter(index);
1472 }
1473 }
1474
1475
1476
1477
1478
1479
1480
1481
1482 private boolean isActionComplete(int index, Row row) {
1483 if (!isReplicaGet(row)) return false;
1484 Object resObj = results[index];
1485 return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1486 || ((ReplicaResultState)resObj).callCount == 0);
1487 }
1488
1489
1490
1491
1492
1493 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1494 Object result, ServerName server, boolean isFromReplica) {
1495 Object resObj = null;
1496 if (!isReplicaGet(row)) {
1497 if (isFromReplica) {
1498 throw new AssertionError("Unexpected stale result for " + row);
1499 }
1500 results[index] = result;
1501 } else {
1502 synchronized (replicaResultLock) {
1503 if ((resObj = results[index]) == null) {
1504 if (isFromReplica) {
1505 throw new AssertionError("Unexpected stale result for " + row);
1506 }
1507 results[index] = result;
1508 }
1509 }
1510 }
1511
1512 ReplicaResultState rrs =
1513 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1514 if (rrs == null && isError) {
1515
1516 errors.add((Throwable)result, row, server);
1517 }
1518
1519 if (resObj == null) {
1520
1521 decActionCounter(index);
1522 return null;
1523 }
1524 return rrs;
1525 }
1526
1527 private void decActionCounter(int index) {
1528 long actionsRemaining = actionsInProgress.decrementAndGet();
1529 if (actionsRemaining < 0) {
1530 String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1531 throw new AssertionError(error);
1532 } else if (actionsRemaining == 0) {
1533 synchronized (actionsInProgress) {
1534 actionsInProgress.notifyAll();
1535 }
1536 }
1537 }
1538
1539 private String buildDetailedErrorMsg(String string, int index) {
1540 String error = string + "; called for " + index +
1541 ", actionsInProgress " + actionsInProgress.get() + "; replica gets: ";
1542 if (replicaGetIndices != null) {
1543 for (int i = 0; i < replicaGetIndices.length; ++i) {
1544 error += replicaGetIndices[i] + ", ";
1545 }
1546 } else {
1547 error += (hasAnyReplicaGets ? "all" : "none");
1548 }
1549 error += "; results ";
1550 if (results != null) {
1551 for (int i = 0; i < results.length; ++i) {
1552 Object o = results[i];
1553 error += ((o == null) ? "null" : o.toString()) + ", ";
1554 }
1555 }
1556 return error;
1557 }
1558
1559 @Override
1560 public void waitUntilDone() throws InterruptedIOException {
1561 try {
1562 waitUntilDone(Long.MAX_VALUE);
1563 } catch (InterruptedException iex) {
1564 throw new InterruptedIOException(iex.getMessage());
1565 } finally {
1566 if (callsInProgress != null) {
1567 for (MultiServerCallable<Row> clb : callsInProgress) {
1568 clb.cancel();
1569 }
1570 }
1571 }
1572 }
1573
1574 private boolean waitUntilDone(long cutoff) throws InterruptedException {
1575 boolean hasWait = cutoff != Long.MAX_VALUE;
1576 long lastLog = EnvironmentEdgeManager.currentTime();
1577 long currentInProgress;
1578 while (0 != (currentInProgress = actionsInProgress.get())) {
1579 long now = EnvironmentEdgeManager.currentTime();
1580 if (hasWait && (now * 1000L) > cutoff) {
1581 return false;
1582 }
1583 if (!hasWait) {
1584 if (now > lastLog + 10000) {
1585 lastLog = now;
1586 LOG.info("#" + id + ", waiting for " + currentInProgress
1587 + " actions to finish on table: " + tableName);
1588 if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1589 logDetailsOfUndoneTasks(currentInProgress);
1590 }
1591 }
1592 }
1593 synchronized (actionsInProgress) {
1594 if (actionsInProgress.get() == 0) break;
1595 if (!hasWait) {
1596 actionsInProgress.wait(10);
1597 } else {
1598 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1599 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1600 }
1601 }
1602 }
1603 return true;
1604 }
1605
1606 @Override
1607 public boolean hasError() {
1608 return errors.hasErrors();
1609 }
1610
1611 @Override
1612 public List<? extends Row> getFailedOperations() {
1613 return errors.actions;
1614 }
1615
1616 @Override
1617 public RetriesExhaustedWithDetailsException getErrors() {
1618 return errors.makeException();
1619 }
1620
1621 @Override
1622 public Object[] getResults() throws InterruptedIOException {
1623 waitUntilDone();
1624 return results;
1625 }
1626 }
1627
1628 @VisibleForTesting
1629
1630 protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1631 TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1632 Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1633 return new AsyncRequestFutureImpl<CResult>(
1634 tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1635 }
1636
1637
1638
1639
1640 @VisibleForTesting
1641 protected MultiServerCallable<Row> createCallable(final ServerName server,
1642 TableName tableName, final MultiAction<Row> multi) {
1643 return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1644 }
1645
1646
1647
1648
1649 @VisibleForTesting
1650 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1651 return rpcCallerFactory.<MultiResponse> newCaller();
1652 }
1653
1654 @VisibleForTesting
1655
1656 void waitUntilDone() throws InterruptedIOException {
1657 waitForMaximumCurrentTasks(0, null);
1658 }
1659
1660
1661 private void waitForMaximumCurrentTasks(int max, String tableName)
1662 throws InterruptedIOException {
1663 waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
1664 }
1665
1666
1667 @VisibleForTesting
1668 void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
1669 String tableName) throws InterruptedIOException {
1670 long lastLog = EnvironmentEdgeManager.currentTime();
1671 long currentInProgress, oldInProgress = Long.MAX_VALUE;
1672 while ((currentInProgress = tasksInProgress.get()) > max) {
1673 if (oldInProgress != currentInProgress) {
1674 long now = EnvironmentEdgeManager.currentTime();
1675 if (now > lastLog + 10000) {
1676 lastLog = now;
1677 LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1678 + max + ", tasksInProgress=" + currentInProgress +
1679 " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
1680 if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1681 logDetailsOfUndoneTasks(currentInProgress);
1682 }
1683 }
1684 }
1685 oldInProgress = currentInProgress;
1686 try {
1687 synchronized (tasksInProgress) {
1688 if (tasksInProgress.get() == oldInProgress) {
1689 tasksInProgress.wait(10);
1690 }
1691 }
1692 } catch (InterruptedException e) {
1693 throw new InterruptedIOException("#" + id + ", interrupted." +
1694 " currentNumberOfTask=" + currentInProgress);
1695 }
1696 }
1697 }
1698
1699 private void logDetailsOfUndoneTasks(long taskInProgress) {
1700 ArrayList<ServerName> servers = new ArrayList<ServerName>();
1701 for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
1702 if (entry.getValue().get() > 0) {
1703 servers.add(entry.getKey());
1704 }
1705 }
1706 LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
1707 if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
1708 ArrayList<String> regions = new ArrayList<String>();
1709 for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
1710 if (entry.getValue().get() > 0) {
1711 regions.add(Bytes.toString(entry.getKey()));
1712 }
1713 }
1714 LOG.info("Regions against which left over task(s) are processed: " + regions);
1715 }
1716 }
1717
1718
1719
1720
1721
1722
1723 public boolean hasError() {
1724 return globalErrors.hasErrors();
1725 }
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737 public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1738 List<Row> failedRows, String tableName) throws InterruptedIOException {
1739 waitForMaximumCurrentTasks(0, tableName);
1740 if (!globalErrors.hasErrors()) {
1741 return null;
1742 }
1743 if (failedRows != null) {
1744 failedRows.addAll(globalErrors.actions);
1745 }
1746 RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1747 globalErrors.clear();
1748 return result;
1749 }
1750
1751
1752
1753
1754 protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1755 tasksInProgress.incrementAndGet();
1756
1757 AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1758 if (serverCnt == null) {
1759 taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1760 serverCnt = taskCounterPerServer.get(sn);
1761 }
1762 serverCnt.incrementAndGet();
1763
1764 for (byte[] regBytes : regions) {
1765 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1766 if (regionCnt == null) {
1767 regionCnt = new AtomicInteger();
1768 AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1769 if (oldCnt != null) {
1770 regionCnt = oldCnt;
1771 }
1772 }
1773 regionCnt.incrementAndGet();
1774 }
1775 }
1776
1777
1778
1779
1780 protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1781 for (byte[] regBytes : regions) {
1782 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1783 regionCnt.decrementAndGet();
1784 }
1785
1786 taskCounterPerServer.get(sn).decrementAndGet();
1787 tasksInProgress.decrementAndGet();
1788 synchronized (tasksInProgress) {
1789 tasksInProgress.notifyAll();
1790 }
1791 }
1792
1793
1794
1795
1796
1797
1798
1799
1800 protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1801 return new ConnectionManager.ServerErrorTracker(
1802 this.serverTrackerTimeout, this.numTries);
1803 }
1804
1805 private static boolean isReplicaGet(Row row) {
1806 return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1807 }
1808
1809
1810
1811
1812 private enum Retry {
1813 YES,
1814 NO_LOCATION_PROBLEM,
1815 NO_NOT_RETRIABLE,
1816 NO_RETRIES_EXHAUSTED,
1817 NO_OTHER_SUCCEEDED
1818 }
1819 }