1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.HashSet;
31 import java.util.TreeSet;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35 import java.util.concurrent.locks.ReentrantLock;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.CopyOnWriteArrayList;
38 import java.util.concurrent.TimeUnit;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.ProcedureInfo;
45 import org.apache.hadoop.hbase.classification.InterfaceAudience;
46 import org.apache.hadoop.hbase.classification.InterfaceStability;
47 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
48 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
49 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
50 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
51 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
52 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
53 import org.apache.hadoop.hbase.security.User;
54 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55 import org.apache.hadoop.hbase.util.NonceKey;
56 import org.apache.hadoop.hbase.util.Pair;
57
58 import com.google.common.base.Preconditions;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 @InterfaceAudience.Private
74 @InterfaceStability.Evolving
75 public class ProcedureExecutor<TEnvironment> {
76 private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
77
78 Testing testing = null;
79 public static class Testing {
80 protected boolean killBeforeStoreUpdate = false;
81 protected boolean toggleKillBeforeStoreUpdate = false;
82
83 protected boolean shouldKillBeforeStoreUpdate() {
84 final boolean kill = this.killBeforeStoreUpdate;
85 if (this.toggleKillBeforeStoreUpdate) {
86 this.killBeforeStoreUpdate = !kill;
87 LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
88 }
89 return kill;
90 }
91 }
92
93 public interface ProcedureExecutorListener {
94 void procedureLoaded(long procId);
95 void procedureAdded(long procId);
96 void procedureFinished(long procId);
97 }
98
99
100
101
102 private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
103 @Override
104 public long getTimeout(Procedure proc) {
105 return proc.getTimeRemaining();
106 }
107
108 @Override
109 public TimeUnit getTimeUnit(Procedure proc) {
110 return TimeUnit.MILLISECONDS;
111 }
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
130 private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
131
132 private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
133 private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000;
134
135 private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
136 private static final int DEFAULT_EVICT_TTL = 15 * 60000;
137
138 private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
139 private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000;
140
141 private final Map<Long, ProcedureInfo> completed;
142 private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
143 private final ProcedureStore store;
144 private final Configuration conf;
145
146 public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
147 final Map<Long, ProcedureInfo> completedMap,
148 final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
149
150 setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
151 this.completed = completedMap;
152 this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
153 this.store = store;
154 this.conf = conf;
155 }
156
157 public void periodicExecute(final TEnvironment env) {
158 if (completed.isEmpty()) {
159 if (LOG.isDebugEnabled()) {
160 LOG.debug("No completed procedures to cleanup.");
161 }
162 return;
163 }
164
165 final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
166 final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
167
168 long now = EnvironmentEdgeManager.currentTime();
169 Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
170 while (it.hasNext() && store.isRunning()) {
171 Map.Entry<Long, ProcedureInfo> entry = it.next();
172 ProcedureInfo result = entry.getValue();
173
174
175 if ((result.hasClientAckTime() && (now - result.getClientAckTime()) >= evictAckTtl) ||
176 (now - result.getLastUpdate()) >= evictTtl) {
177 if (LOG.isDebugEnabled()) {
178 LOG.debug("Evict completed procedure " + entry.getKey());
179 }
180 store.delete(entry.getKey());
181 it.remove();
182
183 NonceKey nonceKey = result.getNonceKey();
184 if (nonceKey != null) {
185 nonceKeysToProcIdsMap.remove(nonceKey);
186 }
187 }
188 }
189 }
190
191 @Override
192 protected Procedure[] execute(final TEnvironment env) {
193 throw new UnsupportedOperationException();
194 }
195
196 @Override
197 protected void rollback(final TEnvironment env) {
198 throw new UnsupportedOperationException();
199 }
200
201 @Override
202 protected boolean abort(final TEnvironment env) {
203 throw new UnsupportedOperationException();
204 }
205
206 @Override
207 public void serializeStateData(final OutputStream stream) {
208 throw new UnsupportedOperationException();
209 }
210
211 @Override
212 public void deserializeStateData(final InputStream stream) {
213 throw new UnsupportedOperationException();
214 }
215 }
216
217
218
219
220
221
222 private final ConcurrentHashMap<Long, ProcedureInfo> completed =
223 new ConcurrentHashMap<Long, ProcedureInfo>();
224
225
226
227
228
229
230 private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack =
231 new ConcurrentHashMap<Long, RootProcedureState>();
232
233
234
235
236
237 private final ConcurrentHashMap<Long, Procedure> procedures =
238 new ConcurrentHashMap<Long, Procedure>();
239
240
241
242
243
244 private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
245 new ConcurrentHashMap<NonceKey, Long>();
246
247
248
249
250
251 private final TimeoutBlockingQueue<Procedure> waitingTimeout =
252 new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
253
254
255
256
257 private final ProcedureRunnableSet runnables;
258
259
260 private final ReentrantLock submitLock = new ReentrantLock();
261 private final AtomicLong lastProcId = new AtomicLong(-1);
262
263 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
264 new CopyOnWriteArrayList<ProcedureExecutorListener>();
265
266 private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
267 private final AtomicBoolean running = new AtomicBoolean(false);
268 private final TEnvironment environment;
269 private final ProcedureStore store;
270 private final Configuration conf;
271
272 private Thread[] threads;
273
274 public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
275 final ProcedureStore store) {
276 this(conf, environment, store, new ProcedureSimpleRunQueue());
277 }
278
279 public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
280 final ProcedureStore store, final ProcedureRunnableSet runqueue) {
281 this.environment = environment;
282 this.runnables = runqueue;
283 this.store = store;
284 this.conf = conf;
285 }
286
287 private List<Map.Entry<Long, RootProcedureState>> load() throws IOException {
288 Preconditions.checkArgument(completed.isEmpty());
289 Preconditions.checkArgument(rollbackStack.isEmpty());
290 Preconditions.checkArgument(procedures.isEmpty());
291 Preconditions.checkArgument(waitingTimeout.isEmpty());
292 Preconditions.checkArgument(runnables.size() == 0);
293
294
295 Iterator<Procedure> loader = store.load();
296 if (loader == null) {
297 lastProcId.set(0);
298 return null;
299 }
300
301 long logMaxProcId = 0;
302 int runnablesCount = 0;
303 while (loader.hasNext()) {
304 Procedure proc = loader.next();
305 proc.beforeReplay(getEnvironment());
306 procedures.put(proc.getProcId(), proc);
307 logMaxProcId = Math.max(logMaxProcId, proc.getProcId());
308 if (LOG.isDebugEnabled()) {
309 LOG.debug("Loading procedure state=" + proc.getState() +
310 " isFailed=" + proc.hasException() + ": " + proc);
311 }
312 if (!proc.hasParent() && !proc.isFinished()) {
313 rollbackStack.put(proc.getProcId(), new RootProcedureState());
314 }
315
316
317 if (proc.getNonceKey() != null) {
318 nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId());
319 }
320
321 if (proc.getState() == ProcedureState.RUNNABLE) {
322 runnablesCount++;
323 }
324 }
325 assert lastProcId.get() < 0;
326 lastProcId.set(logMaxProcId);
327
328
329 TreeSet<Procedure> runnableSet = null;
330 HashSet<Procedure> waitingSet = null;
331 for (final Procedure proc: procedures.values()) {
332 Long rootProcId = getRootProcedureId(proc);
333 if (rootProcId == null) {
334
335 runnables.addBack(proc);
336 continue;
337 }
338
339 if (!proc.hasParent() && proc.isFinished()) {
340 if (LOG.isDebugEnabled()) {
341 LOG.debug("The procedure is completed state=" + proc.getState() +
342 " isFailed=" + proc.hasException() + ": " + proc);
343 }
344 assert !rollbackStack.containsKey(proc.getProcId());
345
346 completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
347
348 continue;
349 }
350
351 if (proc.hasParent() && !proc.isFinished()) {
352 Procedure parent = procedures.get(proc.getParentProcId());
353
354 if (parent != null) {
355 parent.incChildrenLatch();
356 }
357 }
358
359 RootProcedureState procStack = rollbackStack.get(rootProcId);
360 procStack.loadStack(proc);
361
362 switch (proc.getState()) {
363 case RUNNABLE:
364 if (runnableSet == null) {
365 runnableSet = new TreeSet<Procedure>();
366 }
367 runnableSet.add(proc);
368 break;
369 case WAITING_TIMEOUT:
370 if (waitingSet == null) {
371 waitingSet = new HashSet<Procedure>();
372 }
373 waitingSet.add(proc);
374 break;
375 case FINISHED:
376 if (proc.hasException()) {
377
378 runnables.addBack(proc);
379 break;
380 }
381 case ROLLEDBACK:
382 case INITIALIZING:
383 String msg = "Unexpected " + proc.getState() + " state for " + proc;
384 LOG.error(msg);
385 throw new UnsupportedOperationException(msg);
386 default:
387 break;
388 }
389 }
390
391
392 List<Map.Entry<Long, RootProcedureState>> corrupted = null;
393 Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
394 while (itStack.hasNext()) {
395 Map.Entry<Long, RootProcedureState> entry = itStack.next();
396 RootProcedureState procStack = entry.getValue();
397 if (procStack.isValid()) continue;
398
399 for (Procedure proc: procStack.getSubprocedures()) {
400 procedures.remove(proc.getProcId());
401 if (runnableSet != null) runnableSet.remove(proc);
402 if (waitingSet != null) waitingSet.remove(proc);
403 }
404 itStack.remove();
405 if (corrupted == null) {
406 corrupted = new ArrayList<Map.Entry<Long, RootProcedureState>>();
407 }
408 corrupted.add(entry);
409 }
410
411
412 if (runnableSet != null) {
413
414
415 for (Procedure proc: runnableSet) {
416 if (!proc.hasParent()) {
417 sendProcedureLoadedNotification(proc.getProcId());
418 }
419 runnables.addBack(proc);
420 }
421 }
422 return corrupted;
423 }
424
425 public void start(int numThreads) throws IOException {
426 if (running.getAndSet(true)) {
427 LOG.warn("Already running");
428 return;
429 }
430
431
432
433 threads = new Thread[numThreads + 1];
434 LOG.info("Starting procedure executor threads=" + threads.length);
435
436
437 for (int i = 0; i < numThreads; ++i) {
438 threads[i] = new Thread("ProcedureExecutorThread-" + i) {
439 @Override
440 public void run() {
441 execLoop();
442 }
443 };
444 }
445
446
447 threads[numThreads] = new Thread("ProcedureExecutorTimeout") {
448 @Override
449 public void run() {
450 timeoutLoop();
451 }
452 };
453
454
455 store.recoverLease();
456
457
458
459
460
461
462 load();
463
464
465 for (int i = 0; i < threads.length; ++i) {
466 threads[i].start();
467 }
468
469
470 waitingTimeout.add(
471 new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
472 }
473
474 public void stop() {
475 if (!running.getAndSet(false)) {
476 return;
477 }
478
479 LOG.info("Stopping the procedure executor");
480 runnables.signalAll();
481 waitingTimeout.signalAll();
482 }
483
484 public void join() {
485 boolean interrupted = false;
486
487 for (int i = 0; i < threads.length; ++i) {
488 try {
489 threads[i].join();
490 } catch (InterruptedException ex) {
491 interrupted = true;
492 }
493 }
494
495 if (interrupted) {
496 Thread.currentThread().interrupt();
497 }
498
499 completed.clear();
500 rollbackStack.clear();
501 procedures.clear();
502 nonceKeysToProcIdsMap.clear();
503 waitingTimeout.clear();
504 runnables.clear();
505 lastProcId.set(-1);
506 }
507
508 public boolean isRunning() {
509 return running.get();
510 }
511
512
513
514
515 public int getNumThreads() {
516 return threads == null ? 0 : (threads.length - 1);
517 }
518
519 public int getActiveExecutorCount() {
520 return activeExecutorCount.get();
521 }
522
523 public TEnvironment getEnvironment() {
524 return this.environment;
525 }
526
527 public ProcedureStore getStore() {
528 return this.store;
529 }
530
531 public void registerListener(ProcedureExecutorListener listener) {
532 this.listeners.add(listener);
533 }
534
535 public boolean unregisterListener(ProcedureExecutorListener listener) {
536 return this.listeners.remove(listener);
537 }
538
539
540
541
542
543 public List<ProcedureInfo> listProcedures() {
544 List<ProcedureInfo> procedureLists =
545 new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
546 for (java.util.Map.Entry<Long, Procedure> p: procedures.entrySet()) {
547 procedureLists.add(Procedure.createProcedureInfo(p.getValue(), null));
548 }
549 for (java.util.Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
550
551
552
553
554 procedureLists.add(e.getValue());
555 }
556 return procedureLists;
557 }
558
559
560
561
562
563
564 public long submitProcedure(final Procedure proc) {
565 return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
566 }
567
568
569
570
571
572
573
574
575 public long submitProcedure(
576 final Procedure proc,
577 final long nonceGroup,
578 final long nonce) {
579 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
580 Preconditions.checkArgument(isRunning());
581 Preconditions.checkArgument(lastProcId.get() >= 0);
582 Preconditions.checkArgument(!proc.hasParent());
583
584 Long currentProcId;
585
586
587
588 synchronized (this) {
589
590
591
592 NonceKey noncekey = null;
593 if (nonce != HConstants.NO_NONCE) {
594 noncekey = new NonceKey(nonceGroup, nonce);
595 currentProcId = nonceKeysToProcIdsMap.get(noncekey);
596 if (currentProcId != null) {
597
598 return currentProcId;
599 }
600 }
601
602
603 currentProcId = nextProcId();
604 proc.setProcId(currentProcId);
605
606
607 if (noncekey != null) {
608 proc.setNonceKey(noncekey);
609 nonceKeysToProcIdsMap.put(noncekey, currentProcId);
610 }
611 }
612
613
614 store.insert(proc, null);
615 if (LOG.isDebugEnabled()) {
616 LOG.debug("Procedure " + proc + " added to the store.");
617 }
618
619
620 RootProcedureState stack = new RootProcedureState();
621 rollbackStack.put(currentProcId, stack);
622
623
624 assert !procedures.containsKey(currentProcId);
625 procedures.put(currentProcId, proc);
626 sendProcedureAddedNotification(currentProcId);
627 runnables.addBack(proc);
628 return currentProcId;
629 }
630
631 public ProcedureInfo getResult(final long procId) {
632 return completed.get(procId);
633 }
634
635
636
637
638
639
640
641
642 public boolean isFinished(final long procId) {
643 return completed.containsKey(procId);
644 }
645
646
647
648
649
650
651 public boolean isStarted(final long procId) {
652 Procedure proc = procedures.get(procId);
653 if (proc == null) {
654 return completed.get(procId) != null;
655 }
656 return proc.wasExecuted();
657 }
658
659
660
661
662
663 public void removeResult(final long procId) {
664 ProcedureInfo result = completed.get(procId);
665 if (result == null) {
666 assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
667 if (LOG.isDebugEnabled()) {
668 LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
669 }
670 return;
671 }
672
673
674 result.setClientAckTime(EnvironmentEdgeManager.currentTime());
675 }
676
677
678
679
680
681
682
683 public boolean abort(final long procId) {
684 return abort(procId, true);
685 }
686
687
688
689
690
691
692
693
694 public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
695 Procedure proc = procedures.get(procId);
696 if (proc != null) {
697 if (!mayInterruptIfRunning && proc.wasExecuted()) {
698 return false;
699 } else {
700 return proc.abort(getEnvironment());
701 }
702 }
703 return false;
704 }
705
706
707
708
709
710
711
712
713 public boolean isProcedureOwner(final long procId, final User user) {
714 if (user == null) {
715 return false;
716 }
717
718 Procedure proc = procedures.get(procId);
719 if (proc != null) {
720 return proc.getOwner().equals(user.getShortName());
721 }
722 ProcedureInfo procInfo = completed.get(procId);
723 if (procInfo == null) {
724
725
726 return false;
727 }
728 return ProcedureInfo.isProcedureOwner(procInfo, user);
729 }
730
731 public Map<Long, ProcedureInfo> getResults() {
732 return Collections.unmodifiableMap(completed);
733 }
734
735 public Procedure getProcedure(final long procId) {
736 return procedures.get(procId);
737 }
738
739 protected ProcedureRunnableSet getRunnableSet() {
740 return runnables;
741 }
742
743
744
745
746
747
748 private void execLoop() {
749 while (isRunning()) {
750 Long procId = runnables.poll();
751 Procedure proc = procId != null ? procedures.get(procId) : null;
752 if (proc == null) continue;
753
754 try {
755 activeExecutorCount.incrementAndGet();
756 execLoop(proc);
757 } finally {
758 activeExecutorCount.decrementAndGet();
759 }
760 }
761 }
762
763 private void execLoop(Procedure proc) {
764 if (LOG.isTraceEnabled()) {
765 LOG.trace("Trying to start the execution of " + proc);
766 }
767
768 Long rootProcId = getRootProcedureId(proc);
769 if (rootProcId == null) {
770
771 executeRollback(proc);
772 return;
773 }
774
775 RootProcedureState procStack = rollbackStack.get(rootProcId);
776 if (procStack == null) return;
777
778 do {
779
780 if (!procStack.acquire(proc)) {
781 if (procStack.setRollback()) {
782
783 if (!executeRollback(rootProcId, procStack)) {
784 procStack.unsetRollback();
785 runnables.yield(proc);
786 }
787 } else {
788
789
790
791 if (!proc.wasExecuted()) {
792 if (!executeRollback(proc)) {
793 runnables.yield(proc);
794 }
795 }
796 }
797 break;
798 }
799
800
801 assert proc.getState() == ProcedureState.RUNNABLE;
802 if (proc.acquireLock(getEnvironment())) {
803 execProcedure(procStack, proc);
804 proc.releaseLock(getEnvironment());
805 } else {
806 runnables.yield(proc);
807 }
808 procStack.release(proc);
809
810
811
812 if (testing != null && !isRunning()) {
813 break;
814 }
815
816 if (proc.isSuccess()) {
817 if (LOG.isDebugEnabled()) {
818 LOG.debug("Procedure completed in " +
819 StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
820 }
821
822 if (proc.getProcId() == rootProcId) {
823 procedureFinished(proc);
824 }
825 break;
826 }
827 } while (procStack.isFailed());
828 }
829
830 private void timeoutLoop() {
831 while (isRunning()) {
832 Procedure proc = waitingTimeout.poll();
833 if (proc == null) continue;
834
835 if (proc.getTimeRemaining() > 100) {
836
837
838 waitingTimeout.add(proc);
839 continue;
840 }
841
842
843
844
845
846
847
848
849
850
851
852
853 if (proc instanceof CompletedProcedureCleaner) {
854 try {
855 ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
856 } catch (Throwable e) {
857 LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
858 }
859 proc.setStartTime(EnvironmentEdgeManager.currentTime());
860 waitingTimeout.add(proc);
861 continue;
862 }
863
864
865
866 if (proc.setTimeoutFailure()) {
867 long rootProcId = Procedure.getRootProcedureId(procedures, proc);
868 RootProcedureState procStack = rollbackStack.get(rootProcId);
869 procStack.abort();
870 store.update(proc);
871 runnables.addFront(proc);
872 continue;
873 }
874 }
875 }
876
877
878
879
880
881
882 private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
883 Procedure rootProc = procedures.get(rootProcId);
884 RemoteProcedureException exception = rootProc.getException();
885 if (exception == null) {
886 exception = procStack.getException();
887 rootProc.setFailure(exception);
888 store.update(rootProc);
889 }
890
891 List<Procedure> subprocStack = procStack.getSubprocedures();
892 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
893
894 int stackTail = subprocStack.size();
895 boolean reuseLock = false;
896 while (stackTail --> 0) {
897 final Procedure proc = subprocStack.get(stackTail);
898
899 if (!reuseLock && !proc.acquireLock(getEnvironment())) {
900
901
902 return false;
903 }
904
905 boolean abortRollback = !executeRollback(proc);
906 abortRollback |= !isRunning() || !store.isRunning();
907
908
909
910
911 reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
912 if (!reuseLock) {
913 proc.releaseLock(getEnvironment());
914 }
915
916
917
918 if (abortRollback) {
919 return false;
920 }
921
922 subprocStack.remove(stackTail);
923 }
924
925
926 LOG.info("Rolledback procedure " + rootProc +
927 " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
928 " exception=" + exception.getMessage());
929 procedureFinished(rootProc);
930 return true;
931 }
932
933
934
935
936
937
938 private boolean executeRollback(final Procedure proc) {
939 try {
940 proc.doRollback(getEnvironment());
941 } catch (IOException e) {
942 if (LOG.isDebugEnabled()) {
943 LOG.debug("rollback attempt failed for " + proc, e);
944 }
945 return false;
946 } catch (Throwable e) {
947
948 LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
949 }
950
951
952
953 if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
954 if (LOG.isDebugEnabled()) {
955 LOG.debug("TESTING: Kill before store update");
956 }
957 stop();
958 return false;
959 }
960
961 if (proc.removeStackIndex()) {
962 proc.setState(ProcedureState.ROLLEDBACK);
963 if (proc.hasParent()) {
964 store.delete(proc.getProcId());
965 procedures.remove(proc.getProcId());
966 } else {
967 store.update(proc);
968 }
969 } else {
970 store.update(proc);
971 }
972 return true;
973 }
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992 private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
993 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
994
995
996 boolean reExecute = false;
997 Procedure[] subprocs = null;
998 do {
999 reExecute = false;
1000 try {
1001 subprocs = procedure.doExecute(getEnvironment());
1002 if (subprocs != null && subprocs.length == 0) {
1003 subprocs = null;
1004 }
1005 } catch (ProcedureYieldException e) {
1006 if (LOG.isTraceEnabled()) {
1007 LOG.trace("Yield procedure: " + procedure);
1008 }
1009 runnables.yield(procedure);
1010 return;
1011 } catch (Throwable e) {
1012
1013 String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
1014 LOG.error(msg, e);
1015 procedure.setFailure(new RemoteProcedureException(msg, e));
1016 }
1017
1018 if (!procedure.isFailed()) {
1019 if (subprocs != null) {
1020 if (subprocs.length == 1 && subprocs[0] == procedure) {
1021
1022 subprocs = null;
1023 reExecute = true;
1024 } else {
1025
1026 for (int i = 0; i < subprocs.length; ++i) {
1027 Procedure subproc = subprocs[i];
1028 if (subproc == null) {
1029 String msg = "subproc[" + i + "] is null, aborting the procedure";
1030 procedure.setFailure(new RemoteProcedureException(msg,
1031 new IllegalArgumentIOException(msg)));
1032 subprocs = null;
1033 break;
1034 }
1035
1036 assert subproc.getState() == ProcedureState.INITIALIZING;
1037 subproc.setParentProcId(procedure.getProcId());
1038 subproc.setProcId(nextProcId());
1039 }
1040
1041 if (!procedure.isFailed()) {
1042 procedure.setChildrenLatch(subprocs.length);
1043 switch (procedure.getState()) {
1044 case RUNNABLE:
1045 procedure.setState(ProcedureState.WAITING);
1046 break;
1047 case WAITING_TIMEOUT:
1048 waitingTimeout.add(procedure);
1049 break;
1050 default:
1051 break;
1052 }
1053 }
1054 }
1055 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1056 waitingTimeout.add(procedure);
1057 } else {
1058
1059 procedure.setState(ProcedureState.FINISHED);
1060 }
1061 }
1062
1063
1064 procStack.addRollbackStep(procedure);
1065
1066
1067
1068 if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1069 if (LOG.isDebugEnabled()) {
1070 LOG.debug("TESTING: Kill before store update");
1071 }
1072 stop();
1073 return;
1074 }
1075
1076
1077 if (subprocs != null && !procedure.isFailed()) {
1078 if (LOG.isTraceEnabled()) {
1079 LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
1080 }
1081 store.insert(procedure, subprocs);
1082 } else {
1083 if (LOG.isTraceEnabled()) {
1084 LOG.trace("Store update " + procedure);
1085 }
1086 store.update(procedure);
1087 }
1088
1089
1090 if (!store.isRunning()) {
1091 return;
1092 }
1093
1094 assert (reExecute && subprocs == null) || !reExecute;
1095 } while (reExecute);
1096
1097
1098 if (subprocs != null && !procedure.isFailed()) {
1099 for (int i = 0; i < subprocs.length; ++i) {
1100 Procedure subproc = subprocs[i];
1101 assert !procedures.containsKey(subproc.getProcId());
1102 procedures.put(subproc.getProcId(), subproc);
1103 runnables.addFront(subproc);
1104 }
1105 }
1106
1107 if (procedure.isFinished() && procedure.hasParent()) {
1108 Procedure parent = procedures.get(procedure.getParentProcId());
1109 if (parent == null) {
1110 assert procStack.isRollingback();
1111 return;
1112 }
1113
1114
1115 if (LOG.isTraceEnabled()) {
1116 LOG.trace(parent + " child is done: " + procedure);
1117 }
1118 if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
1119 parent.setState(ProcedureState.RUNNABLE);
1120 store.update(parent);
1121 runnables.addFront(parent);
1122 if (LOG.isTraceEnabled()) {
1123 LOG.trace(parent + " all the children finished their work, resume.");
1124 }
1125 return;
1126 }
1127 }
1128 }
1129
1130 private void sendProcedureLoadedNotification(final long procId) {
1131 if (!this.listeners.isEmpty()) {
1132 for (ProcedureExecutorListener listener: this.listeners) {
1133 try {
1134 listener.procedureLoaded(procId);
1135 } catch (Throwable e) {
1136 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1137 }
1138 }
1139 }
1140 }
1141
1142 private void sendProcedureAddedNotification(final long procId) {
1143 if (!this.listeners.isEmpty()) {
1144 for (ProcedureExecutorListener listener: this.listeners) {
1145 try {
1146 listener.procedureAdded(procId);
1147 } catch (Throwable e) {
1148 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1149 }
1150 }
1151 }
1152 }
1153
1154 private void sendProcedureFinishedNotification(final long procId) {
1155 if (!this.listeners.isEmpty()) {
1156 for (ProcedureExecutorListener listener: this.listeners) {
1157 try {
1158 listener.procedureFinished(procId);
1159 } catch (Throwable e) {
1160 LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1161 }
1162 }
1163 }
1164 }
1165
1166 private long nextProcId() {
1167 long procId = lastProcId.incrementAndGet();
1168 if (procId < 0) {
1169 while (!lastProcId.compareAndSet(procId, 0)) {
1170 procId = lastProcId.get();
1171 if (procId >= 0)
1172 break;
1173 }
1174 while (procedures.containsKey(procId)) {
1175 procId = lastProcId.incrementAndGet();
1176 }
1177 }
1178 return procId;
1179 }
1180
1181 private Long getRootProcedureId(Procedure proc) {
1182 return Procedure.getRootProcedureId(procedures, proc);
1183 }
1184
1185 private void procedureFinished(final Procedure proc) {
1186
1187 try {
1188 proc.completionCleanup(getEnvironment());
1189 } catch (Throwable e) {
1190
1191 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1192 }
1193
1194
1195 completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
1196 rollbackStack.remove(proc.getProcId());
1197 procedures.remove(proc.getProcId());
1198
1199
1200 try {
1201 runnables.completionCleanup(proc);
1202 } catch (Throwable e) {
1203
1204 LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
1205 }
1206
1207
1208 sendProcedureFinishedNotification(proc.getProcId());
1209 }
1210
1211 public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long procId) {
1212 ProcedureInfo result = completed.get(procId);
1213 Procedure proc = null;
1214 if (result == null) {
1215 proc = procedures.get(procId);
1216 if (proc == null) {
1217 result = completed.get(procId);
1218 }
1219 }
1220 return new Pair(result, proc);
1221 }
1222 }