001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import edu.umd.cs.findbugs.annotations.Nullable;
021import java.io.IOException;
022import java.io.UncheckedIOException;
023import java.util.ArrayDeque;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.Deque;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.CopyOnWriteArrayList;
033import java.util.concurrent.Executor;
034import java.util.concurrent.Executors;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicInteger;
038import java.util.concurrent.atomic.AtomicLong;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
044import org.apache.hadoop.hbase.log.HBaseMarkers;
045import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
049import org.apache.hadoop.hbase.procedure2.trace.ProcedureSpanBuilder;
050import org.apache.hadoop.hbase.procedure2.util.StringUtils;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.trace.TraceUtil;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.IdLock;
055import org.apache.hadoop.hbase.util.NonceKey;
056import org.apache.hadoop.hbase.util.Threads;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
062import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
063
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
065
066/**
067 * Thread Pool that executes the submitted procedures. The executor has a ProcedureStore associated.
068 * Each operation is logged and on restart the pending procedures are resumed. Unless the Procedure
069 * code throws an error (e.g. invalid user input) the procedure will complete (at some point in
070 * time), On restart the pending procedures are resumed and the once failed will be rolledback. The
071 * user can add procedures to the executor via submitProcedure(proc) check for the finished state
072 * via isFinished(procId) and get the result via getResult(procId)
073 */
074@InterfaceAudience.Private
075public class ProcedureExecutor<TEnvironment> {
076  private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
077
078  public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
079  private static final boolean DEFAULT_CHECK_OWNER_SET = false;
080
081  public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
082    "hbase.procedure.worker.keep.alive.time.msec";
083  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
084
085  public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
086  static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
087
088  public static final String EVICT_ACKED_TTL_CONF_KEY = "hbase.procedure.cleaner.acked.evict.ttl";
089  static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
090
091  /**
092   * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to break PE
093   * having it fail at various junctures. When non-null, testing is set to an instance of the below
094   * internal {@link Testing} class with flags set for the particular test.
095   */
096  volatile Testing testing = null;
097
098  /**
099   * Class with parameters describing how to fail/die when in testing-context.
100   */
101  public static class Testing {
102    protected volatile boolean killIfHasParent = true;
103    protected volatile boolean killIfSuspended = false;
104
105    /**
106     * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is
107     * persisting all the state it needs to recover after a crash.
108     */
109    protected volatile boolean killBeforeStoreUpdate = false;
110    protected volatile boolean toggleKillBeforeStoreUpdate = false;
111
112    /**
113     * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978
114     * is about a case where memory-state was being set after store to WAL where a crash could cause
115     * us to get stuck. This flag allows killing at what was a vulnerable time.
116     */
117    protected volatile boolean killAfterStoreUpdate = false;
118    protected volatile boolean toggleKillAfterStoreUpdate = false;
119
120    protected boolean shouldKillBeforeStoreUpdate() {
121      final boolean kill = this.killBeforeStoreUpdate;
122      if (this.toggleKillBeforeStoreUpdate) {
123        this.killBeforeStoreUpdate = !kill;
124        LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
125      }
126      return kill;
127    }
128
129    protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) {
130      if (isSuspended && !killIfSuspended) {
131        return false;
132      }
133      if (hasParent && !killIfHasParent) {
134        return false;
135      }
136      return shouldKillBeforeStoreUpdate();
137    }
138
139    protected boolean shouldKillAfterStoreUpdate() {
140      final boolean kill = this.killAfterStoreUpdate;
141      if (this.toggleKillAfterStoreUpdate) {
142        this.killAfterStoreUpdate = !kill;
143        LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);
144      }
145      return kill;
146    }
147
148    protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {
149      return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();
150    }
151  }
152
153  public interface ProcedureExecutorListener {
154    void procedureLoaded(long procId);
155
156    void procedureAdded(long procId);
157
158    void procedureFinished(long procId);
159  }
160
161  /**
162   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. Once a
163   * Root-Procedure completes (success or failure), the result will be added to this map. The user
164   * of ProcedureExecutor should call getResult(procId) to get the result.
165   */
166  private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
167    new ConcurrentHashMap<>();
168
169  /**
170   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
171   * The RootProcedureState contains the execution stack of the Root-Procedure, It is added to the
172   * map by submitProcedure() and removed on procedure completion.
173   */
174  private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
175    new ConcurrentHashMap<>();
176
177  /**
178   * Helper map to lookup the live procedures by ID. This map contains every procedure.
179   * root-procedures and subprocedures.
180   */
181  private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
182    new ConcurrentHashMap<>();
183
184  /**
185   * Helper map to lookup whether the procedure already issued from the same client. This map
186   * contains every root procedure.
187   */
188  private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
189
190  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
191    new CopyOnWriteArrayList<>();
192
193  private Configuration conf;
194
195  /**
196   * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
197   * resource handling rather than observing in a #join is unexpected). Overridden when we do the
198   * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).
199   */
200  private ThreadGroup threadGroup;
201
202  /**
203   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
204   * resource handling rather than observing in a #join is unexpected). Overridden when we do the
205   * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).
206   */
207  private CopyOnWriteArrayList<WorkerThread> workerThreads;
208
209  /**
210   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
211   * resource handling rather than observing in a #join is unexpected). Overridden when we do the
212   * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).
213   */
214  private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
215
216  /**
217   * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
218   * there is no worker to assign meta, it will new worker thread for it, so it is very important.
219   * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore and
220   * so on, some tasks may execute for a long time so will block other tasks like WorkerMonitor, so
221   * use a dedicated thread for executing WorkerMonitor.
222   */
223  private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;
224
225  private int corePoolSize;
226  private int maxPoolSize;
227
228  private volatile long keepAliveTime;
229
230  /**
231   * Scheduler/Queue that contains runnable procedures.
232   */
233  private final ProcedureScheduler scheduler;
234
235  private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
236    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
237
238  private final AtomicLong lastProcId = new AtomicLong(-1);
239  private final AtomicLong workerId = new AtomicLong(0);
240  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
241  private final AtomicBoolean running = new AtomicBoolean(false);
242  private final TEnvironment environment;
243  private final ProcedureStore store;
244
245  private final boolean checkOwnerSet;
246
247  // To prevent concurrent execution of the same procedure.
248  // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
249  // procedure is woken up before we finish the suspend which causes the same procedures to be
250  // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
251  // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
252  // execution of the same procedure.
253  private final IdLock procExecutionLock = new IdLock();
254
255  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
256    final ProcedureStore store) {
257    this(conf, environment, store, new SimpleProcedureScheduler());
258  }
259
260  private boolean isRootFinished(Procedure<?> proc) {
261    Procedure<?> rootProc = procedures.get(proc.getRootProcId());
262    return rootProc == null || rootProc.isFinished();
263  }
264
265  private void forceUpdateProcedure(long procId) throws IOException {
266    IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
267    try {
268      Procedure<TEnvironment> proc = procedures.get(procId);
269      if (proc != null) {
270        if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
271          LOG.debug("Procedure {} has already been finished and parent is succeeded,"
272            + " skip force updating", proc);
273          return;
274        }
275      } else {
276        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
277        if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
278          LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
279          return;
280        }
281        long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
282        long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
283        if (retainer.isExpired(EnvironmentEdgeManager.currentTime(), evictTtl, evictAckTtl)) {
284          LOG.debug("Procedure {} has already been finished and expired, skip force updating",
285            procId);
286          return;
287        }
288        proc = retainer.getProcedure();
289      }
290      LOG.debug("Force update procedure {}", proc);
291      store.update(proc);
292    } finally {
293      procExecutionLock.releaseLockEntry(lockEntry);
294    }
295  }
296
297  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
298    final ProcedureStore store, final ProcedureScheduler scheduler) {
299    this.environment = environment;
300    this.scheduler = scheduler;
301    this.store = store;
302    this.conf = conf;
303    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
304    refreshConfiguration(conf);
305    store.registerListener(new ProcedureStoreListener() {
306
307      @Override
308      public void forceUpdate(long[] procIds) {
309        Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
310          try {
311            forceUpdateProcedure(procId);
312          } catch (IOException e) {
313            LOG.warn("Failed to force update procedure with pid={}", procId);
314          }
315        }));
316      }
317    });
318  }
319
320  private void load(final boolean abortOnCorruption) throws IOException {
321    Preconditions.checkArgument(completed.isEmpty(), "completed not empty");
322    Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty");
323    Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty");
324    Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty");
325
326    store.load(new ProcedureStore.ProcedureLoader() {
327      @Override
328      public void setMaxProcId(long maxProcId) {
329        assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
330        lastProcId.set(maxProcId);
331      }
332
333      @Override
334      public void load(ProcedureIterator procIter) throws IOException {
335        loadProcedures(procIter, abortOnCorruption);
336      }
337
338      @Override
339      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
340        int corruptedCount = 0;
341        while (procIter.hasNext()) {
342          Procedure<?> proc = procIter.next();
343          LOG.error("Corrupt " + proc);
344          corruptedCount++;
345        }
346        if (abortOnCorruption && corruptedCount > 0) {
347          throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
348        }
349      }
350    });
351  }
352
353  private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
354    proc.restoreLock(getEnvironment());
355    restored.add(proc.getProcId());
356  }
357
358  private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
359    while (!stack.isEmpty()) {
360      restoreLock(stack.pop(), restored);
361    }
362  }
363
364  // Restore the locks for all the procedures.
365  // Notice that we need to restore the locks starting from the root proc, otherwise there will be
366  // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
367  // calling the acquireLock method for the parent procedure.
368  // The algorithm is straight-forward:
369  // 1. Use a set to record the procedures which locks have already been restored.
370  // 2. Use a stack to store the hierarchy of the procedures
371  // 3. For all the procedure, we will first try to find its parent and push it into the stack,
372  // unless
373  // a. We have no parent, i.e, we are the root procedure
374  // b. The lock has already been restored(by checking the set introduced in #1)
375  // then we start to pop the stack and call acquireLock for each procedure.
376  // Notice that this should be done for all procedures, not only the ones in runnableList.
377  private void restoreLocks() {
378    Set<Long> restored = new HashSet<>();
379    Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
380    procedures.values().forEach(proc -> {
381      for (;;) {
382        if (restored.contains(proc.getProcId())) {
383          restoreLocks(stack, restored);
384          return;
385        }
386        if (!proc.hasParent()) {
387          restoreLock(proc, restored);
388          restoreLocks(stack, restored);
389          return;
390        }
391        stack.push(proc);
392        proc = procedures.get(proc.getParentProcId());
393      }
394    });
395  }
396
397  private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
398    throws IOException {
399    // 1. Build the rollback stack
400    int runnableCount = 0;
401    int failedCount = 0;
402    int waitingCount = 0;
403    int waitingTimeoutCount = 0;
404    while (procIter.hasNext()) {
405      boolean finished = procIter.isNextFinished();
406      @SuppressWarnings("unchecked")
407      Procedure<TEnvironment> proc = procIter.next();
408      NonceKey nonceKey = proc.getNonceKey();
409      long procId = proc.getProcId();
410
411      if (finished) {
412        completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
413        LOG.debug("Completed {}", proc);
414      } else {
415        if (!proc.hasParent()) {
416          assert !proc.isFinished() : "unexpected finished procedure";
417          rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
418        }
419
420        // add the procedure to the map
421        proc.beforeReplay(getEnvironment());
422        procedures.put(proc.getProcId(), proc);
423        switch (proc.getState()) {
424          case RUNNABLE:
425            runnableCount++;
426            break;
427          case FAILED:
428            failedCount++;
429            break;
430          case WAITING:
431            waitingCount++;
432            break;
433          case WAITING_TIMEOUT:
434            waitingTimeoutCount++;
435            break;
436          default:
437            break;
438        }
439      }
440
441      if (nonceKey != null) {
442        nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map
443      }
444    }
445
446    // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
447    // push it into the ProcedureScheduler directly to execute the rollback. But this does not work
448    // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
449    // the queue from runQueue in scheduler, and then when a procedure which has lock access, for
450    // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
451    // we will add the queue back to let the workers poll from it. The assumption here is that, the
452    // procedure which has the xlock should have been polled out already, so when loading we can not
453    // add the procedure to scheduler first and then call acquireLock, since the procedure is still
454    // in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
455    // then there is a dead lock
456    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
457    List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
458    List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
459    List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
460    procIter.reset();
461    while (procIter.hasNext()) {
462      if (procIter.isNextFinished()) {
463        procIter.skipNext();
464        continue;
465      }
466
467      @SuppressWarnings("unchecked")
468      Procedure<TEnvironment> proc = procIter.next();
469      assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
470      LOG.debug("Loading {}", proc);
471      Long rootProcId = getRootProcedureId(proc);
472      // The orphan procedures will be passed to handleCorrupted, so add an assert here
473      assert rootProcId != null;
474
475      if (proc.hasParent()) {
476        Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
477        if (parent != null && !proc.isFinished()) {
478          parent.incChildrenLatch();
479        }
480      }
481
482      RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
483      procStack.loadStack(proc);
484
485      proc.setRootProcId(rootProcId);
486      switch (proc.getState()) {
487        case RUNNABLE:
488          runnableList.add(proc);
489          break;
490        case WAITING:
491          waitingList.add(proc);
492          break;
493        case WAITING_TIMEOUT:
494          waitingTimeoutList.add(proc);
495          break;
496        case FAILED:
497          failedList.add(proc);
498          break;
499        case ROLLEDBACK:
500        case INITIALIZING:
501          String msg = "Unexpected " + proc.getState() + " state for " + proc;
502          LOG.error(msg);
503          throw new UnsupportedOperationException(msg);
504        default:
505          break;
506      }
507    }
508
509    // 3. Check the waiting procedures to see if some of them can be added to runnable.
510    waitingList.forEach(proc -> {
511      if (!proc.hasChildren()) {
512        // Normally, WAITING procedures should be waken by its children. But, there is a case that,
513        // all the children are successful and before they can wake up their parent procedure, the
514        // master was killed. So, during recovering the procedures from ProcedureWal, its children
515        // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
516        // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
517        // exception will throw:
518        // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
519        // "NOT RUNNABLE! " + procedure.toString());
520        proc.setState(ProcedureState.RUNNABLE);
521        runnableList.add(proc);
522      } else {
523        proc.afterReplay(getEnvironment());
524      }
525    });
526    // 4. restore locks
527    restoreLocks();
528
529    // 5. Push the procedures to the timeout executor
530    waitingTimeoutList.forEach(proc -> {
531      proc.afterReplay(getEnvironment());
532      timeoutExecutor.add(proc);
533    });
534
535    // 6. Push the procedure to the scheduler
536    failedList.forEach(scheduler::addBack);
537    runnableList.forEach(p -> {
538      p.afterReplay(getEnvironment());
539      if (!p.hasParent()) {
540        sendProcedureLoadedNotification(p.getProcId());
541      }
542      scheduler.addBack(p);
543    });
544    // After all procedures put into the queue, signal the worker threads.
545    // Otherwise, there is a race condition. See HBASE-21364.
546    scheduler.signalAll();
547  }
548
549  /**
550   * Initialize the procedure executor, but do not start workers. We will start them later.
551   * <p/>
552   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
553   * ensure a single executor, and start the procedure replay to resume and recover the previous
554   * pending and in-progress procedures.
555   * @param numThreads        number of threads available for procedure execution.
556   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
557   *                          is found on replay. otherwise false.
558   */
559  public void init(int numThreads, boolean abortOnCorruption) throws IOException {
560    // We have numThreads executor + one timer thread used for timing out
561    // procedures and triggering periodic procedures.
562    this.corePoolSize = numThreads;
563    this.maxPoolSize = 10 * numThreads;
564    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
565      corePoolSize, maxPoolSize);
566
567    this.threadGroup = new ThreadGroup("PEWorkerGroup");
568    this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
569    this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");
570
571    // Create the workers
572    workerId.set(0);
573    workerThreads = new CopyOnWriteArrayList<>();
574    for (int i = 0; i < corePoolSize; ++i) {
575      workerThreads.add(new WorkerThread(threadGroup));
576    }
577
578    long st, et;
579
580    // Acquire the store lease.
581    st = System.nanoTime();
582    store.recoverLease();
583    et = System.nanoTime();
584    LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
585      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
586
587    // start the procedure scheduler
588    scheduler.start();
589
590    // TODO: Split in two steps.
591    // TODO: Handle corrupted procedures (currently just a warn)
592    // The first one will make sure that we have the latest id,
593    // so we can start the threads and accept new procedures.
594    // The second step will do the actual load of old procedures.
595    st = System.nanoTime();
596    load(abortOnCorruption);
597    et = System.nanoTime();
598    LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
599      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
600  }
601
602  /**
603   * Start the workers.
604   */
605  public void startWorkers() throws IOException {
606    if (!running.compareAndSet(false, true)) {
607      LOG.warn("Already running");
608      return;
609    }
610    // Start the executors. Here we must have the lastProcId set.
611    LOG.trace("Start workers {}", workerThreads.size());
612    timeoutExecutor.start();
613    workerMonitorExecutor.start();
614    for (WorkerThread worker : workerThreads) {
615      worker.start();
616    }
617
618    // Internal chores
619    workerMonitorExecutor.add(new WorkerMonitor());
620
621    // Add completed cleaner chore
622    addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
623      nonceKeysToProcIdsMap));
624  }
625
626  public void stop() {
627    if (!running.getAndSet(false)) {
628      return;
629    }
630
631    LOG.info("Stopping");
632    scheduler.stop();
633    timeoutExecutor.sendStopSignal();
634    workerMonitorExecutor.sendStopSignal();
635  }
636
637  public void join() {
638    assert !isRunning() : "expected not running";
639
640    // stop the timeout executor
641    timeoutExecutor.awaitTermination();
642    // stop the work monitor executor
643    workerMonitorExecutor.awaitTermination();
644
645    // stop the worker threads
646    for (WorkerThread worker : workerThreads) {
647      worker.awaitTermination();
648    }
649
650    // Destroy the Thread Group for the executors
651    // TODO: Fix. #join is not place to destroy resources.
652    try {
653      threadGroup.destroy();
654    } catch (IllegalThreadStateException e) {
655      LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup,
656        e.getMessage());
657      // This dumps list of threads on STDOUT.
658      this.threadGroup.list();
659    }
660
661    // reset the in-memory state for testing
662    completed.clear();
663    rollbackStack.clear();
664    procedures.clear();
665    nonceKeysToProcIdsMap.clear();
666    scheduler.clear();
667    lastProcId.set(-1);
668  }
669
670  public void refreshConfiguration(final Configuration conf) {
671    this.conf = conf;
672    setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, DEFAULT_WORKER_KEEP_ALIVE_TIME),
673      TimeUnit.MILLISECONDS);
674  }
675
676  // ==========================================================================
677  // Accessors
678  // ==========================================================================
679  public boolean isRunning() {
680    return running.get();
681  }
682
683  /**
684   * @return the current number of worker threads.
685   */
686  public int getWorkerThreadCount() {
687    return workerThreads.size();
688  }
689
690  /**
691   * @return the core pool size settings.
692   */
693  public int getCorePoolSize() {
694    return corePoolSize;
695  }
696
697  public int getActiveExecutorCount() {
698    return activeExecutorCount.get();
699  }
700
701  public TEnvironment getEnvironment() {
702    return this.environment;
703  }
704
705  public ProcedureStore getStore() {
706    return this.store;
707  }
708
709  ProcedureScheduler getScheduler() {
710    return scheduler;
711  }
712
713  public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
714    this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
715    this.scheduler.signalAll();
716  }
717
718  public long getKeepAliveTime(final TimeUnit timeUnit) {
719    return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
720  }
721
722  // ==========================================================================
723  // Submit/Remove Chores
724  // ==========================================================================
725
726  /**
727   * Add a chore procedure to the executor
728   * @param chore the chore to add
729   */
730  public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {
731    if (chore == null) {
732      return;
733    }
734    chore.setState(ProcedureState.WAITING_TIMEOUT);
735    timeoutExecutor.add(chore);
736  }
737
738  /**
739   * Remove a chore procedure from the executor
740   * @param chore the chore to remove
741   * @return whether the chore is removed, or it will be removed later
742   */
743  public boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {
744    if (chore == null) {
745      return true;
746    }
747    chore.setState(ProcedureState.SUCCESS);
748    return timeoutExecutor.remove(chore);
749  }
750
751  // ==========================================================================
752  // Nonce Procedure helpers
753  // ==========================================================================
754  /**
755   * Create a NonceKey from the specified nonceGroup and nonce.
756   * @param nonceGroup the group to use for the {@link NonceKey}
757   * @param nonce      the nonce to use in the {@link NonceKey}
758   * @return the generated NonceKey
759   */
760  public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
761    return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
762  }
763
764  /**
765   * Register a nonce for a procedure that is going to be submitted. A procId will be reserved and
766   * on submitProcedure(), the procedure with the specified nonce will take the reserved ProcId. If
767   * someone already reserved the nonce, this method will return the procId reserved, otherwise an
768   * invalid procId will be returned. and the caller should procede and submit the procedure.
769   * @param nonceKey A unique identifier for this operation from the client or process.
770   * @return the procId associated with the nonce, if any otherwise an invalid procId.
771   */
772  public long registerNonce(final NonceKey nonceKey) {
773    if (nonceKey == null) {
774      return -1;
775    }
776
777    // check if we have already a Reserved ID for the nonce
778    Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
779    if (oldProcId == null) {
780      // reserve a new Procedure ID, this will be associated with the nonce
781      // and the procedure submitted with the specified nonce will use this ID.
782      final long newProcId = nextProcId();
783      oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
784      if (oldProcId == null) {
785        return -1;
786      }
787    }
788
789    // we found a registered nonce, but the procedure may not have been submitted yet.
790    // since the client expect the procedure to be submitted, spin here until it is.
791    final boolean traceEnabled = LOG.isTraceEnabled();
792    while (
793      isRunning() && !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId))
794        && nonceKeysToProcIdsMap.containsKey(nonceKey)
795    ) {
796      if (traceEnabled) {
797        LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
798      }
799      Threads.sleep(100);
800    }
801    return oldProcId.longValue();
802  }
803
804  /**
805   * Remove the NonceKey if the procedure was not submitted to the executor.
806   * @param nonceKey A unique identifier for this operation from the client or process.
807   */
808  public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
809    if (nonceKey == null) {
810      return;
811    }
812
813    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
814    if (procId == null) {
815      return;
816    }
817
818    // if the procedure was not submitted, remove the nonce
819    if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
820      nonceKeysToProcIdsMap.remove(nonceKey);
821    }
822  }
823
824  /**
825   * If the failure failed before submitting it, we may want to give back the same error to the
826   * requests with the same nonceKey.
827   * @param nonceKey  A unique identifier for this operation from the client or process
828   * @param procName  name of the procedure, used to inform the user
829   * @param procOwner name of the owner of the procedure, used to inform the user
830   * @param exception the failure to report to the user
831   */
832  public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
833    IOException exception) {
834    if (nonceKey == null) {
835      return;
836    }
837
838    Long procId = nonceKeysToProcIdsMap.get(nonceKey);
839    if (procId == null || completed.containsKey(procId)) {
840      return;
841    }
842
843    completed.computeIfAbsent(procId, (key) -> {
844      Procedure<TEnvironment> proc =
845        new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
846
847      return new CompletedProcedureRetainer<>(proc);
848    });
849  }
850
851  // ==========================================================================
852  // Submit/Abort Procedure
853  // ==========================================================================
854  /**
855   * Add a new root-procedure to the executor.
856   * @param proc the new procedure to execute.
857   * @return the procedure id, that can be used to monitor the operation
858   */
859  public long submitProcedure(Procedure<TEnvironment> proc) {
860    return submitProcedure(proc, null);
861  }
862
863  /**
864   * Bypass a procedure. If the procedure is set to bypass, all the logic in execute/rollback will
865   * be ignored and it will return success, whatever. It is used to recover buggy stuck procedures,
866   * releasing the lock resources and letting other procedures run. Bypassing one procedure (and its
867   * ancestors will be bypassed automatically) may leave the cluster in a middle state, e.g. region
868   * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, the
869   * operators may have to do some clean up on hdfs or schedule some assign procedures to let region
870   * online. DO AT YOUR OWN RISK.
871   * <p>
872   * A procedure can be bypassed only if 1. The procedure is in state of RUNNABLE, WAITING,
873   * WAITING_TIMEOUT or it is a root procedure without any child. 2. No other worker thread is
874   * executing it 3. No child procedure has been submitted
875   * <p>
876   * If all the requirements are meet, the procedure and its ancestors will be bypassed and
877   * persisted to WAL.
878   * <p>
879   * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. TODO: What
880   * about WAITING_TIMEOUT?
881   * @param pids      the procedure id
882   * @param lockWait  time to wait lock
883   * @param force     if force set to true, we will bypass the procedure even if it is executing.
884   *                  This is for procedures which can't break out during executing(due to bug,
885   *                  mostly) In this case, bypassing the procedure is not enough, since it is
886   *                  already stuck there. We need to restart the master after bypassing, and
887   *                  letting the problematic procedure to execute wth bypass=true, so in that
888   *                  condition, the procedure can be successfully bypassed.
889   * @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
890   * @return true if bypass success
891   * @throws IOException IOException
892   */
893  public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,
894    boolean recursive) throws IOException {
895    List<Boolean> result = new ArrayList<Boolean>(pids.size());
896    for (long pid : pids) {
897      result.add(bypassProcedure(pid, lockWait, force, recursive));
898    }
899    return result;
900  }
901
902  boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)
903    throws IOException {
904    Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
905    final Procedure<TEnvironment> procedure = getProcedure(pid);
906    if (procedure == null) {
907      LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
908      return false;
909    }
910
911    LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", procedure, lockWait,
912      override, recursive);
913    IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
914    if (lockEntry == null && !override) {
915      LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", lockWait,
916        procedure, override);
917      return false;
918    } else if (lockEntry == null) {
919      LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", lockWait,
920        procedure, override);
921    }
922    try {
923      // check whether the procedure is already finished
924      if (procedure.isFinished()) {
925        LOG.debug("{} is already finished, skipping bypass", procedure);
926        return false;
927      }
928
929      if (procedure.hasChildren()) {
930        if (recursive) {
931          // EXPENSIVE. Checks each live procedure of which there could be many!!!
932          // Is there another way to get children of a procedure?
933          LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());
934          this.procedures.forEachValue(1 /* Single-threaded */,
935            // Transformer
936            v -> v.getParentProcId() == procedure.getProcId() ? v : null,
937            // Consumer
938            v -> {
939              try {
940                bypassProcedure(v.getProcId(), lockWait, override, recursive);
941              } catch (IOException e) {
942                LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
943              }
944            });
945        } else {
946          LOG.debug("{} has children, skipping bypass", procedure);
947          return false;
948        }
949      }
950
951      // If the procedure has no parent or no child, we are safe to bypass it in whatever state
952      if (
953        procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
954          && procedure.getState() != ProcedureState.WAITING
955          && procedure.getState() != ProcedureState.WAITING_TIMEOUT
956      ) {
957        LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
958          + "(with no parent), {}", procedure);
959        // Question: how is the bypass done here?
960        return false;
961      }
962
963      // Now, the procedure is not finished, and no one can execute it since we take the lock now
964      // And we can be sure that its ancestor is not running too, since their child has not
965      // finished yet
966      Procedure<TEnvironment> current = procedure;
967      while (current != null) {
968        LOG.debug("Bypassing {}", current);
969        current.bypass(getEnvironment());
970        store.update(current);
971        long parentID = current.getParentProcId();
972        current = getProcedure(parentID);
973      }
974
975      // wake up waiting procedure, already checked there is no child
976      if (procedure.getState() == ProcedureState.WAITING) {
977        procedure.setState(ProcedureState.RUNNABLE);
978        store.update(procedure);
979      }
980
981      // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
982      // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
983      if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
984        LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
985        if (timeoutExecutor.remove(procedure)) {
986          LOG.debug("removed procedure {} from timeoutExecutor", procedure);
987          timeoutExecutor.executeTimedoutProcedure(procedure);
988        }
989      } else if (lockEntry != null) {
990        scheduler.addFront(procedure);
991        LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
992      } else {
993        // If we don't have the lock, we can't re-submit the queue,
994        // since it is already executing. To get rid of the stuck situation, we
995        // need to restart the master. With the procedure set to bypass, the procedureExecutor
996        // will bypass it and won't get stuck again.
997        LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
998          + "skipping add to queue", procedure);
999      }
1000      return true;
1001
1002    } finally {
1003      if (lockEntry != null) {
1004        procExecutionLock.releaseLockEntry(lockEntry);
1005      }
1006    }
1007  }
1008
1009  /**
1010   * Add a new root-procedure to the executor.
1011   * @param proc     the new procedure to execute.
1012   * @param nonceKey the registered unique identifier for this operation from the client or process.
1013   * @return the procedure id, that can be used to monitor the operation
1014   */
1015  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
1016      justification = "FindBugs is blind to the check-for-null")
1017  public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
1018    Preconditions.checkArgument(lastProcId.get() >= 0);
1019
1020    prepareProcedure(proc);
1021
1022    final Long currentProcId;
1023    if (nonceKey != null) {
1024      currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
1025      Preconditions.checkArgument(currentProcId != null,
1026        "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);
1027    } else {
1028      currentProcId = nextProcId();
1029    }
1030
1031    // Initialize the procedure
1032    proc.setNonceKey(nonceKey);
1033    proc.setProcId(currentProcId.longValue());
1034
1035    // Commit the transaction
1036    store.insert(proc, null);
1037    LOG.debug("Stored {}", proc);
1038
1039    // Add the procedure to the executor
1040    return pushProcedure(proc);
1041  }
1042
1043  /**
1044   * Add a set of new root-procedure to the executor.
1045   * @param procs the new procedures to execute.
1046   */
1047  // TODO: Do we need to take nonces here?
1048  public void submitProcedures(Procedure<TEnvironment>[] procs) {
1049    Preconditions.checkArgument(lastProcId.get() >= 0);
1050    if (procs == null || procs.length <= 0) {
1051      return;
1052    }
1053
1054    // Prepare procedure
1055    for (int i = 0; i < procs.length; ++i) {
1056      prepareProcedure(procs[i]).setProcId(nextProcId());
1057    }
1058
1059    // Commit the transaction
1060    store.insert(procs);
1061    if (LOG.isDebugEnabled()) {
1062      LOG.debug("Stored " + Arrays.toString(procs));
1063    }
1064
1065    // Add the procedure to the executor
1066    for (int i = 0; i < procs.length; ++i) {
1067      pushProcedure(procs[i]);
1068    }
1069  }
1070
1071  private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
1072    Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
1073    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
1074    if (this.checkOwnerSet) {
1075      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
1076    }
1077    return proc;
1078  }
1079
1080  private long pushProcedure(Procedure<TEnvironment> proc) {
1081    final long currentProcId = proc.getProcId();
1082
1083    // Update metrics on start of a procedure
1084    proc.updateMetricsOnSubmit(getEnvironment());
1085
1086    // Create the rollback stack for the procedure
1087    RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
1088    rollbackStack.put(currentProcId, stack);
1089
1090    // Submit the new subprocedures
1091    assert !procedures.containsKey(currentProcId);
1092    procedures.put(currentProcId, proc);
1093    sendProcedureAddedNotification(currentProcId);
1094    scheduler.addBack(proc);
1095    return proc.getProcId();
1096  }
1097
1098  /**
1099   * Send an abort notification the specified procedure. Depending on the procedure implementation
1100   * the abort can be considered or ignored.
1101   * @param procId the procedure to abort
1102   * @return true if the procedure exists and has received the abort, otherwise false.
1103   */
1104  public boolean abort(long procId) {
1105    return abort(procId, true);
1106  }
1107
1108  /**
1109   * Send an abort notification to the specified procedure. Depending on the procedure
1110   * implementation, the abort can be considered or ignored.
1111   * @param procId                the procedure to abort
1112   * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
1113   * @return true if the procedure exists and has received the abort, otherwise false.
1114   */
1115  public boolean abort(long procId, boolean mayInterruptIfRunning) {
1116    Procedure<TEnvironment> proc = procedures.get(procId);
1117    if (proc != null) {
1118      if (!mayInterruptIfRunning && proc.wasExecuted()) {
1119        return false;
1120      }
1121      return proc.abort(getEnvironment());
1122    }
1123    return false;
1124  }
1125
1126  // ==========================================================================
1127  // Executor query helpers
1128  // ==========================================================================
1129  public Procedure<TEnvironment> getProcedure(final long procId) {
1130    return procedures.get(procId);
1131  }
1132
1133  public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
1134    Procedure<TEnvironment> proc = getProcedure(procId);
1135    if (clazz.isInstance(proc)) {
1136      return clazz.cast(proc);
1137    }
1138    return null;
1139  }
1140
1141  public Procedure<TEnvironment> getResult(long procId) {
1142    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1143    if (retainer == null) {
1144      return null;
1145    } else {
1146      return retainer.getProcedure();
1147    }
1148  }
1149
1150  /**
1151   * Return true if the procedure is finished. The state may be "completed successfully" or "failed
1152   * and rolledback". Use getResult() to check the state or get the result data.
1153   * @param procId the ID of the procedure to check
1154   * @return true if the procedure execution is finished, otherwise false.
1155   */
1156  public boolean isFinished(final long procId) {
1157    return !procedures.containsKey(procId);
1158  }
1159
1160  /**
1161   * Return true if the procedure is started.
1162   * @param procId the ID of the procedure to check
1163   * @return true if the procedure execution is started, otherwise false.
1164   */
1165  public boolean isStarted(long procId) {
1166    Procedure<?> proc = procedures.get(procId);
1167    if (proc == null) {
1168      return completed.get(procId) != null;
1169    }
1170    return proc.wasExecuted();
1171  }
1172
1173  /**
1174   * Mark the specified completed procedure, as ready to remove.
1175   * @param procId the ID of the procedure to remove
1176   */
1177  public void removeResult(long procId) {
1178    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1179    if (retainer == null) {
1180      assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
1181      LOG.debug("pid={} already removed by the cleaner.", procId);
1182      return;
1183    }
1184
1185    // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
1186    retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
1187  }
1188
1189  public Procedure<TEnvironment> getResultOrProcedure(long procId) {
1190    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1191    if (retainer == null) {
1192      return procedures.get(procId);
1193    } else {
1194      return retainer.getProcedure();
1195    }
1196  }
1197
1198  /**
1199   * Check if the user is this procedure's owner
1200   * @param procId the target procedure
1201   * @param user   the user
1202   * @return true if the user is the owner of the procedure, false otherwise or the owner is
1203   *         unknown.
1204   */
1205  public boolean isProcedureOwner(long procId, User user) {
1206    if (user == null) {
1207      return false;
1208    }
1209    final Procedure<TEnvironment> runningProc = procedures.get(procId);
1210    if (runningProc != null) {
1211      return runningProc.getOwner().equals(user.getShortName());
1212    }
1213
1214    final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1215    if (retainer != null) {
1216      return retainer.getProcedure().getOwner().equals(user.getShortName());
1217    }
1218
1219    // Procedure either does not exist or has already completed and got cleaned up.
1220    // At this time, we cannot check the owner of the procedure
1221    return false;
1222  }
1223
1224  /**
1225   * Should only be used when starting up, where the procedure workers have not been started.
1226   * <p/>
1227   * If the procedure works has been started, the return values maybe changed when you are
1228   * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
1229   * it will do a copy, and also include the finished procedures.
1230   */
1231  public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
1232    return procedures.values();
1233  }
1234
1235  /**
1236   * Get procedures.
1237   * @return the procedures in a list
1238   */
1239  public List<Procedure<TEnvironment>> getProcedures() {
1240    List<Procedure<TEnvironment>> procedureList =
1241      new ArrayList<>(procedures.size() + completed.size());
1242    procedureList.addAll(procedures.values());
1243    // Note: The procedure could show up twice in the list with different state, as
1244    // it could complete after we walk through procedures list and insert into
1245    // procedureList - it is ok, as we will use the information in the Procedure
1246    // to figure it out; to prevent this would increase the complexity of the logic.
1247    completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
1248      .forEach(procedureList::add);
1249    return procedureList;
1250  }
1251
1252  // ==========================================================================
1253  // Listeners helpers
1254  // ==========================================================================
1255  public void registerListener(ProcedureExecutorListener listener) {
1256    this.listeners.add(listener);
1257  }
1258
1259  public boolean unregisterListener(ProcedureExecutorListener listener) {
1260    return this.listeners.remove(listener);
1261  }
1262
1263  private void sendProcedureLoadedNotification(final long procId) {
1264    if (!this.listeners.isEmpty()) {
1265      for (ProcedureExecutorListener listener : this.listeners) {
1266        try {
1267          listener.procedureLoaded(procId);
1268        } catch (Throwable e) {
1269          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1270        }
1271      }
1272    }
1273  }
1274
1275  private void sendProcedureAddedNotification(final long procId) {
1276    if (!this.listeners.isEmpty()) {
1277      for (ProcedureExecutorListener listener : this.listeners) {
1278        try {
1279          listener.procedureAdded(procId);
1280        } catch (Throwable e) {
1281          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1282        }
1283      }
1284    }
1285  }
1286
1287  private void sendProcedureFinishedNotification(final long procId) {
1288    if (!this.listeners.isEmpty()) {
1289      for (ProcedureExecutorListener listener : this.listeners) {
1290        try {
1291          listener.procedureFinished(procId);
1292        } catch (Throwable e) {
1293          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1294        }
1295      }
1296    }
1297  }
1298
1299  // ==========================================================================
1300  // Procedure IDs helpers
1301  // ==========================================================================
1302  private long nextProcId() {
1303    long procId = lastProcId.incrementAndGet();
1304    if (procId < 0) {
1305      while (!lastProcId.compareAndSet(procId, 0)) {
1306        procId = lastProcId.get();
1307        if (procId >= 0) {
1308          break;
1309        }
1310      }
1311      while (procedures.containsKey(procId)) {
1312        procId = lastProcId.incrementAndGet();
1313      }
1314    }
1315    assert procId >= 0 : "Invalid procId " + procId;
1316    return procId;
1317  }
1318
1319  protected long getLastProcId() {
1320    return lastProcId.get();
1321  }
1322
1323  public Set<Long> getActiveProcIds() {
1324    return procedures.keySet();
1325  }
1326
1327  Long getRootProcedureId(Procedure<TEnvironment> proc) {
1328    return Procedure.getRootProcedureId(procedures, proc);
1329  }
1330
1331  // ==========================================================================
1332  // Executions
1333  // ==========================================================================
1334  private void executeProcedure(Procedure<TEnvironment> proc) {
1335    if (proc.isFinished()) {
1336      LOG.debug("{} is already finished, skipping execution", proc);
1337      return;
1338    }
1339    final Long rootProcId = getRootProcedureId(proc);
1340    if (rootProcId == null) {
1341      // The 'proc' was ready to run but the root procedure was rolledback
1342      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
1343      executeRollback(proc);
1344      return;
1345    }
1346
1347    RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
1348    if (procStack == null) {
1349      LOG.warn("RootProcedureState is null for " + proc.getProcId());
1350      return;
1351    }
1352    do {
1353      // Try to acquire the execution
1354      if (!procStack.acquire(proc)) {
1355        if (procStack.setRollback()) {
1356          // we have the 'rollback-lock' we can start rollingback
1357          switch (executeRollback(rootProcId, procStack)) {
1358            case LOCK_ACQUIRED:
1359              break;
1360            case LOCK_YIELD_WAIT:
1361              procStack.unsetRollback();
1362              scheduler.yield(proc);
1363              break;
1364            case LOCK_EVENT_WAIT:
1365              LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
1366              procStack.unsetRollback();
1367              break;
1368            default:
1369              throw new UnsupportedOperationException();
1370          }
1371        } else {
1372          // if we can't rollback means that some child is still running.
1373          // the rollback will be executed after all the children are done.
1374          // If the procedure was never executed, remove and mark it as rolledback.
1375          if (!proc.wasExecuted()) {
1376            switch (executeRollback(proc)) {
1377              case LOCK_ACQUIRED:
1378                break;
1379              case LOCK_YIELD_WAIT:
1380                scheduler.yield(proc);
1381                break;
1382              case LOCK_EVENT_WAIT:
1383                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
1384                break;
1385              default:
1386                throw new UnsupportedOperationException();
1387            }
1388          }
1389        }
1390        break;
1391      }
1392
1393      // Execute the procedure
1394      assert proc.getState() == ProcedureState.RUNNABLE : proc;
1395      // Note that lock is NOT about concurrency but rather about ensuring
1396      // ownership of a procedure of an entity such as a region or table
1397      LockState lockState = acquireLock(proc);
1398      switch (lockState) {
1399        case LOCK_ACQUIRED:
1400          execProcedure(procStack, proc);
1401          break;
1402        case LOCK_YIELD_WAIT:
1403          LOG.info(lockState + " " + proc);
1404          scheduler.yield(proc);
1405          break;
1406        case LOCK_EVENT_WAIT:
1407          // Someone will wake us up when the lock is available
1408          LOG.debug(lockState + " " + proc);
1409          break;
1410        default:
1411          throw new UnsupportedOperationException();
1412      }
1413      procStack.release(proc);
1414
1415      if (proc.isSuccess()) {
1416        // update metrics on finishing the procedure
1417        proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
1418        LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
1419        // Finalize the procedure state
1420        if (proc.getProcId() == rootProcId) {
1421          procedureFinished(proc);
1422        } else {
1423          execCompletionCleanup(proc);
1424        }
1425        break;
1426      }
1427    } while (procStack.isFailed());
1428  }
1429
1430  private LockState acquireLock(Procedure<TEnvironment> proc) {
1431    TEnvironment env = getEnvironment();
1432    // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
1433    // hasLock is true.
1434    if (proc.hasLock()) {
1435      return LockState.LOCK_ACQUIRED;
1436    }
1437    return proc.doAcquireLock(env, store);
1438  }
1439
1440  private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
1441    TEnvironment env = getEnvironment();
1442    // For how the framework works, we know that we will always have the lock
1443    // when we call releaseLock(), so we can avoid calling proc.hasLock()
1444    if (force || !proc.holdLock(env) || proc.isFinished()) {
1445      proc.doReleaseLock(env, store);
1446    }
1447  }
1448
1449  /**
1450   * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
1451   * root-procedure will be visible as finished to user, and the result will be the fatal exception.
1452   */
1453  private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
1454    Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
1455    RemoteProcedureException exception = rootProc.getException();
1456    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
1457    // rolling back because the subprocedure does. Clarify.
1458    if (exception == null) {
1459      exception = procStack.getException();
1460      rootProc.setFailure(exception);
1461      store.update(rootProc);
1462    }
1463
1464    List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
1465    assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
1466
1467    int stackTail = subprocStack.size();
1468    while (stackTail-- > 0) {
1469      Procedure<TEnvironment> proc = subprocStack.get(stackTail);
1470      IdLock.Entry lockEntry = null;
1471      // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
1472      // this check, as the worker will hold the lock before executing a procedure. This is the only
1473      // place where we may hold two procedure execution locks, and there is a fence in the
1474      // RootProcedureState where we can make sure that only one worker can execute the rollback of
1475      // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
1476      // prevent race between us and the force update thread.
1477      if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
1478        try {
1479          lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
1480        } catch (IOException e) {
1481          // can only happen if interrupted, so not a big deal to propagate it
1482          throw new UncheckedIOException(e);
1483        }
1484      }
1485      try {
1486        // For the sub procedures which are successfully finished, we do not rollback them.
1487        // Typically, if we want to rollback a procedure, we first need to rollback it, and then
1488        // recursively rollback its ancestors. The state changes which are done by sub procedures
1489        // should be handled by parent procedures when rolling back. For example, when rolling back
1490        // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
1491        // online, instead of rolling back the original procedures which offlined the regions(in
1492        // fact these procedures can not be rolled back...).
1493        if (proc.isSuccess()) {
1494          // Just do the cleanup work, without actually executing the rollback
1495          subprocStack.remove(stackTail);
1496          cleanupAfterRollbackOneStep(proc);
1497          continue;
1498        }
1499        LockState lockState = acquireLock(proc);
1500        if (lockState != LockState.LOCK_ACQUIRED) {
1501          // can't take a lock on the procedure, add the root-proc back on the
1502          // queue waiting for the lock availability
1503          return lockState;
1504        }
1505
1506        lockState = executeRollback(proc);
1507        releaseLock(proc, false);
1508        boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
1509        abortRollback |= !isRunning() || !store.isRunning();
1510
1511        // allows to kill the executor before something is stored to the wal.
1512        // useful to test the procedure recovery.
1513        if (abortRollback) {
1514          return lockState;
1515        }
1516
1517        subprocStack.remove(stackTail);
1518
1519        // if the procedure is kind enough to pass the slot to someone else, yield
1520        // if the proc is already finished, do not yield
1521        if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
1522          return LockState.LOCK_YIELD_WAIT;
1523        }
1524
1525        if (proc != rootProc) {
1526          execCompletionCleanup(proc);
1527        }
1528      } finally {
1529        if (lockEntry != null) {
1530          procExecutionLock.releaseLockEntry(lockEntry);
1531        }
1532      }
1533    }
1534
1535    // Finalize the procedure state
1536    LOG.info("Rolled back {} exec-time={}", rootProc,
1537      StringUtils.humanTimeDiff(rootProc.elapsedTime()));
1538    procedureFinished(rootProc);
1539    return LockState.LOCK_ACQUIRED;
1540  }
1541
1542  private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {
1543    if (proc.removeStackIndex()) {
1544      if (!proc.isSuccess()) {
1545        proc.setState(ProcedureState.ROLLEDBACK);
1546      }
1547
1548      // update metrics on finishing the procedure (fail)
1549      proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);
1550
1551      if (proc.hasParent()) {
1552        store.delete(proc.getProcId());
1553        procedures.remove(proc.getProcId());
1554      } else {
1555        final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
1556        if (childProcIds != null) {
1557          store.delete(proc, childProcIds);
1558        } else {
1559          store.update(proc);
1560        }
1561      }
1562    } else {
1563      store.update(proc);
1564    }
1565  }
1566
1567  /**
1568   * Execute the rollback of the procedure step. It updates the store with the new state (stack
1569   * index) or will remove completly the procedure in case it is a child.
1570   */
1571  private LockState executeRollback(Procedure<TEnvironment> proc) {
1572    try {
1573      proc.doRollback(getEnvironment());
1574    } catch (IOException e) {
1575      LOG.debug("Roll back attempt failed for {}", proc, e);
1576      return LockState.LOCK_YIELD_WAIT;
1577    } catch (InterruptedException e) {
1578      handleInterruptedException(proc, e);
1579      return LockState.LOCK_YIELD_WAIT;
1580    } catch (Throwable e) {
1581      // Catch NullPointerExceptions or similar errors...
1582      LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);
1583    }
1584
1585    // allows to kill the executor before something is stored to the wal.
1586    // useful to test the procedure recovery.
1587    if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1588      String msg = "TESTING: Kill before store update";
1589      LOG.debug(msg);
1590      stop();
1591      throw new RuntimeException(msg);
1592    }
1593
1594    cleanupAfterRollbackOneStep(proc);
1595
1596    return LockState.LOCK_ACQUIRED;
1597  }
1598
1599  private void yieldProcedure(Procedure<TEnvironment> proc) {
1600    releaseLock(proc, false);
1601    scheduler.yield(proc);
1602  }
1603
1604  /**
1605   * Executes <code>procedure</code>
1606   * <ul>
1607   * <li>Calls the doExecute() of the procedure
1608   * <li>If the procedure execution didn't fail (i.e. valid user input)
1609   * <ul>
1610   * <li>...and returned subprocedures
1611   * <ul>
1612   * <li>The subprocedures are initialized.
1613   * <li>The subprocedures are added to the store
1614   * <li>The subprocedures are added to the runnable queue
1615   * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
1616   * </ul>
1617   * </li>
1618   * <li>...if there are no subprocedure
1619   * <ul>
1620   * <li>the procedure completed successfully
1621   * <li>if there is a parent (WAITING)
1622   * <li>the parent state will be set to RUNNABLE
1623   * </ul>
1624   * </li>
1625   * </ul>
1626   * </li>
1627   * <li>In case of failure
1628   * <ul>
1629   * <li>The store is updated with the new state</li>
1630   * <li>The executor (caller of this method) will start the rollback of the procedure</li>
1631   * </ul>
1632   * </li>
1633   * </ul>
1634   */
1635  private void execProcedure(RootProcedureState<TEnvironment> procStack,
1636    Procedure<TEnvironment> procedure) {
1637    Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
1638      "NOT RUNNABLE! " + procedure.toString());
1639
1640    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
1641    // The exception is caught below and then we hurry to the exit without disturbing state. The
1642    // idea is that the processing of this procedure will be unsuspended later by an external event
1643    // such the report of a region open.
1644    boolean suspended = false;
1645
1646    // Whether to 're-' -execute; run through the loop again.
1647    boolean reExecute = false;
1648
1649    Procedure<TEnvironment>[] subprocs = null;
1650    do {
1651      reExecute = false;
1652      procedure.resetPersistence();
1653      try {
1654        subprocs = procedure.doExecute(getEnvironment());
1655        if (subprocs != null && subprocs.length == 0) {
1656          subprocs = null;
1657        }
1658      } catch (ProcedureSuspendedException e) {
1659        LOG.trace("Suspend {}", procedure);
1660        suspended = true;
1661      } catch (ProcedureYieldException e) {
1662        LOG.trace("Yield {}", procedure, e);
1663        yieldProcedure(procedure);
1664        return;
1665      } catch (InterruptedException e) {
1666        LOG.trace("Yield interrupt {}", procedure, e);
1667        handleInterruptedException(procedure, e);
1668        yieldProcedure(procedure);
1669        return;
1670      } catch (Throwable e) {
1671        // Catch NullPointerExceptions or similar errors...
1672        String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
1673        LOG.error(msg, e);
1674        procedure.setFailure(new RemoteProcedureException(msg, e));
1675      }
1676
1677      if (!procedure.isFailed()) {
1678        if (subprocs != null) {
1679          if (subprocs.length == 1 && subprocs[0] == procedure) {
1680            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
1681            // i.e. we go around this loop again rather than go back out on the scheduler queue.
1682            subprocs = null;
1683            reExecute = true;
1684            LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
1685          } else {
1686            // Yield the current procedure, and make the subprocedure runnable
1687            // subprocs may come back 'null'.
1688            subprocs = initializeChildren(procStack, procedure, subprocs);
1689            LOG.info("Initialized subprocedures=" + (subprocs == null
1690              ? null
1691              : Stream.of(subprocs).map(e -> "{" + e.toString() + "}").collect(Collectors.toList())
1692                .toString()));
1693          }
1694        } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1695          LOG.trace("Added to timeoutExecutor {}", procedure);
1696          timeoutExecutor.add(procedure);
1697        } else if (!suspended) {
1698          // No subtask, so we are done
1699          procedure.setState(ProcedureState.SUCCESS);
1700        }
1701      }
1702
1703      // Add the procedure to the stack
1704      procStack.addRollbackStep(procedure);
1705
1706      // allows to kill the executor before something is stored to the wal.
1707      // useful to test the procedure recovery.
1708      if (
1709        testing != null && testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())
1710      ) {
1711        kill("TESTING: Kill BEFORE store update: " + procedure);
1712      }
1713
1714      // TODO: The code here doesn't check if store is running before persisting to the store as
1715      // it relies on the method call below to throw RuntimeException to wind up the stack and
1716      // executor thread to stop. The statement following the method call below seems to check if
1717      // store is not running, to prevent scheduling children procedures, re-execution or yield
1718      // of this procedure. This may need more scrutiny and subsequent cleanup in future
1719      //
1720      // Commit the transaction even if a suspend (state may have changed). Note this append
1721      // can take a bunch of time to complete.
1722      if (procedure.needPersistence()) {
1723        updateStoreOnExec(procStack, procedure, subprocs);
1724      }
1725
1726      // if the store is not running we are aborting
1727      if (!store.isRunning()) {
1728        return;
1729      }
1730      // if the procedure is kind enough to pass the slot to someone else, yield
1731      if (
1732        procedure.isRunnable() && !suspended
1733          && procedure.isYieldAfterExecutionStep(getEnvironment())
1734      ) {
1735        yieldProcedure(procedure);
1736        return;
1737      }
1738
1739      assert (reExecute && subprocs == null) || !reExecute;
1740    } while (reExecute);
1741
1742    // Allows to kill the executor after something is stored to the WAL but before the below
1743    // state settings are done -- in particular the one on the end where we make parent
1744    // RUNNABLE again when its children are done; see countDownChildren.
1745    if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
1746      kill("TESTING: Kill AFTER store update: " + procedure);
1747    }
1748
1749    // Submit the new subprocedures
1750    if (subprocs != null && !procedure.isFailed()) {
1751      submitChildrenProcedures(subprocs);
1752    }
1753
1754    // we need to log the release lock operation before waking up the parent procedure, as there
1755    // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
1756    // the sub procedures from store and cause problems...
1757    releaseLock(procedure, false);
1758
1759    // if the procedure is complete and has a parent, count down the children latch.
1760    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
1761    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
1762      countDownChildren(procStack, procedure);
1763    }
1764  }
1765
1766  private void kill(String msg) {
1767    LOG.debug(msg);
1768    stop();
1769    throw new RuntimeException(msg);
1770  }
1771
1772  private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
1773    Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1774    assert subprocs != null : "expected subprocedures";
1775    final long rootProcId = getRootProcedureId(procedure);
1776    for (int i = 0; i < subprocs.length; ++i) {
1777      Procedure<TEnvironment> subproc = subprocs[i];
1778      if (subproc == null) {
1779        String msg = "subproc[" + i + "] is null, aborting the procedure";
1780        procedure
1781          .setFailure(new RemoteProcedureException(msg, new IllegalArgumentIOException(msg)));
1782        return null;
1783      }
1784
1785      assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
1786      subproc.setParentProcId(procedure.getProcId());
1787      subproc.setRootProcId(rootProcId);
1788      subproc.setProcId(nextProcId());
1789      procStack.addSubProcedure(subproc);
1790    }
1791
1792    if (!procedure.isFailed()) {
1793      procedure.setChildrenLatch(subprocs.length);
1794      switch (procedure.getState()) {
1795        case RUNNABLE:
1796          procedure.setState(ProcedureState.WAITING);
1797          break;
1798        case WAITING_TIMEOUT:
1799          timeoutExecutor.add(procedure);
1800          break;
1801        default:
1802          break;
1803      }
1804    }
1805    return subprocs;
1806  }
1807
1808  private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
1809    for (int i = 0; i < subprocs.length; ++i) {
1810      Procedure<TEnvironment> subproc = subprocs[i];
1811      subproc.updateMetricsOnSubmit(getEnvironment());
1812      assert !procedures.containsKey(subproc.getProcId());
1813      procedures.put(subproc.getProcId(), subproc);
1814      scheduler.addFront(subproc);
1815    }
1816  }
1817
1818  private void countDownChildren(RootProcedureState<TEnvironment> procStack,
1819    Procedure<TEnvironment> procedure) {
1820    Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
1821    if (parent == null) {
1822      assert procStack.isRollingback();
1823      return;
1824    }
1825
1826    // If this procedure is the last child awake the parent procedure
1827    if (parent.tryRunnable()) {
1828      // If we succeeded in making the parent runnable -- i.e. all of its
1829      // children have completed, move parent to front of the queue.
1830      store.update(parent);
1831      scheduler.addFront(parent);
1832      LOG.info("Finished subprocedure pid={}, resume processing ppid={}", procedure.getProcId(),
1833        parent.getProcId());
1834      return;
1835    }
1836  }
1837
1838  private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
1839    Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1840    if (subprocs != null && !procedure.isFailed()) {
1841      if (LOG.isTraceEnabled()) {
1842        LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
1843      }
1844      store.insert(procedure, subprocs);
1845    } else {
1846      LOG.trace("Store update {}", procedure);
1847      if (procedure.isFinished() && !procedure.hasParent()) {
1848        // remove child procedures
1849        final long[] childProcIds = procStack.getSubprocedureIds();
1850        if (childProcIds != null) {
1851          store.delete(procedure, childProcIds);
1852          for (int i = 0; i < childProcIds.length; ++i) {
1853            procedures.remove(childProcIds[i]);
1854          }
1855        } else {
1856          store.update(procedure);
1857        }
1858      } else {
1859        store.update(procedure);
1860      }
1861    }
1862  }
1863
1864  private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
1865    LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
1866    // NOTE: We don't call Thread.currentThread().interrupt()
1867    // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
1868    // the InterruptedException. If the master is going down, we will be notified
1869    // and the executor/store will be stopped.
1870    // (The interrupted procedure will be retried on the next run)
1871  }
1872
1873  private void execCompletionCleanup(Procedure<TEnvironment> proc) {
1874    final TEnvironment env = getEnvironment();
1875    if (proc.hasLock()) {
1876      LOG.warn("Usually this should not happen, we will release the lock before if the procedure"
1877        + " is finished, even if the holdLock is true, arrive here means we have some holes where"
1878        + " we do not release the lock. And the releaseLock below may fail since the procedure may"
1879        + " have already been deleted from the procedure store.");
1880      releaseLock(proc, true);
1881    }
1882    try {
1883      proc.completionCleanup(env);
1884    } catch (Throwable e) {
1885      // Catch NullPointerExceptions or similar errors...
1886      LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1887    }
1888  }
1889
1890  private void procedureFinished(Procedure<TEnvironment> proc) {
1891    // call the procedure completion cleanup handler
1892    execCompletionCleanup(proc);
1893
1894    CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
1895
1896    // update the executor internal state maps
1897    if (!proc.shouldWaitClientAck(getEnvironment())) {
1898      retainer.setClientAckTime(0);
1899    }
1900
1901    completed.put(proc.getProcId(), retainer);
1902    rollbackStack.remove(proc.getProcId());
1903    procedures.remove(proc.getProcId());
1904
1905    // call the runnableSet completion cleanup handler
1906    try {
1907      scheduler.completionCleanup(proc);
1908    } catch (Throwable e) {
1909      // Catch NullPointerExceptions or similar errors...
1910      LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
1911    }
1912
1913    // Notify the listeners
1914    sendProcedureFinishedNotification(proc.getProcId());
1915  }
1916
1917  RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
1918    return rollbackStack.get(rootProcId);
1919  }
1920
1921  ProcedureScheduler getProcedureScheduler() {
1922    return scheduler;
1923  }
1924
1925  int getCompletedSize() {
1926    return completed.size();
1927  }
1928
1929  public IdLock getProcExecutionLock() {
1930    return procExecutionLock;
1931  }
1932
1933  // ==========================================================================
1934  // Worker Thread
1935  // ==========================================================================
1936  private class WorkerThread extends StoppableThread {
1937    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
1938    private volatile Procedure<TEnvironment> activeProcedure;
1939
1940    public WorkerThread(ThreadGroup group) {
1941      this(group, "PEWorker-");
1942    }
1943
1944    protected WorkerThread(ThreadGroup group, String prefix) {
1945      super(group, prefix + workerId.incrementAndGet());
1946      setDaemon(true);
1947    }
1948
1949    @Override
1950    public void sendStopSignal() {
1951      scheduler.signalAll();
1952    }
1953
1954    /**
1955     * Encapsulates execution of the current {@link #activeProcedure} for easy tracing.
1956     */
1957    private long runProcedure() throws IOException {
1958      final Procedure<TEnvironment> proc = this.activeProcedure;
1959      int activeCount = activeExecutorCount.incrementAndGet();
1960      int runningCount = store.setRunningProcedureCount(activeCount);
1961      LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount,
1962        activeCount);
1963      executionStartTime.set(EnvironmentEdgeManager.currentTime());
1964      IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
1965      try {
1966        executeProcedure(proc);
1967      } catch (AssertionError e) {
1968        LOG.info("ASSERT pid=" + proc.getProcId(), e);
1969        throw e;
1970      } finally {
1971        procExecutionLock.releaseLockEntry(lockEntry);
1972        activeCount = activeExecutorCount.decrementAndGet();
1973        runningCount = store.setRunningProcedureCount(activeCount);
1974        LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount,
1975          activeCount);
1976        this.activeProcedure = null;
1977        executionStartTime.set(Long.MAX_VALUE);
1978      }
1979      return EnvironmentEdgeManager.currentTime();
1980    }
1981
1982    @Override
1983    public void run() {
1984      long lastUpdate = EnvironmentEdgeManager.currentTime();
1985      try {
1986        while (isRunning() && keepAlive(lastUpdate)) {
1987          @SuppressWarnings("unchecked")
1988          Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
1989          if (proc == null) {
1990            continue;
1991          }
1992          this.activeProcedure = proc;
1993          lastUpdate = TraceUtil.trace(this::runProcedure, new ProcedureSpanBuilder(proc));
1994        }
1995      } catch (Throwable t) {
1996        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
1997      } finally {
1998        LOG.trace("Worker terminated.");
1999      }
2000      workerThreads.remove(this);
2001    }
2002
2003    @Override
2004    public String toString() {
2005      Procedure<?> p = this.activeProcedure;
2006      return getName() + "(pid=" + (p == null ? Procedure.NO_PROC_ID : p.getProcId() + ")");
2007    }
2008
2009    /**
2010     * @return the time since the current procedure is running
2011     */
2012    public long getCurrentRunTime() {
2013      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
2014    }
2015
2016    // core worker never timeout
2017    protected boolean keepAlive(long lastUpdate) {
2018      return true;
2019    }
2020  }
2021
2022  // A worker thread which can be added when core workers are stuck. Will timeout after
2023  // keepAliveTime if there is no procedure to run.
2024  private final class KeepAliveWorkerThread extends WorkerThread {
2025    public KeepAliveWorkerThread(ThreadGroup group) {
2026      super(group, "KeepAlivePEWorker-");
2027    }
2028
2029    @Override
2030    protected boolean keepAlive(long lastUpdate) {
2031      return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
2032    }
2033  }
2034
2035  // ----------------------------------------------------------------------------
2036  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the
2037  // full set of procedures pending and completed to write a compacted
2038  // version of the log (in case is a log)?
2039  // In theory no, procedures are have a short life, so at some point the store
2040  // will have the tracker saying everything is in the last log.
2041  // ----------------------------------------------------------------------------
2042
2043  private final class WorkerMonitor extends InlineChore {
2044    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
2045      "hbase.procedure.worker.monitor.interval.msec";
2046    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
2047
2048    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
2049      "hbase.procedure.worker.stuck.threshold.msec";
2050    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
2051
2052    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =
2053      "hbase.procedure.worker.add.stuck.percentage";
2054    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck
2055
2056    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
2057    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
2058    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;
2059
2060    public WorkerMonitor() {
2061      refreshConfig();
2062    }
2063
2064    @Override
2065    public void run() {
2066      final int stuckCount = checkForStuckWorkers();
2067      checkThreadCount(stuckCount);
2068
2069      // refresh interval (poor man dynamic conf update)
2070      refreshConfig();
2071    }
2072
2073    private int checkForStuckWorkers() {
2074      // check if any of the worker is stuck
2075      int stuckCount = 0;
2076      for (WorkerThread worker : workerThreads) {
2077        if (worker.getCurrentRunTime() < stuckThreshold) {
2078          continue;
2079        }
2080
2081        // WARN the worker is stuck
2082        stuckCount++;
2083        LOG.warn("Worker stuck {}, run time {}", worker,
2084          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
2085      }
2086      return stuckCount;
2087    }
2088
2089    private void checkThreadCount(final int stuckCount) {
2090      // nothing to do if there are no runnable tasks
2091      if (stuckCount < 1 || !scheduler.hasRunnables()) {
2092        return;
2093      }
2094
2095      // add a new thread if the worker stuck percentage exceed the threshold limit
2096      // and every handler is active.
2097      final float stuckPerc = ((float) stuckCount) / workerThreads.size();
2098      // let's add new worker thread more aggressively, as they will timeout finally if there is no
2099      // work to do.
2100      if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
2101        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
2102        workerThreads.add(worker);
2103        worker.start();
2104        LOG.debug("Added new worker thread {}", worker);
2105      }
2106    }
2107
2108    private void refreshConfig() {
2109      addWorkerStuckPercentage =
2110        conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);
2111      timeoutInterval =
2112        conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, DEFAULT_WORKER_MONITOR_INTERVAL);
2113      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, DEFAULT_WORKER_STUCK_THRESHOLD);
2114    }
2115
2116    @Override
2117    public int getTimeoutInterval() {
2118      return timeoutInterval;
2119    }
2120  }
2121}