001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import edu.umd.cs.findbugs.annotations.Nullable;
021import java.io.IOException;
022import java.io.UncheckedIOException;
023import java.util.ArrayDeque;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.Deque;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.CopyOnWriteArrayList;
033import java.util.concurrent.Executor;
034import java.util.concurrent.Executors;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicInteger;
038import java.util.concurrent.atomic.AtomicLong;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
044import org.apache.hadoop.hbase.log.HBaseMarkers;
045import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
049import org.apache.hadoop.hbase.procedure2.util.StringUtils;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.util.IdLock;
053import org.apache.hadoop.hbase.util.NonceKey;
054import org.apache.hadoop.hbase.util.Threads;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
060import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
062
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
064
065/**
066 * Thread Pool that executes the submitted procedures.
067 * The executor has a ProcedureStore associated.
068 * Each operation is logged and on restart the pending procedures are resumed.
069 *
070 * Unless the Procedure code throws an error (e.g. invalid user input)
071 * the procedure will complete (at some point in time), On restart the pending
072 * procedures are resumed and the once failed will be rolledback.
073 *
074 * The user can add procedures to the executor via submitProcedure(proc)
075 * check for the finished state via isFinished(procId)
076 * and get the result via getResult(procId)
077 */
078@InterfaceAudience.Private
079public class ProcedureExecutor<TEnvironment> {
080  private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
081
082  public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
083  private static final boolean DEFAULT_CHECK_OWNER_SET = false;
084
085  public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
086      "hbase.procedure.worker.keep.alive.time.msec";
087  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
088
089  public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
090  static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
091
092  public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
093  static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
094
095  /**
096   * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
097   * break PE having it fail at various junctures. When non-null, testing is set to an instance of
098   * the below internal {@link Testing} class with flags set for the particular test.
099   */
100  volatile Testing testing = null;
101
102  /**
103   * Class with parameters describing how to fail/die when in testing-context.
104   */
105  public static class Testing {
106    protected volatile boolean killIfHasParent = true;
107    protected volatile boolean killIfSuspended = false;
108
109    /**
110     * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is
111     * persisting all the state it needs to recover after a crash.
112     */
113    protected volatile boolean killBeforeStoreUpdate = false;
114    protected volatile boolean toggleKillBeforeStoreUpdate = false;
115
116    /**
117     * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978
118     * is about a case where memory-state was being set after store to WAL where a crash could
119     * cause us to get stuck. This flag allows killing at what was a vulnerable time.
120     */
121    protected volatile boolean killAfterStoreUpdate = false;
122    protected volatile boolean toggleKillAfterStoreUpdate = false;
123
124    protected boolean shouldKillBeforeStoreUpdate() {
125      final boolean kill = this.killBeforeStoreUpdate;
126      if (this.toggleKillBeforeStoreUpdate) {
127        this.killBeforeStoreUpdate = !kill;
128        LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
129      }
130      return kill;
131    }
132
133    protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) {
134      if (isSuspended && !killIfSuspended) {
135        return false;
136      }
137      if (hasParent && !killIfHasParent) {
138        return false;
139      }
140      return shouldKillBeforeStoreUpdate();
141    }
142
143    protected boolean shouldKillAfterStoreUpdate() {
144      final boolean kill = this.killAfterStoreUpdate;
145      if (this.toggleKillAfterStoreUpdate) {
146        this.killAfterStoreUpdate = !kill;
147        LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);
148      }
149      return kill;
150    }
151
152    protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {
153      return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();
154    }
155  }
156
157  public interface ProcedureExecutorListener {
158    void procedureLoaded(long procId);
159    void procedureAdded(long procId);
160    void procedureFinished(long procId);
161  }
162
163  /**
164   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
165   * Once a Root-Procedure completes (success or failure), the result will be added to this map.
166   * The user of ProcedureExecutor should call getResult(procId) to get the result.
167   */
168  private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
169    new ConcurrentHashMap<>();
170
171  /**
172   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
173   * The RootProcedureState contains the execution stack of the Root-Procedure,
174   * It is added to the map by submitProcedure() and removed on procedure completion.
175   */
176  private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
177    new ConcurrentHashMap<>();
178
179  /**
180   * Helper map to lookup the live procedures by ID.
181   * This map contains every procedure. root-procedures and subprocedures.
182   */
183  private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
184    new ConcurrentHashMap<>();
185
186  /**
187   * Helper map to lookup whether the procedure already issued from the same client. This map
188   * contains every root procedure.
189   */
190  private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
191
192  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
193    new CopyOnWriteArrayList<>();
194
195  private Configuration conf;
196
197  /**
198   * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
199   * resource handling rather than observing in a #join is unexpected).
200   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
201   * (Should be ok).
202   */
203  private ThreadGroup threadGroup;
204
205  /**
206   * Created in the {@link #init(int, boolean)}  method. Terminated in {@link #join()} (FIX! Doing
207   * resource handling rather than observing in a #join is unexpected).
208   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
209   * (Should be ok).
210   */
211  private CopyOnWriteArrayList<WorkerThread> workerThreads;
212
213  /**
214   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
215   * resource handling rather than observing in a #join is unexpected).
216   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
217   * (Should be ok).
218   */
219  private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
220
221  /**
222   * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
223   * there is no worker to assign meta, it will new worker thread for it, so it is very important.
224   * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore
225   * and so on, some tasks may execute for a long time so will block other tasks like
226   * WorkerMonitor, so use a dedicated thread for executing WorkerMonitor.
227   */
228  private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;
229
230  private int corePoolSize;
231  private int maxPoolSize;
232
233  private volatile long keepAliveTime;
234
235  /**
236   * Scheduler/Queue that contains runnable procedures.
237   */
238  private final ProcedureScheduler scheduler;
239
240  private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
241    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
242
243  private final AtomicLong lastProcId = new AtomicLong(-1);
244  private final AtomicLong workerId = new AtomicLong(0);
245  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
246  private final AtomicBoolean running = new AtomicBoolean(false);
247  private final TEnvironment environment;
248  private final ProcedureStore store;
249
250  private final boolean checkOwnerSet;
251
252  // To prevent concurrent execution of the same procedure.
253  // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
254  // procedure is woken up before we finish the suspend which causes the same procedures to be
255  // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
256  // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
257  // execution of the same procedure.
258  private final IdLock procExecutionLock = new IdLock();
259
260  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
261      final ProcedureStore store) {
262    this(conf, environment, store, new SimpleProcedureScheduler());
263  }
264
265  private boolean isRootFinished(Procedure<?> proc) {
266    Procedure<?> rootProc = procedures.get(proc.getRootProcId());
267    return rootProc == null || rootProc.isFinished();
268  }
269
270  private void forceUpdateProcedure(long procId) throws IOException {
271    IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
272    try {
273      Procedure<TEnvironment> proc = procedures.get(procId);
274      if (proc != null) {
275        if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
276          LOG.debug("Procedure {} has already been finished and parent is succeeded," +
277            " skip force updating", proc);
278          return;
279        }
280      } else {
281        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
282        if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
283          LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
284          return;
285        }
286        long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
287        long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
288        if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
289          LOG.debug("Procedure {} has already been finished and expired, skip force updating",
290            procId);
291          return;
292        }
293        proc = retainer.getProcedure();
294      }
295      LOG.debug("Force update procedure {}", proc);
296      store.update(proc);
297    } finally {
298      procExecutionLock.releaseLockEntry(lockEntry);
299    }
300  }
301
302  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
303      final ProcedureStore store, final ProcedureScheduler scheduler) {
304    this.environment = environment;
305    this.scheduler = scheduler;
306    this.store = store;
307    this.conf = conf;
308    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
309    refreshConfiguration(conf);
310    store.registerListener(new ProcedureStoreListener() {
311
312      @Override
313      public void forceUpdate(long[] procIds) {
314        Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
315          try {
316            forceUpdateProcedure(procId);
317          } catch (IOException e) {
318            LOG.warn("Failed to force update procedure with pid={}", procId);
319          }
320        }));
321      }
322    });
323  }
324
325  private void load(final boolean abortOnCorruption) throws IOException {
326    Preconditions.checkArgument(completed.isEmpty(), "completed not empty");
327    Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty");
328    Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty");
329    Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty");
330
331    store.load(new ProcedureStore.ProcedureLoader() {
332      @Override
333      public void setMaxProcId(long maxProcId) {
334        assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
335        lastProcId.set(maxProcId);
336      }
337
338      @Override
339      public void load(ProcedureIterator procIter) throws IOException {
340        loadProcedures(procIter, abortOnCorruption);
341      }
342
343      @Override
344      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
345        int corruptedCount = 0;
346        while (procIter.hasNext()) {
347          Procedure<?> proc = procIter.next();
348          LOG.error("Corrupt " + proc);
349          corruptedCount++;
350        }
351        if (abortOnCorruption && corruptedCount > 0) {
352          throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
353        }
354      }
355    });
356  }
357
358  private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
359    proc.restoreLock(getEnvironment());
360    restored.add(proc.getProcId());
361  }
362
363  private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
364    while (!stack.isEmpty()) {
365      restoreLock(stack.pop(), restored);
366    }
367  }
368
369  // Restore the locks for all the procedures.
370  // Notice that we need to restore the locks starting from the root proc, otherwise there will be
371  // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
372  // calling the acquireLock method for the parent procedure.
373  // The algorithm is straight-forward:
374  // 1. Use a set to record the procedures which locks have already been restored.
375  // 2. Use a stack to store the hierarchy of the procedures
376  // 3. For all the procedure, we will first try to find its parent and push it into the stack,
377  // unless
378  // a. We have no parent, i.e, we are the root procedure
379  // b. The lock has already been restored(by checking the set introduced in #1)
380  // then we start to pop the stack and call acquireLock for each procedure.
381  // Notice that this should be done for all procedures, not only the ones in runnableList.
382  private void restoreLocks() {
383    Set<Long> restored = new HashSet<>();
384    Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
385    procedures.values().forEach(proc -> {
386      for (;;) {
387        if (restored.contains(proc.getProcId())) {
388          restoreLocks(stack, restored);
389          return;
390        }
391        if (!proc.hasParent()) {
392          restoreLock(proc, restored);
393          restoreLocks(stack, restored);
394          return;
395        }
396        stack.push(proc);
397        proc = procedures.get(proc.getParentProcId());
398      }
399    });
400  }
401
402  private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
403      throws IOException {
404    // 1. Build the rollback stack
405    int runnableCount = 0;
406    int failedCount = 0;
407    int waitingCount = 0;
408    int waitingTimeoutCount = 0;
409    while (procIter.hasNext()) {
410      boolean finished = procIter.isNextFinished();
411      @SuppressWarnings("unchecked")
412      Procedure<TEnvironment> proc = procIter.next();
413      NonceKey nonceKey = proc.getNonceKey();
414      long procId = proc.getProcId();
415
416      if (finished) {
417        completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
418        LOG.debug("Completed {}", proc);
419      } else {
420        if (!proc.hasParent()) {
421          assert !proc.isFinished() : "unexpected finished procedure";
422          rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
423        }
424
425        // add the procedure to the map
426        proc.beforeReplay(getEnvironment());
427        procedures.put(proc.getProcId(), proc);
428        switch (proc.getState()) {
429          case RUNNABLE:
430            runnableCount++;
431            break;
432          case FAILED:
433            failedCount++;
434            break;
435          case WAITING:
436            waitingCount++;
437            break;
438          case WAITING_TIMEOUT:
439            waitingTimeoutCount++;
440            break;
441          default:
442            break;
443        }
444      }
445
446      if (nonceKey != null) {
447        nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map
448      }
449    }
450
451    // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
452    // push it into the ProcedureScheduler directly to execute the rollback. But this does not work
453    // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
454    // the queue from runQueue in scheduler, and then when a procedure which has lock access, for
455    // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
456    // we will add the queue back to let the workers poll from it. The assumption here is that, the
457    // procedure which has the xlock should have been polled out already, so when loading we can not
458    // add the procedure to scheduler first and then call acquireLock, since the procedure is still
459    // in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
460    // then there is a dead lock
461    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
462    List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
463    List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
464    List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
465    procIter.reset();
466    while (procIter.hasNext()) {
467      if (procIter.isNextFinished()) {
468        procIter.skipNext();
469        continue;
470      }
471
472      @SuppressWarnings("unchecked")
473      Procedure<TEnvironment> proc = procIter.next();
474      assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
475      LOG.debug("Loading {}", proc);
476      Long rootProcId = getRootProcedureId(proc);
477      // The orphan procedures will be passed to handleCorrupted, so add an assert here
478      assert rootProcId != null;
479
480      if (proc.hasParent()) {
481        Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
482        if (parent != null && !proc.isFinished()) {
483          parent.incChildrenLatch();
484        }
485      }
486
487      RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
488      procStack.loadStack(proc);
489
490      proc.setRootProcId(rootProcId);
491      switch (proc.getState()) {
492        case RUNNABLE:
493          runnableList.add(proc);
494          break;
495        case WAITING:
496          waitingList.add(proc);
497          break;
498        case WAITING_TIMEOUT:
499          waitingTimeoutList.add(proc);
500          break;
501        case FAILED:
502          failedList.add(proc);
503          break;
504        case ROLLEDBACK:
505        case INITIALIZING:
506          String msg = "Unexpected " + proc.getState() + " state for " + proc;
507          LOG.error(msg);
508          throw new UnsupportedOperationException(msg);
509        default:
510          break;
511      }
512    }
513
514    // 3. Check the waiting procedures to see if some of them can be added to runnable.
515    waitingList.forEach(proc -> {
516      if (!proc.hasChildren()) {
517        // Normally, WAITING procedures should be waken by its children. But, there is a case that,
518        // all the children are successful and before they can wake up their parent procedure, the
519        // master was killed. So, during recovering the procedures from ProcedureWal, its children
520        // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
521        // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
522        // exception will throw:
523        // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
524        // "NOT RUNNABLE! " + procedure.toString());
525        proc.setState(ProcedureState.RUNNABLE);
526        runnableList.add(proc);
527      } else {
528        proc.afterReplay(getEnvironment());
529      }
530    });
531    // 4. restore locks
532    restoreLocks();
533
534    // 5. Push the procedures to the timeout executor
535    waitingTimeoutList.forEach(proc -> {
536      proc.afterReplay(getEnvironment());
537      timeoutExecutor.add(proc);
538    });
539
540    // 6. Push the procedure to the scheduler
541    failedList.forEach(scheduler::addBack);
542    runnableList.forEach(p -> {
543      p.afterReplay(getEnvironment());
544      if (!p.hasParent()) {
545        sendProcedureLoadedNotification(p.getProcId());
546      }
547      scheduler.addBack(p);
548    });
549    // After all procedures put into the queue, signal the worker threads.
550    // Otherwise, there is a race condition. See HBASE-21364.
551    scheduler.signalAll();
552  }
553
554  /**
555   * Initialize the procedure executor, but do not start workers. We will start them later.
556   * <p/>
557   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
558   * ensure a single executor, and start the procedure replay to resume and recover the previous
559   * pending and in-progress procedures.
560   * @param numThreads number of threads available for procedure execution.
561   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
562   *          is found on replay. otherwise false.
563   */
564  public void init(int numThreads, boolean abortOnCorruption) throws IOException {
565    // We have numThreads executor + one timer thread used for timing out
566    // procedures and triggering periodic procedures.
567    this.corePoolSize = numThreads;
568    this.maxPoolSize = 10 * numThreads;
569    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
570        corePoolSize, maxPoolSize);
571
572    this.threadGroup = new ThreadGroup("PEWorkerGroup");
573    this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
574    this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");
575
576    // Create the workers
577    workerId.set(0);
578    workerThreads = new CopyOnWriteArrayList<>();
579    for (int i = 0; i < corePoolSize; ++i) {
580      workerThreads.add(new WorkerThread(threadGroup));
581    }
582
583    long st, et;
584
585    // Acquire the store lease.
586    st = System.nanoTime();
587    store.recoverLease();
588    et = System.nanoTime();
589    LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
590      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
591
592    // start the procedure scheduler
593    scheduler.start();
594
595    // TODO: Split in two steps.
596    // TODO: Handle corrupted procedures (currently just a warn)
597    // The first one will make sure that we have the latest id,
598    // so we can start the threads and accept new procedures.
599    // The second step will do the actual load of old procedures.
600    st = System.nanoTime();
601    load(abortOnCorruption);
602    et = System.nanoTime();
603    LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
604      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
605  }
606
607  /**
608   * Start the workers.
609   */
610  public void startWorkers() throws IOException {
611    if (!running.compareAndSet(false, true)) {
612      LOG.warn("Already running");
613      return;
614    }
615    // Start the executors. Here we must have the lastProcId set.
616    LOG.trace("Start workers {}", workerThreads.size());
617    timeoutExecutor.start();
618    workerMonitorExecutor.start();
619    for (WorkerThread worker: workerThreads) {
620      worker.start();
621    }
622
623    // Internal chores
624    workerMonitorExecutor.add(new WorkerMonitor());
625
626    // Add completed cleaner chore
627    addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
628      nonceKeysToProcIdsMap));
629  }
630
631  public void stop() {
632    if (!running.getAndSet(false)) {
633      return;
634    }
635
636    LOG.info("Stopping");
637    scheduler.stop();
638    timeoutExecutor.sendStopSignal();
639    workerMonitorExecutor.sendStopSignal();
640  }
641
642  @VisibleForTesting
643  public void join() {
644    assert !isRunning() : "expected not running";
645
646    // stop the timeout executor
647    timeoutExecutor.awaitTermination();
648    // stop the work monitor executor
649    workerMonitorExecutor.awaitTermination();
650
651    // stop the worker threads
652    for (WorkerThread worker: workerThreads) {
653      worker.awaitTermination();
654    }
655
656    // Destroy the Thread Group for the executors
657    // TODO: Fix. #join is not place to destroy resources.
658    try {
659      threadGroup.destroy();
660    } catch (IllegalThreadStateException e) {
661      LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT",
662          this.threadGroup, e.getMessage());
663      // This dumps list of threads on STDOUT.
664      this.threadGroup.list();
665    }
666
667    // reset the in-memory state for testing
668    completed.clear();
669    rollbackStack.clear();
670    procedures.clear();
671    nonceKeysToProcIdsMap.clear();
672    scheduler.clear();
673    lastProcId.set(-1);
674  }
675
676  public void refreshConfiguration(final Configuration conf) {
677    this.conf = conf;
678    setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY,
679        DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS);
680  }
681
682  // ==========================================================================
683  //  Accessors
684  // ==========================================================================
685  public boolean isRunning() {
686    return running.get();
687  }
688
689  /**
690   * @return the current number of worker threads.
691   */
692  public int getWorkerThreadCount() {
693    return workerThreads.size();
694  }
695
696  /**
697   * @return the core pool size settings.
698   */
699  public int getCorePoolSize() {
700    return corePoolSize;
701  }
702
703  public int getActiveExecutorCount() {
704    return activeExecutorCount.get();
705  }
706
707  public TEnvironment getEnvironment() {
708    return this.environment;
709  }
710
711  public ProcedureStore getStore() {
712    return this.store;
713  }
714
715  ProcedureScheduler getScheduler() {
716    return scheduler;
717  }
718
719  public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
720    this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
721    this.scheduler.signalAll();
722  }
723
724  public long getKeepAliveTime(final TimeUnit timeUnit) {
725    return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
726  }
727
728  // ==========================================================================
729  //  Submit/Remove Chores
730  // ==========================================================================
731
732  /**
733   * Add a chore procedure to the executor
734   * @param chore the chore to add
735   */
736  public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {
737    if (chore == null) {
738      return;
739    }
740    chore.setState(ProcedureState.WAITING_TIMEOUT);
741    timeoutExecutor.add(chore);
742  }
743
744  /**
745   * Remove a chore procedure from the executor
746   * @param chore the chore to remove
747   * @return whether the chore is removed, or it will be removed later
748   */
749  public boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {
750    if (chore == null) {
751      return true;
752    }
753    chore.setState(ProcedureState.SUCCESS);
754    return timeoutExecutor.remove(chore);
755  }
756
757  // ==========================================================================
758  //  Nonce Procedure helpers
759  // ==========================================================================
760  /**
761   * Create a NonceKey from the specified nonceGroup and nonce.
762   * @param nonceGroup the group to use for the {@link NonceKey}
763   * @param nonce the nonce to use in the {@link NonceKey}
764   * @return the generated NonceKey
765   */
766  public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
767    return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
768  }
769
770  /**
771   * Register a nonce for a procedure that is going to be submitted.
772   * A procId will be reserved and on submitProcedure(),
773   * the procedure with the specified nonce will take the reserved ProcId.
774   * If someone already reserved the nonce, this method will return the procId reserved,
775   * otherwise an invalid procId will be returned. and the caller should procede
776   * and submit the procedure.
777   *
778   * @param nonceKey A unique identifier for this operation from the client or process.
779   * @return the procId associated with the nonce, if any otherwise an invalid procId.
780   */
781  public long registerNonce(final NonceKey nonceKey) {
782    if (nonceKey == null) {
783      return -1;
784    }
785
786    // check if we have already a Reserved ID for the nonce
787    Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
788    if (oldProcId == null) {
789      // reserve a new Procedure ID, this will be associated with the nonce
790      // and the procedure submitted with the specified nonce will use this ID.
791      final long newProcId = nextProcId();
792      oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
793      if (oldProcId == null) {
794        return -1;
795      }
796    }
797
798    // we found a registered nonce, but the procedure may not have been submitted yet.
799    // since the client expect the procedure to be submitted, spin here until it is.
800    final boolean traceEnabled = LOG.isTraceEnabled();
801    while (isRunning() &&
802           !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
803           nonceKeysToProcIdsMap.containsKey(nonceKey)) {
804      if (traceEnabled) {
805        LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
806      }
807      Threads.sleep(100);
808    }
809    return oldProcId.longValue();
810  }
811
812  /**
813   * Remove the NonceKey if the procedure was not submitted to the executor.
814   * @param nonceKey A unique identifier for this operation from the client or process.
815   */
816  public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
817    if (nonceKey == null) {
818      return;
819    }
820
821    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
822    if (procId == null) {
823      return;
824    }
825
826    // if the procedure was not submitted, remove the nonce
827    if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
828      nonceKeysToProcIdsMap.remove(nonceKey);
829    }
830  }
831
832  /**
833   * If the failure failed before submitting it, we may want to give back the
834   * same error to the requests with the same nonceKey.
835   *
836   * @param nonceKey A unique identifier for this operation from the client or process
837   * @param procName name of the procedure, used to inform the user
838   * @param procOwner name of the owner of the procedure, used to inform the user
839   * @param exception the failure to report to the user
840   */
841  public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
842      IOException exception) {
843    if (nonceKey == null) {
844      return;
845    }
846
847    Long procId = nonceKeysToProcIdsMap.get(nonceKey);
848    if (procId == null || completed.containsKey(procId)) {
849      return;
850    }
851
852    completed.computeIfAbsent(procId, (key) -> {
853      Procedure<TEnvironment> proc = new FailedProcedure<>(procId.longValue(),
854          procName, procOwner, nonceKey, exception);
855
856      return new CompletedProcedureRetainer<>(proc);
857    });
858  }
859
860  // ==========================================================================
861  //  Submit/Abort Procedure
862  // ==========================================================================
863  /**
864   * Add a new root-procedure to the executor.
865   * @param proc the new procedure to execute.
866   * @return the procedure id, that can be used to monitor the operation
867   */
868  public long submitProcedure(Procedure<TEnvironment> proc) {
869    return submitProcedure(proc, null);
870  }
871
872  /**
873   * Bypass a procedure. If the procedure is set to bypass, all the logic in
874   * execute/rollback will be ignored and it will return success, whatever.
875   * It is used to recover buggy stuck procedures, releasing the lock resources
876   * and letting other procedures run. Bypassing one procedure (and its ancestors will
877   * be bypassed automatically) may leave the cluster in a middle state, e.g. region
878   * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures,
879   * the operators may have to do some clean up on hdfs or schedule some assign procedures
880   * to let region online. DO AT YOUR OWN RISK.
881   * <p>
882   * A procedure can be bypassed only if
883   * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT
884   * or it is a root procedure without any child.
885   * 2. No other worker thread is executing it
886   * 3. No child procedure has been submitted
887   *
888   * <p>
889   * If all the requirements are meet, the procedure and its ancestors will be
890   * bypassed and persisted to WAL.
891   *
892   * <p>
893   * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
894   * TODO: What about WAITING_TIMEOUT?
895   * @param pids the procedure id
896   * @param lockWait time to wait lock
897   * @param force if force set to true, we will bypass the procedure even if it is executing.
898   *              This is for procedures which can't break out during executing(due to bug, mostly)
899   *              In this case, bypassing the procedure is not enough, since it is already stuck
900   *              there. We need to restart the master after bypassing, and letting the problematic
901   *              procedure to execute wth bypass=true, so in that condition, the procedure can be
902   *              successfully bypassed.
903   * @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
904   * @return true if bypass success
905   * @throws IOException IOException
906   */
907  public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,
908      boolean recursive)
909      throws IOException {
910    List<Boolean> result = new ArrayList<Boolean>(pids.size());
911    for(long pid: pids) {
912      result.add(bypassProcedure(pid, lockWait, force, recursive));
913    }
914    return result;
915  }
916
917  boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)
918      throws IOException {
919    Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
920    final Procedure<TEnvironment> procedure = getProcedure(pid);
921    if (procedure == null) {
922      LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
923      return false;
924    }
925
926    LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}",
927        procedure, lockWait, override, recursive);
928    IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
929    if (lockEntry == null && !override) {
930      LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}",
931          lockWait, procedure, override);
932      return false;
933    } else if (lockEntry == null) {
934      LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}",
935          lockWait, procedure, override);
936    }
937    try {
938      // check whether the procedure is already finished
939      if (procedure.isFinished()) {
940        LOG.debug("{} is already finished, skipping bypass", procedure);
941        return false;
942      }
943
944      if (procedure.hasChildren()) {
945        if (recursive) {
946          // EXPENSIVE. Checks each live procedure of which there could be many!!!
947          // Is there another way to get children of a procedure?
948          LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());
949          this.procedures.forEachValue(1 /*Single-threaded*/,
950            // Transformer
951            v -> v.getParentProcId() == procedure.getProcId()? v: null,
952            // Consumer
953            v -> {
954              try {
955                bypassProcedure(v.getProcId(), lockWait, override, recursive);
956              } catch (IOException e) {
957                LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
958              }
959            });
960        } else {
961          LOG.debug("{} has children, skipping bypass", procedure);
962          return false;
963        }
964      }
965
966      // If the procedure has no parent or no child, we are safe to bypass it in whatever state
967      if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
968          && procedure.getState() != ProcedureState.WAITING
969          && procedure.getState() != ProcedureState.WAITING_TIMEOUT) {
970        LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
971                + "(with no parent), {}",
972            procedure);
973        // Question: how is the bypass done here?
974        return false;
975      }
976
977      // Now, the procedure is not finished, and no one can execute it since we take the lock now
978      // And we can be sure that its ancestor is not running too, since their child has not
979      // finished yet
980      Procedure<TEnvironment> current = procedure;
981      while (current != null) {
982        LOG.debug("Bypassing {}", current);
983        current.bypass(getEnvironment());
984        store.update(procedure);
985        long parentID = current.getParentProcId();
986        current = getProcedure(parentID);
987      }
988
989      //wake up waiting procedure, already checked there is no child
990      if (procedure.getState() == ProcedureState.WAITING) {
991        procedure.setState(ProcedureState.RUNNABLE);
992        store.update(procedure);
993      }
994
995      // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
996      // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
997      if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
998        LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
999        if (timeoutExecutor.remove(procedure)) {
1000          LOG.debug("removed procedure {} from timeoutExecutor", procedure);
1001          timeoutExecutor.executeTimedoutProcedure(procedure);
1002        }
1003      } else if (lockEntry != null) {
1004        scheduler.addFront(procedure);
1005        LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
1006      } else {
1007        // If we don't have the lock, we can't re-submit the queue,
1008        // since it is already executing. To get rid of the stuck situation, we
1009        // need to restart the master. With the procedure set to bypass, the procedureExecutor
1010        // will bypass it and won't get stuck again.
1011        LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
1012            + "skipping add to queue",
1013          procedure);
1014      }
1015      return true;
1016
1017    } finally {
1018      if (lockEntry != null) {
1019        procExecutionLock.releaseLockEntry(lockEntry);
1020      }
1021    }
1022  }
1023
1024  /**
1025   * Add a new root-procedure to the executor.
1026   * @param proc the new procedure to execute.
1027   * @param nonceKey the registered unique identifier for this operation from the client or process.
1028   * @return the procedure id, that can be used to monitor the operation
1029   */
1030  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
1031      justification = "FindBugs is blind to the check-for-null")
1032  public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
1033    Preconditions.checkArgument(lastProcId.get() >= 0);
1034
1035    prepareProcedure(proc);
1036
1037    final Long currentProcId;
1038    if (nonceKey != null) {
1039      currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
1040      Preconditions.checkArgument(currentProcId != null,
1041        "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);
1042    } else {
1043      currentProcId = nextProcId();
1044    }
1045
1046    // Initialize the procedure
1047    proc.setNonceKey(nonceKey);
1048    proc.setProcId(currentProcId.longValue());
1049
1050    // Commit the transaction
1051    store.insert(proc, null);
1052    LOG.debug("Stored {}", proc);
1053
1054    // Add the procedure to the executor
1055    return pushProcedure(proc);
1056  }
1057
1058  /**
1059   * Add a set of new root-procedure to the executor.
1060   * @param procs the new procedures to execute.
1061   */
1062  // TODO: Do we need to take nonces here?
1063  public void submitProcedures(Procedure<TEnvironment>[] procs) {
1064    Preconditions.checkArgument(lastProcId.get() >= 0);
1065    if (procs == null || procs.length <= 0) {
1066      return;
1067    }
1068
1069    // Prepare procedure
1070    for (int i = 0; i < procs.length; ++i) {
1071      prepareProcedure(procs[i]).setProcId(nextProcId());
1072    }
1073
1074    // Commit the transaction
1075    store.insert(procs);
1076    if (LOG.isDebugEnabled()) {
1077      LOG.debug("Stored " + Arrays.toString(procs));
1078    }
1079
1080    // Add the procedure to the executor
1081    for (int i = 0; i < procs.length; ++i) {
1082      pushProcedure(procs[i]);
1083    }
1084  }
1085
1086  private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
1087    Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
1088    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
1089    if (this.checkOwnerSet) {
1090      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
1091    }
1092    return proc;
1093  }
1094
1095  private long pushProcedure(Procedure<TEnvironment> proc) {
1096    final long currentProcId = proc.getProcId();
1097
1098    // Update metrics on start of a procedure
1099    proc.updateMetricsOnSubmit(getEnvironment());
1100
1101    // Create the rollback stack for the procedure
1102    RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
1103    rollbackStack.put(currentProcId, stack);
1104
1105    // Submit the new subprocedures
1106    assert !procedures.containsKey(currentProcId);
1107    procedures.put(currentProcId, proc);
1108    sendProcedureAddedNotification(currentProcId);
1109    scheduler.addBack(proc);
1110    return proc.getProcId();
1111  }
1112
1113  /**
1114   * Send an abort notification the specified procedure.
1115   * Depending on the procedure implementation the abort can be considered or ignored.
1116   * @param procId the procedure to abort
1117   * @return true if the procedure exists and has received the abort, otherwise false.
1118   */
1119  public boolean abort(long procId) {
1120    return abort(procId, true);
1121  }
1122
1123  /**
1124   * Send an abort notification to the specified procedure.
1125   * Depending on the procedure implementation, the abort can be considered or ignored.
1126   * @param procId the procedure to abort
1127   * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
1128   * @return true if the procedure exists and has received the abort, otherwise false.
1129   */
1130  public boolean abort(long procId, boolean mayInterruptIfRunning) {
1131    Procedure<TEnvironment> proc = procedures.get(procId);
1132    if (proc != null) {
1133      if (!mayInterruptIfRunning && proc.wasExecuted()) {
1134        return false;
1135      }
1136      return proc.abort(getEnvironment());
1137    }
1138    return false;
1139  }
1140
1141  // ==========================================================================
1142  //  Executor query helpers
1143  // ==========================================================================
1144  public Procedure<TEnvironment> getProcedure(final long procId) {
1145    return procedures.get(procId);
1146  }
1147
1148  public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
1149    Procedure<TEnvironment> proc = getProcedure(procId);
1150    if (clazz.isInstance(proc)) {
1151      return clazz.cast(proc);
1152    }
1153    return null;
1154  }
1155
1156  public Procedure<TEnvironment> getResult(long procId) {
1157    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1158    if (retainer == null) {
1159      return null;
1160    } else {
1161      return retainer.getProcedure();
1162    }
1163  }
1164
1165  /**
1166   * Return true if the procedure is finished.
1167   * The state may be "completed successfully" or "failed and rolledback".
1168   * Use getResult() to check the state or get the result data.
1169   * @param procId the ID of the procedure to check
1170   * @return true if the procedure execution is finished, otherwise false.
1171   */
1172  public boolean isFinished(final long procId) {
1173    return !procedures.containsKey(procId);
1174  }
1175
1176  /**
1177   * Return true if the procedure is started.
1178   * @param procId the ID of the procedure to check
1179   * @return true if the procedure execution is started, otherwise false.
1180   */
1181  public boolean isStarted(long procId) {
1182    Procedure<?> proc = procedures.get(procId);
1183    if (proc == null) {
1184      return completed.get(procId) != null;
1185    }
1186    return proc.wasExecuted();
1187  }
1188
1189  /**
1190   * Mark the specified completed procedure, as ready to remove.
1191   * @param procId the ID of the procedure to remove
1192   */
1193  public void removeResult(long procId) {
1194    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1195    if (retainer == null) {
1196      assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
1197      LOG.debug("pid={} already removed by the cleaner.", procId);
1198      return;
1199    }
1200
1201    // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
1202    retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
1203  }
1204
1205  public Procedure<TEnvironment> getResultOrProcedure(long procId) {
1206    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1207    if (retainer == null) {
1208      return procedures.get(procId);
1209    } else {
1210      return retainer.getProcedure();
1211    }
1212  }
1213
1214  /**
1215   * Check if the user is this procedure's owner
1216   * @param procId the target procedure
1217   * @param user the user
1218   * @return true if the user is the owner of the procedure,
1219   *   false otherwise or the owner is unknown.
1220   */
1221  public boolean isProcedureOwner(long procId, User user) {
1222    if (user == null) {
1223      return false;
1224    }
1225    final Procedure<TEnvironment> runningProc = procedures.get(procId);
1226    if (runningProc != null) {
1227      return runningProc.getOwner().equals(user.getShortName());
1228    }
1229
1230    final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1231    if (retainer != null) {
1232      return retainer.getProcedure().getOwner().equals(user.getShortName());
1233    }
1234
1235    // Procedure either does not exist or has already completed and got cleaned up.
1236    // At this time, we cannot check the owner of the procedure
1237    return false;
1238  }
1239
1240  /**
1241   * Should only be used when starting up, where the procedure workers have not been started.
1242   * <p/>
1243   * If the procedure works has been started, the return values maybe changed when you are
1244   * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
1245   * it will do a copy, and also include the finished procedures.
1246   */
1247  public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
1248    return procedures.values();
1249  }
1250
1251  /**
1252   * Get procedures.
1253   * @return the procedures in a list
1254   */
1255  public List<Procedure<TEnvironment>> getProcedures() {
1256    List<Procedure<TEnvironment>> procedureList =
1257      new ArrayList<>(procedures.size() + completed.size());
1258    procedureList.addAll(procedures.values());
1259    // Note: The procedure could show up twice in the list with different state, as
1260    // it could complete after we walk through procedures list and insert into
1261    // procedureList - it is ok, as we will use the information in the Procedure
1262    // to figure it out; to prevent this would increase the complexity of the logic.
1263    completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
1264      .forEach(procedureList::add);
1265    return procedureList;
1266  }
1267
1268  // ==========================================================================
1269  //  Listeners helpers
1270  // ==========================================================================
1271  public void registerListener(ProcedureExecutorListener listener) {
1272    this.listeners.add(listener);
1273  }
1274
1275  public boolean unregisterListener(ProcedureExecutorListener listener) {
1276    return this.listeners.remove(listener);
1277  }
1278
1279  private void sendProcedureLoadedNotification(final long procId) {
1280    if (!this.listeners.isEmpty()) {
1281      for (ProcedureExecutorListener listener: this.listeners) {
1282        try {
1283          listener.procedureLoaded(procId);
1284        } catch (Throwable e) {
1285          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1286        }
1287      }
1288    }
1289  }
1290
1291  private void sendProcedureAddedNotification(final long procId) {
1292    if (!this.listeners.isEmpty()) {
1293      for (ProcedureExecutorListener listener: this.listeners) {
1294        try {
1295          listener.procedureAdded(procId);
1296        } catch (Throwable e) {
1297          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1298        }
1299      }
1300    }
1301  }
1302
1303  private void sendProcedureFinishedNotification(final long procId) {
1304    if (!this.listeners.isEmpty()) {
1305      for (ProcedureExecutorListener listener: this.listeners) {
1306        try {
1307          listener.procedureFinished(procId);
1308        } catch (Throwable e) {
1309          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1310        }
1311      }
1312    }
1313  }
1314
1315  // ==========================================================================
1316  //  Procedure IDs helpers
1317  // ==========================================================================
1318  private long nextProcId() {
1319    long procId = lastProcId.incrementAndGet();
1320    if (procId < 0) {
1321      while (!lastProcId.compareAndSet(procId, 0)) {
1322        procId = lastProcId.get();
1323        if (procId >= 0) {
1324          break;
1325        }
1326      }
1327      while (procedures.containsKey(procId)) {
1328        procId = lastProcId.incrementAndGet();
1329      }
1330    }
1331    assert procId >= 0 : "Invalid procId " + procId;
1332    return procId;
1333  }
1334
1335  @VisibleForTesting
1336  protected long getLastProcId() {
1337    return lastProcId.get();
1338  }
1339
1340  @VisibleForTesting
1341  public Set<Long> getActiveProcIds() {
1342    return procedures.keySet();
1343  }
1344
1345  Long getRootProcedureId(Procedure<TEnvironment> proc) {
1346    return Procedure.getRootProcedureId(procedures, proc);
1347  }
1348
1349  // ==========================================================================
1350  //  Executions
1351  // ==========================================================================
1352  private void executeProcedure(Procedure<TEnvironment> proc) {
1353    if (proc.isFinished()) {
1354      LOG.debug("{} is already finished, skipping execution", proc);
1355      return;
1356    }
1357    final Long rootProcId = getRootProcedureId(proc);
1358    if (rootProcId == null) {
1359      // The 'proc' was ready to run but the root procedure was rolledback
1360      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
1361      executeRollback(proc);
1362      return;
1363    }
1364
1365    RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
1366    if (procStack == null) {
1367      LOG.warn("RootProcedureState is null for " + proc.getProcId());
1368      return;
1369    }
1370    do {
1371      // Try to acquire the execution
1372      if (!procStack.acquire(proc)) {
1373        if (procStack.setRollback()) {
1374          // we have the 'rollback-lock' we can start rollingback
1375          switch (executeRollback(rootProcId, procStack)) {
1376            case LOCK_ACQUIRED:
1377              break;
1378            case LOCK_YIELD_WAIT:
1379              procStack.unsetRollback();
1380              scheduler.yield(proc);
1381              break;
1382            case LOCK_EVENT_WAIT:
1383              LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
1384              procStack.unsetRollback();
1385              break;
1386            default:
1387              throw new UnsupportedOperationException();
1388          }
1389        } else {
1390          // if we can't rollback means that some child is still running.
1391          // the rollback will be executed after all the children are done.
1392          // If the procedure was never executed, remove and mark it as rolledback.
1393          if (!proc.wasExecuted()) {
1394            switch (executeRollback(proc)) {
1395              case LOCK_ACQUIRED:
1396                break;
1397              case LOCK_YIELD_WAIT:
1398                scheduler.yield(proc);
1399                break;
1400              case LOCK_EVENT_WAIT:
1401                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
1402                break;
1403              default:
1404                throw new UnsupportedOperationException();
1405            }
1406          }
1407        }
1408        break;
1409      }
1410
1411      // Execute the procedure
1412      assert proc.getState() == ProcedureState.RUNNABLE : proc;
1413      // Note that lock is NOT about concurrency but rather about ensuring
1414      // ownership of a procedure of an entity such as a region or table
1415      LockState lockState = acquireLock(proc);
1416      switch (lockState) {
1417        case LOCK_ACQUIRED:
1418          execProcedure(procStack, proc);
1419          break;
1420        case LOCK_YIELD_WAIT:
1421          LOG.info(lockState + " " + proc);
1422          scheduler.yield(proc);
1423          break;
1424        case LOCK_EVENT_WAIT:
1425          // Someone will wake us up when the lock is available
1426          LOG.debug(lockState + " " + proc);
1427          break;
1428        default:
1429          throw new UnsupportedOperationException();
1430      }
1431      procStack.release(proc);
1432
1433      if (proc.isSuccess()) {
1434        // update metrics on finishing the procedure
1435        proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
1436        LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
1437        // Finalize the procedure state
1438        if (proc.getProcId() == rootProcId) {
1439          procedureFinished(proc);
1440        } else {
1441          execCompletionCleanup(proc);
1442        }
1443        break;
1444      }
1445    } while (procStack.isFailed());
1446  }
1447
1448  private LockState acquireLock(Procedure<TEnvironment> proc) {
1449    TEnvironment env = getEnvironment();
1450    // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
1451    // hasLock is true.
1452    if (proc.hasLock()) {
1453      return LockState.LOCK_ACQUIRED;
1454    }
1455    return proc.doAcquireLock(env, store);
1456  }
1457
1458  private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
1459    TEnvironment env = getEnvironment();
1460    // For how the framework works, we know that we will always have the lock
1461    // when we call releaseLock(), so we can avoid calling proc.hasLock()
1462    if (force || !proc.holdLock(env) || proc.isFinished()) {
1463      proc.doReleaseLock(env, store);
1464    }
1465  }
1466
1467  /**
1468   * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
1469   * root-procedure will be visible as finished to user, and the result will be the fatal exception.
1470   */
1471  private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
1472    Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
1473    RemoteProcedureException exception = rootProc.getException();
1474    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
1475    // rolling back because the subprocedure does. Clarify.
1476    if (exception == null) {
1477      exception = procStack.getException();
1478      rootProc.setFailure(exception);
1479      store.update(rootProc);
1480    }
1481
1482    List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
1483    assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
1484
1485    int stackTail = subprocStack.size();
1486    while (stackTail-- > 0) {
1487      Procedure<TEnvironment> proc = subprocStack.get(stackTail);
1488      IdLock.Entry lockEntry = null;
1489      // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
1490      // this check, as the worker will hold the lock before executing a procedure. This is the only
1491      // place where we may hold two procedure execution locks, and there is a fence in the
1492      // RootProcedureState where we can make sure that only one worker can execute the rollback of
1493      // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
1494      // prevent race between us and the force update thread.
1495      if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
1496        try {
1497          lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
1498        } catch (IOException e) {
1499          // can only happen if interrupted, so not a big deal to propagate it
1500          throw new UncheckedIOException(e);
1501        }
1502      }
1503      try {
1504        // For the sub procedures which are successfully finished, we do not rollback them.
1505        // Typically, if we want to rollback a procedure, we first need to rollback it, and then
1506        // recursively rollback its ancestors. The state changes which are done by sub procedures
1507        // should be handled by parent procedures when rolling back. For example, when rolling back
1508        // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
1509        // online, instead of rolling back the original procedures which offlined the regions(in
1510        // fact these procedures can not be rolled back...).
1511        if (proc.isSuccess()) {
1512          // Just do the cleanup work, without actually executing the rollback
1513          subprocStack.remove(stackTail);
1514          cleanupAfterRollbackOneStep(proc);
1515          continue;
1516        }
1517        LockState lockState = acquireLock(proc);
1518        if (lockState != LockState.LOCK_ACQUIRED) {
1519          // can't take a lock on the procedure, add the root-proc back on the
1520          // queue waiting for the lock availability
1521          return lockState;
1522        }
1523
1524        lockState = executeRollback(proc);
1525        releaseLock(proc, false);
1526        boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
1527        abortRollback |= !isRunning() || !store.isRunning();
1528
1529        // allows to kill the executor before something is stored to the wal.
1530        // useful to test the procedure recovery.
1531        if (abortRollback) {
1532          return lockState;
1533        }
1534
1535        subprocStack.remove(stackTail);
1536
1537        // if the procedure is kind enough to pass the slot to someone else, yield
1538        // if the proc is already finished, do not yield
1539        if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
1540          return LockState.LOCK_YIELD_WAIT;
1541        }
1542
1543        if (proc != rootProc) {
1544          execCompletionCleanup(proc);
1545        }
1546      } finally {
1547        if (lockEntry != null) {
1548          procExecutionLock.releaseLockEntry(lockEntry);
1549        }
1550      }
1551    }
1552
1553    // Finalize the procedure state
1554    LOG.info("Rolled back {} exec-time={}", rootProc,
1555      StringUtils.humanTimeDiff(rootProc.elapsedTime()));
1556    procedureFinished(rootProc);
1557    return LockState.LOCK_ACQUIRED;
1558  }
1559
1560  private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {
1561    if (proc.removeStackIndex()) {
1562      if (!proc.isSuccess()) {
1563        proc.setState(ProcedureState.ROLLEDBACK);
1564      }
1565
1566      // update metrics on finishing the procedure (fail)
1567      proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);
1568
1569      if (proc.hasParent()) {
1570        store.delete(proc.getProcId());
1571        procedures.remove(proc.getProcId());
1572      } else {
1573        final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
1574        if (childProcIds != null) {
1575          store.delete(proc, childProcIds);
1576        } else {
1577          store.update(proc);
1578        }
1579      }
1580    } else {
1581      store.update(proc);
1582    }
1583  }
1584
1585  /**
1586   * Execute the rollback of the procedure step.
1587   * It updates the store with the new state (stack index)
1588   * or will remove completly the procedure in case it is a child.
1589   */
1590  private LockState executeRollback(Procedure<TEnvironment> proc) {
1591    try {
1592      proc.doRollback(getEnvironment());
1593    } catch (IOException e) {
1594      LOG.debug("Roll back attempt failed for {}", proc, e);
1595      return LockState.LOCK_YIELD_WAIT;
1596    } catch (InterruptedException e) {
1597      handleInterruptedException(proc, e);
1598      return LockState.LOCK_YIELD_WAIT;
1599    } catch (Throwable e) {
1600      // Catch NullPointerExceptions or similar errors...
1601      LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);
1602    }
1603
1604    // allows to kill the executor before something is stored to the wal.
1605    // useful to test the procedure recovery.
1606    if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1607      String msg = "TESTING: Kill before store update";
1608      LOG.debug(msg);
1609      stop();
1610      throw new RuntimeException(msg);
1611    }
1612
1613    cleanupAfterRollbackOneStep(proc);
1614
1615    return LockState.LOCK_ACQUIRED;
1616  }
1617
1618  private void yieldProcedure(Procedure<TEnvironment> proc) {
1619    releaseLock(proc, false);
1620    scheduler.yield(proc);
1621  }
1622
1623  /**
1624   * Executes <code>procedure</code>
1625   * <ul>
1626   *  <li>Calls the doExecute() of the procedure
1627   *  <li>If the procedure execution didn't fail (i.e. valid user input)
1628   *  <ul>
1629   *    <li>...and returned subprocedures
1630   *    <ul><li>The subprocedures are initialized.
1631   *      <li>The subprocedures are added to the store
1632   *      <li>The subprocedures are added to the runnable queue
1633   *      <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
1634   *    </ul>
1635   *    </li>
1636   *   <li>...if there are no subprocedure
1637   *    <ul><li>the procedure completed successfully
1638   *      <li>if there is a parent (WAITING)
1639   *      <li>the parent state will be set to RUNNABLE
1640   *    </ul>
1641   *   </li>
1642   *  </ul>
1643   *  </li>
1644   *  <li>In case of failure
1645   *  <ul>
1646   *    <li>The store is updated with the new state</li>
1647   *    <li>The executor (caller of this method) will start the rollback of the procedure</li>
1648   *  </ul>
1649   *  </li>
1650   *  </ul>
1651   */
1652  private void execProcedure(RootProcedureState<TEnvironment> procStack,
1653      Procedure<TEnvironment> procedure) {
1654    Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
1655        "NOT RUNNABLE! " + procedure.toString());
1656
1657    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
1658    // The exception is caught below and then we hurry to the exit without disturbing state. The
1659    // idea is that the processing of this procedure will be unsuspended later by an external event
1660    // such the report of a region open.
1661    boolean suspended = false;
1662
1663    // Whether to 're-' -execute; run through the loop again.
1664    boolean reExecute = false;
1665
1666    Procedure<TEnvironment>[] subprocs = null;
1667    do {
1668      reExecute = false;
1669      procedure.resetPersistence();
1670      try {
1671        subprocs = procedure.doExecute(getEnvironment());
1672        if (subprocs != null && subprocs.length == 0) {
1673          subprocs = null;
1674        }
1675      } catch (ProcedureSuspendedException e) {
1676        LOG.trace("Suspend {}", procedure);
1677        suspended = true;
1678      } catch (ProcedureYieldException e) {
1679        LOG.trace("Yield {}", procedure, e);
1680        yieldProcedure(procedure);
1681        return;
1682      } catch (InterruptedException e) {
1683        LOG.trace("Yield interrupt {}", procedure, e);
1684        handleInterruptedException(procedure, e);
1685        yieldProcedure(procedure);
1686        return;
1687      } catch (Throwable e) {
1688        // Catch NullPointerExceptions or similar errors...
1689        String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
1690        LOG.error(msg, e);
1691        procedure.setFailure(new RemoteProcedureException(msg, e));
1692      }
1693
1694      if (!procedure.isFailed()) {
1695        if (subprocs != null) {
1696          if (subprocs.length == 1 && subprocs[0] == procedure) {
1697            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
1698            // i.e. we go around this loop again rather than go back out on the scheduler queue.
1699            subprocs = null;
1700            reExecute = true;
1701            LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
1702          } else {
1703            // Yield the current procedure, and make the subprocedure runnable
1704            // subprocs may come back 'null'.
1705            subprocs = initializeChildren(procStack, procedure, subprocs);
1706            LOG.info("Initialized subprocedures=" +
1707              (subprocs == null? null:
1708                Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
1709                collect(Collectors.toList()).toString()));
1710          }
1711        } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1712          LOG.trace("Added to timeoutExecutor {}", procedure);
1713          timeoutExecutor.add(procedure);
1714        } else if (!suspended) {
1715          // No subtask, so we are done
1716          procedure.setState(ProcedureState.SUCCESS);
1717        }
1718      }
1719
1720      // Add the procedure to the stack
1721      procStack.addRollbackStep(procedure);
1722
1723      // allows to kill the executor before something is stored to the wal.
1724      // useful to test the procedure recovery.
1725      if (testing != null &&
1726        testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())) {
1727        kill("TESTING: Kill BEFORE store update: " + procedure);
1728      }
1729
1730      // TODO: The code here doesn't check if store is running before persisting to the store as
1731      // it relies on the method call below to throw RuntimeException to wind up the stack and
1732      // executor thread to stop. The statement following the method call below seems to check if
1733      // store is not running, to prevent scheduling children procedures, re-execution or yield
1734      // of this procedure. This may need more scrutiny and subsequent cleanup in future
1735      //
1736      // Commit the transaction even if a suspend (state may have changed). Note this append
1737      // can take a bunch of time to complete.
1738      if (procedure.needPersistence()) {
1739        updateStoreOnExec(procStack, procedure, subprocs);
1740      }
1741
1742      // if the store is not running we are aborting
1743      if (!store.isRunning()) {
1744        return;
1745      }
1746      // if the procedure is kind enough to pass the slot to someone else, yield
1747      if (procedure.isRunnable() && !suspended &&
1748          procedure.isYieldAfterExecutionStep(getEnvironment())) {
1749        yieldProcedure(procedure);
1750        return;
1751      }
1752
1753      assert (reExecute && subprocs == null) || !reExecute;
1754    } while (reExecute);
1755
1756    // Allows to kill the executor after something is stored to the WAL but before the below
1757    // state settings are done -- in particular the one on the end where we make parent
1758    // RUNNABLE again when its children are done; see countDownChildren.
1759    if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
1760      kill("TESTING: Kill AFTER store update: " + procedure);
1761    }
1762
1763    // Submit the new subprocedures
1764    if (subprocs != null && !procedure.isFailed()) {
1765      submitChildrenProcedures(subprocs);
1766    }
1767
1768    // we need to log the release lock operation before waking up the parent procedure, as there
1769    // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
1770    // the sub procedures from store and cause problems...
1771    releaseLock(procedure, false);
1772
1773    // if the procedure is complete and has a parent, count down the children latch.
1774    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
1775    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
1776      countDownChildren(procStack, procedure);
1777    }
1778  }
1779
1780  private void kill(String msg) {
1781    LOG.debug(msg);
1782    stop();
1783    throw new RuntimeException(msg);
1784  }
1785
1786  private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
1787      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1788    assert subprocs != null : "expected subprocedures";
1789    final long rootProcId = getRootProcedureId(procedure);
1790    for (int i = 0; i < subprocs.length; ++i) {
1791      Procedure<TEnvironment> subproc = subprocs[i];
1792      if (subproc == null) {
1793        String msg = "subproc[" + i + "] is null, aborting the procedure";
1794        procedure.setFailure(new RemoteProcedureException(msg,
1795          new IllegalArgumentIOException(msg)));
1796        return null;
1797      }
1798
1799      assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
1800      subproc.setParentProcId(procedure.getProcId());
1801      subproc.setRootProcId(rootProcId);
1802      subproc.setProcId(nextProcId());
1803      procStack.addSubProcedure(subproc);
1804    }
1805
1806    if (!procedure.isFailed()) {
1807      procedure.setChildrenLatch(subprocs.length);
1808      switch (procedure.getState()) {
1809        case RUNNABLE:
1810          procedure.setState(ProcedureState.WAITING);
1811          break;
1812        case WAITING_TIMEOUT:
1813          timeoutExecutor.add(procedure);
1814          break;
1815        default:
1816          break;
1817      }
1818    }
1819    return subprocs;
1820  }
1821
1822  private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
1823    for (int i = 0; i < subprocs.length; ++i) {
1824      Procedure<TEnvironment> subproc = subprocs[i];
1825      subproc.updateMetricsOnSubmit(getEnvironment());
1826      assert !procedures.containsKey(subproc.getProcId());
1827      procedures.put(subproc.getProcId(), subproc);
1828      scheduler.addFront(subproc);
1829    }
1830  }
1831
1832  private void countDownChildren(RootProcedureState<TEnvironment> procStack,
1833      Procedure<TEnvironment> procedure) {
1834    Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
1835    if (parent == null) {
1836      assert procStack.isRollingback();
1837      return;
1838    }
1839
1840    // If this procedure is the last child awake the parent procedure
1841    if (parent.tryRunnable()) {
1842      // If we succeeded in making the parent runnable -- i.e. all of its
1843      // children have completed, move parent to front of the queue.
1844      store.update(parent);
1845      scheduler.addFront(parent);
1846      LOG.info("Finished subprocedure pid={}, resume processing ppid={}",
1847        procedure.getProcId(), parent.getProcId());
1848      return;
1849    }
1850  }
1851
1852  private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
1853      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1854    if (subprocs != null && !procedure.isFailed()) {
1855      if (LOG.isTraceEnabled()) {
1856        LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
1857      }
1858      store.insert(procedure, subprocs);
1859    } else {
1860      LOG.trace("Store update {}", procedure);
1861      if (procedure.isFinished() && !procedure.hasParent()) {
1862        // remove child procedures
1863        final long[] childProcIds = procStack.getSubprocedureIds();
1864        if (childProcIds != null) {
1865          store.delete(procedure, childProcIds);
1866          for (int i = 0; i < childProcIds.length; ++i) {
1867            procedures.remove(childProcIds[i]);
1868          }
1869        } else {
1870          store.update(procedure);
1871        }
1872      } else {
1873        store.update(procedure);
1874      }
1875    }
1876  }
1877
1878  private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
1879    LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
1880    // NOTE: We don't call Thread.currentThread().interrupt()
1881    // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
1882    // the InterruptedException. If the master is going down, we will be notified
1883    // and the executor/store will be stopped.
1884    // (The interrupted procedure will be retried on the next run)
1885  }
1886
1887  private void execCompletionCleanup(Procedure<TEnvironment> proc) {
1888    final TEnvironment env = getEnvironment();
1889    if (proc.hasLock()) {
1890      LOG.warn("Usually this should not happen, we will release the lock before if the procedure" +
1891        " is finished, even if the holdLock is true, arrive here means we have some holes where" +
1892        " we do not release the lock. And the releaseLock below may fail since the procedure may" +
1893        " have already been deleted from the procedure store.");
1894      releaseLock(proc, true);
1895    }
1896    try {
1897      proc.completionCleanup(env);
1898    } catch (Throwable e) {
1899      // Catch NullPointerExceptions or similar errors...
1900      LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1901    }
1902  }
1903
1904  private void procedureFinished(Procedure<TEnvironment> proc) {
1905    // call the procedure completion cleanup handler
1906    execCompletionCleanup(proc);
1907
1908    CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
1909
1910    // update the executor internal state maps
1911    if (!proc.shouldWaitClientAck(getEnvironment())) {
1912      retainer.setClientAckTime(0);
1913    }
1914
1915    completed.put(proc.getProcId(), retainer);
1916    rollbackStack.remove(proc.getProcId());
1917    procedures.remove(proc.getProcId());
1918
1919    // call the runnableSet completion cleanup handler
1920    try {
1921      scheduler.completionCleanup(proc);
1922    } catch (Throwable e) {
1923      // Catch NullPointerExceptions or similar errors...
1924      LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
1925    }
1926
1927    // Notify the listeners
1928    sendProcedureFinishedNotification(proc.getProcId());
1929  }
1930
1931  RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
1932    return rollbackStack.get(rootProcId);
1933  }
1934
1935  @VisibleForTesting
1936  ProcedureScheduler getProcedureScheduler() {
1937    return scheduler;
1938  }
1939
1940  @VisibleForTesting
1941  int getCompletedSize() {
1942    return completed.size();
1943  }
1944
1945  @VisibleForTesting
1946  public IdLock getProcExecutionLock() {
1947    return procExecutionLock;
1948  }
1949
1950  // ==========================================================================
1951  //  Worker Thread
1952  // ==========================================================================
1953  private class WorkerThread extends StoppableThread {
1954    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
1955    private volatile Procedure<TEnvironment> activeProcedure;
1956
1957    public WorkerThread(ThreadGroup group) {
1958      this(group, "PEWorker-");
1959    }
1960
1961    protected WorkerThread(ThreadGroup group, String prefix) {
1962      super(group, prefix + workerId.incrementAndGet());
1963      setDaemon(true);
1964    }
1965
1966    @Override
1967    public void sendStopSignal() {
1968      scheduler.signalAll();
1969    }
1970    @Override
1971    public void run() {
1972      long lastUpdate = EnvironmentEdgeManager.currentTime();
1973      try {
1974        while (isRunning() && keepAlive(lastUpdate)) {
1975          @SuppressWarnings("unchecked")
1976          Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
1977          if (proc == null) {
1978            continue;
1979          }
1980          this.activeProcedure = proc;
1981          int activeCount = activeExecutorCount.incrementAndGet();
1982          int runningCount = store.setRunningProcedureCount(activeCount);
1983          LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
1984            runningCount, activeCount);
1985          executionStartTime.set(EnvironmentEdgeManager.currentTime());
1986          IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
1987          try {
1988            executeProcedure(proc);
1989          } catch (AssertionError e) {
1990            LOG.info("ASSERT pid=" + proc.getProcId(), e);
1991            throw e;
1992          } finally {
1993            procExecutionLock.releaseLockEntry(lockEntry);
1994            activeCount = activeExecutorCount.decrementAndGet();
1995            runningCount = store.setRunningProcedureCount(activeCount);
1996            LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
1997              runningCount, activeCount);
1998            this.activeProcedure = null;
1999            lastUpdate = EnvironmentEdgeManager.currentTime();
2000            executionStartTime.set(Long.MAX_VALUE);
2001          }
2002        }
2003      } catch (Throwable t) {
2004        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
2005      } finally {
2006        LOG.trace("Worker terminated.");
2007      }
2008      workerThreads.remove(this);
2009    }
2010
2011    @Override
2012    public String toString() {
2013      Procedure<?> p = this.activeProcedure;
2014      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");
2015    }
2016
2017    /**
2018     * @return the time since the current procedure is running
2019     */
2020    public long getCurrentRunTime() {
2021      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
2022    }
2023
2024    // core worker never timeout
2025    protected boolean keepAlive(long lastUpdate) {
2026      return true;
2027    }
2028  }
2029
2030  // A worker thread which can be added when core workers are stuck. Will timeout after
2031  // keepAliveTime if there is no procedure to run.
2032  private final class KeepAliveWorkerThread extends WorkerThread {
2033    public KeepAliveWorkerThread(ThreadGroup group) {
2034      super(group, "KeepAlivePEWorker-");
2035    }
2036
2037    @Override
2038    protected boolean keepAlive(long lastUpdate) {
2039      return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
2040    }
2041  }
2042
2043  // ----------------------------------------------------------------------------
2044  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the
2045  // full set of procedures pending and completed to write a compacted
2046  // version of the log (in case is a log)?
2047  // In theory no, procedures are have a short life, so at some point the store
2048  // will have the tracker saying everything is in the last log.
2049  // ----------------------------------------------------------------------------
2050
2051  private final class WorkerMonitor extends InlineChore {
2052    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
2053        "hbase.procedure.worker.monitor.interval.msec";
2054    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
2055
2056    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
2057        "hbase.procedure.worker.stuck.threshold.msec";
2058    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
2059
2060    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =
2061        "hbase.procedure.worker.add.stuck.percentage";
2062    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck
2063
2064    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
2065    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
2066    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;
2067
2068    public WorkerMonitor() {
2069      refreshConfig();
2070    }
2071
2072    @Override
2073    public void run() {
2074      final int stuckCount = checkForStuckWorkers();
2075      checkThreadCount(stuckCount);
2076
2077      // refresh interval (poor man dynamic conf update)
2078      refreshConfig();
2079    }
2080
2081    private int checkForStuckWorkers() {
2082      // check if any of the worker is stuck
2083      int stuckCount = 0;
2084      for (WorkerThread worker : workerThreads) {
2085        if (worker.getCurrentRunTime() < stuckThreshold) {
2086          continue;
2087        }
2088
2089        // WARN the worker is stuck
2090        stuckCount++;
2091        LOG.warn("Worker stuck {}, run time {}", worker,
2092          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
2093      }
2094      return stuckCount;
2095    }
2096
2097    private void checkThreadCount(final int stuckCount) {
2098      // nothing to do if there are no runnable tasks
2099      if (stuckCount < 1 || !scheduler.hasRunnables()) {
2100        return;
2101      }
2102
2103      // add a new thread if the worker stuck percentage exceed the threshold limit
2104      // and every handler is active.
2105      final float stuckPerc = ((float) stuckCount) / workerThreads.size();
2106      // let's add new worker thread more aggressively, as they will timeout finally if there is no
2107      // work to do.
2108      if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
2109        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
2110        workerThreads.add(worker);
2111        worker.start();
2112        LOG.debug("Added new worker thread {}", worker);
2113      }
2114    }
2115
2116    private void refreshConfig() {
2117      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,
2118          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);
2119      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,
2120        DEFAULT_WORKER_MONITOR_INTERVAL);
2121      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,
2122        DEFAULT_WORKER_STUCK_THRESHOLD);
2123    }
2124
2125    @Override
2126    public int getTimeoutInterval() {
2127      return timeoutInterval;
2128    }
2129  }
2130}