001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.UncheckedIOException;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Deque;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.Executor;
033import java.util.concurrent.Executors;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.stream.Collectors;
039import java.util.stream.Stream;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
043import org.apache.hadoop.hbase.log.HBaseMarkers;
044import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
048import org.apache.hadoop.hbase.procedure2.util.StringUtils;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.IdLock;
052import org.apache.hadoop.hbase.util.NonceKey;
053import org.apache.hadoop.hbase.util.Threads;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
063
064/**
065 * Thread Pool that executes the submitted procedures.
066 * The executor has a ProcedureStore associated.
067 * Each operation is logged and on restart the pending procedures are resumed.
068 *
069 * Unless the Procedure code throws an error (e.g. invalid user input)
070 * the procedure will complete (at some point in time), On restart the pending
071 * procedures are resumed and the once failed will be rolledback.
072 *
073 * The user can add procedures to the executor via submitProcedure(proc)
074 * check for the finished state via isFinished(procId)
075 * and get the result via getResult(procId)
076 */
077@InterfaceAudience.Private
078public class ProcedureExecutor<TEnvironment> {
079  private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
080
081  public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
082  private static final boolean DEFAULT_CHECK_OWNER_SET = false;
083
084  public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
085      "hbase.procedure.worker.keep.alive.time.msec";
086  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
087
088  public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
089  static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
090
091  public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
092  static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
093
094  /**
095   * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
096   * break PE having it fail at various junctures. When non-null, testing is set to an instance of
097   * the below internal {@link Testing} class with flags set for the particular test.
098   */
099  volatile Testing testing = null;
100
101  /**
102   * Class with parameters describing how to fail/die when in testing-context.
103   */
104  public static class Testing {
105    protected volatile boolean killIfHasParent = true;
106    protected volatile boolean killIfSuspended = false;
107
108    /**
109     * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is
110     * persisting all the state it needs to recover after a crash.
111     */
112    protected volatile boolean killBeforeStoreUpdate = false;
113    protected volatile boolean toggleKillBeforeStoreUpdate = false;
114
115    /**
116     * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978
117     * is about a case where memory-state was being set after store to WAL where a crash could
118     * cause us to get stuck. This flag allows killing at what was a vulnerable time.
119     */
120    protected volatile boolean killAfterStoreUpdate = false;
121    protected volatile boolean toggleKillAfterStoreUpdate = false;
122
123    protected boolean shouldKillBeforeStoreUpdate() {
124      final boolean kill = this.killBeforeStoreUpdate;
125      if (this.toggleKillBeforeStoreUpdate) {
126        this.killBeforeStoreUpdate = !kill;
127        LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
128      }
129      return kill;
130    }
131
132    protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) {
133      if (isSuspended && !killIfSuspended) {
134        return false;
135      }
136      if (hasParent && !killIfHasParent) {
137        return false;
138      }
139      return shouldKillBeforeStoreUpdate();
140    }
141
142    protected boolean shouldKillAfterStoreUpdate() {
143      final boolean kill = this.killAfterStoreUpdate;
144      if (this.toggleKillAfterStoreUpdate) {
145        this.killAfterStoreUpdate = !kill;
146        LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);
147      }
148      return kill;
149    }
150
151    protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {
152      return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();
153    }
154  }
155
156  public interface ProcedureExecutorListener {
157    void procedureLoaded(long procId);
158    void procedureAdded(long procId);
159    void procedureFinished(long procId);
160  }
161
162  /**
163   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
164   * Once a Root-Procedure completes (success or failure), the result will be added to this map.
165   * The user of ProcedureExecutor should call getResult(procId) to get the result.
166   */
167  private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
168    new ConcurrentHashMap<>();
169
170  /**
171   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
172   * The RootProcedureState contains the execution stack of the Root-Procedure,
173   * It is added to the map by submitProcedure() and removed on procedure completion.
174   */
175  private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
176    new ConcurrentHashMap<>();
177
178  /**
179   * Helper map to lookup the live procedures by ID.
180   * This map contains every procedure. root-procedures and subprocedures.
181   */
182  private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
183    new ConcurrentHashMap<>();
184
185  /**
186   * Helper map to lookup whether the procedure already issued from the same client. This map
187   * contains every root procedure.
188   */
189  private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
190
191  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
192    new CopyOnWriteArrayList<>();
193
194  private Configuration conf;
195
196  /**
197   * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
198   * resource handling rather than observing in a #join is unexpected).
199   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
200   * (Should be ok).
201   */
202  private ThreadGroup threadGroup;
203
204  /**
205   * Created in the {@link #init(int, boolean)}  method. Terminated in {@link #join()} (FIX! Doing
206   * resource handling rather than observing in a #join is unexpected).
207   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
208   * (Should be ok).
209   */
210  private CopyOnWriteArrayList<WorkerThread> workerThreads;
211
212  /**
213   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
214   * resource handling rather than observing in a #join is unexpected).
215   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
216   * (Should be ok).
217   */
218  private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
219
220  /**
221   * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
222   * there is no worker to assign meta, it will new worker thread for it, so it is very important.
223   * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore
224   * and so on, some tasks may execute for a long time so will block other tasks like
225   * WorkerMonitor, so use a dedicated thread for executing WorkerMonitor.
226   */
227  private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;
228
229  private int corePoolSize;
230  private int maxPoolSize;
231
232  private volatile long keepAliveTime;
233
234  /**
235   * Scheduler/Queue that contains runnable procedures.
236   */
237  private final ProcedureScheduler scheduler;
238
239  private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
240    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
241
242  private final AtomicLong lastProcId = new AtomicLong(-1);
243  private final AtomicLong workerId = new AtomicLong(0);
244  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
245  private final AtomicBoolean running = new AtomicBoolean(false);
246  private final TEnvironment environment;
247  private final ProcedureStore store;
248
249  private final boolean checkOwnerSet;
250
251  // To prevent concurrent execution of the same procedure.
252  // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
253  // procedure is woken up before we finish the suspend which causes the same procedures to be
254  // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
255  // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
256  // execution of the same procedure.
257  private final IdLock procExecutionLock = new IdLock();
258
259  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
260      final ProcedureStore store) {
261    this(conf, environment, store, new SimpleProcedureScheduler());
262  }
263
264  private boolean isRootFinished(Procedure<?> proc) {
265    Procedure<?> rootProc = procedures.get(proc.getRootProcId());
266    return rootProc == null || rootProc.isFinished();
267  }
268
269  private void forceUpdateProcedure(long procId) throws IOException {
270    IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
271    try {
272      Procedure<TEnvironment> proc = procedures.get(procId);
273      if (proc != null) {
274        if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
275          LOG.debug("Procedure {} has already been finished and parent is succeeded," +
276            " skip force updating", proc);
277          return;
278        }
279      } else {
280        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
281        if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
282          LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
283          return;
284        }
285        long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
286        long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
287        if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
288          LOG.debug("Procedure {} has already been finished and expired, skip force updating",
289            procId);
290          return;
291        }
292        proc = retainer.getProcedure();
293      }
294      LOG.debug("Force update procedure {}", proc);
295      store.update(proc);
296    } finally {
297      procExecutionLock.releaseLockEntry(lockEntry);
298    }
299  }
300
301  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
302      final ProcedureStore store, final ProcedureScheduler scheduler) {
303    this.environment = environment;
304    this.scheduler = scheduler;
305    this.store = store;
306    this.conf = conf;
307    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
308    refreshConfiguration(conf);
309    store.registerListener(new ProcedureStoreListener() {
310
311      @Override
312      public void forceUpdate(long[] procIds) {
313        Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
314          try {
315            forceUpdateProcedure(procId);
316          } catch (IOException e) {
317            LOG.warn("Failed to force update procedure with pid={}", procId);
318          }
319        }));
320      }
321    });
322  }
323
324  private void load(final boolean abortOnCorruption) throws IOException {
325    Preconditions.checkArgument(completed.isEmpty(), "completed not empty");
326    Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty");
327    Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty");
328    Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty");
329
330    store.load(new ProcedureStore.ProcedureLoader() {
331      @Override
332      public void setMaxProcId(long maxProcId) {
333        assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
334        lastProcId.set(maxProcId);
335      }
336
337      @Override
338      public void load(ProcedureIterator procIter) throws IOException {
339        loadProcedures(procIter, abortOnCorruption);
340      }
341
342      @Override
343      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
344        int corruptedCount = 0;
345        while (procIter.hasNext()) {
346          Procedure<?> proc = procIter.next();
347          LOG.error("Corrupt " + proc);
348          corruptedCount++;
349        }
350        if (abortOnCorruption && corruptedCount > 0) {
351          throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
352        }
353      }
354    });
355  }
356
357  private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
358    proc.restoreLock(getEnvironment());
359    restored.add(proc.getProcId());
360  }
361
362  private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
363    while (!stack.isEmpty()) {
364      restoreLock(stack.pop(), restored);
365    }
366  }
367
368  // Restore the locks for all the procedures.
369  // Notice that we need to restore the locks starting from the root proc, otherwise there will be
370  // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
371  // calling the acquireLock method for the parent procedure.
372  // The algorithm is straight-forward:
373  // 1. Use a set to record the procedures which locks have already been restored.
374  // 2. Use a stack to store the hierarchy of the procedures
375  // 3. For all the procedure, we will first try to find its parent and push it into the stack,
376  // unless
377  // a. We have no parent, i.e, we are the root procedure
378  // b. The lock has already been restored(by checking the set introduced in #1)
379  // then we start to pop the stack and call acquireLock for each procedure.
380  // Notice that this should be done for all procedures, not only the ones in runnableList.
381  private void restoreLocks() {
382    Set<Long> restored = new HashSet<>();
383    Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
384    procedures.values().forEach(proc -> {
385      for (;;) {
386        if (restored.contains(proc.getProcId())) {
387          restoreLocks(stack, restored);
388          return;
389        }
390        if (!proc.hasParent()) {
391          restoreLock(proc, restored);
392          restoreLocks(stack, restored);
393          return;
394        }
395        stack.push(proc);
396        proc = procedures.get(proc.getParentProcId());
397      }
398    });
399  }
400
401  private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
402      throws IOException {
403    // 1. Build the rollback stack
404    int runnableCount = 0;
405    int failedCount = 0;
406    int waitingCount = 0;
407    int waitingTimeoutCount = 0;
408    while (procIter.hasNext()) {
409      boolean finished = procIter.isNextFinished();
410      @SuppressWarnings("unchecked")
411      Procedure<TEnvironment> proc = procIter.next();
412      NonceKey nonceKey = proc.getNonceKey();
413      long procId = proc.getProcId();
414
415      if (finished) {
416        completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
417        LOG.debug("Completed {}", proc);
418      } else {
419        if (!proc.hasParent()) {
420          assert !proc.isFinished() : "unexpected finished procedure";
421          rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
422        }
423
424        // add the procedure to the map
425        proc.beforeReplay(getEnvironment());
426        procedures.put(proc.getProcId(), proc);
427        switch (proc.getState()) {
428          case RUNNABLE:
429            runnableCount++;
430            break;
431          case FAILED:
432            failedCount++;
433            break;
434          case WAITING:
435            waitingCount++;
436            break;
437          case WAITING_TIMEOUT:
438            waitingTimeoutCount++;
439            break;
440          default:
441            break;
442        }
443      }
444
445      if (nonceKey != null) {
446        nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map
447      }
448    }
449
450    // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
451    // push it into the ProcedureScheduler directly to execute the rollback. But this does not work
452    // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
453    // the queue from runQueue in scheduler, and then when a procedure which has lock access, for
454    // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
455    // we will add the queue back to let the workers poll from it. The assumption here is that, the
456    // procedure which has the xlock should have been polled out already, so when loading we can not
457    // add the procedure to scheduler first and then call acquireLock, since the procedure is still
458    // in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
459    // then there is a dead lock
460    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
461    List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
462    List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
463    List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
464    procIter.reset();
465    while (procIter.hasNext()) {
466      if (procIter.isNextFinished()) {
467        procIter.skipNext();
468        continue;
469      }
470
471      @SuppressWarnings("unchecked")
472      Procedure<TEnvironment> proc = procIter.next();
473      assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
474      LOG.debug("Loading {}", proc);
475      Long rootProcId = getRootProcedureId(proc);
476      // The orphan procedures will be passed to handleCorrupted, so add an assert here
477      assert rootProcId != null;
478
479      if (proc.hasParent()) {
480        Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
481        if (parent != null && !proc.isFinished()) {
482          parent.incChildrenLatch();
483        }
484      }
485
486      RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
487      procStack.loadStack(proc);
488
489      proc.setRootProcId(rootProcId);
490      switch (proc.getState()) {
491        case RUNNABLE:
492          runnableList.add(proc);
493          break;
494        case WAITING:
495          waitingList.add(proc);
496          break;
497        case WAITING_TIMEOUT:
498          waitingTimeoutList.add(proc);
499          break;
500        case FAILED:
501          failedList.add(proc);
502          break;
503        case ROLLEDBACK:
504        case INITIALIZING:
505          String msg = "Unexpected " + proc.getState() + " state for " + proc;
506          LOG.error(msg);
507          throw new UnsupportedOperationException(msg);
508        default:
509          break;
510      }
511    }
512
513    // 3. Check the waiting procedures to see if some of them can be added to runnable.
514    waitingList.forEach(proc -> {
515      if (!proc.hasChildren()) {
516        // Normally, WAITING procedures should be waken by its children. But, there is a case that,
517        // all the children are successful and before they can wake up their parent procedure, the
518        // master was killed. So, during recovering the procedures from ProcedureWal, its children
519        // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
520        // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
521        // exception will throw:
522        // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
523        // "NOT RUNNABLE! " + procedure.toString());
524        proc.setState(ProcedureState.RUNNABLE);
525        runnableList.add(proc);
526      } else {
527        proc.afterReplay(getEnvironment());
528      }
529    });
530    // 4. restore locks
531    restoreLocks();
532
533    // 5. Push the procedures to the timeout executor
534    waitingTimeoutList.forEach(proc -> {
535      proc.afterReplay(getEnvironment());
536      timeoutExecutor.add(proc);
537    });
538
539    // 6. Push the procedure to the scheduler
540    failedList.forEach(scheduler::addBack);
541    runnableList.forEach(p -> {
542      p.afterReplay(getEnvironment());
543      if (!p.hasParent()) {
544        sendProcedureLoadedNotification(p.getProcId());
545      }
546      scheduler.addBack(p);
547    });
548    // After all procedures put into the queue, signal the worker threads.
549    // Otherwise, there is a race condition. See HBASE-21364.
550    scheduler.signalAll();
551  }
552
553  /**
554   * Initialize the procedure executor, but do not start workers. We will start them later.
555   * <p/>
556   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
557   * ensure a single executor, and start the procedure replay to resume and recover the previous
558   * pending and in-progress procedures.
559   * @param numThreads number of threads available for procedure execution.
560   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
561   *          is found on replay. otherwise false.
562   */
563  public void init(int numThreads, boolean abortOnCorruption) throws IOException {
564    // We have numThreads executor + one timer thread used for timing out
565    // procedures and triggering periodic procedures.
566    this.corePoolSize = numThreads;
567    this.maxPoolSize = 10 * numThreads;
568    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
569        corePoolSize, maxPoolSize);
570
571    this.threadGroup = new ThreadGroup("PEWorkerGroup");
572    this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
573    this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");
574
575    // Create the workers
576    workerId.set(0);
577    workerThreads = new CopyOnWriteArrayList<>();
578    for (int i = 0; i < corePoolSize; ++i) {
579      workerThreads.add(new WorkerThread(threadGroup));
580    }
581
582    long st, et;
583
584    // Acquire the store lease.
585    st = System.nanoTime();
586    store.recoverLease();
587    et = System.nanoTime();
588    LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
589      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
590
591    // start the procedure scheduler
592    scheduler.start();
593
594    // TODO: Split in two steps.
595    // TODO: Handle corrupted procedures (currently just a warn)
596    // The first one will make sure that we have the latest id,
597    // so we can start the threads and accept new procedures.
598    // The second step will do the actual load of old procedures.
599    st = System.nanoTime();
600    load(abortOnCorruption);
601    et = System.nanoTime();
602    LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
603      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
604  }
605
606  /**
607   * Start the workers.
608   */
609  public void startWorkers() throws IOException {
610    if (!running.compareAndSet(false, true)) {
611      LOG.warn("Already running");
612      return;
613    }
614    // Start the executors. Here we must have the lastProcId set.
615    LOG.trace("Start workers {}", workerThreads.size());
616    timeoutExecutor.start();
617    workerMonitorExecutor.start();
618    for (WorkerThread worker: workerThreads) {
619      worker.start();
620    }
621
622    // Internal chores
623    workerMonitorExecutor.add(new WorkerMonitor());
624
625    // Add completed cleaner chore
626    addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
627      nonceKeysToProcIdsMap));
628  }
629
630  public void stop() {
631    if (!running.getAndSet(false)) {
632      return;
633    }
634
635    LOG.info("Stopping");
636    scheduler.stop();
637    timeoutExecutor.sendStopSignal();
638    workerMonitorExecutor.sendStopSignal();
639  }
640
641  @VisibleForTesting
642  public void join() {
643    assert !isRunning() : "expected not running";
644
645    // stop the timeout executor
646    timeoutExecutor.awaitTermination();
647    // stop the work monitor executor
648    workerMonitorExecutor.awaitTermination();
649
650    // stop the worker threads
651    for (WorkerThread worker: workerThreads) {
652      worker.awaitTermination();
653    }
654
655    // Destroy the Thread Group for the executors
656    // TODO: Fix. #join is not place to destroy resources.
657    try {
658      threadGroup.destroy();
659    } catch (IllegalThreadStateException e) {
660      LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT",
661          this.threadGroup, e.getMessage());
662      // This dumps list of threads on STDOUT.
663      this.threadGroup.list();
664    }
665
666    // reset the in-memory state for testing
667    completed.clear();
668    rollbackStack.clear();
669    procedures.clear();
670    nonceKeysToProcIdsMap.clear();
671    scheduler.clear();
672    lastProcId.set(-1);
673  }
674
675  public void refreshConfiguration(final Configuration conf) {
676    this.conf = conf;
677    setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY,
678        DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS);
679  }
680
681  // ==========================================================================
682  //  Accessors
683  // ==========================================================================
684  public boolean isRunning() {
685    return running.get();
686  }
687
688  /**
689   * @return the current number of worker threads.
690   */
691  public int getWorkerThreadCount() {
692    return workerThreads.size();
693  }
694
695  /**
696   * @return the core pool size settings.
697   */
698  public int getCorePoolSize() {
699    return corePoolSize;
700  }
701
702  public int getActiveExecutorCount() {
703    return activeExecutorCount.get();
704  }
705
706  public TEnvironment getEnvironment() {
707    return this.environment;
708  }
709
710  public ProcedureStore getStore() {
711    return this.store;
712  }
713
714  ProcedureScheduler getScheduler() {
715    return scheduler;
716  }
717
718  public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
719    this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
720    this.scheduler.signalAll();
721  }
722
723  public long getKeepAliveTime(final TimeUnit timeUnit) {
724    return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
725  }
726
727  // ==========================================================================
728  //  Submit/Remove Chores
729  // ==========================================================================
730
731  /**
732   * Add a chore procedure to the executor
733   * @param chore the chore to add
734   */
735  public void addChore(ProcedureInMemoryChore<TEnvironment> chore) {
736    chore.setState(ProcedureState.WAITING_TIMEOUT);
737    timeoutExecutor.add(chore);
738  }
739
740  /**
741   * Remove a chore procedure from the executor
742   * @param chore the chore to remove
743   * @return whether the chore is removed, or it will be removed later
744   */
745  public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) {
746    chore.setState(ProcedureState.SUCCESS);
747    return timeoutExecutor.remove(chore);
748  }
749
750  // ==========================================================================
751  //  Nonce Procedure helpers
752  // ==========================================================================
753  /**
754   * Create a NonceKey from the specified nonceGroup and nonce.
755   * @param nonceGroup the group to use for the {@link NonceKey}
756   * @param nonce the nonce to use in the {@link NonceKey}
757   * @return the generated NonceKey
758   */
759  public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
760    return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
761  }
762
763  /**
764   * Register a nonce for a procedure that is going to be submitted.
765   * A procId will be reserved and on submitProcedure(),
766   * the procedure with the specified nonce will take the reserved ProcId.
767   * If someone already reserved the nonce, this method will return the procId reserved,
768   * otherwise an invalid procId will be returned. and the caller should procede
769   * and submit the procedure.
770   *
771   * @param nonceKey A unique identifier for this operation from the client or process.
772   * @return the procId associated with the nonce, if any otherwise an invalid procId.
773   */
774  public long registerNonce(final NonceKey nonceKey) {
775    if (nonceKey == null) {
776      return -1;
777    }
778
779    // check if we have already a Reserved ID for the nonce
780    Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
781    if (oldProcId == null) {
782      // reserve a new Procedure ID, this will be associated with the nonce
783      // and the procedure submitted with the specified nonce will use this ID.
784      final long newProcId = nextProcId();
785      oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
786      if (oldProcId == null) {
787        return -1;
788      }
789    }
790
791    // we found a registered nonce, but the procedure may not have been submitted yet.
792    // since the client expect the procedure to be submitted, spin here until it is.
793    final boolean traceEnabled = LOG.isTraceEnabled();
794    while (isRunning() &&
795           !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
796           nonceKeysToProcIdsMap.containsKey(nonceKey)) {
797      if (traceEnabled) {
798        LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
799      }
800      Threads.sleep(100);
801    }
802    return oldProcId.longValue();
803  }
804
805  /**
806   * Remove the NonceKey if the procedure was not submitted to the executor.
807   * @param nonceKey A unique identifier for this operation from the client or process.
808   */
809  public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
810    if (nonceKey == null) {
811      return;
812    }
813
814    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
815    if (procId == null) {
816      return;
817    }
818
819    // if the procedure was not submitted, remove the nonce
820    if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
821      nonceKeysToProcIdsMap.remove(nonceKey);
822    }
823  }
824
825  /**
826   * If the failure failed before submitting it, we may want to give back the
827   * same error to the requests with the same nonceKey.
828   *
829   * @param nonceKey A unique identifier for this operation from the client or process
830   * @param procName name of the procedure, used to inform the user
831   * @param procOwner name of the owner of the procedure, used to inform the user
832   * @param exception the failure to report to the user
833   */
834  public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
835      IOException exception) {
836    if (nonceKey == null) {
837      return;
838    }
839
840    Long procId = nonceKeysToProcIdsMap.get(nonceKey);
841    if (procId == null || completed.containsKey(procId)) {
842      return;
843    }
844
845    Procedure<TEnvironment> proc =
846      new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
847
848    completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
849  }
850
851  // ==========================================================================
852  //  Submit/Abort Procedure
853  // ==========================================================================
854  /**
855   * Add a new root-procedure to the executor.
856   * @param proc the new procedure to execute.
857   * @return the procedure id, that can be used to monitor the operation
858   */
859  public long submitProcedure(Procedure<TEnvironment> proc) {
860    return submitProcedure(proc, null);
861  }
862
863  /**
864   * Bypass a procedure. If the procedure is set to bypass, all the logic in
865   * execute/rollback will be ignored and it will return success, whatever.
866   * It is used to recover buggy stuck procedures, releasing the lock resources
867   * and letting other procedures run. Bypassing one procedure (and its ancestors will
868   * be bypassed automatically) may leave the cluster in a middle state, e.g. region
869   * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures,
870   * the operators may have to do some clean up on hdfs or schedule some assign procedures
871   * to let region online. DO AT YOUR OWN RISK.
872   * <p>
873   * A procedure can be bypassed only if
874   * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT
875   * or it is a root procedure without any child.
876   * 2. No other worker thread is executing it
877   * 3. No child procedure has been submitted
878   *
879   * <p>
880   * If all the requirements are meet, the procedure and its ancestors will be
881   * bypassed and persisted to WAL.
882   *
883   * <p>
884   * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
885   * TODO: What about WAITING_TIMEOUT?
886   * @param pids the procedure id
887   * @param lockWait time to wait lock
888   * @param force if force set to true, we will bypass the procedure even if it is executing.
889   *              This is for procedures which can't break out during executing(due to bug, mostly)
890   *              In this case, bypassing the procedure is not enough, since it is already stuck
891   *              there. We need to restart the master after bypassing, and letting the problematic
892   *              procedure to execute wth bypass=true, so in that condition, the procedure can be
893   *              successfully bypassed.
894   * @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
895   * @return true if bypass success
896   * @throws IOException IOException
897   */
898  public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,
899      boolean recursive)
900      throws IOException {
901    List<Boolean> result = new ArrayList<Boolean>(pids.size());
902    for(long pid: pids) {
903      result.add(bypassProcedure(pid, lockWait, force, recursive));
904    }
905    return result;
906  }
907
908  boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)
909      throws IOException {
910    Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
911    final Procedure<TEnvironment> procedure = getProcedure(pid);
912    if (procedure == null) {
913      LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
914      return false;
915    }
916
917    LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}",
918        procedure, lockWait, override, recursive);
919    IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
920    if (lockEntry == null && !override) {
921      LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}",
922          lockWait, procedure, override);
923      return false;
924    } else if (lockEntry == null) {
925      LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}",
926          lockWait, procedure, override);
927    }
928    try {
929      // check whether the procedure is already finished
930      if (procedure.isFinished()) {
931        LOG.debug("{} is already finished, skipping bypass", procedure);
932        return false;
933      }
934
935      if (procedure.hasChildren()) {
936        if (recursive) {
937          // EXPENSIVE. Checks each live procedure of which there could be many!!!
938          // Is there another way to get children of a procedure?
939          LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());
940          this.procedures.forEachValue(1 /*Single-threaded*/,
941            // Transformer
942            v -> v.getParentProcId() == procedure.getProcId()? v: null,
943            // Consumer
944            v -> {
945              try {
946                bypassProcedure(v.getProcId(), lockWait, override, recursive);
947              } catch (IOException e) {
948                LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
949              }
950            });
951        } else {
952          LOG.debug("{} has children, skipping bypass", procedure);
953          return false;
954        }
955      }
956
957      // If the procedure has no parent or no child, we are safe to bypass it in whatever state
958      if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
959          && procedure.getState() != ProcedureState.WAITING
960          && procedure.getState() != ProcedureState.WAITING_TIMEOUT) {
961        LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
962                + "(with no parent), {}",
963            procedure);
964        // Question: how is the bypass done here?
965        return false;
966      }
967
968      // Now, the procedure is not finished, and no one can execute it since we take the lock now
969      // And we can be sure that its ancestor is not running too, since their child has not
970      // finished yet
971      Procedure<TEnvironment> current = procedure;
972      while (current != null) {
973        LOG.debug("Bypassing {}", current);
974        current.bypass(getEnvironment());
975        store.update(procedure);
976        long parentID = current.getParentProcId();
977        current = getProcedure(parentID);
978      }
979
980      //wake up waiting procedure, already checked there is no child
981      if (procedure.getState() == ProcedureState.WAITING) {
982        procedure.setState(ProcedureState.RUNNABLE);
983        store.update(procedure);
984      }
985
986      // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
987      // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
988      if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
989        LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
990        if (timeoutExecutor.remove(procedure)) {
991          LOG.debug("removed procedure {} from timeoutExecutor", procedure);
992          timeoutExecutor.executeTimedoutProcedure(procedure);
993        }
994      } else if (lockEntry != null) {
995        scheduler.addFront(procedure);
996        LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
997      } else {
998        // If we don't have the lock, we can't re-submit the queue,
999        // since it is already executing. To get rid of the stuck situation, we
1000        // need to restart the master. With the procedure set to bypass, the procedureExecutor
1001        // will bypass it and won't get stuck again.
1002        LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
1003            + "skipping add to queue",
1004          procedure);
1005      }
1006      return true;
1007
1008    } finally {
1009      if (lockEntry != null) {
1010        procExecutionLock.releaseLockEntry(lockEntry);
1011      }
1012    }
1013  }
1014
1015  /**
1016   * Add a new root-procedure to the executor.
1017   * @param proc the new procedure to execute.
1018   * @param nonceKey the registered unique identifier for this operation from the client or process.
1019   * @return the procedure id, that can be used to monitor the operation
1020   */
1021  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
1022      justification = "FindBugs is blind to the check-for-null")
1023  public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
1024    Preconditions.checkArgument(lastProcId.get() >= 0);
1025
1026    prepareProcedure(proc);
1027
1028    final Long currentProcId;
1029    if (nonceKey != null) {
1030      currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
1031      Preconditions.checkArgument(currentProcId != null,
1032        "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);
1033    } else {
1034      currentProcId = nextProcId();
1035    }
1036
1037    // Initialize the procedure
1038    proc.setNonceKey(nonceKey);
1039    proc.setProcId(currentProcId.longValue());
1040
1041    // Commit the transaction
1042    store.insert(proc, null);
1043    LOG.debug("Stored {}", proc);
1044
1045    // Add the procedure to the executor
1046    return pushProcedure(proc);
1047  }
1048
1049  /**
1050   * Add a set of new root-procedure to the executor.
1051   * @param procs the new procedures to execute.
1052   */
1053  // TODO: Do we need to take nonces here?
1054  public void submitProcedures(Procedure<TEnvironment>[] procs) {
1055    Preconditions.checkArgument(lastProcId.get() >= 0);
1056    if (procs == null || procs.length <= 0) {
1057      return;
1058    }
1059
1060    // Prepare procedure
1061    for (int i = 0; i < procs.length; ++i) {
1062      prepareProcedure(procs[i]).setProcId(nextProcId());
1063    }
1064
1065    // Commit the transaction
1066    store.insert(procs);
1067    if (LOG.isDebugEnabled()) {
1068      LOG.debug("Stored " + Arrays.toString(procs));
1069    }
1070
1071    // Add the procedure to the executor
1072    for (int i = 0; i < procs.length; ++i) {
1073      pushProcedure(procs[i]);
1074    }
1075  }
1076
1077  private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
1078    Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
1079    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
1080    if (this.checkOwnerSet) {
1081      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
1082    }
1083    return proc;
1084  }
1085
1086  private long pushProcedure(Procedure<TEnvironment> proc) {
1087    final long currentProcId = proc.getProcId();
1088
1089    // Update metrics on start of a procedure
1090    proc.updateMetricsOnSubmit(getEnvironment());
1091
1092    // Create the rollback stack for the procedure
1093    RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
1094    rollbackStack.put(currentProcId, stack);
1095
1096    // Submit the new subprocedures
1097    assert !procedures.containsKey(currentProcId);
1098    procedures.put(currentProcId, proc);
1099    sendProcedureAddedNotification(currentProcId);
1100    scheduler.addBack(proc);
1101    return proc.getProcId();
1102  }
1103
1104  /**
1105   * Send an abort notification the specified procedure.
1106   * Depending on the procedure implementation the abort can be considered or ignored.
1107   * @param procId the procedure to abort
1108   * @return true if the procedure exists and has received the abort, otherwise false.
1109   */
1110  public boolean abort(long procId) {
1111    return abort(procId, true);
1112  }
1113
1114  /**
1115   * Send an abort notification to the specified procedure.
1116   * Depending on the procedure implementation, the abort can be considered or ignored.
1117   * @param procId the procedure to abort
1118   * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
1119   * @return true if the procedure exists and has received the abort, otherwise false.
1120   */
1121  public boolean abort(long procId, boolean mayInterruptIfRunning) {
1122    Procedure<TEnvironment> proc = procedures.get(procId);
1123    if (proc != null) {
1124      if (!mayInterruptIfRunning && proc.wasExecuted()) {
1125        return false;
1126      }
1127      return proc.abort(getEnvironment());
1128    }
1129    return false;
1130  }
1131
1132  // ==========================================================================
1133  //  Executor query helpers
1134  // ==========================================================================
1135  public Procedure<TEnvironment> getProcedure(final long procId) {
1136    return procedures.get(procId);
1137  }
1138
1139  public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
1140    Procedure<TEnvironment> proc = getProcedure(procId);
1141    if (clazz.isInstance(proc)) {
1142      return clazz.cast(proc);
1143    }
1144    return null;
1145  }
1146
1147  public Procedure<TEnvironment> getResult(long procId) {
1148    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1149    if (retainer == null) {
1150      return null;
1151    } else {
1152      return retainer.getProcedure();
1153    }
1154  }
1155
1156  /**
1157   * Return true if the procedure is finished.
1158   * The state may be "completed successfully" or "failed and rolledback".
1159   * Use getResult() to check the state or get the result data.
1160   * @param procId the ID of the procedure to check
1161   * @return true if the procedure execution is finished, otherwise false.
1162   */
1163  public boolean isFinished(final long procId) {
1164    return !procedures.containsKey(procId);
1165  }
1166
1167  /**
1168   * Return true if the procedure is started.
1169   * @param procId the ID of the procedure to check
1170   * @return true if the procedure execution is started, otherwise false.
1171   */
1172  public boolean isStarted(long procId) {
1173    Procedure<?> proc = procedures.get(procId);
1174    if (proc == null) {
1175      return completed.get(procId) != null;
1176    }
1177    return proc.wasExecuted();
1178  }
1179
1180  /**
1181   * Mark the specified completed procedure, as ready to remove.
1182   * @param procId the ID of the procedure to remove
1183   */
1184  public void removeResult(long procId) {
1185    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1186    if (retainer == null) {
1187      assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
1188      LOG.debug("pid={} already removed by the cleaner.", procId);
1189      return;
1190    }
1191
1192    // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
1193    retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
1194  }
1195
1196  public Procedure<TEnvironment> getResultOrProcedure(long procId) {
1197    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1198    if (retainer == null) {
1199      return procedures.get(procId);
1200    } else {
1201      return retainer.getProcedure();
1202    }
1203  }
1204
1205  /**
1206   * Check if the user is this procedure's owner
1207   * @param procId the target procedure
1208   * @param user the user
1209   * @return true if the user is the owner of the procedure,
1210   *   false otherwise or the owner is unknown.
1211   */
1212  public boolean isProcedureOwner(long procId, User user) {
1213    if (user == null) {
1214      return false;
1215    }
1216    final Procedure<TEnvironment> runningProc = procedures.get(procId);
1217    if (runningProc != null) {
1218      return runningProc.getOwner().equals(user.getShortName());
1219    }
1220
1221    final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1222    if (retainer != null) {
1223      return retainer.getProcedure().getOwner().equals(user.getShortName());
1224    }
1225
1226    // Procedure either does not exist or has already completed and got cleaned up.
1227    // At this time, we cannot check the owner of the procedure
1228    return false;
1229  }
1230
1231  /**
1232   * Should only be used when starting up, where the procedure workers have not been started.
1233   * <p/>
1234   * If the procedure works has been started, the return values maybe changed when you are
1235   * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
1236   * it will do a copy, and also include the finished procedures.
1237   */
1238  public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
1239    return procedures.values();
1240  }
1241
1242  /**
1243   * Get procedures.
1244   * @return the procedures in a list
1245   */
1246  public List<Procedure<TEnvironment>> getProcedures() {
1247    List<Procedure<TEnvironment>> procedureList =
1248      new ArrayList<>(procedures.size() + completed.size());
1249    procedureList.addAll(procedures.values());
1250    // Note: The procedure could show up twice in the list with different state, as
1251    // it could complete after we walk through procedures list and insert into
1252    // procedureList - it is ok, as we will use the information in the Procedure
1253    // to figure it out; to prevent this would increase the complexity of the logic.
1254    completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
1255      .forEach(procedureList::add);
1256    return procedureList;
1257  }
1258
1259  // ==========================================================================
1260  //  Listeners helpers
1261  // ==========================================================================
1262  public void registerListener(ProcedureExecutorListener listener) {
1263    this.listeners.add(listener);
1264  }
1265
1266  public boolean unregisterListener(ProcedureExecutorListener listener) {
1267    return this.listeners.remove(listener);
1268  }
1269
1270  private void sendProcedureLoadedNotification(final long procId) {
1271    if (!this.listeners.isEmpty()) {
1272      for (ProcedureExecutorListener listener: this.listeners) {
1273        try {
1274          listener.procedureLoaded(procId);
1275        } catch (Throwable e) {
1276          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1277        }
1278      }
1279    }
1280  }
1281
1282  private void sendProcedureAddedNotification(final long procId) {
1283    if (!this.listeners.isEmpty()) {
1284      for (ProcedureExecutorListener listener: this.listeners) {
1285        try {
1286          listener.procedureAdded(procId);
1287        } catch (Throwable e) {
1288          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1289        }
1290      }
1291    }
1292  }
1293
1294  private void sendProcedureFinishedNotification(final long procId) {
1295    if (!this.listeners.isEmpty()) {
1296      for (ProcedureExecutorListener listener: this.listeners) {
1297        try {
1298          listener.procedureFinished(procId);
1299        } catch (Throwable e) {
1300          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1301        }
1302      }
1303    }
1304  }
1305
1306  // ==========================================================================
1307  //  Procedure IDs helpers
1308  // ==========================================================================
1309  private long nextProcId() {
1310    long procId = lastProcId.incrementAndGet();
1311    if (procId < 0) {
1312      while (!lastProcId.compareAndSet(procId, 0)) {
1313        procId = lastProcId.get();
1314        if (procId >= 0) {
1315          break;
1316        }
1317      }
1318      while (procedures.containsKey(procId)) {
1319        procId = lastProcId.incrementAndGet();
1320      }
1321    }
1322    assert procId >= 0 : "Invalid procId " + procId;
1323    return procId;
1324  }
1325
1326  @VisibleForTesting
1327  protected long getLastProcId() {
1328    return lastProcId.get();
1329  }
1330
1331  @VisibleForTesting
1332  public Set<Long> getActiveProcIds() {
1333    return procedures.keySet();
1334  }
1335
1336  Long getRootProcedureId(Procedure<TEnvironment> proc) {
1337    return Procedure.getRootProcedureId(procedures, proc);
1338  }
1339
1340  // ==========================================================================
1341  //  Executions
1342  // ==========================================================================
1343  private void executeProcedure(Procedure<TEnvironment> proc) {
1344    if (proc.isFinished()) {
1345      LOG.debug("{} is already finished, skipping execution", proc);
1346      return;
1347    }
1348    final Long rootProcId = getRootProcedureId(proc);
1349    if (rootProcId == null) {
1350      // The 'proc' was ready to run but the root procedure was rolledback
1351      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
1352      executeRollback(proc);
1353      return;
1354    }
1355
1356    RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
1357    if (procStack == null) {
1358      LOG.warn("RootProcedureState is null for " + proc.getProcId());
1359      return;
1360    }
1361    do {
1362      // Try to acquire the execution
1363      if (!procStack.acquire(proc)) {
1364        if (procStack.setRollback()) {
1365          // we have the 'rollback-lock' we can start rollingback
1366          switch (executeRollback(rootProcId, procStack)) {
1367            case LOCK_ACQUIRED:
1368              break;
1369            case LOCK_YIELD_WAIT:
1370              procStack.unsetRollback();
1371              scheduler.yield(proc);
1372              break;
1373            case LOCK_EVENT_WAIT:
1374              LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
1375              procStack.unsetRollback();
1376              break;
1377            default:
1378              throw new UnsupportedOperationException();
1379          }
1380        } else {
1381          // if we can't rollback means that some child is still running.
1382          // the rollback will be executed after all the children are done.
1383          // If the procedure was never executed, remove and mark it as rolledback.
1384          if (!proc.wasExecuted()) {
1385            switch (executeRollback(proc)) {
1386              case LOCK_ACQUIRED:
1387                break;
1388              case LOCK_YIELD_WAIT:
1389                scheduler.yield(proc);
1390                break;
1391              case LOCK_EVENT_WAIT:
1392                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
1393                break;
1394              default:
1395                throw new UnsupportedOperationException();
1396            }
1397          }
1398        }
1399        break;
1400      }
1401
1402      // Execute the procedure
1403      assert proc.getState() == ProcedureState.RUNNABLE : proc;
1404      // Note that lock is NOT about concurrency but rather about ensuring
1405      // ownership of a procedure of an entity such as a region or table
1406      LockState lockState = acquireLock(proc);
1407      switch (lockState) {
1408        case LOCK_ACQUIRED:
1409          execProcedure(procStack, proc);
1410          break;
1411        case LOCK_YIELD_WAIT:
1412          LOG.info(lockState + " " + proc);
1413          scheduler.yield(proc);
1414          break;
1415        case LOCK_EVENT_WAIT:
1416          // Someone will wake us up when the lock is available
1417          LOG.debug(lockState + " " + proc);
1418          break;
1419        default:
1420          throw new UnsupportedOperationException();
1421      }
1422      procStack.release(proc);
1423
1424      if (proc.isSuccess()) {
1425        // update metrics on finishing the procedure
1426        proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
1427        LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
1428        // Finalize the procedure state
1429        if (proc.getProcId() == rootProcId) {
1430          procedureFinished(proc);
1431        } else {
1432          execCompletionCleanup(proc);
1433        }
1434        break;
1435      }
1436    } while (procStack.isFailed());
1437  }
1438
1439  private LockState acquireLock(Procedure<TEnvironment> proc) {
1440    TEnvironment env = getEnvironment();
1441    // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
1442    // hasLock is true.
1443    if (proc.hasLock()) {
1444      return LockState.LOCK_ACQUIRED;
1445    }
1446    return proc.doAcquireLock(env, store);
1447  }
1448
1449  private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
1450    TEnvironment env = getEnvironment();
1451    // For how the framework works, we know that we will always have the lock
1452    // when we call releaseLock(), so we can avoid calling proc.hasLock()
1453    if (force || !proc.holdLock(env) || proc.isFinished()) {
1454      proc.doReleaseLock(env, store);
1455    }
1456  }
1457
1458  /**
1459   * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
1460   * root-procedure will be visible as finished to user, and the result will be the fatal exception.
1461   */
1462  private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
1463    Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
1464    RemoteProcedureException exception = rootProc.getException();
1465    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
1466    // rolling back because the subprocedure does. Clarify.
1467    if (exception == null) {
1468      exception = procStack.getException();
1469      rootProc.setFailure(exception);
1470      store.update(rootProc);
1471    }
1472
1473    List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
1474    assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
1475
1476    int stackTail = subprocStack.size();
1477    while (stackTail-- > 0) {
1478      Procedure<TEnvironment> proc = subprocStack.get(stackTail);
1479      IdLock.Entry lockEntry = null;
1480      // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
1481      // this check, as the worker will hold the lock before executing a procedure. This is the only
1482      // place where we may hold two procedure execution locks, and there is a fence in the
1483      // RootProcedureState where we can make sure that only one worker can execute the rollback of
1484      // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
1485      // prevent race between us and the force update thread.
1486      if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
1487        try {
1488          lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
1489        } catch (IOException e) {
1490          // can only happen if interrupted, so not a big deal to propagate it
1491          throw new UncheckedIOException(e);
1492        }
1493      }
1494      try {
1495        // For the sub procedures which are successfully finished, we do not rollback them.
1496        // Typically, if we want to rollback a procedure, we first need to rollback it, and then
1497        // recursively rollback its ancestors. The state changes which are done by sub procedures
1498        // should be handled by parent procedures when rolling back. For example, when rolling back
1499        // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
1500        // online, instead of rolling back the original procedures which offlined the regions(in
1501        // fact these procedures can not be rolled back...).
1502        if (proc.isSuccess()) {
1503          // Just do the cleanup work, without actually executing the rollback
1504          subprocStack.remove(stackTail);
1505          cleanupAfterRollbackOneStep(proc);
1506          continue;
1507        }
1508        LockState lockState = acquireLock(proc);
1509        if (lockState != LockState.LOCK_ACQUIRED) {
1510          // can't take a lock on the procedure, add the root-proc back on the
1511          // queue waiting for the lock availability
1512          return lockState;
1513        }
1514
1515        lockState = executeRollback(proc);
1516        releaseLock(proc, false);
1517        boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
1518        abortRollback |= !isRunning() || !store.isRunning();
1519
1520        // allows to kill the executor before something is stored to the wal.
1521        // useful to test the procedure recovery.
1522        if (abortRollback) {
1523          return lockState;
1524        }
1525
1526        subprocStack.remove(stackTail);
1527
1528        // if the procedure is kind enough to pass the slot to someone else, yield
1529        // if the proc is already finished, do not yield
1530        if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
1531          return LockState.LOCK_YIELD_WAIT;
1532        }
1533
1534        if (proc != rootProc) {
1535          execCompletionCleanup(proc);
1536        }
1537      } finally {
1538        if (lockEntry != null) {
1539          procExecutionLock.releaseLockEntry(lockEntry);
1540        }
1541      }
1542    }
1543
1544    // Finalize the procedure state
1545    LOG.info("Rolled back {} exec-time={}", rootProc,
1546      StringUtils.humanTimeDiff(rootProc.elapsedTime()));
1547    procedureFinished(rootProc);
1548    return LockState.LOCK_ACQUIRED;
1549  }
1550
1551  private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {
1552    if (proc.removeStackIndex()) {
1553      if (!proc.isSuccess()) {
1554        proc.setState(ProcedureState.ROLLEDBACK);
1555      }
1556
1557      // update metrics on finishing the procedure (fail)
1558      proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);
1559
1560      if (proc.hasParent()) {
1561        store.delete(proc.getProcId());
1562        procedures.remove(proc.getProcId());
1563      } else {
1564        final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
1565        if (childProcIds != null) {
1566          store.delete(proc, childProcIds);
1567        } else {
1568          store.update(proc);
1569        }
1570      }
1571    } else {
1572      store.update(proc);
1573    }
1574  }
1575
1576  /**
1577   * Execute the rollback of the procedure step.
1578   * It updates the store with the new state (stack index)
1579   * or will remove completly the procedure in case it is a child.
1580   */
1581  private LockState executeRollback(Procedure<TEnvironment> proc) {
1582    try {
1583      proc.doRollback(getEnvironment());
1584    } catch (IOException e) {
1585      LOG.debug("Roll back attempt failed for {}", proc, e);
1586      return LockState.LOCK_YIELD_WAIT;
1587    } catch (InterruptedException e) {
1588      handleInterruptedException(proc, e);
1589      return LockState.LOCK_YIELD_WAIT;
1590    } catch (Throwable e) {
1591      // Catch NullPointerExceptions or similar errors...
1592      LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);
1593    }
1594
1595    // allows to kill the executor before something is stored to the wal.
1596    // useful to test the procedure recovery.
1597    if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1598      String msg = "TESTING: Kill before store update";
1599      LOG.debug(msg);
1600      stop();
1601      throw new RuntimeException(msg);
1602    }
1603
1604    cleanupAfterRollbackOneStep(proc);
1605
1606    return LockState.LOCK_ACQUIRED;
1607  }
1608
1609  private void yieldProcedure(Procedure<TEnvironment> proc) {
1610    releaseLock(proc, false);
1611    scheduler.yield(proc);
1612  }
1613
1614  /**
1615   * Executes <code>procedure</code>
1616   * <ul>
1617   *  <li>Calls the doExecute() of the procedure
1618   *  <li>If the procedure execution didn't fail (i.e. valid user input)
1619   *  <ul>
1620   *    <li>...and returned subprocedures
1621   *    <ul><li>The subprocedures are initialized.
1622   *      <li>The subprocedures are added to the store
1623   *      <li>The subprocedures are added to the runnable queue
1624   *      <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
1625   *    </ul>
1626   *    </li>
1627   *   <li>...if there are no subprocedure
1628   *    <ul><li>the procedure completed successfully
1629   *      <li>if there is a parent (WAITING)
1630   *      <li>the parent state will be set to RUNNABLE
1631   *    </ul>
1632   *   </li>
1633   *  </ul>
1634   *  </li>
1635   *  <li>In case of failure
1636   *  <ul>
1637   *    <li>The store is updated with the new state</li>
1638   *    <li>The executor (caller of this method) will start the rollback of the procedure</li>
1639   *  </ul>
1640   *  </li>
1641   *  </ul>
1642   */
1643  private void execProcedure(RootProcedureState<TEnvironment> procStack,
1644      Procedure<TEnvironment> procedure) {
1645    Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
1646        "NOT RUNNABLE! " + procedure.toString());
1647
1648    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
1649    // The exception is caught below and then we hurry to the exit without disturbing state. The
1650    // idea is that the processing of this procedure will be unsuspended later by an external event
1651    // such the report of a region open.
1652    boolean suspended = false;
1653
1654    // Whether to 're-' -execute; run through the loop again.
1655    boolean reExecute = false;
1656
1657    Procedure<TEnvironment>[] subprocs = null;
1658    do {
1659      reExecute = false;
1660      procedure.resetPersistence();
1661      try {
1662        subprocs = procedure.doExecute(getEnvironment());
1663        if (subprocs != null && subprocs.length == 0) {
1664          subprocs = null;
1665        }
1666      } catch (ProcedureSuspendedException e) {
1667        LOG.trace("Suspend {}", procedure);
1668        suspended = true;
1669      } catch (ProcedureYieldException e) {
1670        LOG.trace("Yield {}", procedure, e);
1671        yieldProcedure(procedure);
1672        return;
1673      } catch (InterruptedException e) {
1674        LOG.trace("Yield interrupt {}", procedure, e);
1675        handleInterruptedException(procedure, e);
1676        yieldProcedure(procedure);
1677        return;
1678      } catch (Throwable e) {
1679        // Catch NullPointerExceptions or similar errors...
1680        String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
1681        LOG.error(msg, e);
1682        procedure.setFailure(new RemoteProcedureException(msg, e));
1683      }
1684
1685      if (!procedure.isFailed()) {
1686        if (subprocs != null) {
1687          if (subprocs.length == 1 && subprocs[0] == procedure) {
1688            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
1689            // i.e. we go around this loop again rather than go back out on the scheduler queue.
1690            subprocs = null;
1691            reExecute = true;
1692            LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
1693          } else {
1694            // Yield the current procedure, and make the subprocedure runnable
1695            // subprocs may come back 'null'.
1696            subprocs = initializeChildren(procStack, procedure, subprocs);
1697            LOG.info("Initialized subprocedures=" +
1698              (subprocs == null? null:
1699                Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
1700                collect(Collectors.toList()).toString()));
1701          }
1702        } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1703          LOG.trace("Added to timeoutExecutor {}", procedure);
1704          timeoutExecutor.add(procedure);
1705        } else if (!suspended) {
1706          // No subtask, so we are done
1707          procedure.setState(ProcedureState.SUCCESS);
1708        }
1709      }
1710
1711      // Add the procedure to the stack
1712      procStack.addRollbackStep(procedure);
1713
1714      // allows to kill the executor before something is stored to the wal.
1715      // useful to test the procedure recovery.
1716      if (testing != null &&
1717        testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())) {
1718        kill("TESTING: Kill BEFORE store update: " + procedure);
1719      }
1720
1721      // TODO: The code here doesn't check if store is running before persisting to the store as
1722      // it relies on the method call below to throw RuntimeException to wind up the stack and
1723      // executor thread to stop. The statement following the method call below seems to check if
1724      // store is not running, to prevent scheduling children procedures, re-execution or yield
1725      // of this procedure. This may need more scrutiny and subsequent cleanup in future
1726      //
1727      // Commit the transaction even if a suspend (state may have changed). Note this append
1728      // can take a bunch of time to complete.
1729      if (procedure.needPersistence()) {
1730        updateStoreOnExec(procStack, procedure, subprocs);
1731      }
1732
1733      // if the store is not running we are aborting
1734      if (!store.isRunning()) {
1735        return;
1736      }
1737      // if the procedure is kind enough to pass the slot to someone else, yield
1738      if (procedure.isRunnable() && !suspended &&
1739          procedure.isYieldAfterExecutionStep(getEnvironment())) {
1740        yieldProcedure(procedure);
1741        return;
1742      }
1743
1744      assert (reExecute && subprocs == null) || !reExecute;
1745    } while (reExecute);
1746
1747    // Allows to kill the executor after something is stored to the WAL but before the below
1748    // state settings are done -- in particular the one on the end where we make parent
1749    // RUNNABLE again when its children are done; see countDownChildren.
1750    if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
1751      kill("TESTING: Kill AFTER store update: " + procedure);
1752    }
1753
1754    // Submit the new subprocedures
1755    if (subprocs != null && !procedure.isFailed()) {
1756      submitChildrenProcedures(subprocs);
1757    }
1758
1759    // we need to log the release lock operation before waking up the parent procedure, as there
1760    // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
1761    // the sub procedures from store and cause problems...
1762    releaseLock(procedure, false);
1763
1764    // if the procedure is complete and has a parent, count down the children latch.
1765    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
1766    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
1767      countDownChildren(procStack, procedure);
1768    }
1769  }
1770
1771  private void kill(String msg) {
1772    LOG.debug(msg);
1773    stop();
1774    throw new RuntimeException(msg);
1775  }
1776
1777  private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
1778      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1779    assert subprocs != null : "expected subprocedures";
1780    final long rootProcId = getRootProcedureId(procedure);
1781    for (int i = 0; i < subprocs.length; ++i) {
1782      Procedure<TEnvironment> subproc = subprocs[i];
1783      if (subproc == null) {
1784        String msg = "subproc[" + i + "] is null, aborting the procedure";
1785        procedure.setFailure(new RemoteProcedureException(msg,
1786          new IllegalArgumentIOException(msg)));
1787        return null;
1788      }
1789
1790      assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
1791      subproc.setParentProcId(procedure.getProcId());
1792      subproc.setRootProcId(rootProcId);
1793      subproc.setProcId(nextProcId());
1794      procStack.addSubProcedure(subproc);
1795    }
1796
1797    if (!procedure.isFailed()) {
1798      procedure.setChildrenLatch(subprocs.length);
1799      switch (procedure.getState()) {
1800        case RUNNABLE:
1801          procedure.setState(ProcedureState.WAITING);
1802          break;
1803        case WAITING_TIMEOUT:
1804          timeoutExecutor.add(procedure);
1805          break;
1806        default:
1807          break;
1808      }
1809    }
1810    return subprocs;
1811  }
1812
1813  private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
1814    for (int i = 0; i < subprocs.length; ++i) {
1815      Procedure<TEnvironment> subproc = subprocs[i];
1816      subproc.updateMetricsOnSubmit(getEnvironment());
1817      assert !procedures.containsKey(subproc.getProcId());
1818      procedures.put(subproc.getProcId(), subproc);
1819      scheduler.addFront(subproc);
1820    }
1821  }
1822
1823  private void countDownChildren(RootProcedureState<TEnvironment> procStack,
1824      Procedure<TEnvironment> procedure) {
1825    Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
1826    if (parent == null) {
1827      assert procStack.isRollingback();
1828      return;
1829    }
1830
1831    // If this procedure is the last child awake the parent procedure
1832    if (parent.tryRunnable()) {
1833      // If we succeeded in making the parent runnable -- i.e. all of its
1834      // children have completed, move parent to front of the queue.
1835      store.update(parent);
1836      scheduler.addFront(parent);
1837      LOG.info("Finished subprocedure pid={}, resume processing parent {}",
1838          procedure.getProcId(), parent);
1839      return;
1840    }
1841  }
1842
1843  private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
1844      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1845    if (subprocs != null && !procedure.isFailed()) {
1846      if (LOG.isTraceEnabled()) {
1847        LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
1848      }
1849      store.insert(procedure, subprocs);
1850    } else {
1851      LOG.trace("Store update {}", procedure);
1852      if (procedure.isFinished() && !procedure.hasParent()) {
1853        // remove child procedures
1854        final long[] childProcIds = procStack.getSubprocedureIds();
1855        if (childProcIds != null) {
1856          store.delete(procedure, childProcIds);
1857          for (int i = 0; i < childProcIds.length; ++i) {
1858            procedures.remove(childProcIds[i]);
1859          }
1860        } else {
1861          store.update(procedure);
1862        }
1863      } else {
1864        store.update(procedure);
1865      }
1866    }
1867  }
1868
1869  private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
1870    LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
1871    // NOTE: We don't call Thread.currentThread().interrupt()
1872    // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
1873    // the InterruptedException. If the master is going down, we will be notified
1874    // and the executor/store will be stopped.
1875    // (The interrupted procedure will be retried on the next run)
1876  }
1877
1878  private void execCompletionCleanup(Procedure<TEnvironment> proc) {
1879    final TEnvironment env = getEnvironment();
1880    if (proc.hasLock()) {
1881      LOG.warn("Usually this should not happen, we will release the lock before if the procedure" +
1882        " is finished, even if the holdLock is true, arrive here means we have some holes where" +
1883        " we do not release the lock. And the releaseLock below may fail since the procedure may" +
1884        " have already been deleted from the procedure store.");
1885      releaseLock(proc, true);
1886    }
1887    try {
1888      proc.completionCleanup(env);
1889    } catch (Throwable e) {
1890      // Catch NullPointerExceptions or similar errors...
1891      LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1892    }
1893  }
1894
1895  private void procedureFinished(Procedure<TEnvironment> proc) {
1896    // call the procedure completion cleanup handler
1897    execCompletionCleanup(proc);
1898
1899    CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
1900
1901    // update the executor internal state maps
1902    if (!proc.shouldWaitClientAck(getEnvironment())) {
1903      retainer.setClientAckTime(0);
1904    }
1905
1906    completed.put(proc.getProcId(), retainer);
1907    rollbackStack.remove(proc.getProcId());
1908    procedures.remove(proc.getProcId());
1909
1910    // call the runnableSet completion cleanup handler
1911    try {
1912      scheduler.completionCleanup(proc);
1913    } catch (Throwable e) {
1914      // Catch NullPointerExceptions or similar errors...
1915      LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
1916    }
1917
1918    // Notify the listeners
1919    sendProcedureFinishedNotification(proc.getProcId());
1920  }
1921
1922  RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
1923    return rollbackStack.get(rootProcId);
1924  }
1925
1926  @VisibleForTesting
1927  ProcedureScheduler getProcedureScheduler() {
1928    return scheduler;
1929  }
1930
1931  @VisibleForTesting
1932  int getCompletedSize() {
1933    return completed.size();
1934  }
1935
1936  @VisibleForTesting
1937  public IdLock getProcExecutionLock() {
1938    return procExecutionLock;
1939  }
1940
1941  // ==========================================================================
1942  //  Worker Thread
1943  // ==========================================================================
1944  private class WorkerThread extends StoppableThread {
1945    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
1946    private volatile Procedure<TEnvironment> activeProcedure;
1947
1948    public WorkerThread(ThreadGroup group) {
1949      this(group, "PEWorker-");
1950    }
1951
1952    protected WorkerThread(ThreadGroup group, String prefix) {
1953      super(group, prefix + workerId.incrementAndGet());
1954      setDaemon(true);
1955    }
1956
1957    @Override
1958    public void sendStopSignal() {
1959      scheduler.signalAll();
1960    }
1961    @Override
1962    public void run() {
1963      long lastUpdate = EnvironmentEdgeManager.currentTime();
1964      try {
1965        while (isRunning() && keepAlive(lastUpdate)) {
1966          @SuppressWarnings("unchecked")
1967          Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
1968          if (proc == null) {
1969            continue;
1970          }
1971          this.activeProcedure = proc;
1972          int activeCount = activeExecutorCount.incrementAndGet();
1973          int runningCount = store.setRunningProcedureCount(activeCount);
1974          LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
1975            runningCount, activeCount);
1976          executionStartTime.set(EnvironmentEdgeManager.currentTime());
1977          IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
1978          try {
1979            executeProcedure(proc);
1980          } catch (AssertionError e) {
1981            LOG.info("ASSERT pid=" + proc.getProcId(), e);
1982            throw e;
1983          } finally {
1984            procExecutionLock.releaseLockEntry(lockEntry);
1985            activeCount = activeExecutorCount.decrementAndGet();
1986            runningCount = store.setRunningProcedureCount(activeCount);
1987            LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
1988              runningCount, activeCount);
1989            this.activeProcedure = null;
1990            lastUpdate = EnvironmentEdgeManager.currentTime();
1991            executionStartTime.set(Long.MAX_VALUE);
1992          }
1993        }
1994      } catch (Throwable t) {
1995        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
1996      } finally {
1997        LOG.trace("Worker terminated.");
1998      }
1999      workerThreads.remove(this);
2000    }
2001
2002    @Override
2003    public String toString() {
2004      Procedure<?> p = this.activeProcedure;
2005      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");
2006    }
2007
2008    /**
2009     * @return the time since the current procedure is running
2010     */
2011    public long getCurrentRunTime() {
2012      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
2013    }
2014
2015    // core worker never timeout
2016    protected boolean keepAlive(long lastUpdate) {
2017      return true;
2018    }
2019  }
2020
2021  // A worker thread which can be added when core workers are stuck. Will timeout after
2022  // keepAliveTime if there is no procedure to run.
2023  private final class KeepAliveWorkerThread extends WorkerThread {
2024    public KeepAliveWorkerThread(ThreadGroup group) {
2025      super(group, "KeepAlivePEWorker-");
2026    }
2027
2028    @Override
2029    protected boolean keepAlive(long lastUpdate) {
2030      return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
2031    }
2032  }
2033
2034  // ----------------------------------------------------------------------------
2035  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the
2036  // full set of procedures pending and completed to write a compacted
2037  // version of the log (in case is a log)?
2038  // In theory no, procedures are have a short life, so at some point the store
2039  // will have the tracker saying everything is in the last log.
2040  // ----------------------------------------------------------------------------
2041
2042  private final class WorkerMonitor extends InlineChore {
2043    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
2044        "hbase.procedure.worker.monitor.interval.msec";
2045    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
2046
2047    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
2048        "hbase.procedure.worker.stuck.threshold.msec";
2049    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
2050
2051    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =
2052        "hbase.procedure.worker.add.stuck.percentage";
2053    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck
2054
2055    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
2056    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
2057    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;
2058
2059    public WorkerMonitor() {
2060      refreshConfig();
2061    }
2062
2063    @Override
2064    public void run() {
2065      final int stuckCount = checkForStuckWorkers();
2066      checkThreadCount(stuckCount);
2067
2068      // refresh interval (poor man dynamic conf update)
2069      refreshConfig();
2070    }
2071
2072    private int checkForStuckWorkers() {
2073      // check if any of the worker is stuck
2074      int stuckCount = 0;
2075      for (WorkerThread worker : workerThreads) {
2076        if (worker.getCurrentRunTime() < stuckThreshold) {
2077          continue;
2078        }
2079
2080        // WARN the worker is stuck
2081        stuckCount++;
2082        LOG.warn("Worker stuck {}, run time {}", worker,
2083          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
2084      }
2085      return stuckCount;
2086    }
2087
2088    private void checkThreadCount(final int stuckCount) {
2089      // nothing to do if there are no runnable tasks
2090      if (stuckCount < 1 || !scheduler.hasRunnables()) {
2091        return;
2092      }
2093
2094      // add a new thread if the worker stuck percentage exceed the threshold limit
2095      // and every handler is active.
2096      final float stuckPerc = ((float) stuckCount) / workerThreads.size();
2097      // let's add new worker thread more aggressively, as they will timeout finally if there is no
2098      // work to do.
2099      if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
2100        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
2101        workerThreads.add(worker);
2102        worker.start();
2103        LOG.debug("Added new worker thread {}", worker);
2104      }
2105    }
2106
2107    private void refreshConfig() {
2108      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,
2109          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);
2110      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,
2111        DEFAULT_WORKER_MONITOR_INTERVAL);
2112      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,
2113        DEFAULT_WORKER_STUCK_THRESHOLD);
2114    }
2115
2116    @Override
2117    public int getTimeoutInterval() {
2118      return timeoutInterval;
2119    }
2120  }
2121}