View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Collections;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.HashSet;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  import java.util.concurrent.locks.ReentrantLock;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.ProcedureInfo;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.classification.InterfaceStability;
46  import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
47  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
48  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
49  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
50  import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
51  import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
52  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
53  import org.apache.hadoop.hbase.security.User;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
56  import org.apache.hadoop.hbase.util.NonceKey;
57  import org.apache.hadoop.hbase.util.Pair;
58  import org.apache.hadoop.hbase.util.Threads;
59  
60  import com.google.common.base.Preconditions;
61  
62  /**
63   * Thread Pool that executes the submitted procedures.
64   * The executor has a ProcedureStore associated.
65   * Each operation is logged and on restart the pending procedures are resumed.
66   *
67   * Unless the Procedure code throws an error (e.g. invalid user input)
68   * the procedure will complete (at some point in time), On restart the pending
69   * procedures are resumed and the once failed will be rolledback.
70   *
71   * The user can add procedures to the executor via submitProcedure(proc)
72   * check for the finished state via isFinished(procId)
73   * and get the result via getResult(procId)
74   */
75  @InterfaceAudience.Private
76  @InterfaceStability.Evolving
77  public class ProcedureExecutor<TEnvironment> {
78    private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
79  
80    Testing testing = null;
81    public static class Testing {
82      protected boolean killBeforeStoreUpdate = false;
83      protected boolean toggleKillBeforeStoreUpdate = false;
84  
85      protected boolean shouldKillBeforeStoreUpdate() {
86        final boolean kill = this.killBeforeStoreUpdate;
87        if (this.toggleKillBeforeStoreUpdate) {
88          this.killBeforeStoreUpdate = !kill;
89          LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
90        }
91        return kill;
92      }
93    }
94  
95    public interface ProcedureExecutorListener {
96      void procedureLoaded(long procId);
97      void procedureAdded(long procId);
98      void procedureFinished(long procId);
99    }
100 
101   /**
102    * Used by the TimeoutBlockingQueue to get the timeout interval of the procedure
103    */
104   private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
105     @Override
106     public long getTimeout(Procedure proc) {
107       return proc.getTimeRemaining();
108     }
109 
110     @Override
111     public TimeUnit getTimeUnit(Procedure proc) {
112       return TimeUnit.MILLISECONDS;
113     }
114   }
115 
116   /**
117    * Internal cleaner that removes the completed procedure results after a TTL.
118    * NOTE: This is a special case handled in timeoutLoop().
119    *
120    * Since the client code looks more or less like:
121    *   procId = master.doOperation()
122    *   while (master.getProcResult(procId) == ProcInProgress);
123    * The master should not throw away the proc result as soon as the procedure is done
124    * but should wait a result request from the client (see executor.removeResult(procId))
125    * The client will call something like master.isProcDone() or master.getProcResult()
126    * which will return the result/state to the client, and it will mark the completed
127    * proc as ready to delete. note that the client may not receive the response from
128    * the master (e.g. master failover) so, if we delay a bit the real deletion of
129    * the proc result the client will be able to get the result the next try.
130    */
131   private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
132     private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
133 
134     private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
135     private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
136 
137     private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
138     private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
139 
140     private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
141     private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
142 
143     private final Map<Long, ProcedureInfo> completed;
144     private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
145     private final ProcedureStore store;
146     private final Configuration conf;
147 
148     public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
149         final Map<Long, ProcedureInfo> completedMap,
150         final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
151       // set the timeout interval that triggers the periodic-procedure
152       setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
153       this.completed = completedMap;
154       this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
155       this.store = store;
156       this.conf = conf;
157     }
158 
159     public void periodicExecute(final TEnvironment env) {
160       if (completed.isEmpty()) {
161         if (LOG.isTraceEnabled()) {
162           LOG.trace("No completed procedures to cleanup.");
163         }
164         return;
165       }
166 
167       final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
168       final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
169 
170       final long now = EnvironmentEdgeManager.currentTime();
171       final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
172       final boolean isDebugEnabled = LOG.isDebugEnabled();
173       while (it.hasNext() && store.isRunning()) {
174         final Map.Entry<Long, ProcedureInfo> entry = it.next();
175         final ProcedureInfo procInfo = entry.getValue();
176 
177         // TODO: Select TTL based on Procedure type
178         if ((procInfo.hasClientAckTime() && (now - procInfo.getClientAckTime()) >= evictAckTtl) ||
179             (now - procInfo.getLastUpdate()) >= evictTtl) {
180           if (isDebugEnabled) {
181             LOG.debug("Evict completed procedure: " + procInfo);
182           }
183           store.delete(entry.getKey());
184           it.remove();
185 
186           NonceKey nonceKey = procInfo.getNonceKey();
187           if (nonceKey != null) {
188             nonceKeysToProcIdsMap.remove(nonceKey);
189           }
190         }
191       }
192     }
193 
194     @Override
195     protected Procedure[] execute(final TEnvironment env) {
196       throw new UnsupportedOperationException();
197     }
198 
199     @Override
200     protected void rollback(final TEnvironment env) {
201       throw new UnsupportedOperationException();
202     }
203 
204     @Override
205     protected boolean abort(final TEnvironment env) {
206       throw new UnsupportedOperationException();
207     }
208 
209     @Override
210     public void serializeStateData(final OutputStream stream) {
211       throw new UnsupportedOperationException();
212     }
213 
214     @Override
215     public void deserializeStateData(final InputStream stream) {
216       throw new UnsupportedOperationException();
217     }
218   }
219 
220   /**
221    * Map the the procId returned by submitProcedure(), the Root-ProcID, to the ProcedureInfo.
222    * Once a Root-Procedure completes (success or failure), the result will be added to this map.
223    * The user of ProcedureExecutor should call getResult(procId) to get the result.
224    */
225   private final ConcurrentHashMap<Long, ProcedureInfo> completed =
226     new ConcurrentHashMap<Long, ProcedureInfo>();
227 
228   /**
229    * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
230    * The RootProcedureState contains the execution stack of the Root-Procedure,
231    * It is added to the map by submitProcedure() and removed on procedure completion.
232    */
233   private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack =
234     new ConcurrentHashMap<Long, RootProcedureState>();
235 
236   /**
237    * Helper map to lookup the live procedures by ID.
238    * This map contains every procedure. root-procedures and subprocedures.
239    */
240   private final ConcurrentHashMap<Long, Procedure> procedures =
241     new ConcurrentHashMap<Long, Procedure>();
242 
243   /**
244    * Helper map to lookup whether the procedure already issued from the same client.
245    * This map contains every root procedure.
246    */
247   private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
248       new ConcurrentHashMap<NonceKey, Long>();
249 
250   /**
251    * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
252    * or periodic procedures.
253    */
254   private final TimeoutBlockingQueue<Procedure> waitingTimeout =
255     new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
256 
257   /**
258    * Queue that contains runnable procedures.
259    */
260   private final ProcedureRunnableSet runnables;
261 
262   // TODO
263   private final ReentrantLock submitLock = new ReentrantLock();
264   private final AtomicLong lastProcId = new AtomicLong(-1);
265 
266   private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
267     new CopyOnWriteArrayList<ProcedureExecutorListener>();
268 
269   private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
270   private final AtomicBoolean running = new AtomicBoolean(false);
271   private final TEnvironment environment;
272   private final ProcedureStore store;
273   private final Configuration conf;
274 
275   private Thread[] threads;
276 
277   public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
278       final ProcedureStore store) {
279     this(conf, environment, store, new ProcedureSimpleRunQueue());
280   }
281 
282   public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
283       final ProcedureStore store, final ProcedureRunnableSet runqueue) {
284     this.environment = environment;
285     this.runnables = runqueue;
286     this.store = store;
287     this.conf = conf;
288   }
289 
290   private void load(final boolean abortOnCorruption) throws IOException {
291     Preconditions.checkArgument(completed.isEmpty());
292     Preconditions.checkArgument(rollbackStack.isEmpty());
293     Preconditions.checkArgument(procedures.isEmpty());
294     Preconditions.checkArgument(waitingTimeout.isEmpty());
295     Preconditions.checkArgument(runnables.size() == 0);
296 
297     store.load(new ProcedureStore.ProcedureLoader() {
298       @Override
299       public void setMaxProcId(long maxProcId) {
300         assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
301         LOG.debug("load procedures maxProcId=" + maxProcId);
302         lastProcId.set(maxProcId);
303       }
304 
305       @Override
306       public void load(ProcedureIterator procIter) throws IOException {
307         loadProcedures(procIter, abortOnCorruption);
308       }
309 
310       @Override
311       public void handleCorrupted(ProcedureIterator procIter) throws IOException {
312         int corruptedCount = 0;
313         while (procIter.hasNext()) {
314           ProcedureInfo proc = procIter.nextAsProcedureInfo();
315           LOG.error("corrupted procedure: " + proc);
316           corruptedCount++;
317         }
318         if (abortOnCorruption && corruptedCount > 0) {
319           throw new IOException("found " + corruptedCount + " procedures on replay");
320         }
321       }
322     });
323   }
324 
325   private void loadProcedures(final ProcedureIterator procIter,
326       final boolean abortOnCorruption) throws IOException {
327     final boolean isDebugEnabled = LOG.isDebugEnabled();
328 
329     // 1. Build the rollback stack
330     int runnablesCount = 0;
331     while (procIter.hasNext()) {
332       final NonceKey nonceKey;
333       final long procId;
334 
335       if (procIter.isNextCompleted()) {
336         ProcedureInfo proc = procIter.nextAsProcedureInfo();
337         nonceKey = proc.getNonceKey();
338         procId = proc.getProcId();
339         completed.put(proc.getProcId(), proc);
340         if (isDebugEnabled) {
341           LOG.debug("The procedure is completed: " + proc);
342         }
343       } else {
344         Procedure proc = procIter.nextAsProcedure();
345         nonceKey = proc.getNonceKey();
346         procId = proc.getProcId();
347 
348         if (!proc.hasParent()) {
349           assert !proc.isFinished() : "unexpected finished procedure";
350           rollbackStack.put(proc.getProcId(), new RootProcedureState());
351         }
352 
353         // add the procedure to the map
354         proc.beforeReplay(getEnvironment());
355         procedures.put(proc.getProcId(), proc);
356 
357         if (proc.getState() == ProcedureState.RUNNABLE) {
358           runnablesCount++;
359         }
360       }
361 
362       // add the nonce to the map
363       if (nonceKey != null) {
364         nonceKeysToProcIdsMap.put(nonceKey, procId);
365       }
366     }
367 
368     // 2. Initialize the stacks
369     ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
370     HashSet<Procedure> waitingSet = null;
371     procIter.reset();
372     while (procIter.hasNext()) {
373       if (procIter.isNextCompleted()) {
374         procIter.skipNext();
375         continue;
376       }
377 
378       Procedure proc = procIter.nextAsProcedure();
379       assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
380 
381       if (isDebugEnabled) {
382         LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
383                     proc.getState(), proc.hasException(), proc));
384       }
385 
386       Long rootProcId = getRootProcedureId(proc);
387       if (rootProcId == null) {
388         // The 'proc' was ready to run but the root procedure was rolledback?
389         runnables.addBack(proc);
390         continue;
391       }
392 
393       if (proc.hasParent() && !proc.isFinished()) {
394         Procedure parent = procedures.get(proc.getParentProcId());
395         // corrupted procedures are handled later at step 3
396         if (parent != null) {
397           parent.incChildrenLatch();
398         }
399       }
400 
401       RootProcedureState procStack = rollbackStack.get(rootProcId);
402       procStack.loadStack(proc);
403 
404       switch (proc.getState()) {
405         case RUNNABLE:
406           runnableList.add(proc);
407           break;
408         case WAITING_TIMEOUT:
409           if (waitingSet == null) {
410             waitingSet = new HashSet<Procedure>();
411           }
412           waitingSet.add(proc);
413           break;
414         case FINISHED:
415           if (proc.hasException()) {
416             // add the proc to the runnables to perform the rollback
417             runnables.addBack(proc);
418             break;
419           }
420         case ROLLEDBACK:
421         case INITIALIZING:
422           String msg = "Unexpected " + proc.getState() + " state for " + proc;
423           LOG.error(msg);
424           throw new UnsupportedOperationException(msg);
425         default:
426           break;
427       }
428     }
429 
430     // 3. Validate the stacks
431     int corruptedCount = 0;
432     Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
433     while (itStack.hasNext()) {
434       Map.Entry<Long, RootProcedureState> entry = itStack.next();
435       RootProcedureState procStack = entry.getValue();
436       if (procStack.isValid()) continue;
437 
438       for (Procedure proc: procStack.getSubprocedures()) {
439         LOG.error("corrupted procedure: " + proc);
440         procedures.remove(proc.getProcId());
441         runnableList.remove(proc);
442         if (waitingSet != null) waitingSet.remove(proc);
443         corruptedCount++;
444       }
445       itStack.remove();
446     }
447 
448     if (abortOnCorruption && corruptedCount > 0) {
449       throw new IOException("found " + corruptedCount + " procedures on replay");
450     }
451 
452     // 4. Push the runnables
453     if (!runnableList.isEmpty()) {
454       // TODO: See ProcedureWALFormatReader#hasFastStartSupport
455       // some procedure may be started way before this stuff.
456       for (int i = runnableList.size() - 1; i >= 0; --i) {
457         Procedure proc = runnableList.get(i);
458         if (!proc.hasParent()) {
459           sendProcedureLoadedNotification(proc.getProcId());
460         }
461         if (proc.wasExecuted()) {
462           runnables.addFront(proc);
463         } else {
464           // if it was not in execution, it can wait.
465           runnables.addBack(proc);
466         }
467       }
468     }
469   }
470 
471   /**
472    * Start the procedure executor.
473    * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to
474    * recover the lease, and ensure a single executor, and start the procedure
475    * replay to resume and recover the previous pending and in-progress perocedures.
476    *
477    * @param numThreads number of threads available for procedure execution.
478    * @param abortOnCorruption true if you want to abort your service in case
479    *          a corrupted procedure is found on replay. otherwise false.
480    */
481   public void start(int numThreads, boolean abortOnCorruption) throws IOException {
482     if (running.getAndSet(true)) {
483       LOG.warn("Already running");
484       return;
485     }
486 
487     // We have numThreads executor + one timer thread used for timing out
488     // procedures and triggering periodic procedures.
489     threads = new Thread[numThreads + 1];
490     LOG.info("Starting procedure executor threads=" + threads.length);
491 
492     // Initialize procedures executor
493     for (int i = 0; i < numThreads; ++i) {
494       threads[i] = new Thread("ProcedureExecutor-" + i) {
495         @Override
496         public void run() {
497           execLoop();
498         }
499       };
500     }
501 
502     // Initialize procedures timeout handler (this is the +1 thread)
503     threads[numThreads] = new Thread("ProcedureExecutorTimeout") {
504       @Override
505       public void run() {
506         timeoutLoop();
507       }
508     };
509 
510     // Acquire the store lease.
511     store.recoverLease();
512 
513     // TODO: Split in two steps.
514     // TODO: Handle corrupted procedures (currently just a warn)
515     // The first one will make sure that we have the latest id,
516     // so we can start the threads and accept new procedures.
517     // The second step will do the actual load of old procedures.
518     load(abortOnCorruption);
519 
520     // Start the executors. Here we must have the lastProcId set.
521     for (int i = 0; i < threads.length; ++i) {
522       threads[i].start();
523     }
524 
525     // Add completed cleaner
526     waitingTimeout.add(
527       new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
528   }
529 
530   public void stop() {
531     if (!running.getAndSet(false)) {
532       return;
533     }
534 
535     LOG.info("Stopping the procedure executor");
536     runnables.signalAll();
537     waitingTimeout.signalAll();
538   }
539 
540   public void join() {
541     boolean interrupted = false;
542 
543     for (int i = 0; i < threads.length; ++i) {
544       try {
545         threads[i].join();
546       } catch (InterruptedException ex) {
547         interrupted = true;
548       }
549     }
550 
551     if (interrupted) {
552       Thread.currentThread().interrupt();
553     }
554 
555     completed.clear();
556     rollbackStack.clear();
557     procedures.clear();
558     nonceKeysToProcIdsMap.clear();
559     waitingTimeout.clear();
560     runnables.clear();
561     lastProcId.set(-1);
562   }
563 
564   public boolean isRunning() {
565     return running.get();
566   }
567 
568   /**
569    * @return the number of execution threads.
570    */
571   public int getNumThreads() {
572     return threads == null ? 0 : (threads.length - 1);
573   }
574 
575   public int getActiveExecutorCount() {
576     return activeExecutorCount.get();
577   }
578 
579   public TEnvironment getEnvironment() {
580     return this.environment;
581   }
582 
583   public ProcedureStore getStore() {
584     return this.store;
585   }
586 
587   public void registerListener(ProcedureExecutorListener listener) {
588     this.listeners.add(listener);
589   }
590 
591   public boolean unregisterListener(ProcedureExecutorListener listener) {
592     return this.listeners.remove(listener);
593   }
594 
595   /**
596    * List procedures.
597    * @return the procedures in a list
598    */
599   public List<ProcedureInfo> listProcedures() {
600     List<ProcedureInfo> procedureLists =
601         new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
602     for (java.util.Map.Entry<Long, Procedure> p: procedures.entrySet()) {
603       procedureLists.add(Procedure.createProcedureInfo(p.getValue(), null));
604     }
605     for (java.util.Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
606       // Note: The procedure could show up twice in the list with different state, as
607       // it could complete after we walk through procedures list and insert into
608       // procedureList - it is ok, as we will use the information in the ProcedureInfo
609       // to figure it out; to prevent this would increase the complexity of the logic.
610       procedureLists.add(e.getValue());
611     }
612     return procedureLists;
613   }
614 
615   // ==========================================================================
616   //  Nonce Procedure helpers
617   // ==========================================================================
618   /**
619    * Create a NoneKey from the specified nonceGroup and nonce.
620    * @param nonceGroup
621    * @param nonce
622    * @return the generated NonceKey
623    */
624   public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
625     return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
626   }
627 
628   /**
629    * Register a nonce for a procedure that is going to be submitted.
630    * A procId will be reserved and on submitProcedure(),
631    * the procedure with the specified nonce will take the reserved ProcId.
632    * If someone already reserved the nonce, this method will return the procId reserved,
633    * otherwise an invalid procId will be returned. and the caller should procede
634    * and submit the procedure.
635    *
636    * @param nonceKey A unique identifier for this operation from the client or process.
637    * @return the procId associated with the nonce, if any otherwise an invalid procId.
638    */
639   public long registerNonce(final NonceKey nonceKey) {
640     if (nonceKey == null) return -1;
641 
642     // check if we have already a Reserved ID for the nonce
643     Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
644     if (oldProcId == null) {
645       // reserve a new Procedure ID, this will be associated with the nonce
646       // and the procedure submitted with the specified nonce will use this ID.
647       final long newProcId = nextProcId();
648       oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
649       if (oldProcId == null) return -1;
650     }
651 
652     // we found a registered nonce, but the procedure may not have been submitted yet.
653     // since the client expect the procedure to be submitted, spin here until it is.
654     final boolean isTraceEnabled = LOG.isTraceEnabled();
655     while (isRunning() &&
656            !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
657            nonceKeysToProcIdsMap.containsKey(nonceKey)) {
658       if (isTraceEnabled) {
659         LOG.trace("waiting for procId=" + oldProcId.longValue() + " to be submitted");
660       }
661       Threads.sleep(100);
662     }
663     return oldProcId.longValue();
664   }
665 
666   /**
667    * Remove the NonceKey if the procedure was not submitted to the executor.
668    * @param nonceKey A unique identifier for this operation from the client or process.
669    */
670   public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
671     if (nonceKey == null) return;
672 
673     final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
674     if (procId == null) return;
675 
676     // if the procedure was not submitted, remove the nonce
677     if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
678       nonceKeysToProcIdsMap.remove(nonceKey);
679     }
680   }
681 
682   /**
683    * If the failure failed before submitting it, we may want to give back the
684    * same error to the requests with the same nonceKey.
685    *
686    * @param nonceKey A unique identifier for this operation from the client or process
687    * @param procName name of the procedure, used to inform the user
688    * @param procOwner name of the owner of the procedure, used to inform the user
689    * @param exception the failure to report to the user
690    */
691   public void setFailureResultForNonce(final NonceKey nonceKey, final String procName,
692       final User procOwner, final IOException exception) {
693     if (nonceKey == null) return;
694 
695     final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
696     if (procId == null || completed.containsKey(procId)) return;
697 
698     final long currentTime = EnvironmentEdgeManager.currentTime();
699     final ProcedureInfo result = new ProcedureInfo(
700       procId.longValue(),
701       procName,
702       procOwner != null ? procOwner.getShortName() : null,
703       ProcedureState.ROLLEDBACK,
704       -1,
705       nonceKey,
706       ForeignExceptionUtil.toProtoForeignException("ProcedureExecutor", exception),
707       currentTime,
708       currentTime,
709       null);
710     completed.putIfAbsent(procId, result);
711   }
712 
713   // ==========================================================================
714   //  Submit/Abort Procedure
715   // ==========================================================================
716   /**
717 >>>>>>> ce33cf2... HBASE-17149 Procedure V2 - Fix nonce submission to avoid unnecessary calling coprocessor multiple times (Matteo Bertozzi)
718    * Add a new root-procedure to the executor.
719    * @param proc the new procedure to execute.
720    * @return the procedure id, that can be used to monitor the operation
721    */
722   public long submitProcedure(final Procedure proc) {
723     return submitProcedure(proc, null);
724   }
725 
726   /**
727    * Add a new root-procedure to the executor.
728    * @param proc the new procedure to execute.
729    * @param nonceKey the registered unique identifier for this operation from the client or process.
730    * @return the procedure id, that can be used to monitor the operation
731    */
732   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
733       justification = "FindBugs is blind to the check-for-null")
734   public long submitProcedure(final Procedure proc, final NonceKey nonceKey) {
735     Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
736     Preconditions.checkArgument(isRunning(), "executor not running");
737     Preconditions.checkArgument(lastProcId.get() >= 0);
738     Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
739 
740     final Long currentProcId;
741     if (nonceKey != null) {
742       currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
743       Preconditions.checkArgument(currentProcId != null,
744         "expected nonceKey=" + nonceKey + " to be reserved, use registerNonce()");
745     } else {
746       currentProcId = nextProcId();
747     }
748 
749     // Initialize the procedure
750     proc.setNonceKey(nonceKey);
751     proc.setProcId(currentProcId.longValue());
752 
753     // Commit the transaction
754     store.insert(proc, null);
755     if (LOG.isDebugEnabled()) {
756       LOG.debug("Procedure " + proc + " added to the store.");
757     }
758 
759     // Create the rollback stack for the procedure
760     RootProcedureState stack = new RootProcedureState();
761     rollbackStack.put(currentProcId, stack);
762 
763     // Submit the new subprocedures
764     assert !procedures.containsKey(currentProcId);
765     procedures.put(currentProcId, proc);
766     sendProcedureAddedNotification(currentProcId);
767     runnables.addBack(proc);
768     return currentProcId;
769   }
770 
771   public ProcedureInfo getResult(final long procId) {
772     return completed.get(procId);
773   }
774 
775   /**
776    * Return true if the procedure is finished.
777    * The state may be "completed successfully" or "failed and rolledback".
778    * Use getResult() to check the state or get the result data.
779    * @param procId the ID of the procedure to check
780    * @return true if the procedure execution is finished, otherwise false.
781    */
782   public boolean isFinished(final long procId) {
783     return completed.containsKey(procId);
784   }
785 
786   /**
787    * Return true if the procedure is started.
788    * @param procId the ID of the procedure to check
789    * @return true if the procedure execution is started, otherwise false.
790    */
791   public boolean isStarted(final long procId) {
792     Procedure proc = procedures.get(procId);
793     if (proc == null) {
794       return completed.get(procId) != null;
795     }
796     return proc.wasExecuted();
797   }
798 
799   /**
800    * Mark the specified completed procedure, as ready to remove.
801    * @param procId the ID of the procedure to remove
802    */
803   public void removeResult(final long procId) {
804     ProcedureInfo result = completed.get(procId);
805     if (result == null) {
806       assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
807       if (LOG.isDebugEnabled()) {
808         LOG.debug("Procedure procId=" + procId + " already removed by the cleaner.");
809       }
810       return;
811     }
812 
813     // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
814     result.setClientAckTime(EnvironmentEdgeManager.currentTime());
815   }
816 
817   /**
818    * Send an abort notification the specified procedure.
819    * Depending on the procedure implementation the abort can be considered or ignored.
820    * @param procId the procedure to abort
821    * @return true if the procedure exist and has received the abort, otherwise false.
822    */
823   public boolean abort(final long procId) {
824     return abort(procId, true);
825   }
826 
827   /**
828    * Send an abort notification the specified procedure.
829    * Depending on the procedure implementation the abort can be considered or ignored.
830    * @param procId the procedure to abort
831    * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
832    * @return true if the procedure exist and has received the abort, otherwise false.
833    */
834   public boolean abort(final long procId, final boolean mayInterruptIfRunning) {
835     Procedure proc = procedures.get(procId);
836     if (proc != null) {
837       if (!mayInterruptIfRunning && proc.wasExecuted()) {
838         return false;
839       } else {
840         return proc.abort(getEnvironment());
841       }
842     }
843     return false;
844   }
845 
846   /**
847    * Check if the user is this procedure's owner
848    * @param procId the target procedure
849    * @param user the user
850    * @return true if the user is the owner of the procedure,
851    *   false otherwise or the owner is unknown.
852    */
853   public boolean isProcedureOwner(final long procId, final User user) {
854     if (user == null) {
855       return false;
856     }
857 
858     Procedure proc = procedures.get(procId);
859     if (proc != null) {
860       return proc.getOwner().equals(user.getShortName());
861     }
862     ProcedureInfo procInfo = completed.get(procId);
863     if (procInfo == null) {
864       // Procedure either does not exist or has already completed and got cleaned up.
865       // At this time, we cannot check the owner of the procedure
866       return false;
867     }
868     return ProcedureInfo.isProcedureOwner(procInfo, user);
869   }
870 
871   public Map<Long, ProcedureInfo> getResults() {
872     return Collections.unmodifiableMap(completed);
873   }
874 
875   public Procedure getProcedure(final long procId) {
876     return procedures.get(procId);
877   }
878 
879   protected ProcedureRunnableSet getRunnableSet() {
880     return runnables;
881   }
882 
883   /**
884    * Execution loop (N threads)
885    * while the executor is in a running state,
886    * fetch a procedure from the runnables queue and start the execution.
887    */
888   private void execLoop() {
889     while (isRunning()) {
890       Procedure proc = runnables.poll();
891       if (proc == null) continue;
892 
893       try {
894         activeExecutorCount.incrementAndGet();
895         execLoop(proc);
896       } finally {
897         activeExecutorCount.decrementAndGet();
898       }
899     }
900   }
901 
902   private void execLoop(Procedure proc) {
903     if (LOG.isTraceEnabled()) {
904       LOG.trace("Trying to start the execution of " + proc);
905     }
906 
907     Long rootProcId = getRootProcedureId(proc);
908     if (rootProcId == null) {
909       // The 'proc' was ready to run but the root procedure was rolledback
910       executeRollback(proc);
911       return;
912     }
913 
914     RootProcedureState procStack = rollbackStack.get(rootProcId);
915     if (procStack == null) return;
916 
917     do {
918       // Try to acquire the execution
919       if (!procStack.acquire(proc)) {
920         if (procStack.setRollback()) {
921           // we have the 'rollback-lock' we can start rollingback
922           if (!executeRollback(rootProcId, procStack)) {
923             procStack.unsetRollback();
924             runnables.yield(proc);
925           }
926         } else {
927           // if we can't rollback means that some child is still running.
928           // the rollback will be executed after all the children are done.
929           // If the procedure was never executed, remove and mark it as rolledback.
930           if (!proc.wasExecuted()) {
931             if (!executeRollback(proc)) {
932               runnables.yield(proc);
933             }
934           }
935         }
936         break;
937       }
938 
939       // Execute the procedure
940       assert proc.getState() == ProcedureState.RUNNABLE;
941       if (proc.acquireLock(getEnvironment())) {
942         execProcedure(procStack, proc);
943         proc.releaseLock(getEnvironment());
944       } else {
945         runnables.yield(proc);
946       }
947       procStack.release(proc);
948 
949       // allows to kill the executor before something is stored to the wal.
950       // useful to test the procedure recovery.
951       if (testing != null && !isRunning()) {
952         break;
953       }
954 
955       if (proc.isSuccess()) {
956         if (LOG.isDebugEnabled()) {
957           LOG.debug("Procedure completed in " +
958               StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
959         }
960         // Finalize the procedure state
961         if (proc.getProcId() == rootProcId) {
962           procedureFinished(proc);
963         }
964         break;
965       }
966     } while (procStack.isFailed());
967   }
968 
969   private void timeoutLoop() {
970     while (isRunning()) {
971       Procedure proc = waitingTimeout.poll();
972       if (proc == null) continue;
973 
974       if (proc.getTimeRemaining() > 100) {
975         // got an early wake, maybe a stop?
976         // re-enqueue the task in case was not a stop or just a signal
977         waitingTimeout.add(proc);
978         continue;
979       }
980 
981       // ----------------------------------------------------------------------------
982       // TODO-MAYBE: Should we provide a notification to the store with the
983       // full set of procedures pending and completed to write a compacted
984       // version of the log (in case is a log)?
985       // In theory no, procedures are have a short life, so at some point the store
986       // will have the tracker saying everything is in the last log.
987       // ----------------------------------------------------------------------------
988 
989       // The CompletedProcedureCleaner is a special case, and it acts as a chore.
990       // instead of bringing the Chore class in, we reuse this timeout thread for
991       // this special case.
992       if (proc instanceof CompletedProcedureCleaner) {
993         try {
994           ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
995         } catch (Throwable e) {
996           LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
997         }
998         proc.setStartTime(EnvironmentEdgeManager.currentTime());
999         waitingTimeout.add(proc);
1000         continue;
1001       }
1002 
1003       // The procedure received an "abort-timeout", call abort() and
1004       // add the procedure back in the queue for rollback.
1005       if (proc.setTimeoutFailure()) {
1006         long rootProcId = Procedure.getRootProcedureId(procedures, proc);
1007         RootProcedureState procStack = rollbackStack.get(rootProcId);
1008         procStack.abort();
1009         store.update(proc);
1010         runnables.addFront(proc);
1011         continue;
1012       }
1013     }
1014   }
1015 
1016   /**
1017    * Execute the rollback of the full procedure stack.
1018    * Once the procedure is rolledback, the root-procedure will be visible as
1019    * finished to user, and the result will be the fatal exception.
1020    */
1021   private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
1022     Procedure rootProc = procedures.get(rootProcId);
1023     RemoteProcedureException exception = rootProc.getException();
1024     if (exception == null) {
1025       exception = procStack.getException();
1026       rootProc.setFailure(exception);
1027       store.update(rootProc);
1028     }
1029 
1030     List<Procedure> subprocStack = procStack.getSubprocedures();
1031     assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
1032 
1033     int stackTail = subprocStack.size();
1034     boolean reuseLock = false;
1035     while (stackTail --> 0) {
1036       final Procedure proc = subprocStack.get(stackTail);
1037 
1038       if (!reuseLock && !proc.acquireLock(getEnvironment())) {
1039         // can't take a lock on the procedure, add the root-proc back on the
1040         // queue waiting for the lock availability
1041         return false;
1042       }
1043 
1044       boolean abortRollback = !executeRollback(proc);
1045       abortRollback |= !isRunning() || !store.isRunning();
1046 
1047       // If the next procedure is the same to this one
1048       // (e.g. StateMachineProcedure reuse the same instance)
1049       // we can avoid to lock/unlock each step
1050       reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
1051       if (!reuseLock) {
1052         proc.releaseLock(getEnvironment());
1053       }
1054 
1055       // allows to kill the executor before something is stored to the wal.
1056       // useful to test the procedure recovery.
1057       if (abortRollback) {
1058         return false;
1059       }
1060 
1061       subprocStack.remove(stackTail);
1062 
1063       // if the procedure is kind enough to pass the slot to someone else, yield
1064       if (proc.isYieldAfterExecutionStep(getEnvironment())) {
1065         return false;
1066       }
1067     }
1068 
1069     // Finalize the procedure state
1070     LOG.info("Rolledback procedure " + rootProc +
1071              " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
1072              " exception=" + exception.getMessage());
1073     procedureFinished(rootProc);
1074     return true;
1075   }
1076 
1077   /**
1078    * Execute the rollback of the procedure step.
1079    * It updates the store with the new state (stack index)
1080    * or will remove completly the procedure in case it is a child.
1081    */
1082   private boolean executeRollback(final Procedure proc) {
1083     try {
1084       proc.doRollback(getEnvironment());
1085     } catch (IOException e) {
1086       if (LOG.isDebugEnabled()) {
1087         LOG.debug("rollback attempt failed for " + proc, e);
1088       }
1089       return false;
1090     } catch (InterruptedException e) {
1091       handleInterruptedException(proc, e);
1092       return false;
1093     } catch (Throwable e) {
1094       // Catch NullPointerExceptions or similar errors...
1095       LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
1096     }
1097 
1098     // allows to kill the executor before something is stored to the wal.
1099     // useful to test the procedure recovery.
1100     if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1101       LOG.debug("TESTING: Kill before store update");
1102       stop();
1103       return false;
1104     }
1105 
1106     if (proc.removeStackIndex()) {
1107       proc.setState(ProcedureState.ROLLEDBACK);
1108       if (proc.hasParent()) {
1109         store.delete(proc.getProcId());
1110         procedures.remove(proc.getProcId());
1111       } else {
1112         store.update(proc);
1113       }
1114     } else {
1115       store.update(proc);
1116     }
1117 
1118     return true;
1119   }
1120 
1121   /**
1122    * Executes the specified procedure
1123    *  - calls the doExecute() of the procedure
1124    *  - if the procedure execution didn't fail (e.g. invalid user input)
1125    *     - ...and returned subprocedures
1126    *        - the subprocedures are initialized.
1127    *        - the subprocedures are added to the store
1128    *        - the subprocedures are added to the runnable queue
1129    *        - the procedure is now in a WAITING state, waiting for the subprocedures to complete
1130    *     - ...if there are no subprocedure
1131    *        - the procedure completed successfully
1132    *        - if there is a parent (WAITING)
1133    *            - the parent state will be set to RUNNABLE
1134    *  - in case of failure
1135    *    - the store is updated with the new state
1136    *    - the executor (caller of this method) will start the rollback of the procedure
1137    */
1138   private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
1139     Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
1140 
1141     // Execute the procedure
1142     boolean reExecute = false;
1143     Procedure[] subprocs = null;
1144     do {
1145       reExecute = false;
1146       try {
1147         subprocs = procedure.doExecute(getEnvironment());
1148         if (subprocs != null && subprocs.length == 0) {
1149           subprocs = null;
1150         }
1151       } catch (ProcedureYieldException e) {
1152         if (LOG.isTraceEnabled()) {
1153           LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
1154         }
1155         runnables.yield(procedure);
1156         return;
1157       } catch (InterruptedException e) {
1158         handleInterruptedException(procedure, e);
1159         runnables.yield(procedure);
1160         return;
1161       } catch (Throwable e) {
1162         // Catch NullPointerExceptions or similar errors...
1163         String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
1164         LOG.error(msg, e);
1165         procedure.setFailure(new RemoteProcedureException(msg, e));
1166       }
1167 
1168       if (!procedure.isFailed()) {
1169         if (subprocs != null) {
1170           if (subprocs.length == 1 && subprocs[0] == procedure) {
1171             // quick-shortcut for a state machine like procedure
1172             subprocs = null;
1173             reExecute = true;
1174           } else {
1175             // yield the current procedure, and make the subprocedure runnable
1176             for (int i = 0; i < subprocs.length; ++i) {
1177               Procedure subproc = subprocs[i];
1178               if (subproc == null) {
1179                 String msg = "subproc[" + i + "] is null, aborting the procedure";
1180                 procedure.setFailure(new RemoteProcedureException(msg,
1181                   new IllegalArgumentIOException(msg)));
1182                 subprocs = null;
1183                 break;
1184               }
1185 
1186               assert subproc.getState() == ProcedureState.INITIALIZING;
1187               subproc.setParentProcId(procedure.getProcId());
1188               subproc.setProcId(nextProcId());
1189             }
1190 
1191             if (!procedure.isFailed()) {
1192               procedure.setChildrenLatch(subprocs.length);
1193               switch (procedure.getState()) {
1194                 case RUNNABLE:
1195                   procedure.setState(ProcedureState.WAITING);
1196                   break;
1197                 case WAITING_TIMEOUT:
1198                   waitingTimeout.add(procedure);
1199                   break;
1200                 default:
1201                   break;
1202               }
1203             }
1204           }
1205         } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1206           waitingTimeout.add(procedure);
1207         } else {
1208           // No subtask, so we are done
1209           procedure.setState(ProcedureState.FINISHED);
1210         }
1211       }
1212 
1213       // Add the procedure to the stack
1214       procStack.addRollbackStep(procedure);
1215 
1216       // allows to kill the executor before something is stored to the wal.
1217       // useful to test the procedure recovery.
1218       if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1219         LOG.debug("TESTING: Kill before store update");
1220         stop();
1221         return;
1222       }
1223 
1224       // Commit the transaction
1225       if (subprocs != null && !procedure.isFailed()) {
1226         if (LOG.isTraceEnabled()) {
1227           LOG.trace("Store add " + procedure + " children " + Arrays.toString(subprocs));
1228         }
1229         store.insert(procedure, subprocs);
1230       } else {
1231         if (LOG.isTraceEnabled()) {
1232           LOG.trace("Store update " + procedure);
1233         }
1234         store.update(procedure);
1235       }
1236 
1237       // if the store is not running we are aborting
1238       if (!store.isRunning()) {
1239         return;
1240       }
1241 
1242       // if the procedure is kind enough to pass the slot to someone else, yield
1243       if (procedure.getState() == ProcedureState.RUNNABLE &&
1244           procedure.isYieldAfterExecutionStep(getEnvironment())) {
1245         runnables.yield(procedure);
1246         return;
1247       }
1248 
1249       assert (reExecute && subprocs == null) || !reExecute;
1250     } while (reExecute);
1251 
1252     // Submit the new subprocedures
1253     if (subprocs != null && !procedure.isFailed()) {
1254       for (int i = 0; i < subprocs.length; ++i) {
1255         Procedure subproc = subprocs[i];
1256         assert !procedures.containsKey(subproc.getProcId());
1257         procedures.put(subproc.getProcId(), subproc);
1258         runnables.addFront(subproc);
1259       }
1260     }
1261 
1262     if (procedure.isFinished() && procedure.hasParent()) {
1263       Procedure parent = procedures.get(procedure.getParentProcId());
1264       if (parent == null) {
1265         assert procStack.isRollingback();
1266         return;
1267       }
1268 
1269       // If this procedure is the last child awake the parent procedure
1270       if (LOG.isTraceEnabled()) {
1271         LOG.trace(parent + " child is done: " + procedure);
1272       }
1273       if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
1274         parent.setState(ProcedureState.RUNNABLE);
1275         store.update(parent);
1276         runnables.addFront(parent);
1277         if (LOG.isTraceEnabled()) {
1278           LOG.trace(parent + " all the children finished their work, resume.");
1279         }
1280         return;
1281       }
1282     }
1283   }
1284 
1285   private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
1286     if (LOG.isTraceEnabled()) {
1287       LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
1288     }
1289 
1290     // NOTE: We don't call Thread.currentThread().interrupt()
1291     // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
1292     // the InterruptedException. If the master is going down, we will be notified
1293     // and the executor/store will be stopped.
1294     // (The interrupted procedure will be retried on the next run)
1295   }
1296 
1297   private void sendProcedureLoadedNotification(final long procId) {
1298     if (!this.listeners.isEmpty()) {
1299       for (ProcedureExecutorListener listener: this.listeners) {
1300         try {
1301           listener.procedureLoaded(procId);
1302         } catch (Throwable e) {
1303           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1304         }
1305       }
1306     }
1307   }
1308 
1309   private void sendProcedureAddedNotification(final long procId) {
1310     if (!this.listeners.isEmpty()) {
1311       for (ProcedureExecutorListener listener: this.listeners) {
1312         try {
1313           listener.procedureAdded(procId);
1314         } catch (Throwable e) {
1315           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1316         }
1317       }
1318     }
1319   }
1320 
1321   private void sendProcedureFinishedNotification(final long procId) {
1322     if (!this.listeners.isEmpty()) {
1323       for (ProcedureExecutorListener listener: this.listeners) {
1324         try {
1325           listener.procedureFinished(procId);
1326         } catch (Throwable e) {
1327           LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e);
1328         }
1329       }
1330     }
1331   }
1332 
1333   private long nextProcId() {
1334     long procId = lastProcId.incrementAndGet();
1335     if (procId < 0) {
1336       while (!lastProcId.compareAndSet(procId, 0)) {
1337         procId = lastProcId.get();
1338         if (procId >= 0)
1339           break;
1340       }
1341       while (procedures.containsKey(procId)) {
1342         procId = lastProcId.incrementAndGet();
1343       }
1344     }
1345     return procId;
1346   }
1347 
1348   private Long getRootProcedureId(Procedure proc) {
1349     return Procedure.getRootProcedureId(procedures, proc);
1350   }
1351 
1352   private void procedureFinished(final Procedure proc) {
1353     // call the procedure completion cleanup handler
1354     try {
1355       proc.completionCleanup(getEnvironment());
1356     } catch (Throwable e) {
1357       // Catch NullPointerExceptions or similar errors...
1358       LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1359     }
1360 
1361     // update the executor internal state maps
1362     ProcedureInfo procInfo = Procedure.createProcedureInfo(proc, proc.getNonceKey());
1363     if (!proc.shouldWaitClientAck(getEnvironment())) {
1364       procInfo.setClientAckTime(0);
1365     }
1366 
1367     completed.put(procInfo.getProcId(), procInfo);
1368     rollbackStack.remove(proc.getProcId());
1369     procedures.remove(proc.getProcId());
1370 
1371     // call the runnableSet completion cleanup handler
1372     try {
1373       runnables.completionCleanup(proc);
1374     } catch (Throwable e) {
1375       // Catch NullPointerExceptions or similar errors...
1376       LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
1377     }
1378 
1379     // Notify the listeners
1380     sendProcedureFinishedNotification(proc.getProcId());
1381   }
1382 
1383   public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long procId) {
1384     ProcedureInfo result = completed.get(procId);
1385     Procedure proc = null;
1386     if (result == null) {
1387       proc = procedures.get(procId);
1388       if (proc == null) {
1389         result = completed.get(procId);
1390       }
1391     }
1392     return new Pair(result, proc);
1393   }
1394 }