View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Collections;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.HashSet;
31  import java.util.TreeSet;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.CopyOnWriteArrayList;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.ProcedureInfo;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.classification.InterfaceStability;
47  import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
48  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
49  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
50  import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
51  import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
52  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
53  import org.apache.hadoop.hbase.security.User;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  import org.apache.hadoop.hbase.util.NonceKey;
56  import org.apache.hadoop.hbase.util.Pair;
57  
58  import com.google.common.base.Preconditions;
59  
60  /**
61   * Thread Pool that executes the submitted procedures.
62   * The executor has a ProcedureStore associated.
63   * Each operation is logged and on restart the pending procedures are resumed.
64   *
65   * Unless the Procedure code throws an error (e.g. invalid user input)
66   * the procedure will complete (at some point in time), On restart the pending
67   * procedures are resumed and the once failed will be rolledback.
68   *
69   * The user can add procedures to the executor via submitProcedure(proc)
70   * check for the finished state via isFinished(procId)
71   * and get the result via getResult(procId)
72   */
73  @InterfaceAudience.Private
74  @InterfaceStability.Evolving
75  public class ProcedureExecutor<TEnvironment> {
76    private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
77  
78    Testing testing = null;
79    public static class Testing {
80      protected boolean killBeforeStoreUpdate = false;
81      protected boolean toggleKillBeforeStoreUpdate = false;
82  
83      protected boolean shouldKillBeforeStoreUpdate() {
84        final boolean kill = this.killBeforeStoreUpdate;
85        if (this.toggleKillBeforeStoreUpdate) {
86          this.killBeforeStoreUpdate = !kill;
87          LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
88        }
89        return kill;
90      }
91    }
92  
93    public interface ProcedureExecutorListener {
94      void procedureLoaded(long procId);
95      void procedureAdded(long procId);
96      void procedureFinished(long procId);
97    }
98  
99    /**
100    * Used by the TimeoutBlockingQueue to get the timeout interval of the procedure
101    */
102   private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
103     @Override
104     public long getTimeout(Procedure proc) {
105       return proc.getTimeRemaining();
106     }
107 
108     @Override
109     public TimeUnit getTimeUnit(Procedure proc) {
110       return TimeUnit.MILLISECONDS;
111     }
112   }
113 
114   /**
115    * Internal cleaner that removes the completed procedure results after a TTL.
116    * NOTE: This is a special case handled in timeoutLoop().
117    *
118    * Since the client code looks more or less like:
119    *   procId = master.doOperation()
120    *   while (master.getProcResult(procId) == ProcInProgress);
121    * The master should not throw away the proc result as soon as the procedure is done
122    * but should wait a result request from the client (see executor.removeResult(procId))
123    * The client will call something like master.isProcDone() or master.getProcResult()
124    * which will return the result/state to the client, and it will mark the completed
125    * proc as ready to delete. note that the client may not receive the response from
126    * the master (e.g. master failover) so, if we delay a bit the real deletion of
127    * the proc result the client will be able to get the result the next try.
128    */
129   private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
130     private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
131 
132     private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
133     private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
134 
135     private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
136     private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
137 
138     private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
139     private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
140 
141     private final Map<Long, ProcedureInfo> completed;
142     private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
143     private final ProcedureStore store;
144     private final Configuration conf;
145 
146     public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
147         final Map<Long, ProcedureInfo> completedMap,
148         final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
149       // set the timeout interval that triggers the periodic-procedure
150       setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
151       this.completed = completedMap;
152       this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
153       this.store = store;
154       this.conf = conf;
155     }
156 
157     public void periodicExecute(final TEnvironment env) {
158       if (completed.isEmpty()) {
159         if (LOG.isDebugEnabled()) {
160           LOG.debug("No completed procedures to cleanup.");
161         }
162         return;
163       }
164 
165       final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
166       final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
167 
168       long now = EnvironmentEdgeManager.currentTime();
169       Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
170       while (it.hasNext() && store.isRunning()) {
171         Map.Entry<Long, ProcedureInfo> entry = it.next();
172         ProcedureInfo result = entry.getValue();
173 
174         // TODO: Select TTL based on Procedure type
175         if ((result.hasClientAckTime() && (now - result.getClientAckTime()) >= evictAckTtl) ||
176             (now - result.getLastUpdate()) >= evictTtl) {
177           if (LOG.isDebugEnabled()) {
178             LOG.debug("Evict completed procedure " + entry.getKey());
179           }
180           store.delete(entry.getKey());
181           it.remove();
182 
183           NonceKey nonceKey = result.getNonceKey();
184           if (nonceKey != null) {
185             nonceKeysToProcIdsMap.remove(nonceKey);
186           }
187         }
188       }
189     }
190 
191     @Override
192     protected Procedure[] execute(final TEnvironment env) {
193       throw new UnsupportedOperationException();
194     }
195 
196     @Override
197     protected void rollback(final TEnvironment env) {
198       throw new UnsupportedOperationException();
199     }
200 
201     @Override
202     protected boolean abort(final TEnvironment env) {
203       throw new UnsupportedOperationException();
204     }
205 
206     @Override
207     public void serializeStateData(final OutputStream stream) {
208       throw new UnsupportedOperationException();
209     }
210 
211     @Override
212     public void deserializeStateData(final InputStream stream) {
213       throw new UnsupportedOperationException();
214     }
215   }
216 
217   /**
218    * Map the the procId returned by submitProcedure(), the Root-ProcID, to the ProcedureInfo.
219    * Once a Root-Procedure completes (success or failure), the result will be added to this map.
220    * The user of ProcedureExecutor should call getResult(procId) to get the result.
221    */
222   private final ConcurrentHashMap<Long, ProcedureInfo> completed =
223     new ConcurrentHashMap<Long, ProcedureInfo>();
224 
225   /**
226    * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
227    * The RootProcedureState contains the execution stack of the Root-Procedure,
228    * It is added to the map by submitProcedure() and removed on procedure completion.
229    */
230   private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack =
231     new ConcurrentHashMap<Long, RootProcedureState>();
232 
233   /**
234    * Helper map to lookup the live procedures by ID.
235    * This map contains every procedure. root-procedures and subprocedures.
236    */
237   private final ConcurrentHashMap<Long, Procedure> procedures =
238     new ConcurrentHashMap<Long, Procedure>();
239 
240   /**
241    * Helper map to lookup whether the procedure already issued from the same client.
242    * This map contains every root procedure.
243    */
244   private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
245       new ConcurrentHashMap<NonceKey, Long>();
246 
247   /**
248    * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
249    * or periodic procedures.
250    */
251   private final TimeoutBlockingQueue<Procedure> waitingTimeout =
252     new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
253 
254   /**
255    * Queue that contains runnable procedures.
256    */
257   private final ProcedureRunnableSet runnables;
258 
259   // TODO
260   private final ReentrantLock submitLock = new ReentrantLock();
261   private final AtomicLong lastProcId = new AtomicLong(-1);
262 
263   private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
264     new CopyOnWriteArrayList<ProcedureExecutorListener>();
265 
266   private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
267   private final AtomicBoolean running = new AtomicBoolean(false);
268   private final TEnvironment environment;
269   private final ProcedureStore store;
270   private final Configuration conf;
271 
272   private Thread[] threads;
273 
274   public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
275       final ProcedureStore store) {
276     this(conf, environment, store, new ProcedureSimpleRunQueue());
277   }
278 
279   public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
280       final ProcedureStore store, final ProcedureRunnableSet runqueue) {
281     this.environment = environment;
282     this.runnables = runqueue;
283     this.store = store;
284     this.conf = conf;
285   }
286 
287   private List<Map.Entry<Long, RootProcedureState>> load() throws IOException {
288     Preconditions.checkArgument(completed.isEmpty());
289     Preconditions.checkArgument(rollbackStack.isEmpty());
290     Preconditions.checkArgument(procedures.isEmpty());
291     Preconditions.checkArgument(waitingTimeout.isEmpty());
292     Preconditions.checkArgument(runnables.size() == 0);
293 
294     // 1. Load the procedures
295     Iterator<Procedure> loader = store.load();
296     if (loader == null) {
297       lastProcId.set(0);
298       return null;
299     }
300 
301     long logMaxProcId = 0;
302     int runnablesCount = 0;
303     while (loader.hasNext()) {
304       Procedure proc = loader.next();
305       proc.beforeReplay(getEnvironment());
306       procedures.put(proc.getProcId(), proc);
307       logMaxProcId = Math.max(logMaxProcId, proc.getProcId());
308       if (LOG.isDebugEnabled()) {
309         LOG.debug("Loading procedure state=" + proc.getState() +
310             " isFailed=" + proc.hasException() + ": " + proc);
311       }
312       if (!proc.hasParent() && !proc.isFinished()) {
313         rollbackStack.put(proc.getProcId(), new RootProcedureState());
314       }
315 
316       // add the nonce to the map
317       if (proc.getNonceKey() != null) {
318         nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId());
319       }
320 
321       if (proc.getState() == ProcedureState.RUNNABLE) {
322         runnablesCount++;
323       }
324     }
325     assert lastProcId.get() < 0;
326     lastProcId.set(logMaxProcId);
327 
328     // 2. Initialize the stacks
329     TreeSet<Procedure> runnableSet = null;
330     HashSet<Procedure> waitingSet = null;
331     for (final Procedure proc: procedures.values()) {
332       Long rootProcId = getRootProcedureId(proc);
333       if (rootProcId == null) {
334         // The 'proc' was ready to run but the root procedure was rolledback?
335         runnables.addBack(proc);
336         continue;
337       }
338 
339       if (!proc.hasParent() && proc.isFinished()) {
340         if (LOG.isDebugEnabled()) {
341           LOG.debug("The procedure is completed state=" + proc.getState() +
342               " isFailed=" + proc.hasException() + ": " + proc);
343         }
344         assert !rollbackStack.containsKey(proc.getProcId());
345 
346         completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
347 
348         continue;
349       }
350 
351       if (proc.hasParent() && !proc.isFinished()) {
352         Procedure parent = procedures.get(proc.getParentProcId());
353         // corrupted procedures are handled later at step 3
354         if (parent != null) {
355           parent.incChildrenLatch();
356         }
357       }
358 
359       RootProcedureState procStack = rollbackStack.get(rootProcId);
360       procStack.loadStack(proc);
361 
362       switch (proc.getState()) {
363         case RUNNABLE:
364           if (runnableSet == null) {
365             runnableSet = new TreeSet<Procedure>();
366           }
367           runnableSet.add(proc);
368           break;
369         case WAITING_TIMEOUT:
370           if (waitingSet == null) {
371             waitingSet = new HashSet<Procedure>();
372           }
373           waitingSet.add(proc);
374           break;
375         case FINISHED:
376           if (proc.hasException()) {
377             // add the proc to the runnables to perform the rollback
378             runnables.addBack(proc);
379             break;
380           }
381         case ROLLEDBACK:
382         case INITIALIZING:
383           String msg = "Unexpected " + proc.getState() + " state for " + proc;
384           LOG.error(msg);
385           throw new UnsupportedOperationException(msg);
386         default:
387           break;
388       }
389     }
390 
391     // 3. Validate the stacks
392     List<Map.Entry<Long, RootProcedureState>> corrupted = null;
393     Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
394     while (itStack.hasNext()) {
395       Map.Entry<Long, RootProcedureState> entry = itStack.next();
396       RootProcedureState procStack = entry.getValue();
397       if (procStack.isValid()) continue;
398 
399       for (Procedure proc: procStack.getSubprocedures()) {
400         procedures.remove(proc.getProcId());
401         if (runnableSet != null) runnableSet.remove(proc);
402         if (waitingSet != null) waitingSet.remove(proc);
403       }
404       itStack.remove();
405       if (corrupted == null) {
406         corrupted = new ArrayList<Map.Entry<Long, RootProcedureState>>();
407       }
408       corrupted.add(entry);
409     }
410 
411     // 4. Push the runnables
412     if (runnableSet != null) {
413       // TODO: See ProcedureWALFormatReader.readInitEntry() some procedure
414       // may be started way before this stuff.
415       for (Procedure proc: runnableSet) {
416         if (!proc.hasParent()) {
417           sendProcedureLoadedNotification(proc.getProcId());
418         }
419         runnables.addBack(proc);
420       }
421     }
422     return corrupted;
423   }
424 
425   public void start(int numThreads) throws IOException {
426     if (running.getAndSet(true)) {
427       LOG.warn("Already running");
428       return;
429     }
430 
431     // We have numThreads executor + one timer thread used for timing out
432     // procedures and triggering periodic procedures.
433     threads = new Thread[numThreads + 1];
434     LOG.info("Starting procedure executor threads=" + threads.length);
435 
436     // Initialize procedures executor
437     for (int i = 0; i < numThreads; ++i) {
438       threads[i] = new Thread("ProcedureExecutorThread-" + i) {
439         @Override
440         public void run() {
441           execLoop();
442         }
443       };
444     }
445 
446     // Initialize procedures timeout handler (this is the +1 thread)
447     threads[numThreads] = new Thread("ProcedureExecutorTimeout") {
448       @Override
449       public void run() {
450         timeoutLoop();
451       }
452     };
453 
454     // Acquire the store lease.
455     store.recoverLease();
456 
457     // TODO: Split in two steps.
458     // TODO: Handle corrupted procedure returned (probably just a WARN)
459     // The first one will make sure that we have the latest id,
460     // so we can start the threads and accept new procedures.
461     // The second step will do the actual load of old procedures.
462     load();
463 
464     // Start the executors. Here we must have the lastProcId set.
465     for (int i = 0; i < threads.length; ++i) {
466       threads[i].start();
467     }
468 
469     // Add completed cleaner
470     waitingTimeout.add(
471       new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
472   }
473 
474   public void stop() {
475     if (!running.getAndSet(false)) {
476       return;
477     }
478 
479     LOG.info("Stopping the procedure executor");
480     runnables.signalAll();
481     waitingTimeout.signalAll();
482   }
483 
484   public void join() {
485     boolean interrupted = false;
486 
487     for (int i = 0; i < threads.length; ++i) {
488       try {
489         threads[i].join();
490       } catch (InterruptedException ex) {
491         interrupted = true;
492       }
493     }
494 
495     if (interrupted) {
496       Thread.currentThread().interrupt();
497     }
498 
499     completed.clear();
500     rollbackStack.clear();
501     procedures.clear();
502     nonceKeysToProcIdsMap.clear();
503     waitingTimeout.clear();
504     runnables.clear();
505     lastProcId.set(-1);
506   }
507 
508   public boolean isRunning() {
509     return running.get();
510   }
511 
512   /**
513    * @return the number of execution threads.
514    */
515   public int getNumThreads() {
516     return threads == null ? 0 : (threads.length - 1);
517   }
518 
519   public int getActiveExecutorCount() {
520     return activeExecutorCount.get();
521   }
522 
523   public TEnvironment getEnvironment() {
524     return this.environment;
525   }
526 
527   public ProcedureStore getStore() {
528     return this.store;
529   }
530 
531   public void registerListener(ProcedureExecutorListener listener) {
532     this.listeners.add(listener);
533   }
534 
535   public boolean unregisterListener(ProcedureExecutorListener listener) {
536     return this.listeners.remove(listener);
537   }
538 
539   /**
540    * List procedures.
541    * @return the procedures in a list
542    */
543   public List<ProcedureInfo> listProcedures() {
544     List<ProcedureInfo> procedureLists =
545         new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
546     for (java.util.Map.Entry<Long, Procedure> p: procedures.entrySet()) {
547       procedureLists.add(Procedure.createProcedureInfo(p.getValue(), null));
548     }
549     for (java.util.Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
550       // Note: The procedure could show up twice in the list with different state, as
551       // it could complete after we walk through procedures list and insert into
552       // procedureList - it is ok, as we will use the information in the ProcedureInfo
553       // to figure it out; to prevent this would increase the complexity of the logic.
554       procedureLists.add(e.getValue());
555     }
556     return procedureLists;
557   }
558 
559   /**
560    * Add a new root-procedure to the executor.
561    * @param proc the new procedure to execute.
562    * @return the procedure id, that can be used to monitor the operation
563    */
564   public long submitProcedure(final Procedure proc) {
565     return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
566   }
567 
568   /**
569    * Add a new root-procedure to the executor.
570    * @param proc the new procedure to execute.
571    * @param nonceGroup
572    * @param nonce
573    * @return the procedure id, that can be used to monitor the operation
574    */
575   public long submitProcedure(
576       final Procedure proc,
577       final long nonceGroup,
578       final long nonce) {
579     Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
580     Preconditions.checkArgument(isRunning());
581     Preconditions.checkArgument(lastProcId.get() >= 0);
582     Preconditions.checkArgument(!proc.hasParent());
583 
584     Long currentProcId;
585 
586     // The following part of the code has to be synchronized to prevent multiple request
587     // with the same nonce to execute at the same time.
588     synchronized (this) {
589       // Check whether the proc exists.  If exist, just return the proc id.
590       // This is to prevent the same proc to submit multiple times (it could happen
591       // when client could not talk to server and resubmit the same request).
592       NonceKey noncekey = null;
593       if (nonce != HConstants.NO_NONCE) {
594         noncekey = new NonceKey(nonceGroup, nonce);
595         currentProcId = nonceKeysToProcIdsMap.get(noncekey);
596         if (currentProcId != null) {
597           // Found the proc
598           return currentProcId;
599         }
600       }
601 
602       // Initialize the Procedure ID
603       currentProcId = nextProcId();
604       proc.setProcId(currentProcId);
605 
606       // This is new procedure. Set the noncekey and insert into the map.
607       if (noncekey != null) {
608         proc.setNonceKey(noncekey);
609         nonceKeysToProcIdsMap.put(noncekey, currentProcId);
610       }
611     } // end of synchronized (this)
612 
613     // Commit the transaction
614     store.insert(proc, null);
615     if (LOG.isDebugEnabled()) {
616       LOG.debug("Procedure " + proc + " added to the store.");
617     }
618 
619     // Create the rollback stack for the procedure
620     RootProcedureState stack = new RootProcedureState();
621     rollbackStack.put(currentProcId, stack);
622 
623     // Submit the new subprocedures
624     assert !procedures.containsKey(currentProcId);
625     procedures.put(currentProcId, proc);
626     sendProcedureAddedNotification(currentProcId);
627     runnables.addBack(proc);
628     return currentProcId;
629   }
630 
631   public ProcedureInfo getResult(final long procId) {
632     return completed.get(procId);
633   }
634 
635   /**
636    * Return true if the procedure is finished.
637    * The state may be "completed successfully" or "failed and rolledback".
638    * Use getResult() to check the state or get the result data.
639    * @param procId the ID of the procedure to check
640    * @return true if the procedure execution is finished, otherwise false.
641    */
642   public boolean isFinished(final long procId) {
643     return completed.containsKey(procId);
644   }
645 
646   /**
647    * Return true if the procedure is started.
648    * @param procId the ID of the procedure to check
649    * @return true if the procedure execution is started, otherwise false.
650    */
651   public boolean isStarted(final long procId) {
652     Procedure proc = procedures.get(procId);
653     if (proc == null) {
654       return completed.get(procId) != null;
655     }
656     return proc.wasExecuted();
657   }
658 
659   /**
660    * Mark the specified completed procedure, as ready to remove.
661    * @param procId the ID of the procedure to remove
662    */
663   public void removeResult(final long procId) {
664     ProcedureInfo result = completed.get(procId);
665     if (result == null) {
666       assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
667       if (LOG.isDebugEnabled()) {
668         LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
669       }
670       return;
671     }
672 
673     // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
674     result.setClientAckTime(EnvironmentEdgeManager.currentTime());
675   }
676 
677   /**
678    * Send an abort notification the specified procedure.
679    * Depending on the procedure implementation the abort can be considered or ignored.
680    * @param procId the procedure to abort
681    * @return true if the procedure exist and has received the abort, otherwise false.
682    */
683   public boolean abort(final long procId) {
684     return abort(procId, true);
685   }
686 
687   /**
688    * Send an abort notification the specified procedure.
689    * Depending on the procedure implementation the abort can be considered or ignored.
690    * @param procId the procedure to abort
691    * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
692    * @return true if the procedure exist and has received the abort, otherwise false.
693    */
694   public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
695     Procedure proc = procedures.get(procId);
696     if (proc != null) {
697       if (!mayInterruptIfRunning && proc.wasExecuted()) {
698         return false;
699       } else {
700         return proc.abort(getEnvironment());
701       }
702     }
703     return false;
704   }
705 
706   /**
707    * Check if the user is this procedure's owner
708    * @param procId the target procedure
709    * @param user the user
710    * @return true if the user is the owner of the procedure,
711    *   false otherwise or the owner is unknown.
712    */
713   public boolean isProcedureOwner(final long procId, final User user) {
714     if (user == null) {
715       return false;
716     }
717 
718     Procedure proc = procedures.get(procId);
719     if (proc != null) {
720       return proc.getOwner().equals(user.getShortName());
721     }
722     ProcedureInfo procInfo = completed.get(procId);
723     if (procInfo == null) {
724       // Procedure either does not exist or has already completed and got cleaned up.
725       // At this time, we cannot check the owner of the procedure
726       return false;
727     }
728     return ProcedureInfo.isProcedureOwner(procInfo, user);
729   }
730 
731   public Map<Long, ProcedureInfo> getResults() {
732     return Collections.unmodifiableMap(completed);
733   }
734 
735   public Procedure getProcedure(final long procId) {
736     return procedures.get(procId);
737   }
738 
739   protected ProcedureRunnableSet getRunnableSet() {
740     return runnables;
741   }
742 
743   /**
744    * Execution loop (N threads)
745    * while the executor is in a running state,
746    * fetch a procedure from the runnables queue and start the execution.
747    */
748   private void execLoop() {
749     while (isRunning()) {
750       Long procId = runnables.poll();
751       Procedure proc = procId != null ? procedures.get(procId) : null;
752       if (proc == null) continue;
753 
754       try {
755         activeExecutorCount.incrementAndGet();
756         execLoop(proc);
757       } finally {
758         activeExecutorCount.decrementAndGet();
759       }
760     }
761   }
762 
763   private void execLoop(Procedure proc) {
764     if (LOG.isTraceEnabled()) {
765       LOG.trace("Trying to start the execution of " + proc);
766     }
767 
768     Long rootProcId = getRootProcedureId(proc);
769     if (rootProcId == null) {
770       // The 'proc' was ready to run but the root procedure was rolledback
771       executeRollback(proc);
772       return;
773     }
774 
775     RootProcedureState procStack = rollbackStack.get(rootProcId);
776     if (procStack == null) return;
777 
778     do {
779       // Try to acquire the execution
780       if (!procStack.acquire(proc)) {
781         if (procStack.setRollback()) {
782           // we have the 'rollback-lock' we can start rollingback
783           if (!executeRollback(rootProcId, procStack)) {
784             procStack.unsetRollback();
785             runnables.yield(proc);
786           }
787         } else {
788           // if we can't rollback means that some child is still running.
789           // the rollback will be executed after all the children are done.
790           // If the procedure was never executed, remove and mark it as rolledback.
791           if (!proc.wasExecuted()) {
792             if (!executeRollback(proc)) {
793               runnables.yield(proc);
794             }
795           }
796         }
797         break;
798       }
799 
800       // Execute the procedure
801       assert proc.getState() == ProcedureState.RUNNABLE;
802       if (proc.acquireLock(getEnvironment())) {
803         execProcedure(procStack, proc);
804         proc.releaseLock(getEnvironment());
805       } else {
806         runnables.yield(proc);
807       }
808       procStack.release(proc);
809 
810       // allows to kill the executor before something is stored to the wal.
811       // useful to test the procedure recovery.
812       if (testing != null && !isRunning()) {
813         break;
814       }
815 
816       if (proc.isSuccess()) {
817         if (LOG.isDebugEnabled()) {
818           LOG.debug("Procedure completed in " +
819               StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
820         }
821         // Finalize the procedure state
822         if (proc.getProcId() == rootProcId) {
823           procedureFinished(proc);
824         }
825         break;
826       }
827     } while (procStack.isFailed());
828   }
829 
830   private void timeoutLoop() {
831     while (isRunning()) {
832       Procedure proc = waitingTimeout.poll();
833       if (proc == null) continue;
834 
835       if (proc.getTimeRemaining() > 100) {
836         // got an early wake, maybe a stop?
837         // re-enqueue the task in case was not a stop or just a signal
838         waitingTimeout.add(proc);
839         continue;
840       }
841 
842       // ----------------------------------------------------------------------------
843       // TODO-MAYBE: Should we provide a notification to the store with the
844       // full set of procedures pending and completed to write a compacted
845       // version of the log (in case is a log)?
846       // In theory no, procedures are have a short life, so at some point the store
847       // will have the tracker saying everything is in the last log.
848       // ----------------------------------------------------------------------------
849 
850       // The CompletedProcedureCleaner is a special case, and it acts as a chore.
851       // instead of bringing the Chore class in, we reuse this timeout thread for
852       // this special case.
853       if (proc instanceof CompletedProcedureCleaner) {
854         try {
855           ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
856         } catch (Throwable e) {
857           LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
858         }
859         proc.setStartTime(EnvironmentEdgeManager.currentTime());
860         waitingTimeout.add(proc);
861         continue;
862       }
863 
864       // The procedure received an "abort-timeout", call abort() and
865       // add the procedure back in the queue for rollback.
866       if (proc.setTimeoutFailure()) {
867         long rootProcId = Procedure.getRootProcedureId(procedures, proc);
868         RootProcedureState procStack = rollbackStack.get(rootProcId);
869         procStack.abort();
870         store.update(proc);
871         runnables.addFront(proc);
872         continue;
873       }
874     }
875   }
876 
877   /**
878    * Execute the rollback of the full procedure stack.
879    * Once the procedure is rolledback, the root-procedure will be visible as
880    * finished to user, and the result will be the fatal exception.
881    */
882   private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
883     Procedure rootProc = procedures.get(rootProcId);
884     RemoteProcedureException exception = rootProc.getException();
885     if (exception == null) {
886       exception = procStack.getException();
887       rootProc.setFailure(exception);
888       store.update(rootProc);
889     }
890 
891     List<Procedure> subprocStack = procStack.getSubprocedures();
892     assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
893 
894     int stackTail = subprocStack.size();
895     boolean reuseLock = false;
896     while (stackTail --> 0) {
897       final Procedure proc = subprocStack.get(stackTail);
898 
899       if (!reuseLock && !proc.acquireLock(getEnvironment())) {
900         // can't take a lock on the procedure, add the root-proc back on the
901         // queue waiting for the lock availability
902         return false;
903       }
904 
905       boolean abortRollback = !executeRollback(proc);
906       abortRollback |= !isRunning() || !store.isRunning();
907 
908       // If the next procedure is the same to this one
909       // (e.g. StateMachineProcedure reuse the same instance)
910       // we can avoid to lock/unlock each step
911       reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
912       if (!reuseLock) {
913         proc.releaseLock(getEnvironment());
914       }
915 
916       // allows to kill the executor before something is stored to the wal.
917       // useful to test the procedure recovery.
918       if (abortRollback) {
919         return false;
920       }
921 
922       subprocStack.remove(stackTail);
923     }
924 
925     // Finalize the procedure state
926     LOG.info("Rolledback procedure " + rootProc +
927              " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
928              " exception=" + exception.getMessage());
929     procedureFinished(rootProc);
930     return true;
931   }
932 
933   /**
934    * Execute the rollback of the procedure step.
935    * It updates the store with the new state (stack index)
936    * or will remove completly the procedure in case it is a child.
937    */
938   private boolean executeRollback(final Procedure proc) {
939     try {
940       proc.doRollback(getEnvironment());
941     } catch (IOException e) {
942       if (LOG.isDebugEnabled()) {
943         LOG.debug("rollback attempt failed for " + proc, e);
944       }
945       return false;
946     } catch (Throwable e) {
947       // Catch NullPointerExceptions or similar errors...
948       LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
949     }
950 
951     // allows to kill the executor before something is stored to the wal.
952     // useful to test the procedure recovery.
953     if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
954       if (LOG.isDebugEnabled()) {
955         LOG.debug("TESTING: Kill before store update");
956       }
957       stop();
958       return false;
959     }
960 
961     if (proc.removeStackIndex()) {
962       proc.setState(ProcedureState.ROLLEDBACK);
963       if (proc.hasParent()) {
964         store.delete(proc.getProcId());
965         procedures.remove(proc.getProcId());
966       } else {
967         store.update(proc);
968       }
969     } else {
970       store.update(proc);
971     }
972     return true;
973   }
974 
975   /**
976    * Executes the specified procedure
977    *  - calls the doExecute() of the procedure
978    *  - if the procedure execution didn't fail (e.g. invalid user input)
979    *     - ...and returned subprocedures
980    *        - the subprocedures are initialized.
981    *        - the subprocedures are added to the store
982    *        - the subprocedures are added to the runnable queue
983    *        - the procedure is now in a WAITING state, waiting for the subprocedures to complete
984    *     - ...if there are no subprocedure
985    *        - the procedure completed successfully
986    *        - if there is a parent (WAITING)
987    *            - the parent state will be set to RUNNABLE
988    *  - in case of failure
989    *    - the store is updated with the new state
990    *    - the executor (caller of this method) will start the rollback of the procedure
991    */
992   private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
993     Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
994 
995     // Execute the procedure
996     boolean reExecute = false;
997     Procedure[] subprocs = null;
998     do {
999       reExecute = false;
1000       try {
1001         subprocs = procedure.doExecute(getEnvironment());
1002         if (subprocs != null && subprocs.length == 0) {
1003           subprocs = null;
1004         }
1005       } catch (ProcedureYieldException e) {
1006         if (LOG.isTraceEnabled()) {
1007           LOG.trace("Yield procedure: " + procedure);
1008         }
1009         runnables.yield(procedure);
1010         return;
1011       } catch (Throwable e) {
1012         // Catch NullPointerExceptions or similar errors...
1013         String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
1014         LOG.error(msg, e);
1015         procedure.setFailure(new RemoteProcedureException(msg, e));
1016       }
1017 
1018       if (!procedure.isFailed()) {
1019         if (subprocs != null) {
1020           if (subprocs.length == 1 && subprocs[0] == procedure) {
1021             // quick-shortcut for a state machine like procedure
1022             subprocs = null;
1023             reExecute = true;
1024           } else {
1025             // yield the current procedure, and make the subprocedure runnable
1026             for (int i = 0; i < subprocs.length; ++i) {
1027               Procedure subproc = subprocs[i];
1028               if (subproc == null) {
1029                 String msg = "subproc[" + i + "] is null, aborting the procedure";
1030                 procedure.setFailure(new RemoteProcedureException(msg,
1031                   new IllegalArgumentIOException(msg)));
1032                 subprocs = null;
1033                 break;
1034               }
1035 
1036               assert subproc.getState() == ProcedureState.INITIALIZING;
1037               subproc.setParentProcId(procedure.getProcId());
1038               subproc.setProcId(nextProcId());
1039             }
1040 
1041             if (!procedure.isFailed()) {
1042               procedure.setChildrenLatch(subprocs.length);
1043               switch (procedure.getState()) {
1044                 case RUNNABLE:
1045                   procedure.setState(ProcedureState.WAITING);
1046                   break;
1047                 case WAITING_TIMEOUT:
1048                   waitingTimeout.add(procedure);
1049                   break;
1050                 default:
1051                   break;
1052               }
1053             }
1054           }
1055         } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1056           waitingTimeout.add(procedure);
1057         } else {
1058           // No subtask, so we are done
1059           procedure.setState(ProcedureState.FINISHED);
1060         }
1061       }
1062 
1063       // Add the procedure to the stack
1064       procStack.addRollbackStep(procedure);
1065 
1066       // allows to kill the executor before something is stored to the wal.
1067       // useful to test the procedure recovery.
1068       if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1069         if (LOG.isDebugEnabled()) {
1070           LOG.debug("TESTING: Kill before store update");
1071         }
1072         stop();
1073         return;
1074       }
1075 
1076       // Commit the transaction
1077       if (subprocs != null && !procedure.isFailed()) {
1078         if (LOG.isTraceEnabled()) {
1079           LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
1080         }
1081         store.insert(procedure, subprocs);
1082       } else {
1083         if (LOG.isTraceEnabled()) {
1084           LOG.trace("Store update " + procedure);
1085         }
1086         store.update(procedure);
1087       }
1088 
1089       // if the store is not running we are aborting
1090       if (!store.isRunning()) {
1091         return;
1092       }
1093 
1094       assert (reExecute && subprocs == null) || !reExecute;
1095     } while (reExecute);
1096 
1097     // Submit the new subprocedures
1098     if (subprocs != null && !procedure.isFailed()) {
1099       for (int i = 0; i < subprocs.length; ++i) {
1100         Procedure subproc = subprocs[i];
1101         assert !procedures.containsKey(subproc.getProcId());
1102         procedures.put(subproc.getProcId(), subproc);
1103         runnables.addFront(subproc);
1104       }
1105     }
1106 
1107     if (procedure.isFinished() && procedure.hasParent()) {
1108       Procedure parent = procedures.get(procedure.getParentProcId());
1109       if (parent == null) {
1110         assert procStack.isRollingback();
1111         return;
1112       }
1113 
1114       // If this procedure is the last child awake the parent procedure
1115       if (LOG.isTraceEnabled()) {
1116         LOG.trace(parent + " child is done: " + procedure);
1117       }
1118       if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
1119         parent.setState(ProcedureState.RUNNABLE);
1120         store.update(parent);
1121         runnables.addFront(parent);
1122         if (LOG.isTraceEnabled()) {
1123           LOG.trace(parent + " all the children finished their work, resume.");
1124         }
1125         return;
1126       }
1127     }
1128   }
1129 
1130   private void sendProcedureLoadedNotification(final long procId) {
1131     if (!this.listeners.isEmpty()) {
1132       for (ProcedureExecutorListener listener: this.listeners) {
1133         try {
1134           listener.procedureLoaded(procId);
1135         } catch (Throwable e) {
1136           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1137         }
1138       }
1139     }
1140   }
1141 
1142   private void sendProcedureAddedNotification(final long procId) {
1143     if (!this.listeners.isEmpty()) {
1144       for (ProcedureExecutorListener listener: this.listeners) {
1145         try {
1146           listener.procedureAdded(procId);
1147         } catch (Throwable e) {
1148           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1149         }
1150       }
1151     }
1152   }
1153 
1154   private void sendProcedureFinishedNotification(final long procId) {
1155     if (!this.listeners.isEmpty()) {
1156       for (ProcedureExecutorListener listener: this.listeners) {
1157         try {
1158           listener.procedureFinished(procId);
1159         } catch (Throwable e) {
1160           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1161         }
1162       }
1163     }
1164   }
1165 
1166   private long nextProcId() {
1167     long procId = lastProcId.incrementAndGet();
1168     if (procId < 0) {
1169       while (!lastProcId.compareAndSet(procId, 0)) {
1170         procId = lastProcId.get();
1171         if (procId >= 0)
1172           break;
1173       }
1174       while (procedures.containsKey(procId)) {
1175         procId = lastProcId.incrementAndGet();
1176       }
1177     }
1178     return procId;
1179   }
1180 
1181   private Long getRootProcedureId(Procedure proc) {
1182     return Procedure.getRootProcedureId(procedures, proc);
1183   }
1184 
1185   private void procedureFinished(final Procedure proc) {
1186     // call the procedure completion cleanup handler
1187     try {
1188       proc.completionCleanup(getEnvironment());
1189     } catch (Throwable e) {
1190       // Catch NullPointerExceptions or similar errors...
1191       LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1192     }
1193 
1194     // update the executor internal state maps
1195     completed.put(proc.getProcId(), Procedure.createProcedureInfo(proc, proc.getNonceKey()));
1196     rollbackStack.remove(proc.getProcId());
1197     procedures.remove(proc.getProcId());
1198 
1199     // call the runnableSet completion cleanup handler
1200     try {
1201       runnables.completionCleanup(proc);
1202     } catch (Throwable e) {
1203       // Catch NullPointerExceptions or similar errors...
1204       LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
1205     }
1206 
1207     // Notify the listeners
1208     sendProcedureFinishedNotification(proc.getProcId());
1209   }
1210 
1211   public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long procId) {
1212     ProcedureInfo result = completed.get(procId);
1213     Procedure proc = null;
1214     if (result == null) {
1215       proc = procedures.get(procId);
1216       if (proc == null) {
1217         result = completed.get(procId);
1218       }
1219     }
1220     return new Pair(result, proc);
1221   }
1222 }