1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.HashSet;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34 import java.util.concurrent.locks.ReentrantLock;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.CopyOnWriteArrayList;
37 import java.util.concurrent.TimeUnit;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.ProcedureInfo;
44 import org.apache.hadoop.hbase.classification.InterfaceAudience;
45 import org.apache.hadoop.hbase.classification.InterfaceStability;
46 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
47 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
48 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
49 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
50 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
51 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
52 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
53 import org.apache.hadoop.hbase.security.User;
54 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
56 import org.apache.hadoop.hbase.util.NonceKey;
57 import org.apache.hadoop.hbase.util.Pair;
58 import org.apache.hadoop.hbase.util.Threads;
59
60 import com.google.common.base.Preconditions;
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 @InterfaceAudience.Private
76 @InterfaceStability.Evolving
77 public class ProcedureExecutor<TEnvironment> {
78 private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
79
80 Testing testing = null;
81 public static class Testing {
82 protected boolean killBeforeStoreUpdate = false;
83 protected boolean toggleKillBeforeStoreUpdate = false;
84
85 protected boolean shouldKillBeforeStoreUpdate() {
86 final boolean kill = this.killBeforeStoreUpdate;
87 if (this.toggleKillBeforeStoreUpdate) {
88 this.killBeforeStoreUpdate = !kill;
89 LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
90 }
91 return kill;
92 }
93 }
94
95 public interface ProcedureExecutorListener {
96 void procedureLoaded(long procId);
97 void procedureAdded(long procId);
98 void procedureFinished(long procId);
99 }
100
101
102
103
104 private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
105 @Override
106 public long getTimeout(Procedure proc) {
107 return proc.getTimeRemaining();
108 }
109
110 @Override
111 public TimeUnit getTimeUnit(Procedure proc) {
112 return TimeUnit.MILLISECONDS;
113 }
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
132 private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
133
134 private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
135 private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000;
136
137 private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
138 private static final int DEFAULT_EVICT_TTL = 15 * 60000;
139
140 private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
141 private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000;
142
143 private final Map<Long, ProcedureInfo> completed;
144 private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
145 private final ProcedureStore store;
146 private final Configuration conf;
147
148 public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
149 final Map<Long, ProcedureInfo> completedMap,
150 final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
151
152 setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
153 this.completed = completedMap;
154 this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
155 this.store = store;
156 this.conf = conf;
157 }
158
159 public void periodicExecute(final TEnvironment env) {
160 if (completed.isEmpty()) {
161 if (LOG.isTraceEnabled()) {
162 LOG.trace("No completed procedures to cleanup.");
163 }
164 return;
165 }
166
167 final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
168 final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
169
170 final long now = EnvironmentEdgeManager.currentTime();
171 final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
172 final boolean isDebugEnabled = LOG.isDebugEnabled();
173 while (it.hasNext() && store.isRunning()) {
174 final Map.Entry<Long, ProcedureInfo> entry = it.next();
175 final ProcedureInfo procInfo = entry.getValue();
176
177
178 if ((procInfo.hasClientAckTime() && (now - procInfo.getClientAckTime()) >= evictAckTtl) ||
179 (now - procInfo.getLastUpdate()) >= evictTtl) {
180 if (isDebugEnabled) {
181 LOG.debug("Evict completed procedure: " + procInfo);
182 }
183 store.delete(entry.getKey());
184 it.remove();
185
186 NonceKey nonceKey = procInfo.getNonceKey();
187 if (nonceKey != null) {
188 nonceKeysToProcIdsMap.remove(nonceKey);
189 }
190 }
191 }
192 }
193
194 @Override
195 protected Procedure[] execute(final TEnvironment env) {
196 throw new UnsupportedOperationException();
197 }
198
199 @Override
200 protected void rollback(final TEnvironment env) {
201 throw new UnsupportedOperationException();
202 }
203
204 @Override
205 protected boolean abort(final TEnvironment env) {
206 throw new UnsupportedOperationException();
207 }
208
209 @Override
210 public void serializeStateData(final OutputStream stream) {
211 throw new UnsupportedOperationException();
212 }
213
214 @Override
215 public void deserializeStateData(final InputStream stream) {
216 throw new UnsupportedOperationException();
217 }
218 }
219
220
221
222
223
224
225 private final ConcurrentHashMap<Long, ProcedureInfo> completed =
226 new ConcurrentHashMap<Long, ProcedureInfo>();
227
228
229
230
231
232
233 private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack =
234 new ConcurrentHashMap<Long, RootProcedureState>();
235
236
237
238
239
240 private final ConcurrentHashMap<Long, Procedure> procedures =
241 new ConcurrentHashMap<Long, Procedure>();
242
243
244
245
246
247 private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
248 new ConcurrentHashMap<NonceKey, Long>();
249
250
251
252
253
254 private final TimeoutBlockingQueue<Procedure> waitingTimeout =
255 new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
256
257
258
259
260 private final ProcedureRunnableSet runnables;
261
262
263 private final ReentrantLock submitLock = new ReentrantLock();
264 private final AtomicLong lastProcId = new AtomicLong(-1);
265
266 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
267 new CopyOnWriteArrayList<ProcedureExecutorListener>();
268
269 private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
270 private final AtomicBoolean running = new AtomicBoolean(false);
271 private final TEnvironment environment;
272 private final ProcedureStore store;
273 private final Configuration conf;
274
275 private Thread[] threads;
276
277 public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
278 final ProcedureStore store) {
279 this(conf, environment, store, new ProcedureSimpleRunQueue());
280 }
281
282 public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
283 final ProcedureStore store, final ProcedureRunnableSet runqueue) {
284 this.environment = environment;
285 this.runnables = runqueue;
286 this.store = store;
287 this.conf = conf;
288 }
289
290 private void load(final boolean abortOnCorruption) throws IOException {
291 Preconditions.checkArgument(completed.isEmpty());
292 Preconditions.checkArgument(rollbackStack.isEmpty());
293 Preconditions.checkArgument(procedures.isEmpty());
294 Preconditions.checkArgument(waitingTimeout.isEmpty());
295 Preconditions.checkArgument(runnables.size() == 0);
296
297 store.load(new ProcedureStore.ProcedureLoader() {
298 @Override
299 public void setMaxProcId(long maxProcId) {
300 assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
301 LOG.debug("load procedures maxProcId=" + maxProcId);
302 lastProcId.set(maxProcId);
303 }
304
305 @Override
306 public void load(ProcedureIterator procIter) throws IOException {
307 loadProcedures(procIter, abortOnCorruption);
308 }
309
310 @Override
311 public void handleCorrupted(ProcedureIterator procIter) throws IOException {
312 int corruptedCount = 0;
313 while (procIter.hasNext()) {
314 ProcedureInfo proc = procIter.nextAsProcedureInfo();
315 LOG.error("corrupted procedure: " + proc);
316 corruptedCount++;
317 }
318 if (abortOnCorruption && corruptedCount > 0) {
319 throw new IOException("found " + corruptedCount + " procedures on replay");
320 }
321 }
322 });
323 }
324
325 private void loadProcedures(final ProcedureIterator procIter,
326 final boolean abortOnCorruption) throws IOException {
327 final boolean isDebugEnabled = LOG.isDebugEnabled();
328
329
330 int runnablesCount = 0;
331 while (procIter.hasNext()) {
332 final NonceKey nonceKey;
333 final long procId;
334
335 if (procIter.isNextCompleted()) {
336 ProcedureInfo proc = procIter.nextAsProcedureInfo();
337 nonceKey = proc.getNonceKey();
338 procId = proc.getProcId();
339 completed.put(proc.getProcId(), proc);
340 if (isDebugEnabled) {
341 LOG.debug("The procedure is completed: " + proc);
342 }
343 } else {
344 Procedure proc = procIter.nextAsProcedure();
345 nonceKey = proc.getNonceKey();
346 procId = proc.getProcId();
347
348 if (!proc.hasParent()) {
349 assert !proc.isFinished() : "unexpected finished procedure";
350 rollbackStack.put(proc.getProcId(), new RootProcedureState());
351 }
352
353
354 proc.beforeReplay(getEnvironment());
355 procedures.put(proc.getProcId(), proc);
356
357 if (proc.getState() == ProcedureState.RUNNABLE) {
358 runnablesCount++;
359 }
360 }
361
362
363 if (nonceKey != null) {
364 nonceKeysToProcIdsMap.put(nonceKey, procId);
365 }
366 }
367
368
369 ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
370 HashSet<Procedure> waitingSet = null;
371 procIter.reset();
372 while (procIter.hasNext()) {
373 if (procIter.isNextCompleted()) {
374 procIter.skipNext();
375 continue;
376 }
377
378 Procedure proc = procIter.nextAsProcedure();
379 assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
380
381 if (isDebugEnabled) {
382 LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
383 proc.getState(), proc.hasException(), proc));
384 }
385
386 Long rootProcId = getRootProcedureId(proc);
387 if (rootProcId == null) {
388
389 runnables.addBack(proc);
390 continue;
391 }
392
393 if (proc.hasParent() && !proc.isFinished()) {
394 Procedure parent = procedures.get(proc.getParentProcId());
395
396 if (parent != null) {
397 parent.incChildrenLatch();
398 }
399 }
400
401 RootProcedureState procStack = rollbackStack.get(rootProcId);
402 procStack.loadStack(proc);
403
404 switch (proc.getState()) {
405 case RUNNABLE:
406 runnableList.add(proc);
407 break;
408 case WAITING_TIMEOUT:
409 if (waitingSet == null) {
410 waitingSet = new HashSet<Procedure>();
411 }
412 waitingSet.add(proc);
413 break;
414 case FINISHED:
415 if (proc.hasException()) {
416
417 runnables.addBack(proc);
418 break;
419 }
420 case ROLLEDBACK:
421 case INITIALIZING:
422 String msg = "Unexpected " + proc.getState() + " state for " + proc;
423 LOG.error(msg);
424 throw new UnsupportedOperationException(msg);
425 default:
426 break;
427 }
428 }
429
430
431 int corruptedCount = 0;
432 Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
433 while (itStack.hasNext()) {
434 Map.Entry<Long, RootProcedureState> entry = itStack.next();
435 RootProcedureState procStack = entry.getValue();
436 if (procStack.isValid()) continue;
437
438 for (Procedure proc: procStack.getSubprocedures()) {
439 LOG.error("corrupted procedure: " + proc);
440 procedures.remove(proc.getProcId());
441 runnableList.remove(proc);
442 if (waitingSet != null) waitingSet.remove(proc);
443 corruptedCount++;
444 }
445 itStack.remove();
446 }
447
448 if (abortOnCorruption && corruptedCount > 0) {
449 throw new IOException("found " + corruptedCount + " procedures on replay");
450 }
451
452
453 if (!runnableList.isEmpty()) {
454
455
456 for (int i = runnableList.size() - 1; i >= 0; --i) {
457 Procedure proc = runnableList.get(i);
458 if (!proc.hasParent()) {
459 sendProcedureLoadedNotification(proc.getProcId());
460 }
461 if (proc.wasExecuted()) {
462 runnables.addFront(proc);
463 } else {
464
465 runnables.addBack(proc);
466 }
467 }
468 }
469 }
470
471
472
473
474
475
476
477
478
479
480
481 public void start(int numThreads, boolean abortOnCorruption) throws IOException {
482 if (running.getAndSet(true)) {
483 LOG.warn("Already running");
484 return;
485 }
486
487
488
489 threads = new Thread[numThreads + 1];
490 LOG.info("Starting procedure executor threads=" + threads.length);
491
492
493 for (int i = 0; i < numThreads; ++i) {
494 threads[i] = new Thread("ProcedureExecutor-" + i) {
495 @Override
496 public void run() {
497 execLoop();
498 }
499 };
500 }
501
502
503 threads[numThreads] = new Thread("ProcedureExecutorTimeout") {
504 @Override
505 public void run() {
506 timeoutLoop();
507 }
508 };
509
510
511 store.recoverLease();
512
513
514
515
516
517
518 load(abortOnCorruption);
519
520
521 for (int i = 0; i < threads.length; ++i) {
522 threads[i].start();
523 }
524
525
526 waitingTimeout.add(
527 new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
528 }
529
530 public void stop() {
531 if (!running.getAndSet(false)) {
532 return;
533 }
534
535 LOG.info("Stopping the procedure executor");
536 runnables.signalAll();
537 waitingTimeout.signalAll();
538 }
539
540 public void join() {
541 boolean interrupted = false;
542
543 for (int i = 0; i < threads.length; ++i) {
544 try {
545 threads[i].join();
546 } catch (InterruptedException ex) {
547 interrupted = true;
548 }
549 }
550
551 if (interrupted) {
552 Thread.currentThread().interrupt();
553 }
554
555 completed.clear();
556 rollbackStack.clear();
557 procedures.clear();
558 nonceKeysToProcIdsMap.clear();
559 waitingTimeout.clear();
560 runnables.clear();
561 lastProcId.set(-1);
562 }
563
564 public boolean isRunning() {
565 return running.get();
566 }
567
568
569
570
571 public int getNumThreads() {
572 return threads == null ? 0 : (threads.length - 1);
573 }
574
575 public int getActiveExecutorCount() {
576 return activeExecutorCount.get();
577 }
578
579 public TEnvironment getEnvironment() {
580 return this.environment;
581 }
582
583 public ProcedureStore getStore() {
584 return this.store;
585 }
586
587 public void registerListener(ProcedureExecutorListener listener) {
588 this.listeners.add(listener);
589 }
590
591 public boolean unregisterListener(ProcedureExecutorListener listener) {
592 return this.listeners.remove(listener);
593 }
594
595
596
597
598
599 public List<ProcedureInfo> listProcedures() {
600 List<ProcedureInfo> procedureLists =
601 new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
602 for (java.util.Map.Entry<Long, Procedure> p: procedures.entrySet()) {
603 procedureLists.add(Procedure.createProcedureInfo(p.getValue(), null));
604 }
605 for (java.util.Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
606
607
608
609
610 procedureLists.add(e.getValue());
611 }
612 return procedureLists;
613 }
614
615
616
617
618
619
620
621
622
623
624 public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
625 return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
626 }
627
628
629
630
631
632
633
634
635
636
637
638
639 public long registerNonce(final NonceKey nonceKey) {
640 if (nonceKey == null) return -1;
641
642
643 Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
644 if (oldProcId == null) {
645
646
647 final long newProcId = nextProcId();
648 oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
649 if (oldProcId == null) return -1;
650 }
651
652
653
654 final boolean isTraceEnabled = LOG.isTraceEnabled();
655 while (isRunning() &&
656 !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
657 nonceKeysToProcIdsMap.containsKey(nonceKey)) {
658 if (isTraceEnabled) {
659 LOG.trace("waiting for procId=" + oldProcId.longValue() + " to be submitted");
660 }
661 Threads.sleep(100);
662 }
663 return oldProcId.longValue();
664 }
665
666
667
668
669
670 public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
671 if (nonceKey == null) return;
672
673 final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
674 if (procId == null) return;
675
676
677 if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
678 nonceKeysToProcIdsMap.remove(nonceKey);
679 }
680 }
681
682
683
684
685
686
687
688
689
690
691 public void setFailureResultForNonce(final NonceKey nonceKey, final String procName,
692 final User procOwner, final IOException exception) {
693 if (nonceKey == null) return;
694
695 final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
696 if (procId == null || completed.containsKey(procId)) return;
697
698 final long currentTime = EnvironmentEdgeManager.currentTime();
699 final ProcedureInfo result = new ProcedureInfo(
700 procId.longValue(),
701 procName,
702 procOwner != null ? procOwner.getShortName() : null,
703 ProcedureState.ROLLEDBACK,
704 -1,
705 nonceKey,
706 ForeignExceptionUtil.toProtoForeignException("ProcedureExecutor", exception),
707 currentTime,
708 currentTime,
709 null);
710 completed.putIfAbsent(procId, result);
711 }
712
713
714
715
716
717
718
719
720
721
722 public long submitProcedure(final Procedure proc) {
723 return submitProcedure(proc, null);
724 }
725
726
727
728
729
730
731
732 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
733 justification = "FindBugs is blind to the check-for-null")
734 public long submitProcedure(final Procedure proc, final NonceKey nonceKey) {
735 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
736 Preconditions.checkArgument(isRunning(), "executor not running");
737 Preconditions.checkArgument(lastProcId.get() >= 0);
738 Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
739
740 final Long currentProcId;
741 if (nonceKey != null) {
742 currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
743 Preconditions.checkArgument(currentProcId != null,
744 "expected nonceKey=" + nonceKey + " to be reserved, use registerNonce()");
745 } else {
746 currentProcId = nextProcId();
747 }
748
749
750 proc.setNonceKey(nonceKey);
751 proc.setProcId(currentProcId.longValue());
752
753
754 store.insert(proc, null);
755 if (LOG.isDebugEnabled()) {
756 LOG.debug("Procedure " + proc + " added to the store.");
757 }
758
759
760 RootProcedureState stack = new RootProcedureState();
761 rollbackStack.put(currentProcId, stack);
762
763
764 assert !procedures.containsKey(currentProcId);
765 procedures.put(currentProcId, proc);
766 sendProcedureAddedNotification(currentProcId);
767 runnables.addBack(proc);
768 return currentProcId;
769 }
770
771 public ProcedureInfo getResult(final long procId) {
772 return completed.get(procId);
773 }
774
775
776
777
778
779
780
781
782 public boolean isFinished(final long procId) {
783 return completed.containsKey(procId);
784 }
785
786
787
788
789
790
791 public boolean isStarted(final long procId) {
792 Procedure proc = procedures.get(procId);
793 if (proc == null) {
794 return completed.get(procId) != null;
795 }
796 return proc.wasExecuted();
797 }
798
799
800
801
802
803 public void removeResult(final long procId) {
804 ProcedureInfo result = completed.get(procId);
805 if (result == null) {
806 assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
807 if (LOG.isDebugEnabled()) {
808 LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
809 }
810 return;
811 }
812
813
814 result.setClientAckTime(EnvironmentEdgeManager.currentTime());
815 }
816
817
818
819
820
821
822
823 public boolean abort(final long procId) {
824 return abort(procId, true);
825 }
826
827
828
829
830
831
832
833
834 public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
835 Procedure proc = procedures.get(procId);
836 if (proc != null) {
837 if (!mayInterruptIfRunning && proc.wasExecuted()) {
838 return false;
839 } else {
840 return proc.abort(getEnvironment());
841 }
842 }
843 return false;
844 }
845
846
847
848
849
850
851
852
853 public boolean isProcedureOwner(final long procId, final User user) {
854 if (user == null) {
855 return false;
856 }
857
858 Procedure proc = procedures.get(procId);
859 if (proc != null) {
860 return proc.getOwner().equals(user.getShortName());
861 }
862 ProcedureInfo procInfo = completed.get(procId);
863 if (procInfo == null) {
864
865
866 return false;
867 }
868 return ProcedureInfo.isProcedureOwner(procInfo, user);
869 }
870
871 public Map<Long, ProcedureInfo> getResults() {
872 return Collections.unmodifiableMap(completed);
873 }
874
875 public Procedure getProcedure(final long procId) {
876 return procedures.get(procId);
877 }
878
879 protected ProcedureRunnableSet getRunnableSet() {
880 return runnables;
881 }
882
883
884
885
886
887
888 private void execLoop() {
889 while (isRunning()) {
890 Procedure proc = runnables.poll();
891 if (proc == null) continue;
892
893 try {
894 activeExecutorCount.incrementAndGet();
895 execLoop(proc);
896 } finally {
897 activeExecutorCount.decrementAndGet();
898 }
899 }
900 }
901
902 private void execLoop(Procedure proc) {
903 if (LOG.isTraceEnabled()) {
904 LOG.trace("Trying to start the execution of " + proc);
905 }
906
907 Long rootProcId = getRootProcedureId(proc);
908 if (rootProcId == null) {
909
910 executeRollback(proc);
911 return;
912 }
913
914 RootProcedureState procStack = rollbackStack.get(rootProcId);
915 if (procStack == null) return;
916
917 do {
918
919 if (!procStack.acquire(proc)) {
920 if (procStack.setRollback()) {
921
922 if (!executeRollback(rootProcId, procStack)) {
923 procStack.unsetRollback();
924 runnables.yield(proc);
925 }
926 } else {
927
928
929
930 if (!proc.wasExecuted()) {
931 if (!executeRollback(proc)) {
932 runnables.yield(proc);
933 }
934 }
935 }
936 break;
937 }
938
939
940 assert proc.getState() == ProcedureState.RUNNABLE;
941 if (proc.acquireLock(getEnvironment())) {
942 execProcedure(procStack, proc);
943 proc.releaseLock(getEnvironment());
944 } else {
945 runnables.yield(proc);
946 }
947 procStack.release(proc);
948
949
950
951 if (testing != null && !isRunning()) {
952 break;
953 }
954
955 if (proc.isSuccess()) {
956 if (LOG.isDebugEnabled()) {
957 LOG.debug("Procedure completed in " +
958 StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
959 }
960
961 if (proc.getProcId() == rootProcId) {
962 procedureFinished(proc);
963 }
964 break;
965 }
966 } while (procStack.isFailed());
967 }
968
969 private void timeoutLoop() {
970 while (isRunning()) {
971 Procedure proc = waitingTimeout.poll();
972 if (proc == null) continue;
973
974 if (proc.getTimeRemaining() > 100) {
975
976
977 waitingTimeout.add(proc);
978 continue;
979 }
980
981
982
983
984
985
986
987
988
989
990
991
992 if (proc instanceof CompletedProcedureCleaner) {
993 try {
994 ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
995 } catch (Throwable e) {
996 LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
997 }
998 proc.setStartTime(EnvironmentEdgeManager.currentTime());
999 waitingTimeout.add(proc);
1000 continue;
1001 }
1002
1003
1004
1005 if (proc.setTimeoutFailure()) {
1006 long rootProcId = Procedure.getRootProcedureId(procedures, proc);
1007 RootProcedureState procStack = rollbackStack.get(rootProcId);
1008 procStack.abort();
1009 store.update(proc);
1010 runnables.addFront(proc);
1011 continue;
1012 }
1013 }
1014 }
1015
1016
1017
1018
1019
1020
1021 private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
1022 Procedure rootProc = procedures.get(rootProcId);
1023 RemoteProcedureException exception = rootProc.getException();
1024 if (exception == null) {
1025 exception = procStack.getException();
1026 rootProc.setFailure(exception);
1027 store.update(rootProc);
1028 }
1029
1030 List<Procedure> subprocStack = procStack.getSubprocedures();
1031 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
1032
1033 int stackTail = subprocStack.size();
1034 boolean reuseLock = false;
1035 while (stackTail --> 0) {
1036 final Procedure proc = subprocStack.get(stackTail);
1037
1038 if (!reuseLock && !proc.acquireLock(getEnvironment())) {
1039
1040
1041 return false;
1042 }
1043
1044 boolean abortRollback = !executeRollback(proc);
1045 abortRollback |= !isRunning() || !store.isRunning();
1046
1047
1048
1049
1050 reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
1051 if (!reuseLock) {
1052 proc.releaseLock(getEnvironment());
1053 }
1054
1055
1056
1057 if (abortRollback) {
1058 return false;
1059 }
1060
1061 subprocStack.remove(stackTail);
1062
1063
1064 if (proc.isYieldAfterExecutionStep(getEnvironment())) {
1065 return false;
1066 }
1067 }
1068
1069
1070 LOG.info("Rolledback procedure " + rootProc +
1071 " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
1072 " exception=" + exception.getMessage());
1073 procedureFinished(rootProc);
1074 return true;
1075 }
1076
1077
1078
1079
1080
1081
1082 private boolean executeRollback(final Procedure proc) {
1083 try {
1084 proc.doRollback(getEnvironment());
1085 } catch (IOException e) {
1086 if (LOG.isDebugEnabled()) {
1087 LOG.debug("rollback attempt failed for " + proc, e);
1088 }
1089 return false;
1090 } catch (InterruptedException e) {
1091 handleInterruptedException(proc, e);
1092 return false;
1093 } catch (Throwable e) {
1094
1095 LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
1096 }
1097
1098
1099
1100 if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1101 LOG.debug("TESTING: Kill before store update");
1102 stop();
1103 return false;
1104 }
1105
1106 if (proc.removeStackIndex()) {
1107 proc.setState(ProcedureState.ROLLEDBACK);
1108 if (proc.hasParent()) {
1109 store.delete(proc.getProcId());
1110 procedures.remove(proc.getProcId());
1111 } else {
1112 store.update(proc);
1113 }
1114 } else {
1115 store.update(proc);
1116 }
1117
1118 return true;
1119 }
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138 private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
1139 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
1140
1141
1142 boolean reExecute = false;
1143 Procedure[] subprocs = null;
1144 do {
1145 reExecute = false;
1146 try {
1147 subprocs = procedure.doExecute(getEnvironment());
1148 if (subprocs != null && subprocs.length == 0) {
1149 subprocs = null;
1150 }
1151 } catch (ProcedureYieldException e) {
1152 if (LOG.isTraceEnabled()) {
1153 LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
1154 }
1155 runnables.yield(procedure);
1156 return;
1157 } catch (InterruptedException e) {
1158 handleInterruptedException(procedure, e);
1159 runnables.yield(procedure);
1160 return;
1161 } catch (Throwable e) {
1162
1163 String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
1164 LOG.error(msg, e);
1165 procedure.setFailure(new RemoteProcedureException(msg, e));
1166 }
1167
1168 if (!procedure.isFailed()) {
1169 if (subprocs != null) {
1170 if (subprocs.length == 1 && subprocs[0] == procedure) {
1171
1172 subprocs = null;
1173 reExecute = true;
1174 } else {
1175
1176 for (int i = 0; i < subprocs.length; ++i) {
1177 Procedure subproc = subprocs[i];
1178 if (subproc == null) {
1179 String msg = "subproc[" + i + "] is null, aborting the procedure";
1180 procedure.setFailure(new RemoteProcedureException(msg,
1181 new IllegalArgumentIOException(msg)));
1182 subprocs = null;
1183 break;
1184 }
1185
1186 assert subproc.getState() == ProcedureState.INITIALIZING;
1187 subproc.setParentProcId(procedure.getProcId());
1188 subproc.setProcId(nextProcId());
1189 }
1190
1191 if (!procedure.isFailed()) {
1192 procedure.setChildrenLatch(subprocs.length);
1193 switch (procedure.getState()) {
1194 case RUNNABLE:
1195 procedure.setState(ProcedureState.WAITING);
1196 break;
1197 case WAITING_TIMEOUT:
1198 waitingTimeout.add(procedure);
1199 break;
1200 default:
1201 break;
1202 }
1203 }
1204 }
1205 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1206 waitingTimeout.add(procedure);
1207 } else {
1208
1209 procedure.setState(ProcedureState.FINISHED);
1210 }
1211 }
1212
1213
1214 procStack.addRollbackStep(procedure);
1215
1216
1217
1218 if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1219 LOG.debug("TESTING: Kill before store update");
1220 stop();
1221 return;
1222 }
1223
1224
1225 if (subprocs != null && !procedure.isFailed()) {
1226 if (LOG.isTraceEnabled()) {
1227 LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
1228 }
1229 store.insert(procedure, subprocs);
1230 } else {
1231 if (LOG.isTraceEnabled()) {
1232 LOG.trace("Store update " + procedure);
1233 }
1234 store.update(procedure);
1235 }
1236
1237
1238 if (!store.isRunning()) {
1239 return;
1240 }
1241
1242
1243 if (procedure.getState() == ProcedureState.RUNNABLE &&
1244 procedure.isYieldAfterExecutionStep(getEnvironment())) {
1245 runnables.yield(procedure);
1246 return;
1247 }
1248
1249 assert (reExecute && subprocs == null) || !reExecute;
1250 } while (reExecute);
1251
1252
1253 if (subprocs != null && !procedure.isFailed()) {
1254 for (int i = 0; i < subprocs.length; ++i) {
1255 Procedure subproc = subprocs[i];
1256 assert !procedures.containsKey(subproc.getProcId());
1257 procedures.put(subproc.getProcId(), subproc);
1258 runnables.addFront(subproc);
1259 }
1260 }
1261
1262 if (procedure.isFinished() && procedure.hasParent()) {
1263 Procedure parent = procedures.get(procedure.getParentProcId());
1264 if (parent == null) {
1265 assert procStack.isRollingback();
1266 return;
1267 }
1268
1269
1270 if (LOG.isTraceEnabled()) {
1271 LOG.trace(parent + " child is done: " + procedure);
1272 }
1273 if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
1274 parent.setState(ProcedureState.RUNNABLE);
1275 store.update(parent);
1276 runnables.addFront(parent);
1277 if (LOG.isTraceEnabled()) {
1278 LOG.trace(parent + " all the children finished their work, resume.");
1279 }
1280 return;
1281 }
1282 }
1283 }
1284
1285 private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
1286 if (LOG.isTraceEnabled()) {
1287 LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
1288 }
1289
1290
1291
1292
1293
1294
1295 }
1296
1297 private void sendProcedureLoadedNotification(final long procId) {
1298 if (!this.listeners.isEmpty()) {
1299 for (ProcedureExecutorListener listener: this.listeners) {
1300 try {
1301 listener.procedureLoaded(procId);
1302 } catch (Throwable e) {
1303 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1304 }
1305 }
1306 }
1307 }
1308
1309 private void sendProcedureAddedNotification(final long procId) {
1310 if (!this.listeners.isEmpty()) {
1311 for (ProcedureExecutorListener listener: this.listeners) {
1312 try {
1313 listener.procedureAdded(procId);
1314 } catch (Throwable e) {
1315 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1316 }
1317 }
1318 }
1319 }
1320
1321 private void sendProcedureFinishedNotification(final long procId) {
1322 if (!this.listeners.isEmpty()) {
1323 for (ProcedureExecutorListener listener: this.listeners) {
1324 try {
1325 listener.procedureFinished(procId);
1326 } catch (Throwable e) {
1327 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1328 }
1329 }
1330 }
1331 }
1332
1333 private long nextProcId() {
1334 long procId = lastProcId.incrementAndGet();
1335 if (procId < 0) {
1336 while (!lastProcId.compareAndSet(procId, 0)) {
1337 procId = lastProcId.get();
1338 if (procId >= 0)
1339 break;
1340 }
1341 while (procedures.containsKey(procId)) {
1342 procId = lastProcId.incrementAndGet();
1343 }
1344 }
1345 return procId;
1346 }
1347
1348 private Long getRootProcedureId(Procedure proc) {
1349 return Procedure.getRootProcedureId(procedures, proc);
1350 }
1351
1352 private void procedureFinished(final Procedure proc) {
1353
1354 try {
1355 proc.completionCleanup(getEnvironment());
1356 } catch (Throwable e) {
1357
1358 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1359 }
1360
1361
1362 ProcedureInfo procInfo = Procedure.createProcedureInfo(proc, proc.getNonceKey());
1363 if (!proc.shouldWaitClientAck(getEnvironment())) {
1364 procInfo.setClientAckTime(0);
1365 }
1366
1367 completed.put(procInfo.getProcId(), procInfo);
1368 rollbackStack.remove(proc.getProcId());
1369 procedures.remove(proc.getProcId());
1370
1371
1372 try {
1373 runnables.completionCleanup(proc);
1374 } catch (Throwable e) {
1375
1376 LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
1377 }
1378
1379
1380 sendProcedureFinishedNotification(proc.getProcId());
1381 }
1382
1383 public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long procId) {
1384 ProcedureInfo result = completed.get(procId);
1385 Procedure proc = null;
1386 if (result == null) {
1387 proc = procedures.get(procId);
1388 if (proc == null) {
1389 result = completed.get(procId);
1390 }
1391 }
1392 return new Pair(result, proc);
1393 }
1394 }