View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Collections;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.HashSet;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  import java.util.concurrent.locks.ReentrantLock;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.ProcedureInfo;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.classification.InterfaceStability;
46  import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
47  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
48  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
49  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
50  import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
51  import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
52  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
53  import org.apache.hadoop.hbase.security.User;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  import org.apache.hadoop.hbase.util.NonceKey;
56  import org.apache.hadoop.hbase.util.Pair;
57  
58  import com.google.common.base.Preconditions;
59  
60  /**
61   * Thread Pool that executes the submitted procedures.
62   * The executor has a ProcedureStore associated.
63   * Each operation is logged and on restart the pending procedures are resumed.
64   *
65   * Unless the Procedure code throws an error (e.g. invalid user input)
66   * the procedure will complete (at some point in time), On restart the pending
67   * procedures are resumed and the once failed will be rolledback.
68   *
69   * The user can add procedures to the executor via submitProcedure(proc)
70   * check for the finished state via isFinished(procId)
71   * and get the result via getResult(procId)
72   */
73  @InterfaceAudience.Private
74  @InterfaceStability.Evolving
75  public class ProcedureExecutor<TEnvironment> {
76    private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
77  
78    Testing testing = null;
79    public static class Testing {
80      protected boolean killBeforeStoreUpdate = false;
81      protected boolean toggleKillBeforeStoreUpdate = false;
82  
83      protected boolean shouldKillBeforeStoreUpdate() {
84        final boolean kill = this.killBeforeStoreUpdate;
85        if (this.toggleKillBeforeStoreUpdate) {
86          this.killBeforeStoreUpdate = !kill;
87          LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
88        }
89        return kill;
90      }
91    }
92  
93    public interface ProcedureExecutorListener {
94      void procedureLoaded(long procId);
95      void procedureAdded(long procId);
96      void procedureFinished(long procId);
97    }
98  
99    /**
100    * Used by the TimeoutBlockingQueue to get the timeout interval of the procedure
101    */
102   private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
103     @Override
104     public long getTimeout(Procedure proc) {
105       return proc.getTimeRemaining();
106     }
107 
108     @Override
109     public TimeUnit getTimeUnit(Procedure proc) {
110       return TimeUnit.MILLISECONDS;
111     }
112   }
113 
114   /**
115    * Internal cleaner that removes the completed procedure results after a TTL.
116    * NOTE: This is a special case handled in timeoutLoop().
117    *
118    * Since the client code looks more or less like:
119    *   procId = master.doOperation()
120    *   while (master.getProcResult(procId) == ProcInProgress);
121    * The master should not throw away the proc result as soon as the procedure is done
122    * but should wait a result request from the client (see executor.removeResult(procId))
123    * The client will call something like master.isProcDone() or master.getProcResult()
124    * which will return the result/state to the client, and it will mark the completed
125    * proc as ready to delete. note that the client may not receive the response from
126    * the master (e.g. master failover) so, if we delay a bit the real deletion of
127    * the proc result the client will be able to get the result the next try.
128    */
129   private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
130     private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
131 
132     private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
133     private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
134 
135     private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
136     private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
137 
138     private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
139     private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
140 
141     private final Map<Long, ProcedureInfo> completed;
142     private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
143     private final ProcedureStore store;
144     private final Configuration conf;
145 
146     public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
147         final Map<Long, ProcedureInfo> completedMap,
148         final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
149       // set the timeout interval that triggers the periodic-procedure
150       setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
151       this.completed = completedMap;
152       this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
153       this.store = store;
154       this.conf = conf;
155     }
156 
157     public void periodicExecute(final TEnvironment env) {
158       if (completed.isEmpty()) {
159         if (LOG.isTraceEnabled()) {
160           LOG.trace("No completed procedures to cleanup.");
161         }
162         return;
163       }
164 
165       final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
166       final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
167 
168       final long now = EnvironmentEdgeManager.currentTime();
169       final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
170       final boolean isDebugEnabled = LOG.isDebugEnabled();
171       while (it.hasNext() && store.isRunning()) {
172         final Map.Entry<Long, ProcedureInfo> entry = it.next();
173         final ProcedureInfo procInfo = entry.getValue();
174 
175         // TODO: Select TTL based on Procedure type
176         if ((procInfo.hasClientAckTime() && (now - procInfo.getClientAckTime()) >= evictAckTtl) ||
177             (now - procInfo.getLastUpdate()) >= evictTtl) {
178           if (isDebugEnabled) {
179             LOG.debug("Evict completed procedure: " + procInfo);
180           }
181           store.delete(entry.getKey());
182           it.remove();
183 
184           NonceKey nonceKey = procInfo.getNonceKey();
185           if (nonceKey != null) {
186             nonceKeysToProcIdsMap.remove(nonceKey);
187           }
188         }
189       }
190     }
191 
192     @Override
193     protected Procedure[] execute(final TEnvironment env) {
194       throw new UnsupportedOperationException();
195     }
196 
197     @Override
198     protected void rollback(final TEnvironment env) {
199       throw new UnsupportedOperationException();
200     }
201 
202     @Override
203     protected boolean abort(final TEnvironment env) {
204       throw new UnsupportedOperationException();
205     }
206 
207     @Override
208     public void serializeStateData(final OutputStream stream) {
209       throw new UnsupportedOperationException();
210     }
211 
212     @Override
213     public void deserializeStateData(final InputStream stream) {
214       throw new UnsupportedOperationException();
215     }
216   }
217 
218   /**
219    * Map the the procId returned by submitProcedure(), the Root-ProcID, to the ProcedureInfo.
220    * Once a Root-Procedure completes (success or failure), the result will be added to this map.
221    * The user of ProcedureExecutor should call getResult(procId) to get the result.
222    */
223   private final ConcurrentHashMap<Long, ProcedureInfo> completed =
224     new ConcurrentHashMap<Long, ProcedureInfo>();
225 
226   /**
227    * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
228    * The RootProcedureState contains the execution stack of the Root-Procedure,
229    * It is added to the map by submitProcedure() and removed on procedure completion.
230    */
231   private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack =
232     new ConcurrentHashMap<Long, RootProcedureState>();
233 
234   /**
235    * Helper map to lookup the live procedures by ID.
236    * This map contains every procedure. root-procedures and subprocedures.
237    */
238   private final ConcurrentHashMap<Long, Procedure> procedures =
239     new ConcurrentHashMap<Long, Procedure>();
240 
241   /**
242    * Helper map to lookup whether the procedure already issued from the same client.
243    * This map contains every root procedure.
244    */
245   private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
246       new ConcurrentHashMap<NonceKey, Long>();
247 
248   /**
249    * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
250    * or periodic procedures.
251    */
252   private final TimeoutBlockingQueue<Procedure> waitingTimeout =
253     new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
254 
255   /**
256    * Queue that contains runnable procedures.
257    */
258   private final ProcedureRunnableSet runnables;
259 
260   // TODO
261   private final ReentrantLock submitLock = new ReentrantLock();
262   private final AtomicLong lastProcId = new AtomicLong(-1);
263 
264   private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
265     new CopyOnWriteArrayList<ProcedureExecutorListener>();
266 
267   private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
268   private final AtomicBoolean running = new AtomicBoolean(false);
269   private final TEnvironment environment;
270   private final ProcedureStore store;
271   private final Configuration conf;
272 
273   private Thread[] threads;
274 
275   public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
276       final ProcedureStore store) {
277     this(conf, environment, store, new ProcedureSimpleRunQueue());
278   }
279 
280   public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
281       final ProcedureStore store, final ProcedureRunnableSet runqueue) {
282     this.environment = environment;
283     this.runnables = runqueue;
284     this.store = store;
285     this.conf = conf;
286   }
287 
288   private void load(final boolean abortOnCorruption) throws IOException {
289     Preconditions.checkArgument(completed.isEmpty());
290     Preconditions.checkArgument(rollbackStack.isEmpty());
291     Preconditions.checkArgument(procedures.isEmpty());
292     Preconditions.checkArgument(waitingTimeout.isEmpty());
293     Preconditions.checkArgument(runnables.size() == 0);
294 
295     store.load(new ProcedureStore.ProcedureLoader() {
296       @Override
297       public void setMaxProcId(long maxProcId) {
298         assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
299         LOG.debug("load procedures maxProcId=" + maxProcId);
300         lastProcId.set(maxProcId);
301       }
302 
303       @Override
304       public void load(ProcedureIterator procIter) throws IOException {
305         loadProcedures(procIter, abortOnCorruption);
306       }
307 
308       @Override
309       public void handleCorrupted(ProcedureIterator procIter) throws IOException {
310         int corruptedCount = 0;
311         while (procIter.hasNext()) {
312           ProcedureInfo proc = procIter.nextAsProcedureInfo();
313           LOG.error("corrupted procedure: " + proc);
314           corruptedCount++;
315         }
316         if (abortOnCorruption && corruptedCount > 0) {
317           throw new IOException("found " + corruptedCount + " procedures on replay");
318         }
319       }
320     });
321   }
322 
323   private void loadProcedures(final ProcedureIterator procIter,
324       final boolean abortOnCorruption) throws IOException {
325     final boolean isDebugEnabled = LOG.isDebugEnabled();
326 
327     // 1. Build the rollback stack
328     int runnablesCount = 0;
329     while (procIter.hasNext()) {
330       final NonceKey nonceKey;
331       final long procId;
332 
333       if (procIter.isNextCompleted()) {
334         ProcedureInfo proc = procIter.nextAsProcedureInfo();
335         nonceKey = proc.getNonceKey();
336         procId = proc.getProcId();
337         completed.put(proc.getProcId(), proc);
338         if (isDebugEnabled) {
339           LOG.debug("The procedure is completed: " + proc);
340         }
341       } else {
342         Procedure proc = procIter.nextAsProcedure();
343         nonceKey = proc.getNonceKey();
344         procId = proc.getProcId();
345 
346         if (!proc.hasParent()) {
347           assert !proc.isFinished() : "unexpected finished procedure";
348           rollbackStack.put(proc.getProcId(), new RootProcedureState());
349         }
350 
351         // add the procedure to the map
352         proc.beforeReplay(getEnvironment());
353         procedures.put(proc.getProcId(), proc);
354 
355         if (proc.getState() == ProcedureState.RUNNABLE) {
356           runnablesCount++;
357         }
358       }
359 
360       // add the nonce to the map
361       if (nonceKey != null) {
362         nonceKeysToProcIdsMap.put(nonceKey, procId);
363       }
364     }
365 
366     // 2. Initialize the stacks
367     ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
368     HashSet<Procedure> waitingSet = null;
369     procIter.reset();
370     while (procIter.hasNext()) {
371       if (procIter.isNextCompleted()) {
372         procIter.skipNext();
373         continue;
374       }
375 
376       Procedure proc = procIter.nextAsProcedure();
377       assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
378 
379       if (isDebugEnabled) {
380         LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
381                     proc.getState(), proc.hasException(), proc));
382       }
383 
384       Long rootProcId = getRootProcedureId(proc);
385       if (rootProcId == null) {
386         // The 'proc' was ready to run but the root procedure was rolledback?
387         runnables.addBack(proc);
388         continue;
389       }
390 
391       if (proc.hasParent() && !proc.isFinished()) {
392         Procedure parent = procedures.get(proc.getParentProcId());
393         // corrupted procedures are handled later at step 3
394         if (parent != null) {
395           parent.incChildrenLatch();
396         }
397       }
398 
399       RootProcedureState procStack = rollbackStack.get(rootProcId);
400       procStack.loadStack(proc);
401 
402       switch (proc.getState()) {
403         case RUNNABLE:
404           runnableList.add(proc);
405           break;
406         case WAITING_TIMEOUT:
407           if (waitingSet == null) {
408             waitingSet = new HashSet<Procedure>();
409           }
410           waitingSet.add(proc);
411           break;
412         case FINISHED:
413           if (proc.hasException()) {
414             // add the proc to the runnables to perform the rollback
415             runnables.addBack(proc);
416             break;
417           }
418         case ROLLEDBACK:
419         case INITIALIZING:
420           String msg = "Unexpected " + proc.getState() + " state for " + proc;
421           LOG.error(msg);
422           throw new UnsupportedOperationException(msg);
423         default:
424           break;
425       }
426     }
427 
428     // 3. Validate the stacks
429     int corruptedCount = 0;
430     Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
431     while (itStack.hasNext()) {
432       Map.Entry<Long, RootProcedureState> entry = itStack.next();
433       RootProcedureState procStack = entry.getValue();
434       if (procStack.isValid()) continue;
435 
436       for (Procedure proc: procStack.getSubprocedures()) {
437         LOG.error("corrupted procedure: " + proc);
438         procedures.remove(proc.getProcId());
439         runnableList.remove(proc);
440         if (waitingSet != null) waitingSet.remove(proc);
441         corruptedCount++;
442       }
443       itStack.remove();
444     }
445 
446     if (abortOnCorruption && corruptedCount > 0) {
447       throw new IOException("found " + corruptedCount + " procedures on replay");
448     }
449 
450     // 4. Push the runnables
451     if (!runnableList.isEmpty()) {
452       // TODO: See ProcedureWALFormatReader#hasFastStartSupport
453       // some procedure may be started way before this stuff.
454       for (int i = runnableList.size() - 1; i >= 0; --i) {
455         Procedure proc = runnableList.get(i);
456         if (!proc.hasParent()) {
457           sendProcedureLoadedNotification(proc.getProcId());
458         }
459         if (proc.wasExecuted()) {
460           runnables.addFront(proc);
461         } else {
462           // if it was not in execution, it can wait.
463           runnables.addBack(proc);
464         }
465       }
466     }
467   }
468 
469   /**
470    * Start the procedure executor.
471    * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to
472    * recover the lease, and ensure a single executor, and start the procedure
473    * replay to resume and recover the previous pending and in-progress perocedures.
474    *
475    * @param numThreads number of threads available for procedure execution.
476    * @param abortOnCorruption true if you want to abort your service in case
477    *          a corrupted procedure is found on replay. otherwise false.
478    */
479   public void start(int numThreads, boolean abortOnCorruption) throws IOException {
480     if (running.getAndSet(true)) {
481       LOG.warn("Already running");
482       return;
483     }
484 
485     // We have numThreads executor + one timer thread used for timing out
486     // procedures and triggering periodic procedures.
487     threads = new Thread[numThreads + 1];
488     LOG.info("Starting procedure executor threads=" + threads.length);
489 
490     // Initialize procedures executor
491     for (int i = 0; i < numThreads; ++i) {
492       threads[i] = new Thread("ProcedureExecutor-" + i) {
493         @Override
494         public void run() {
495           execLoop();
496         }
497       };
498     }
499 
500     // Initialize procedures timeout handler (this is the +1 thread)
501     threads[numThreads] = new Thread("ProcedureExecutorTimeout") {
502       @Override
503       public void run() {
504         timeoutLoop();
505       }
506     };
507 
508     // Acquire the store lease.
509     store.recoverLease();
510 
511     // TODO: Split in two steps.
512     // TODO: Handle corrupted procedures (currently just a warn)
513     // The first one will make sure that we have the latest id,
514     // so we can start the threads and accept new procedures.
515     // The second step will do the actual load of old procedures.
516     load(abortOnCorruption);
517 
518     // Start the executors. Here we must have the lastProcId set.
519     for (int i = 0; i < threads.length; ++i) {
520       threads[i].start();
521     }
522 
523     // Add completed cleaner
524     waitingTimeout.add(
525       new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
526   }
527 
528   public void stop() {
529     if (!running.getAndSet(false)) {
530       return;
531     }
532 
533     LOG.info("Stopping the procedure executor");
534     runnables.signalAll();
535     waitingTimeout.signalAll();
536   }
537 
538   public void join() {
539     boolean interrupted = false;
540 
541     for (int i = 0; i < threads.length; ++i) {
542       try {
543         threads[i].join();
544       } catch (InterruptedException ex) {
545         interrupted = true;
546       }
547     }
548 
549     if (interrupted) {
550       Thread.currentThread().interrupt();
551     }
552 
553     completed.clear();
554     rollbackStack.clear();
555     procedures.clear();
556     nonceKeysToProcIdsMap.clear();
557     waitingTimeout.clear();
558     runnables.clear();
559     lastProcId.set(-1);
560   }
561 
562   public boolean isRunning() {
563     return running.get();
564   }
565 
566   /**
567    * @return the number of execution threads.
568    */
569   public int getNumThreads() {
570     return threads == null ? 0 : (threads.length - 1);
571   }
572 
573   public int getActiveExecutorCount() {
574     return activeExecutorCount.get();
575   }
576 
577   public TEnvironment getEnvironment() {
578     return this.environment;
579   }
580 
581   public ProcedureStore getStore() {
582     return this.store;
583   }
584 
585   public void registerListener(ProcedureExecutorListener listener) {
586     this.listeners.add(listener);
587   }
588 
589   public boolean unregisterListener(ProcedureExecutorListener listener) {
590     return this.listeners.remove(listener);
591   }
592 
593   /**
594    * List procedures.
595    * @return the procedures in a list
596    */
597   public List<ProcedureInfo> listProcedures() {
598     List<ProcedureInfo> procedureLists =
599         new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
600     for (java.util.Map.Entry<Long, Procedure> p: procedures.entrySet()) {
601       procedureLists.add(Procedure.createProcedureInfo(p.getValue(), null));
602     }
603     for (java.util.Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
604       // Note: The procedure could show up twice in the list with different state, as
605       // it could complete after we walk through procedures list and insert into
606       // procedureList - it is ok, as we will use the information in the ProcedureInfo
607       // to figure it out; to prevent this would increase the complexity of the logic.
608       procedureLists.add(e.getValue());
609     }
610     return procedureLists;
611   }
612 
613   /**
614    * Add a new root-procedure to the executor.
615    * @param proc the new procedure to execute.
616    * @return the procedure id, that can be used to monitor the operation
617    */
618   public long submitProcedure(final Procedure proc) {
619     return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
620   }
621 
622   /**
623    * Add a new root-procedure to the executor.
624    * @param proc the new procedure to execute.
625    * @param nonceGroup
626    * @param nonce
627    * @return the procedure id, that can be used to monitor the operation
628    */
629   public long submitProcedure(
630       final Procedure proc,
631       final long nonceGroup,
632       final long nonce) {
633     Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
634     Preconditions.checkArgument(isRunning());
635     Preconditions.checkArgument(lastProcId.get() >= 0);
636     Preconditions.checkArgument(!proc.hasParent());
637 
638     Long currentProcId;
639 
640     // The following part of the code has to be synchronized to prevent multiple request
641     // with the same nonce to execute at the same time.
642     synchronized (this) {
643       // Check whether the proc exists.  If exist, just return the proc id.
644       // This is to prevent the same proc to submit multiple times (it could happen
645       // when client could not talk to server and resubmit the same request).
646       NonceKey noncekey = null;
647       if (nonce != HConstants.NO_NONCE) {
648         noncekey = new NonceKey(nonceGroup, nonce);
649         currentProcId = nonceKeysToProcIdsMap.get(noncekey);
650         if (currentProcId != null) {
651           // Found the proc
652           return currentProcId;
653         }
654       }
655 
656       // Initialize the Procedure ID
657       currentProcId = nextProcId();
658       proc.setProcId(currentProcId);
659 
660       // This is new procedure. Set the noncekey and insert into the map.
661       if (noncekey != null) {
662         proc.setNonceKey(noncekey);
663         nonceKeysToProcIdsMap.put(noncekey, currentProcId);
664       }
665     } // end of synchronized (this)
666 
667     // Commit the transaction
668     store.insert(proc, null);
669     if (LOG.isDebugEnabled()) {
670       LOG.debug("Procedure " + proc + " added to the store.");
671     }
672 
673     // Create the rollback stack for the procedure
674     RootProcedureState stack = new RootProcedureState();
675     rollbackStack.put(currentProcId, stack);
676 
677     // Submit the new subprocedures
678     assert !procedures.containsKey(currentProcId);
679     procedures.put(currentProcId, proc);
680     sendProcedureAddedNotification(currentProcId);
681     runnables.addBack(proc);
682     return currentProcId;
683   }
684 
685   public ProcedureInfo getResult(final long procId) {
686     return completed.get(procId);
687   }
688 
689   /**
690    * Return true if the procedure is finished.
691    * The state may be "completed successfully" or "failed and rolledback".
692    * Use getResult() to check the state or get the result data.
693    * @param procId the ID of the procedure to check
694    * @return true if the procedure execution is finished, otherwise false.
695    */
696   public boolean isFinished(final long procId) {
697     return completed.containsKey(procId);
698   }
699 
700   /**
701    * Return true if the procedure is started.
702    * @param procId the ID of the procedure to check
703    * @return true if the procedure execution is started, otherwise false.
704    */
705   public boolean isStarted(final long procId) {
706     Procedure proc = procedures.get(procId);
707     if (proc == null) {
708       return completed.get(procId) != null;
709     }
710     return proc.wasExecuted();
711   }
712 
713   /**
714    * Mark the specified completed procedure, as ready to remove.
715    * @param procId the ID of the procedure to remove
716    */
717   public void removeResult(final long procId) {
718     ProcedureInfo result = completed.get(procId);
719     if (result == null) {
720       assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
721       if (LOG.isDebugEnabled()) {
722         LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
723       }
724       return;
725     }
726 
727     // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
728     result.setClientAckTime(EnvironmentEdgeManager.currentTime());
729   }
730 
731   /**
732    * Send an abort notification the specified procedure.
733    * Depending on the procedure implementation the abort can be considered or ignored.
734    * @param procId the procedure to abort
735    * @return true if the procedure exist and has received the abort, otherwise false.
736    */
737   public boolean abort(final long procId) {
738     return abort(procId, true);
739   }
740 
741   /**
742    * Send an abort notification the specified procedure.
743    * Depending on the procedure implementation the abort can be considered or ignored.
744    * @param procId the procedure to abort
745    * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
746    * @return true if the procedure exist and has received the abort, otherwise false.
747    */
748   public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
749     Procedure proc = procedures.get(procId);
750     if (proc != null) {
751       if (!mayInterruptIfRunning && proc.wasExecuted()) {
752         return false;
753       } else {
754         return proc.abort(getEnvironment());
755       }
756     }
757     return false;
758   }
759 
760   /**
761    * Check if the user is this procedure's owner
762    * @param procId the target procedure
763    * @param user the user
764    * @return true if the user is the owner of the procedure,
765    *   false otherwise or the owner is unknown.
766    */
767   public boolean isProcedureOwner(final long procId, final User user) {
768     if (user == null) {
769       return false;
770     }
771 
772     Procedure proc = procedures.get(procId);
773     if (proc != null) {
774       return proc.getOwner().equals(user.getShortName());
775     }
776     ProcedureInfo procInfo = completed.get(procId);
777     if (procInfo == null) {
778       // Procedure either does not exist or has already completed and got cleaned up.
779       // At this time, we cannot check the owner of the procedure
780       return false;
781     }
782     return ProcedureInfo.isProcedureOwner(procInfo, user);
783   }
784 
785   public Map<Long, ProcedureInfo> getResults() {
786     return Collections.unmodifiableMap(completed);
787   }
788 
789   public Procedure getProcedure(final long procId) {
790     return procedures.get(procId);
791   }
792 
793   protected ProcedureRunnableSet getRunnableSet() {
794     return runnables;
795   }
796 
797   /**
798    * Execution loop (N threads)
799    * while the executor is in a running state,
800    * fetch a procedure from the runnables queue and start the execution.
801    */
802   private void execLoop() {
803     while (isRunning()) {
804       Procedure proc = runnables.poll();
805       if (proc == null) continue;
806 
807       try {
808         activeExecutorCount.incrementAndGet();
809         execLoop(proc);
810       } finally {
811         activeExecutorCount.decrementAndGet();
812       }
813     }
814   }
815 
816   private void execLoop(Procedure proc) {
817     if (LOG.isTraceEnabled()) {
818       LOG.trace("Trying to start the execution of " + proc);
819     }
820 
821     Long rootProcId = getRootProcedureId(proc);
822     if (rootProcId == null) {
823       // The 'proc' was ready to run but the root procedure was rolledback
824       executeRollback(proc);
825       return;
826     }
827 
828     RootProcedureState procStack = rollbackStack.get(rootProcId);
829     if (procStack == null) return;
830 
831     do {
832       // Try to acquire the execution
833       if (!procStack.acquire(proc)) {
834         if (procStack.setRollback()) {
835           // we have the 'rollback-lock' we can start rollingback
836           if (!executeRollback(rootProcId, procStack)) {
837             procStack.unsetRollback();
838             runnables.yield(proc);
839           }
840         } else {
841           // if we can't rollback means that some child is still running.
842           // the rollback will be executed after all the children are done.
843           // If the procedure was never executed, remove and mark it as rolledback.
844           if (!proc.wasExecuted()) {
845             if (!executeRollback(proc)) {
846               runnables.yield(proc);
847             }
848           }
849         }
850         break;
851       }
852 
853       // Execute the procedure
854       assert proc.getState() == ProcedureState.RUNNABLE;
855       if (proc.acquireLock(getEnvironment())) {
856         execProcedure(procStack, proc);
857         proc.releaseLock(getEnvironment());
858       } else {
859         runnables.yield(proc);
860       }
861       procStack.release(proc);
862 
863       // allows to kill the executor before something is stored to the wal.
864       // useful to test the procedure recovery.
865       if (testing != null && !isRunning()) {
866         break;
867       }
868 
869       if (proc.isSuccess()) {
870         if (LOG.isDebugEnabled()) {
871           LOG.debug("Procedure completed in " +
872               StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
873         }
874         // Finalize the procedure state
875         if (proc.getProcId() == rootProcId) {
876           procedureFinished(proc);
877         }
878         break;
879       }
880     } while (procStack.isFailed());
881   }
882 
883   private void timeoutLoop() {
884     while (isRunning()) {
885       Procedure proc = waitingTimeout.poll();
886       if (proc == null) continue;
887 
888       if (proc.getTimeRemaining() > 100) {
889         // got an early wake, maybe a stop?
890         // re-enqueue the task in case was not a stop or just a signal
891         waitingTimeout.add(proc);
892         continue;
893       }
894 
895       // ----------------------------------------------------------------------------
896       // TODO-MAYBE: Should we provide a notification to the store with the
897       // full set of procedures pending and completed to write a compacted
898       // version of the log (in case is a log)?
899       // In theory no, procedures are have a short life, so at some point the store
900       // will have the tracker saying everything is in the last log.
901       // ----------------------------------------------------------------------------
902 
903       // The CompletedProcedureCleaner is a special case, and it acts as a chore.
904       // instead of bringing the Chore class in, we reuse this timeout thread for
905       // this special case.
906       if (proc instanceof CompletedProcedureCleaner) {
907         try {
908           ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
909         } catch (Throwable e) {
910           LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
911         }
912         proc.setStartTime(EnvironmentEdgeManager.currentTime());
913         waitingTimeout.add(proc);
914         continue;
915       }
916 
917       // The procedure received an "abort-timeout", call abort() and
918       // add the procedure back in the queue for rollback.
919       if (proc.setTimeoutFailure()) {
920         long rootProcId = Procedure.getRootProcedureId(procedures, proc);
921         RootProcedureState procStack = rollbackStack.get(rootProcId);
922         procStack.abort();
923         store.update(proc);
924         runnables.addFront(proc);
925         continue;
926       }
927     }
928   }
929 
930   /**
931    * Execute the rollback of the full procedure stack.
932    * Once the procedure is rolledback, the root-procedure will be visible as
933    * finished to user, and the result will be the fatal exception.
934    */
935   private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
936     Procedure rootProc = procedures.get(rootProcId);
937     RemoteProcedureException exception = rootProc.getException();
938     if (exception == null) {
939       exception = procStack.getException();
940       rootProc.setFailure(exception);
941       store.update(rootProc);
942     }
943 
944     List<Procedure> subprocStack = procStack.getSubprocedures();
945     assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
946 
947     int stackTail = subprocStack.size();
948     boolean reuseLock = false;
949     while (stackTail --> 0) {
950       final Procedure proc = subprocStack.get(stackTail);
951 
952       if (!reuseLock && !proc.acquireLock(getEnvironment())) {
953         // can't take a lock on the procedure, add the root-proc back on the
954         // queue waiting for the lock availability
955         return false;
956       }
957 
958       boolean abortRollback = !executeRollback(proc);
959       abortRollback |= !isRunning() || !store.isRunning();
960 
961       // If the next procedure is the same to this one
962       // (e.g. StateMachineProcedure reuse the same instance)
963       // we can avoid to lock/unlock each step
964       reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
965       if (!reuseLock) {
966         proc.releaseLock(getEnvironment());
967       }
968 
969       // allows to kill the executor before something is stored to the wal.
970       // useful to test the procedure recovery.
971       if (abortRollback) {
972         return false;
973       }
974 
975       subprocStack.remove(stackTail);
976 
977       // if the procedure is kind enough to pass the slot to someone else, yield
978       if (proc.isYieldAfterExecutionStep(getEnvironment())) {
979         return false;
980       }
981     }
982 
983     // Finalize the procedure state
984     LOG.info("Rolledback procedure " + rootProc +
985              " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
986              " exception=" + exception.getMessage());
987     procedureFinished(rootProc);
988     return true;
989   }
990 
991   /**
992    * Execute the rollback of the procedure step.
993    * It updates the store with the new state (stack index)
994    * or will remove completly the procedure in case it is a child.
995    */
996   private boolean executeRollback(final Procedure proc) {
997     try {
998       proc.doRollback(getEnvironment());
999     } catch (IOException e) {
1000       if (LOG.isDebugEnabled()) {
1001         LOG.debug("rollback attempt failed for " + proc, e);
1002       }
1003       return false;
1004     } catch (InterruptedException e) {
1005       handleInterruptedException(proc, e);
1006       return false;
1007     } catch (Throwable e) {
1008       // Catch NullPointerExceptions or similar errors...
1009       LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
1010     }
1011 
1012     // allows to kill the executor before something is stored to the wal.
1013     // useful to test the procedure recovery.
1014     if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1015       LOG.debug("TESTING: Kill before store update");
1016       stop();
1017       return false;
1018     }
1019 
1020     if (proc.removeStackIndex()) {
1021       proc.setState(ProcedureState.ROLLEDBACK);
1022       if (proc.hasParent()) {
1023         store.delete(proc.getProcId());
1024         procedures.remove(proc.getProcId());
1025       } else {
1026         store.update(proc);
1027       }
1028     } else {
1029       store.update(proc);
1030     }
1031 
1032     return true;
1033   }
1034 
1035   /**
1036    * Executes the specified procedure
1037    *  - calls the doExecute() of the procedure
1038    *  - if the procedure execution didn't fail (e.g. invalid user input)
1039    *     - ...and returned subprocedures
1040    *        - the subprocedures are initialized.
1041    *        - the subprocedures are added to the store
1042    *        - the subprocedures are added to the runnable queue
1043    *        - the procedure is now in a WAITING state, waiting for the subprocedures to complete
1044    *     - ...if there are no subprocedure
1045    *        - the procedure completed successfully
1046    *        - if there is a parent (WAITING)
1047    *            - the parent state will be set to RUNNABLE
1048    *  - in case of failure
1049    *    - the store is updated with the new state
1050    *    - the executor (caller of this method) will start the rollback of the procedure
1051    */
1052   private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
1053     Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
1054 
1055     // Execute the procedure
1056     boolean reExecute = false;
1057     Procedure[] subprocs = null;
1058     do {
1059       reExecute = false;
1060       try {
1061         subprocs = procedure.doExecute(getEnvironment());
1062         if (subprocs != null && subprocs.length == 0) {
1063           subprocs = null;
1064         }
1065       } catch (ProcedureYieldException e) {
1066         if (LOG.isTraceEnabled()) {
1067           LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
1068         }
1069         runnables.yield(procedure);
1070         return;
1071       } catch (InterruptedException e) {
1072         handleInterruptedException(procedure, e);
1073         runnables.yield(procedure);
1074         return;
1075       } catch (Throwable e) {
1076         // Catch NullPointerExceptions or similar errors...
1077         String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
1078         LOG.error(msg, e);
1079         procedure.setFailure(new RemoteProcedureException(msg, e));
1080       }
1081 
1082       if (!procedure.isFailed()) {
1083         if (subprocs != null) {
1084           if (subprocs.length == 1 && subprocs[0] == procedure) {
1085             // quick-shortcut for a state machine like procedure
1086             subprocs = null;
1087             reExecute = true;
1088           } else {
1089             // yield the current procedure, and make the subprocedure runnable
1090             for (int i = 0; i < subprocs.length; ++i) {
1091               Procedure subproc = subprocs[i];
1092               if (subproc == null) {
1093                 String msg = "subproc[" + i + "] is null, aborting the procedure";
1094                 procedure.setFailure(new RemoteProcedureException(msg,
1095                   new IllegalArgumentIOException(msg)));
1096                 subprocs = null;
1097                 break;
1098               }
1099 
1100               assert subproc.getState() == ProcedureState.INITIALIZING;
1101               subproc.setParentProcId(procedure.getProcId());
1102               subproc.setProcId(nextProcId());
1103             }
1104 
1105             if (!procedure.isFailed()) {
1106               procedure.setChildrenLatch(subprocs.length);
1107               switch (procedure.getState()) {
1108                 case RUNNABLE:
1109                   procedure.setState(ProcedureState.WAITING);
1110                   break;
1111                 case WAITING_TIMEOUT:
1112                   waitingTimeout.add(procedure);
1113                   break;
1114                 default:
1115                   break;
1116               }
1117             }
1118           }
1119         } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1120           waitingTimeout.add(procedure);
1121         } else {
1122           // No subtask, so we are done
1123           procedure.setState(ProcedureState.FINISHED);
1124         }
1125       }
1126 
1127       // Add the procedure to the stack
1128       procStack.addRollbackStep(procedure);
1129 
1130       // allows to kill the executor before something is stored to the wal.
1131       // useful to test the procedure recovery.
1132       if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1133         LOG.debug("TESTING: Kill before store update");
1134         stop();
1135         return;
1136       }
1137 
1138       // Commit the transaction
1139       if (subprocs != null && !procedure.isFailed()) {
1140         if (LOG.isTraceEnabled()) {
1141           LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
1142         }
1143         store.insert(procedure, subprocs);
1144       } else {
1145         if (LOG.isTraceEnabled()) {
1146           LOG.trace("Store update " + procedure);
1147         }
1148         store.update(procedure);
1149       }
1150 
1151       // if the store is not running we are aborting
1152       if (!store.isRunning()) {
1153         return;
1154       }
1155 
1156       // if the procedure is kind enough to pass the slot to someone else, yield
1157       if (procedure.getState() == ProcedureState.RUNNABLE &&
1158           procedure.isYieldAfterExecutionStep(getEnvironment())) {
1159         runnables.yield(procedure);
1160         return;
1161       }
1162 
1163       assert (reExecute && subprocs == null) || !reExecute;
1164     } while (reExecute);
1165 
1166     // Submit the new subprocedures
1167     if (subprocs != null && !procedure.isFailed()) {
1168       for (int i = 0; i < subprocs.length; ++i) {
1169         Procedure subproc = subprocs[i];
1170         assert !procedures.containsKey(subproc.getProcId());
1171         procedures.put(subproc.getProcId(), subproc);
1172         runnables.addFront(subproc);
1173       }
1174     }
1175 
1176     if (procedure.isFinished() && procedure.hasParent()) {
1177       Procedure parent = procedures.get(procedure.getParentProcId());
1178       if (parent == null) {
1179         assert procStack.isRollingback();
1180         return;
1181       }
1182 
1183       // If this procedure is the last child awake the parent procedure
1184       if (LOG.isTraceEnabled()) {
1185         LOG.trace(parent + " child is done: " + procedure);
1186       }
1187       if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
1188         parent.setState(ProcedureState.RUNNABLE);
1189         store.update(parent);
1190         runnables.addFront(parent);
1191         if (LOG.isTraceEnabled()) {
1192           LOG.trace(parent + " all the children finished their work, resume.");
1193         }
1194         return;
1195       }
1196     }
1197   }
1198 
1199   private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
1200     if (LOG.isTraceEnabled()) {
1201       LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
1202     }
1203 
1204     // NOTE: We don't call Thread.currentThread().interrupt()
1205     // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
1206     // the InterruptedException. If the master is going down, we will be notified
1207     // and the executor/store will be stopped.
1208     // (The interrupted procedure will be retried on the next run)
1209   }
1210 
1211   private void sendProcedureLoadedNotification(final long procId) {
1212     if (!this.listeners.isEmpty()) {
1213       for (ProcedureExecutorListener listener: this.listeners) {
1214         try {
1215           listener.procedureLoaded(procId);
1216         } catch (Throwable e) {
1217           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1218         }
1219       }
1220     }
1221   }
1222 
1223   private void sendProcedureAddedNotification(final long procId) {
1224     if (!this.listeners.isEmpty()) {
1225       for (ProcedureExecutorListener listener: this.listeners) {
1226         try {
1227           listener.procedureAdded(procId);
1228         } catch (Throwable e) {
1229           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1230         }
1231       }
1232     }
1233   }
1234 
1235   private void sendProcedureFinishedNotification(final long procId) {
1236     if (!this.listeners.isEmpty()) {
1237       for (ProcedureExecutorListener listener: this.listeners) {
1238         try {
1239           listener.procedureFinished(procId);
1240         } catch (Throwable e) {
1241           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1242         }
1243       }
1244     }
1245   }
1246 
1247   private long nextProcId() {
1248     long procId = lastProcId.incrementAndGet();
1249     if (procId < 0) {
1250       while (!lastProcId.compareAndSet(procId, 0)) {
1251         procId = lastProcId.get();
1252         if (procId >= 0)
1253           break;
1254       }
1255       while (procedures.containsKey(procId)) {
1256         procId = lastProcId.incrementAndGet();
1257       }
1258     }
1259     return procId;
1260   }
1261 
1262   private Long getRootProcedureId(Procedure proc) {
1263     return Procedure.getRootProcedureId(procedures, proc);
1264   }
1265 
1266   private void procedureFinished(final Procedure proc) {
1267     // call the procedure completion cleanup handler
1268     try {
1269       proc.completionCleanup(getEnvironment());
1270     } catch (Throwable e) {
1271       // Catch NullPointerExceptions or similar errors...
1272       LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1273     }
1274 
1275     // update the executor internal state maps
1276     ProcedureInfo procInfo = Procedure.createProcedureInfo(proc, proc.getNonceKey());
1277     if (!proc.shouldWaitClientAck(getEnvironment())) {
1278       procInfo.setClientAckTime(0);
1279     }
1280 
1281     completed.put(procInfo.getProcId(), procInfo);
1282     rollbackStack.remove(proc.getProcId());
1283     procedures.remove(proc.getProcId());
1284 
1285     // call the runnableSet completion cleanup handler
1286     try {
1287       runnables.completionCleanup(proc);
1288     } catch (Throwable e) {
1289       // Catch NullPointerExceptions or similar errors...
1290       LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
1291     }
1292 
1293     // Notify the listeners
1294     sendProcedureFinishedNotification(proc.getProcId());
1295   }
1296 
1297   public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long procId) {
1298     ProcedureInfo result = completed.get(procId);
1299     Procedure proc = null;
1300     if (result == null) {
1301       proc = procedures.get(procId);
1302       if (proc == null) {
1303         result = completed.get(procId);
1304       }
1305     }
1306     return new Pair(result, proc);
1307   }
1308 }