001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.UncheckedIOException;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Deque;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.Executor;
033import java.util.concurrent.Executors;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.stream.Collectors;
039import java.util.stream.Stream;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
043import org.apache.hadoop.hbase.log.HBaseMarkers;
044import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
048import org.apache.hadoop.hbase.procedure2.util.StringUtils;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.IdLock;
052import org.apache.hadoop.hbase.util.NonceKey;
053import org.apache.hadoop.hbase.util.Threads;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
063
064/**
065 * Thread Pool that executes the submitted procedures.
066 * The executor has a ProcedureStore associated.
067 * Each operation is logged and on restart the pending procedures are resumed.
068 *
069 * Unless the Procedure code throws an error (e.g. invalid user input)
070 * the procedure will complete (at some point in time), On restart the pending
071 * procedures are resumed and the once failed will be rolledback.
072 *
073 * The user can add procedures to the executor via submitProcedure(proc)
074 * check for the finished state via isFinished(procId)
075 * and get the result via getResult(procId)
076 */
077@InterfaceAudience.Private
078public class ProcedureExecutor<TEnvironment> {
079  private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
080
081  public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
082  private static final boolean DEFAULT_CHECK_OWNER_SET = false;
083
084  public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
085      "hbase.procedure.worker.keep.alive.time.msec";
086  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
087
088  // Enable this flag if you want to upgrade to 2.2+, there are some incompatible changes on how we
089  // assign or unassign a region, so we need to make sure all these procedures have been finished
090  // before we start the master with new code. See HBASE-20881 and HBASE-21075 for more details.
091  public static final String UPGRADE_TO_2_2 = "hbase.procedure.upgrade-to-2-2";
092  private static final boolean DEFAULT_UPGRADE_TO_2_2 = false;
093
094  public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
095  static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
096
097  public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
098  static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
099
100  /**
101   * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
102   * break PE having it fail at various junctures. When non-null, testing is set to an instance of
103   * the below internal {@link Testing} class with flags set for the particular test.
104   */
105  Testing testing = null;
106
107  /**
108   * Class with parameters describing how to fail/die when in testing-context.
109   */
110  public static class Testing {
111    protected boolean killIfSuspended = false;
112
113    /**
114     * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is
115     * persisting all the state it needs to recover after a crash.
116     */
117    protected boolean killBeforeStoreUpdate = false;
118    protected boolean toggleKillBeforeStoreUpdate = false;
119
120    /**
121     * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978
122     * is about a case where memory-state was being set after store to WAL where a crash could
123     * cause us to get stuck. This flag allows killing at what was a vulnerable time.
124     */
125    protected boolean killAfterStoreUpdate = false;
126    protected boolean toggleKillAfterStoreUpdate = false;
127
128    protected boolean shouldKillBeforeStoreUpdate() {
129      final boolean kill = this.killBeforeStoreUpdate;
130      if (this.toggleKillBeforeStoreUpdate) {
131        this.killBeforeStoreUpdate = !kill;
132        LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
133      }
134      return kill;
135    }
136
137    protected boolean shouldKillBeforeStoreUpdate(final boolean isSuspended) {
138      return (isSuspended && !killIfSuspended) ? false : shouldKillBeforeStoreUpdate();
139    }
140
141    protected boolean shouldKillAfterStoreUpdate() {
142      final boolean kill = this.killAfterStoreUpdate;
143      if (this.toggleKillAfterStoreUpdate) {
144        this.killAfterStoreUpdate = !kill;
145        LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);
146      }
147      return kill;
148    }
149
150    protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {
151      return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();
152    }
153  }
154
155  public interface ProcedureExecutorListener {
156    void procedureLoaded(long procId);
157    void procedureAdded(long procId);
158    void procedureFinished(long procId);
159  }
160
161  /**
162   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
163   * Once a Root-Procedure completes (success or failure), the result will be added to this map.
164   * The user of ProcedureExecutor should call getResult(procId) to get the result.
165   */
166  private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
167    new ConcurrentHashMap<>();
168
169  /**
170   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
171   * The RootProcedureState contains the execution stack of the Root-Procedure,
172   * It is added to the map by submitProcedure() and removed on procedure completion.
173   */
174  private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
175    new ConcurrentHashMap<>();
176
177  /**
178   * Helper map to lookup the live procedures by ID.
179   * This map contains every procedure. root-procedures and subprocedures.
180   */
181  private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
182    new ConcurrentHashMap<>();
183
184  /**
185   * Helper map to lookup whether the procedure already issued from the same client. This map
186   * contains every root procedure.
187   */
188  private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
189
190  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
191    new CopyOnWriteArrayList<>();
192
193  private Configuration conf;
194
195  /**
196   * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
197   * resource handling rather than observing in a #join is unexpected).
198   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
199   * (Should be ok).
200   */
201  private ThreadGroup threadGroup;
202
203  /**
204   * Created in the {@link #init(int, boolean)}  method. Terminated in {@link #join()} (FIX! Doing
205   * resource handling rather than observing in a #join is unexpected).
206   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
207   * (Should be ok).
208   */
209  private CopyOnWriteArrayList<WorkerThread> workerThreads;
210
211  /**
212   * Worker thread only for urgent tasks.
213   */
214  private CopyOnWriteArrayList<WorkerThread> urgentWorkerThreads;
215
216  /**
217   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
218   * resource handling rather than observing in a #join is unexpected).
219   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
220   * (Should be ok).
221   */
222  private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
223
224  /**
225   * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
226   * there is no worker to assign meta, it will new worker thread for it, so it is very important.
227   * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore
228   * and so on, some tasks may execute for a long time so will block other tasks like
229   * WorkerMonitor, so use a dedicated thread for executing WorkerMonitor.
230   */
231  private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;
232
233  private int corePoolSize;
234  private int maxPoolSize;
235  private int urgentPoolSize;
236
237  private volatile long keepAliveTime;
238
239  /**
240   * Scheduler/Queue that contains runnable procedures.
241   */
242  private final ProcedureScheduler scheduler;
243
244  private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
245    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
246
247  private final AtomicLong lastProcId = new AtomicLong(-1);
248  private final AtomicLong workerId = new AtomicLong(0);
249  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
250  private final AtomicBoolean running = new AtomicBoolean(false);
251  private final TEnvironment environment;
252  private final ProcedureStore store;
253
254  private final boolean checkOwnerSet;
255
256  // To prevent concurrent execution of the same procedure.
257  // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
258  // procedure is woken up before we finish the suspend which causes the same procedures to be
259  // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
260  // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
261  // execution of the same procedure.
262  private final IdLock procExecutionLock = new IdLock();
263
264  private final boolean upgradeTo2_2;
265
266  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
267      final ProcedureStore store) {
268    this(conf, environment, store, new SimpleProcedureScheduler());
269  }
270
271  private boolean isRootFinished(Procedure<?> proc) {
272    Procedure<?> rootProc = procedures.get(proc.getRootProcId());
273    return rootProc == null || rootProc.isFinished();
274  }
275
276  private void forceUpdateProcedure(long procId) throws IOException {
277    IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
278    try {
279      Procedure<TEnvironment> proc = procedures.get(procId);
280      if (proc != null) {
281        if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
282          LOG.debug("Procedure {} has already been finished and parent is succeeded," +
283            " skip force updating", proc);
284          return;
285        }
286      } else {
287        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
288        if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
289          LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
290          return;
291        }
292        long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
293        long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
294        if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
295          LOG.debug("Procedure {} has already been finished and expired, skip force updating",
296            procId);
297          return;
298        }
299        proc = retainer.getProcedure();
300      }
301      LOG.debug("Force update procedure {}", proc);
302      store.update(proc);
303    } finally {
304      procExecutionLock.releaseLockEntry(lockEntry);
305    }
306  }
307
308  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
309      final ProcedureStore store, final ProcedureScheduler scheduler) {
310    this.environment = environment;
311    this.scheduler = scheduler;
312    this.store = store;
313    this.conf = conf;
314    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
315    this.upgradeTo2_2 = conf.getBoolean(UPGRADE_TO_2_2, DEFAULT_UPGRADE_TO_2_2);
316    refreshConfiguration(conf);
317    store.registerListener(new ProcedureStoreListener() {
318
319      @Override
320      public void forceUpdate(long[] procIds) {
321        Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
322          try {
323            forceUpdateProcedure(procId);
324          } catch (IOException e) {
325            LOG.warn("Failed to force update procedure with pid={}", procId);
326          }
327        }));
328      }
329    });
330  }
331
332  private void load(final boolean abortOnCorruption) throws IOException {
333    Preconditions.checkArgument(completed.isEmpty(), "completed not empty");
334    Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty");
335    Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty");
336    Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty");
337
338    store.load(new ProcedureStore.ProcedureLoader() {
339      @Override
340      public void setMaxProcId(long maxProcId) {
341        assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
342        lastProcId.set(maxProcId);
343      }
344
345      @Override
346      public void load(ProcedureIterator procIter) throws IOException {
347        loadProcedures(procIter, abortOnCorruption);
348      }
349
350      @Override
351      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
352        int corruptedCount = 0;
353        while (procIter.hasNext()) {
354          Procedure<?> proc = procIter.next();
355          LOG.error("Corrupt " + proc);
356          corruptedCount++;
357        }
358        if (abortOnCorruption && corruptedCount > 0) {
359          throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
360        }
361      }
362    });
363  }
364
365  private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
366    proc.restoreLock(getEnvironment());
367    restored.add(proc.getProcId());
368  }
369
370  private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
371    while (!stack.isEmpty()) {
372      restoreLock(stack.pop(), restored);
373    }
374  }
375
376  // Restore the locks for all the procedures.
377  // Notice that we need to restore the locks starting from the root proc, otherwise there will be
378  // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
379  // calling the acquireLock method for the parent procedure.
380  // The algorithm is straight-forward:
381  // 1. Use a set to record the procedures which locks have already been restored.
382  // 2. Use a stack to store the hierarchy of the procedures
383  // 3. For all the procedure, we will first try to find its parent and push it into the stack,
384  // unless
385  // a. We have no parent, i.e, we are the root procedure
386  // b. The lock has already been restored(by checking the set introduced in #1)
387  // then we start to pop the stack and call acquireLock for each procedure.
388  // Notice that this should be done for all procedures, not only the ones in runnableList.
389  private void restoreLocks() {
390    Set<Long> restored = new HashSet<>();
391    Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
392    procedures.values().forEach(proc -> {
393      for (;;) {
394        if (restored.contains(proc.getProcId())) {
395          restoreLocks(stack, restored);
396          return;
397        }
398        if (!proc.hasParent()) {
399          restoreLock(proc, restored);
400          restoreLocks(stack, restored);
401          return;
402        }
403        stack.push(proc);
404        proc = procedures.get(proc.getParentProcId());
405      }
406    });
407  }
408
409  private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
410      throws IOException {
411    // 1. Build the rollback stack
412    int runnableCount = 0;
413    int failedCount = 0;
414    int waitingCount = 0;
415    int waitingTimeoutCount = 0;
416    while (procIter.hasNext()) {
417      boolean finished = procIter.isNextFinished();
418      Procedure<TEnvironment> proc = procIter.next();
419      NonceKey nonceKey = proc.getNonceKey();
420      long procId = proc.getProcId();
421
422      if (finished) {
423        completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
424        LOG.debug("Completed {}", proc);
425      } else {
426        if (!proc.hasParent()) {
427          assert !proc.isFinished() : "unexpected finished procedure";
428          rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
429        }
430
431        // add the procedure to the map
432        proc.beforeReplay(getEnvironment());
433        procedures.put(proc.getProcId(), proc);
434        switch (proc.getState()) {
435          case RUNNABLE:
436            runnableCount++;
437            break;
438          case FAILED:
439            failedCount++;
440            break;
441          case WAITING:
442            waitingCount++;
443            break;
444          case WAITING_TIMEOUT:
445            waitingTimeoutCount++;
446            break;
447          default:
448            break;
449        }
450      }
451
452      // add the nonce to the map
453      if (nonceKey != null) {
454        nonceKeysToProcIdsMap.put(nonceKey, procId);
455      }
456    }
457
458    // 2. Initialize the stacks
459    // In the old implementation, for procedures in FAILED state, we will push it into the
460    // ProcedureScheduler directly to execute the rollback. But this does not work after we
461    // introduce the restore lock stage.
462    // For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and
463    // then when a procedure which has lock access, for example, a sub procedure of the procedure
464    // which has the xlock, is pushed into the scheduler, we will add the queue back to let the
465    // workers poll from it. The assumption here is that, the procedure which has the xlock should
466    // have been polled out already, so when loading we can not add the procedure to scheduler first
467    // and then call acquireLock, since the procedure is still in the queue, and since we will
468    // remove the queue from runQueue, then no one can poll it out, then there is a dead lock
469    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
470    List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
471    List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
472    List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
473    procIter.reset();
474    while (procIter.hasNext()) {
475      if (procIter.isNextFinished()) {
476        procIter.skipNext();
477        continue;
478      }
479
480      Procedure<TEnvironment> proc = procIter.next();
481      assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
482
483      LOG.debug("Loading {}", proc);
484
485      Long rootProcId = getRootProcedureId(proc);
486      // The orphan procedures will be passed to handleCorrupted, so add an assert here
487      assert rootProcId != null;
488
489      if (proc.hasParent()) {
490        Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
491        if (parent != null && !proc.isFinished()) {
492          parent.incChildrenLatch();
493        }
494      }
495
496      RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
497      procStack.loadStack(proc);
498
499      proc.setRootProcId(rootProcId);
500      switch (proc.getState()) {
501        case RUNNABLE:
502          runnableList.add(proc);
503          break;
504        case WAITING:
505          waitingList.add(proc);
506          break;
507        case WAITING_TIMEOUT:
508          waitingTimeoutList.add(proc);
509          break;
510        case FAILED:
511          failedList.add(proc);
512          break;
513        case ROLLEDBACK:
514        case INITIALIZING:
515          String msg = "Unexpected " + proc.getState() + " state for " + proc;
516          LOG.error(msg);
517          throw new UnsupportedOperationException(msg);
518        default:
519          break;
520      }
521    }
522
523    // 3. Check the waiting procedures to see if some of them can be added to runnable.
524    waitingList.forEach(proc -> {
525      if (!proc.hasChildren()) {
526        // Normally, WAITING procedures should be waken by its children.
527        // But, there is a case that, all the children are successful and before
528        // they can wake up their parent procedure, the master was killed.
529        // So, during recovering the procedures from ProcedureWal, its children
530        // are not loaded because of their SUCCESS state.
531        // So we need to continue to run this WAITING procedure. But before
532        // executing, we need to set its state to RUNNABLE, otherwise, a exception
533        // will throw:
534        // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
535        // "NOT RUNNABLE! " + procedure.toString());
536        proc.setState(ProcedureState.RUNNABLE);
537        runnableList.add(proc);
538      } else {
539        proc.afterReplay(getEnvironment());
540      }
541    });
542    // 4. restore locks
543    restoreLocks();
544
545    // 5. Push the procedures to the timeout executor
546    waitingTimeoutList.forEach(proc -> {
547      proc.afterReplay(getEnvironment());
548      timeoutExecutor.add(proc);
549    });
550
551    // 6. Push the procedure to the scheduler
552    failedList.forEach(scheduler::addBack);
553    runnableList.forEach(p -> {
554      p.afterReplay(getEnvironment());
555      if (!p.hasParent()) {
556        sendProcedureLoadedNotification(p.getProcId());
557      }
558      scheduler.addBack(p);
559    });
560    // After all procedures put into the queue, signal the worker threads.
561    // Otherwise, there is a race condition. See HBASE-21364.
562    scheduler.signalAll();
563  }
564
565  /**
566   * Initialize the procedure executor, but do not start workers. We will start them later.
567   * <p/>
568   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
569   * ensure a single executor, and start the procedure replay to resume and recover the previous
570   * pending and in-progress procedures.
571   * @param numThreads number of threads available for procedure execution.
572   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
573   *          is found on replay. otherwise false.
574   */
575  public void init(int numThreads, boolean abortOnCorruption) throws IOException {
576    init(numThreads, 0, abortOnCorruption);
577  }
578
579  /**
580   * Initialize the procedure executor, but do not start workers. We will start them later.
581   * <p/>
582   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
583   * ensure a single executor, and start the procedure replay to resume and recover the previous
584   * pending and in-progress procedures.
585   * @param numThreads number of threads available for procedure execution.
586   * @param urgentNumThreads number of threads available for urgent procedure execution.
587   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
588   *          is found on replay. otherwise false.
589   */
590  public void init(int numThreads, int urgentNumThreads,
591      boolean abortOnCorruption) throws IOException {
592    // We have numThreads executor + one timer thread used for timing out
593    // procedures and triggering periodic procedures.
594    this.corePoolSize = numThreads;
595    this.maxPoolSize = 10 * numThreads;
596    this.urgentPoolSize = urgentNumThreads;
597    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker "
598            + "count={}, start {} urgent thread(s)",
599        corePoolSize, maxPoolSize, urgentPoolSize);
600
601    this.threadGroup = new ThreadGroup("PEWorkerGroup");
602    this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
603    this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");
604
605    // Create the workers
606    workerId.set(0);
607    workerThreads = new CopyOnWriteArrayList<>();
608    urgentWorkerThreads = new CopyOnWriteArrayList<>();
609    for (int i = 0; i < corePoolSize; ++i) {
610      workerThreads.add(new WorkerThread(threadGroup));
611    }
612    for (int i = 0; i < urgentNumThreads; ++i) {
613      urgentWorkerThreads
614          .add(new WorkerThread(threadGroup, "UrgentPEWorker-", true));
615    }
616
617    long st, et;
618
619    // Acquire the store lease.
620    st = System.nanoTime();
621    store.recoverLease();
622    et = System.nanoTime();
623    LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
624      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
625
626    // start the procedure scheduler
627    scheduler.start();
628
629    // TODO: Split in two steps.
630    // TODO: Handle corrupted procedures (currently just a warn)
631    // The first one will make sure that we have the latest id,
632    // so we can start the threads and accept new procedures.
633    // The second step will do the actual load of old procedures.
634    st = System.nanoTime();
635    load(abortOnCorruption);
636    et = System.nanoTime();
637    LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
638      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
639  }
640
641  /**
642   * Start the workers.
643   */
644  public void startWorkers() throws IOException {
645    if (!running.compareAndSet(false, true)) {
646      LOG.warn("Already running");
647      return;
648    }
649    // Start the executors. Here we must have the lastProcId set.
650    LOG.debug("Start workers {}, urgent workers {}", workerThreads.size(),
651        urgentWorkerThreads.size());
652    timeoutExecutor.start();
653    workerMonitorExecutor.start();
654    for (WorkerThread worker: workerThreads) {
655      worker.start();
656    }
657
658    for (WorkerThread worker: urgentWorkerThreads) {
659      worker.start();
660    }
661
662    // Internal chores
663    workerMonitorExecutor.add(new WorkerMonitor());
664
665    if (upgradeTo2_2) {
666      timeoutExecutor.add(new InlineChore() {
667
668        @Override
669        public void run() {
670          if (procedures.isEmpty()) {
671            LOG.info("UPGRADE OK: All existed procedures have been finished, quit...");
672            System.exit(0);
673          }
674        }
675
676        @Override
677        public int getTimeoutInterval() {
678          // check every 5 seconds to see if we can quit
679          return 5000;
680        }
681      });
682    }
683
684    // Add completed cleaner chore
685    addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
686      nonceKeysToProcIdsMap));
687  }
688
689  public void stop() {
690    if (!running.getAndSet(false)) {
691      return;
692    }
693
694    LOG.info("Stopping");
695    scheduler.stop();
696    timeoutExecutor.sendStopSignal();
697    workerMonitorExecutor.sendStopSignal();
698  }
699
700  @VisibleForTesting
701  public void join() {
702    assert !isRunning() : "expected not running";
703
704    // stop the timeout executor
705    timeoutExecutor.awaitTermination();
706    // stop the work monitor executor
707    workerMonitorExecutor.awaitTermination();
708
709    // stop the worker threads
710    for (WorkerThread worker: workerThreads) {
711      worker.awaitTermination();
712    }
713
714    // stop the worker threads
715    for (WorkerThread worker: urgentWorkerThreads) {
716      worker.awaitTermination();
717    }
718
719    // Destroy the Thread Group for the executors
720    // TODO: Fix. #join is not place to destroy resources.
721    try {
722      threadGroup.destroy();
723    } catch (IllegalThreadStateException e) {
724      LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT",
725          this.threadGroup, e.getMessage());
726      // This dumps list of threads on STDOUT.
727      this.threadGroup.list();
728    }
729
730    // reset the in-memory state for testing
731    completed.clear();
732    rollbackStack.clear();
733    procedures.clear();
734    nonceKeysToProcIdsMap.clear();
735    scheduler.clear();
736    lastProcId.set(-1);
737  }
738
739  public void refreshConfiguration(final Configuration conf) {
740    this.conf = conf;
741    setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY,
742        DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS);
743  }
744
745  // ==========================================================================
746  //  Accessors
747  // ==========================================================================
748  public boolean isRunning() {
749    return running.get();
750  }
751
752  /**
753   * @return the current number of worker threads.
754   */
755  public int getWorkerThreadCount() {
756    return workerThreads.size() + urgentWorkerThreads.size();
757  }
758
759  /**
760   * @return the core pool size settings.
761   */
762  public int getCorePoolSize() {
763    return corePoolSize;
764  }
765
766  public int getUrgentPoolSize() {
767    return urgentPoolSize;
768  }
769
770  public int getActiveExecutorCount() {
771    return activeExecutorCount.get();
772  }
773
774  public TEnvironment getEnvironment() {
775    return this.environment;
776  }
777
778  public ProcedureStore getStore() {
779    return this.store;
780  }
781
782  ProcedureScheduler getScheduler() {
783    return scheduler;
784  }
785
786  public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
787    this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
788    this.scheduler.signalAll();
789  }
790
791  public long getKeepAliveTime(final TimeUnit timeUnit) {
792    return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
793  }
794
795  // ==========================================================================
796  //  Submit/Remove Chores
797  // ==========================================================================
798
799  /**
800   * Add a chore procedure to the executor
801   * @param chore the chore to add
802   */
803  public void addChore(ProcedureInMemoryChore<TEnvironment> chore) {
804    chore.setState(ProcedureState.WAITING_TIMEOUT);
805    timeoutExecutor.add(chore);
806  }
807
808  /**
809   * Remove a chore procedure from the executor
810   * @param chore the chore to remove
811   * @return whether the chore is removed, or it will be removed later
812   */
813  public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) {
814    chore.setState(ProcedureState.SUCCESS);
815    return timeoutExecutor.remove(chore);
816  }
817
818  // ==========================================================================
819  //  Nonce Procedure helpers
820  // ==========================================================================
821  /**
822   * Create a NoneKey from the specified nonceGroup and nonce.
823   * @param nonceGroup
824   * @param nonce
825   * @return the generated NonceKey
826   */
827  public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
828    return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
829  }
830
831  /**
832   * Register a nonce for a procedure that is going to be submitted.
833   * A procId will be reserved and on submitProcedure(),
834   * the procedure with the specified nonce will take the reserved ProcId.
835   * If someone already reserved the nonce, this method will return the procId reserved,
836   * otherwise an invalid procId will be returned. and the caller should procede
837   * and submit the procedure.
838   *
839   * @param nonceKey A unique identifier for this operation from the client or process.
840   * @return the procId associated with the nonce, if any otherwise an invalid procId.
841   */
842  public long registerNonce(final NonceKey nonceKey) {
843    if (nonceKey == null) return -1;
844
845    // check if we have already a Reserved ID for the nonce
846    Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
847    if (oldProcId == null) {
848      // reserve a new Procedure ID, this will be associated with the nonce
849      // and the procedure submitted with the specified nonce will use this ID.
850      final long newProcId = nextProcId();
851      oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
852      if (oldProcId == null) return -1;
853    }
854
855    // we found a registered nonce, but the procedure may not have been submitted yet.
856    // since the client expect the procedure to be submitted, spin here until it is.
857    final boolean traceEnabled = LOG.isTraceEnabled();
858    while (isRunning() &&
859           !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
860           nonceKeysToProcIdsMap.containsKey(nonceKey)) {
861      if (traceEnabled) {
862        LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
863      }
864      Threads.sleep(100);
865    }
866    return oldProcId.longValue();
867  }
868
869  /**
870   * Remove the NonceKey if the procedure was not submitted to the executor.
871   * @param nonceKey A unique identifier for this operation from the client or process.
872   */
873  public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
874    if (nonceKey == null) return;
875
876    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
877    if (procId == null) return;
878
879    // if the procedure was not submitted, remove the nonce
880    if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
881      nonceKeysToProcIdsMap.remove(nonceKey);
882    }
883  }
884
885  /**
886   * If the failure failed before submitting it, we may want to give back the
887   * same error to the requests with the same nonceKey.
888   *
889   * @param nonceKey A unique identifier for this operation from the client or process
890   * @param procName name of the procedure, used to inform the user
891   * @param procOwner name of the owner of the procedure, used to inform the user
892   * @param exception the failure to report to the user
893   */
894  public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
895      IOException exception) {
896    if (nonceKey == null) {
897      return;
898    }
899
900    Long procId = nonceKeysToProcIdsMap.get(nonceKey);
901    if (procId == null || completed.containsKey(procId)) {
902      return;
903    }
904
905    Procedure<TEnvironment> proc =
906      new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
907
908    completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
909  }
910
911  // ==========================================================================
912  //  Submit/Abort Procedure
913  // ==========================================================================
914  /**
915   * Add a new root-procedure to the executor.
916   * @param proc the new procedure to execute.
917   * @return the procedure id, that can be used to monitor the operation
918   */
919  public long submitProcedure(Procedure<TEnvironment> proc) {
920    return submitProcedure(proc, null);
921  }
922
923  /**
924   * Bypass a procedure. If the procedure is set to bypass, all the logic in
925   * execute/rollback will be ignored and it will return success, whatever.
926   * It is used to recover buggy stuck procedures, releasing the lock resources
927   * and letting other procedures run. Bypassing one procedure (and its ancestors will
928   * be bypassed automatically) may leave the cluster in a middle state, e.g. region
929   * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures,
930   * the operators may have to do some clean up on hdfs or schedule some assign procedures
931   * to let region online. DO AT YOUR OWN RISK.
932   * <p>
933   * A procedure can be bypassed only if
934   * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT
935   * or it is a root procedure without any child.
936   * 2. No other worker thread is executing it
937   * 3. No child procedure has been submitted
938   *
939   * <p>
940   * If all the requirements are meet, the procedure and its ancestors will be
941   * bypassed and persisted to WAL.
942   *
943   * <p>
944   * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
945   * TODO: What about WAITING_TIMEOUT?
946   * @param pids the procedure id
947   * @param lockWait time to wait lock
948   * @param force if force set to true, we will bypass the procedure even if it is executing.
949   *              This is for procedures which can't break out during executing(due to bug, mostly)
950   *              In this case, bypassing the procedure is not enough, since it is already stuck
951   *              there. We need to restart the master after bypassing, and letting the problematic
952   *              procedure to execute wth bypass=true, so in that condition, the procedure can be
953   *              successfully bypassed.
954   * @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
955   * @return true if bypass success
956   * @throws IOException IOException
957   */
958  public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,
959      boolean recursive)
960      throws IOException {
961    List<Boolean> result = new ArrayList<Boolean>(pids.size());
962    for(long pid: pids) {
963      result.add(bypassProcedure(pid, lockWait, force, recursive));
964    }
965    return result;
966  }
967
968  boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)
969      throws IOException {
970    Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
971    final Procedure<TEnvironment> procedure = getProcedure(pid);
972    if (procedure == null) {
973      LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
974      return false;
975    }
976
977    LOG.info("Begin bypass {} with lockWait={}, override={}, recursive={}",
978        procedure, lockWait, override, recursive);
979    IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
980    if (lockEntry == null && !override) {
981      LOG.info("Waited {} ms, but {} is still running, skipping bypass with override={}",
982          lockWait, procedure, override);
983      return false;
984    } else if (lockEntry == null) {
985      LOG.info("Waited {} ms, but {} is still running, begin bypass with override={}",
986          lockWait, procedure, override);
987    }
988    try {
989      // check whether the procedure is already finished
990      if (procedure.isFinished()) {
991        LOG.info("{} is already finished, skipping bypass", procedure);
992        return false;
993      }
994
995      if (procedure.hasChildren()) {
996        if (recursive) {
997          // EXPENSIVE. Checks each live procedure of which there could be many!!!
998          // Is there another way to get children of a procedure?
999          LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());
1000          this.procedures.forEachValue(1 /*Single-threaded*/,
1001            // Transformer
1002            v -> {
1003             return v.getParentProcId() == procedure.getProcId()? v: null;
1004            },
1005            // Consumer
1006            v -> {
1007              boolean result = false;
1008              IOException ioe = null;
1009              try {
1010                result = bypassProcedure(v.getProcId(), lockWait, override, recursive);
1011              } catch (IOException e) {
1012                LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
1013              }
1014            });
1015        } else {
1016          LOG.info("{} has children, skipping bypass", procedure);
1017          return false;
1018        }
1019      }
1020
1021      // If the procedure has no parent or no child, we are safe to bypass it in whatever state
1022      if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
1023          && procedure.getState() != ProcedureState.WAITING
1024          && procedure.getState() != ProcedureState.WAITING_TIMEOUT) {
1025        LOG.info("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
1026                + "(with no parent), {}",
1027            procedure);
1028        // Question: how is the bypass done here?
1029        return false;
1030      }
1031
1032      // Now, the procedure is not finished, and no one can execute it since we take the lock now
1033      // And we can be sure that its ancestor is not running too, since their child has not
1034      // finished yet
1035      Procedure<TEnvironment> current = procedure;
1036      while (current != null) {
1037        LOG.info("Bypassing {}", current);
1038        current.bypass(getEnvironment());
1039        store.update(procedure);
1040        long parentID = current.getParentProcId();
1041        current = getProcedure(parentID);
1042      }
1043
1044      //wake up waiting procedure, already checked there is no child
1045      if (procedure.getState() == ProcedureState.WAITING) {
1046        procedure.setState(ProcedureState.RUNNABLE);
1047        store.update(procedure);
1048      }
1049
1050      // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
1051      // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
1052      if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1053        LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
1054        if (timeoutExecutor.remove(procedure)) {
1055          LOG.debug("removed procedure {} from timeoutExecutor", procedure);
1056          timeoutExecutor.executeTimedoutProcedure(procedure);
1057        }
1058      } else if (lockEntry != null) {
1059        scheduler.addFront(procedure);
1060        LOG.info("Bypassing {} and its ancestors successfully, adding to queue", procedure);
1061      } else {
1062        // If we don't have the lock, we can't re-submit the queue,
1063        // since it is already executing. To get rid of the stuck situation, we
1064        // need to restart the master. With the procedure set to bypass, the procedureExecutor
1065        // will bypass it and won't get stuck again.
1066        LOG.info("Bypassing {} and its ancestors successfully, but since it is already running, "
1067            + "skipping add to queue", procedure);
1068      }
1069      return true;
1070
1071    } finally {
1072      if (lockEntry != null) {
1073        procExecutionLock.releaseLockEntry(lockEntry);
1074      }
1075    }
1076  }
1077
1078  /**
1079   * Add a new root-procedure to the executor.
1080   * @param proc the new procedure to execute.
1081   * @param nonceKey the registered unique identifier for this operation from the client or process.
1082   * @return the procedure id, that can be used to monitor the operation
1083   */
1084  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
1085      justification = "FindBugs is blind to the check-for-null")
1086  public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
1087    Preconditions.checkArgument(lastProcId.get() >= 0);
1088
1089    prepareProcedure(proc);
1090
1091    final Long currentProcId;
1092    if (nonceKey != null) {
1093      currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
1094      Preconditions.checkArgument(currentProcId != null,
1095        "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);
1096    } else {
1097      currentProcId = nextProcId();
1098    }
1099
1100    // Initialize the procedure
1101    proc.setNonceKey(nonceKey);
1102    proc.setProcId(currentProcId.longValue());
1103
1104    // Commit the transaction
1105    store.insert(proc, null);
1106    LOG.debug("Stored {}", proc);
1107
1108    // Add the procedure to the executor
1109    return pushProcedure(proc);
1110  }
1111
1112  /**
1113   * Add a set of new root-procedure to the executor.
1114   * @param procs the new procedures to execute.
1115   */
1116  // TODO: Do we need to take nonces here?
1117  public void submitProcedures(Procedure<TEnvironment>[] procs) {
1118    Preconditions.checkArgument(lastProcId.get() >= 0);
1119    if (procs == null || procs.length <= 0) {
1120      return;
1121    }
1122
1123    // Prepare procedure
1124    for (int i = 0; i < procs.length; ++i) {
1125      prepareProcedure(procs[i]).setProcId(nextProcId());
1126    }
1127
1128    // Commit the transaction
1129    store.insert(procs);
1130    if (LOG.isDebugEnabled()) {
1131      LOG.debug("Stored " + Arrays.toString(procs));
1132    }
1133    // Add the procedure to the executor
1134    for (Procedure<TEnvironment> proc : procs) {
1135      pushProcedure(proc);
1136    }
1137  }
1138
1139  private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
1140    Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
1141    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
1142    if (this.checkOwnerSet) {
1143      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
1144    }
1145    return proc;
1146  }
1147
1148  private long pushProcedure(Procedure<TEnvironment> proc) {
1149    long currentProcId = proc.getProcId();
1150    // If we are going to upgrade to 2.2+, and this is not a sub procedure, do not push it to
1151    // scheduler. After we finish all the ongoing procedures, the master will quit.
1152    if (upgradeTo2_2 && !proc.hasParent()) {
1153      return currentProcId;
1154    }
1155
1156    // Update metrics on start of a procedure
1157    proc.updateMetricsOnSubmit(getEnvironment());
1158
1159    // Create the rollback stack for the procedure
1160    RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
1161    rollbackStack.put(currentProcId, stack);
1162
1163    // Submit the new subprocedures
1164    assert !procedures.containsKey(currentProcId);
1165    procedures.put(currentProcId, proc);
1166    sendProcedureAddedNotification(currentProcId);
1167    scheduler.addBack(proc);
1168    return proc.getProcId();
1169  }
1170
1171  /**
1172   * Send an abort notification the specified procedure.
1173   * Depending on the procedure implementation the abort can be considered or ignored.
1174   * @param procId the procedure to abort
1175   * @return true if the procedure exists and has received the abort, otherwise false.
1176   */
1177  public boolean abort(long procId) {
1178    return abort(procId, true);
1179  }
1180
1181  /**
1182   * Send an abort notification to the specified procedure.
1183   * Depending on the procedure implementation, the abort can be considered or ignored.
1184   * @param procId the procedure to abort
1185   * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
1186   * @return true if the procedure exists and has received the abort, otherwise false.
1187   */
1188  public boolean abort(long procId, boolean mayInterruptIfRunning) {
1189    Procedure<TEnvironment> proc = procedures.get(procId);
1190    if (proc != null) {
1191      if (!mayInterruptIfRunning && proc.wasExecuted()) {
1192        return false;
1193      }
1194      return proc.abort(getEnvironment());
1195    }
1196    return false;
1197  }
1198
1199  // ==========================================================================
1200  //  Executor query helpers
1201  // ==========================================================================
1202  public Procedure<TEnvironment> getProcedure(final long procId) {
1203    return procedures.get(procId);
1204  }
1205
1206  public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
1207    Procedure<TEnvironment> proc = getProcedure(procId);
1208    if (clazz.isInstance(proc)) {
1209      return clazz.cast(proc);
1210    }
1211    return null;
1212  }
1213
1214  public Procedure<TEnvironment> getResult(long procId) {
1215    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1216    if (retainer == null) {
1217      return null;
1218    } else {
1219      return retainer.getProcedure();
1220    }
1221  }
1222
1223  /**
1224   * Return true if the procedure is finished.
1225   * The state may be "completed successfully" or "failed and rolledback".
1226   * Use getResult() to check the state or get the result data.
1227   * @param procId the ID of the procedure to check
1228   * @return true if the procedure execution is finished, otherwise false.
1229   */
1230  public boolean isFinished(final long procId) {
1231    return !procedures.containsKey(procId);
1232  }
1233
1234  /**
1235   * Return true if the procedure is started.
1236   * @param procId the ID of the procedure to check
1237   * @return true if the procedure execution is started, otherwise false.
1238   */
1239  public boolean isStarted(long procId) {
1240    Procedure<?> proc = procedures.get(procId);
1241    if (proc == null) {
1242      return completed.get(procId) != null;
1243    }
1244    return proc.wasExecuted();
1245  }
1246
1247  /**
1248   * Mark the specified completed procedure, as ready to remove.
1249   * @param procId the ID of the procedure to remove
1250   */
1251  public void removeResult(long procId) {
1252    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1253    if (retainer == null) {
1254      assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
1255      LOG.debug("pid={} already removed by the cleaner.", procId);
1256      return;
1257    }
1258
1259    // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
1260    retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
1261  }
1262
1263  public Procedure<TEnvironment> getResultOrProcedure(long procId) {
1264    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1265    if (retainer == null) {
1266      return procedures.get(procId);
1267    } else {
1268      return retainer.getProcedure();
1269    }
1270  }
1271
1272  /**
1273   * Check if the user is this procedure's owner
1274   * @param procId the target procedure
1275   * @param user the user
1276   * @return true if the user is the owner of the procedure,
1277   *   false otherwise or the owner is unknown.
1278   */
1279  public boolean isProcedureOwner(long procId, User user) {
1280    if (user == null) {
1281      return false;
1282    }
1283    final Procedure<TEnvironment> runningProc = procedures.get(procId);
1284    if (runningProc != null) {
1285      return runningProc.getOwner().equals(user.getShortName());
1286    }
1287
1288    final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1289    if (retainer != null) {
1290      return retainer.getProcedure().getOwner().equals(user.getShortName());
1291    }
1292
1293    // Procedure either does not exist or has already completed and got cleaned up.
1294    // At this time, we cannot check the owner of the procedure
1295    return false;
1296  }
1297
1298
1299  /**
1300   * Should only be used when starting up, where the procedure workers have not been started.
1301   * <p/>
1302   * If the procedure works has been started, the return values maybe changed when you are
1303   * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
1304   * it will do a copy, and also include the finished procedures.
1305   */
1306  public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
1307    return procedures.values();
1308  }
1309
1310  /**
1311   * Get procedures.
1312   * @return the procedures in a list
1313   */
1314  public List<Procedure<TEnvironment>> getProcedures() {
1315    List<Procedure<TEnvironment>> procedureList =
1316      new ArrayList<>(procedures.size() + completed.size());
1317    procedureList.addAll(procedures.values());
1318    // Note: The procedure could show up twice in the list with different state, as
1319    // it could complete after we walk through procedures list and insert into
1320    // procedureList - it is ok, as we will use the information in the Procedure
1321    // to figure it out; to prevent this would increase the complexity of the logic.
1322    completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
1323      .forEach(procedureList::add);
1324    return procedureList;
1325  }
1326
1327  // ==========================================================================
1328  //  Listeners helpers
1329  // ==========================================================================
1330  public void registerListener(ProcedureExecutorListener listener) {
1331    this.listeners.add(listener);
1332  }
1333
1334  public boolean unregisterListener(ProcedureExecutorListener listener) {
1335    return this.listeners.remove(listener);
1336  }
1337
1338  private void sendProcedureLoadedNotification(final long procId) {
1339    if (!this.listeners.isEmpty()) {
1340      for (ProcedureExecutorListener listener: this.listeners) {
1341        try {
1342          listener.procedureLoaded(procId);
1343        } catch (Throwable e) {
1344          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1345        }
1346      }
1347    }
1348  }
1349
1350  private void sendProcedureAddedNotification(final long procId) {
1351    if (!this.listeners.isEmpty()) {
1352      for (ProcedureExecutorListener listener: this.listeners) {
1353        try {
1354          listener.procedureAdded(procId);
1355        } catch (Throwable e) {
1356          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1357        }
1358      }
1359    }
1360  }
1361
1362  private void sendProcedureFinishedNotification(final long procId) {
1363    if (!this.listeners.isEmpty()) {
1364      for (ProcedureExecutorListener listener: this.listeners) {
1365        try {
1366          listener.procedureFinished(procId);
1367        } catch (Throwable e) {
1368          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1369        }
1370      }
1371    }
1372  }
1373
1374  // ==========================================================================
1375  //  Procedure IDs helpers
1376  // ==========================================================================
1377  private long nextProcId() {
1378    long procId = lastProcId.incrementAndGet();
1379    if (procId < 0) {
1380      while (!lastProcId.compareAndSet(procId, 0)) {
1381        procId = lastProcId.get();
1382        if (procId >= 0)
1383          break;
1384      }
1385      while (procedures.containsKey(procId)) {
1386        procId = lastProcId.incrementAndGet();
1387      }
1388    }
1389    assert procId >= 0 : "Invalid procId " + procId;
1390    return procId;
1391  }
1392
1393  @VisibleForTesting
1394  protected long getLastProcId() {
1395    return lastProcId.get();
1396  }
1397
1398  @VisibleForTesting
1399  public Set<Long> getActiveProcIds() {
1400    return procedures.keySet();
1401  }
1402
1403  Long getRootProcedureId(Procedure<TEnvironment> proc) {
1404    return Procedure.getRootProcedureId(procedures, proc);
1405  }
1406
1407  // ==========================================================================
1408  //  Executions
1409  // ==========================================================================
1410  private void executeProcedure(Procedure<TEnvironment> proc) {
1411    if (proc.isFinished()) {
1412      LOG.debug("{} is already finished, skipping execution", proc);
1413      return;
1414    }
1415    final Long rootProcId = getRootProcedureId(proc);
1416    if (rootProcId == null) {
1417      // The 'proc' was ready to run but the root procedure was rolledback
1418      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
1419      executeRollback(proc);
1420      return;
1421    }
1422
1423    RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
1424    if (procStack == null) {
1425      LOG.warn("RootProcedureState is null for " + proc.getProcId());
1426      return;
1427    }
1428    do {
1429      // Try to acquire the execution
1430      if (!procStack.acquire(proc)) {
1431        if (procStack.setRollback()) {
1432          // we have the 'rollback-lock' we can start rollingback
1433          switch (executeRollback(rootProcId, procStack)) {
1434            case LOCK_ACQUIRED:
1435              break;
1436            case LOCK_YIELD_WAIT:
1437              procStack.unsetRollback();
1438              scheduler.yield(proc);
1439              break;
1440            case LOCK_EVENT_WAIT:
1441              LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
1442              procStack.unsetRollback();
1443              break;
1444            default:
1445              throw new UnsupportedOperationException();
1446          }
1447        } else {
1448          // if we can't rollback means that some child is still running.
1449          // the rollback will be executed after all the children are done.
1450          // If the procedure was never executed, remove and mark it as rolledback.
1451          if (!proc.wasExecuted()) {
1452            switch (executeRollback(proc)) {
1453              case LOCK_ACQUIRED:
1454                break;
1455              case LOCK_YIELD_WAIT:
1456                scheduler.yield(proc);
1457                break;
1458              case LOCK_EVENT_WAIT:
1459                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
1460                break;
1461              default:
1462                throw new UnsupportedOperationException();
1463            }
1464          }
1465        }
1466        break;
1467      }
1468
1469      // Execute the procedure
1470      assert proc.getState() == ProcedureState.RUNNABLE : proc;
1471      // Note that lock is NOT about concurrency but rather about ensuring
1472      // ownership of a procedure of an entity such as a region or table
1473      LockState lockState = acquireLock(proc);
1474      switch (lockState) {
1475        case LOCK_ACQUIRED:
1476          execProcedure(procStack, proc);
1477          break;
1478        case LOCK_YIELD_WAIT:
1479          LOG.info(lockState + " " + proc);
1480          scheduler.yield(proc);
1481          break;
1482        case LOCK_EVENT_WAIT:
1483          // Someone will wake us up when the lock is available
1484          LOG.debug(lockState + " " + proc);
1485          break;
1486        default:
1487          throw new UnsupportedOperationException();
1488      }
1489      procStack.release(proc);
1490
1491      if (proc.isSuccess()) {
1492        // update metrics on finishing the procedure.
1493        proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
1494        // Print out count of outstanding siblings if this procedure has a parent.
1495        Procedure<TEnvironment> parent = null;
1496        if (proc.hasParent()) {
1497          parent = procedures.get(proc.getParentProcId());
1498        }
1499        LOG.info("Finished {} in {}{}",
1500            proc,
1501            StringUtils.humanTimeDiff(proc.elapsedTime()),
1502            parent != null? (", unfinishedSiblingCount=" + parent.getChildrenLatch()): "");
1503        // Finalize the procedure state
1504        if (proc.getProcId() == rootProcId) {
1505          procedureFinished(proc);
1506        } else {
1507          execCompletionCleanup(proc);
1508        }
1509        break;
1510      }
1511    } while (procStack.isFailed());
1512  }
1513
1514  private LockState acquireLock(Procedure<TEnvironment> proc) {
1515    TEnvironment env = getEnvironment();
1516    // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
1517    // hasLock is true.
1518    if (proc.hasLock()) {
1519      return LockState.LOCK_ACQUIRED;
1520    }
1521    return proc.doAcquireLock(env, store);
1522  }
1523
1524  private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
1525    TEnvironment env = getEnvironment();
1526    // For how the framework works, we know that we will always have the lock
1527    // when we call releaseLock(), so we can avoid calling proc.hasLock()
1528    if (force || !proc.holdLock(env) || proc.isFinished()) {
1529      proc.doReleaseLock(env, store);
1530    }
1531  }
1532
1533  /**
1534   * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
1535   * root-procedure will be visible as finished to user, and the result will be the fatal exception.
1536   */
1537  private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
1538    Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
1539    RemoteProcedureException exception = rootProc.getException();
1540    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
1541    // rolling back because the subprocedure does. Clarify.
1542    if (exception == null) {
1543      exception = procStack.getException();
1544      rootProc.setFailure(exception);
1545      store.update(rootProc);
1546    }
1547
1548    List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
1549    assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
1550
1551    int stackTail = subprocStack.size();
1552    while (stackTail-- > 0) {
1553      Procedure<TEnvironment> proc = subprocStack.get(stackTail);
1554      IdLock.Entry lockEntry = null;
1555      // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
1556      // this check, as the worker will hold the lock before executing a procedure. This is the only
1557      // place where we may hold two procedure execution locks, and there is a fence in the
1558      // RootProcedureState where we can make sure that only one worker can execute the rollback of
1559      // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
1560      // prevent race between us and the force update thread.
1561      if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
1562        try {
1563          lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
1564        } catch (IOException e) {
1565          // can only happen if interrupted, so not a big deal to propagate it
1566          throw new UncheckedIOException(e);
1567        }
1568      }
1569      try {
1570        // For the sub procedures which are successfully finished, we do not rollback them.
1571        // Typically, if we want to rollback a procedure, we first need to rollback it, and then
1572        // recursively rollback its ancestors. The state changes which are done by sub procedures
1573        // should be handled by parent procedures when rolling back. For example, when rolling back
1574        // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
1575        // online, instead of rolling back the original procedures which offlined the regions(in
1576        // fact these procedures can not be rolled back...).
1577        if (proc.isSuccess()) {
1578          // Just do the cleanup work, without actually executing the rollback
1579          subprocStack.remove(stackTail);
1580          cleanupAfterRollbackOneStep(proc);
1581          continue;
1582        }
1583        LockState lockState = acquireLock(proc);
1584        if (lockState != LockState.LOCK_ACQUIRED) {
1585          // can't take a lock on the procedure, add the root-proc back on the
1586          // queue waiting for the lock availability
1587          return lockState;
1588        }
1589
1590        lockState = executeRollback(proc);
1591        releaseLock(proc, false);
1592        boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
1593        abortRollback |= !isRunning() || !store.isRunning();
1594
1595        // allows to kill the executor before something is stored to the wal.
1596        // useful to test the procedure recovery.
1597        if (abortRollback) {
1598          return lockState;
1599        }
1600
1601        subprocStack.remove(stackTail);
1602
1603        // if the procedure is kind enough to pass the slot to someone else, yield
1604        // if the proc is already finished, do not yield
1605        if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
1606          return LockState.LOCK_YIELD_WAIT;
1607        }
1608
1609        if (proc != rootProc) {
1610          execCompletionCleanup(proc);
1611        }
1612      } finally {
1613        if (lockEntry != null) {
1614          procExecutionLock.releaseLockEntry(lockEntry);
1615        }
1616      }
1617    }
1618
1619    // Finalize the procedure state
1620    LOG.info("Rolled back {} exec-time={}", rootProc,
1621      StringUtils.humanTimeDiff(rootProc.elapsedTime()));
1622    procedureFinished(rootProc);
1623    return LockState.LOCK_ACQUIRED;
1624  }
1625
1626  private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {
1627    if (proc.removeStackIndex()) {
1628      if (!proc.isSuccess()) {
1629        proc.setState(ProcedureState.ROLLEDBACK);
1630      }
1631
1632      // update metrics on finishing the procedure (fail)
1633      proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);
1634
1635      if (proc.hasParent()) {
1636        store.delete(proc.getProcId());
1637        procedures.remove(proc.getProcId());
1638      } else {
1639        final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
1640        if (childProcIds != null) {
1641          store.delete(proc, childProcIds);
1642        } else {
1643          store.update(proc);
1644        }
1645      }
1646    } else {
1647      store.update(proc);
1648    }
1649  }
1650
1651  /**
1652   * Execute the rollback of the procedure step.
1653   * It updates the store with the new state (stack index)
1654   * or will remove completly the procedure in case it is a child.
1655   */
1656  private LockState executeRollback(Procedure<TEnvironment> proc) {
1657    try {
1658      proc.doRollback(getEnvironment());
1659    } catch (IOException e) {
1660      LOG.debug("Roll back attempt failed for {}", proc, e);
1661      return LockState.LOCK_YIELD_WAIT;
1662    } catch (InterruptedException e) {
1663      handleInterruptedException(proc, e);
1664      return LockState.LOCK_YIELD_WAIT;
1665    } catch (Throwable e) {
1666      // Catch NullPointerExceptions or similar errors...
1667      LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);
1668    }
1669
1670    // allows to kill the executor before something is stored to the wal.
1671    // useful to test the procedure recovery.
1672    if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1673      String msg = "TESTING: Kill before store update";
1674      LOG.debug(msg);
1675      stop();
1676      throw new RuntimeException(msg);
1677    }
1678
1679    cleanupAfterRollbackOneStep(proc);
1680
1681    return LockState.LOCK_ACQUIRED;
1682  }
1683
1684  private void yieldProcedure(Procedure<TEnvironment> proc) {
1685    releaseLock(proc, false);
1686    scheduler.yield(proc);
1687  }
1688
1689  /**
1690   * Executes <code>procedure</code>
1691   * <ul>
1692   *  <li>Calls the doExecute() of the procedure
1693   *  <li>If the procedure execution didn't fail (i.e. valid user input)
1694   *  <ul>
1695   *    <li>...and returned subprocedures
1696   *    <ul><li>The subprocedures are initialized.
1697   *      <li>The subprocedures are added to the store
1698   *      <li>The subprocedures are added to the runnable queue
1699   *      <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
1700   *    </ul>
1701   *    </li>
1702   *   <li>...if there are no subprocedure
1703   *    <ul><li>the procedure completed successfully
1704   *      <li>if there is a parent (WAITING)
1705   *      <li>the parent state will be set to RUNNABLE
1706   *    </ul>
1707   *   </li>
1708   *  </ul>
1709   *  </li>
1710   *  <li>In case of failure
1711   *  <ul>
1712   *    <li>The store is updated with the new state</li>
1713   *    <li>The executor (caller of this method) will start the rollback of the procedure</li>
1714   *  </ul>
1715   *  </li>
1716   *  </ul>
1717   */
1718  private void execProcedure(RootProcedureState<TEnvironment> procStack,
1719      Procedure<TEnvironment> procedure) {
1720    Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
1721        "NOT RUNNABLE! " + procedure.toString());
1722
1723    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
1724    // The exception is caught below and then we hurry to the exit without disturbing state. The
1725    // idea is that the processing of this procedure will be unsuspended later by an external event
1726    // such the report of a region open.
1727    boolean suspended = false;
1728
1729    // Whether to 're-' -execute; run through the loop again.
1730    boolean reExecute = false;
1731
1732    Procedure<TEnvironment>[] subprocs = null;
1733    do {
1734      reExecute = false;
1735      procedure.resetPersistence();
1736      try {
1737        subprocs = procedure.doExecute(getEnvironment());
1738        if (subprocs != null && subprocs.length == 0) {
1739          subprocs = null;
1740        }
1741      } catch (ProcedureSuspendedException e) {
1742        LOG.trace("Suspend {}", procedure);
1743        suspended = true;
1744      } catch (ProcedureYieldException e) {
1745        LOG.trace("Yield {}", procedure, e);
1746        yieldProcedure(procedure);
1747        return;
1748      } catch (InterruptedException e) {
1749        LOG.trace("Yield interrupt {}", procedure, e);
1750        handleInterruptedException(procedure, e);
1751        yieldProcedure(procedure);
1752        return;
1753      } catch (Throwable e) {
1754        // Catch NullPointerExceptions or similar errors...
1755        String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
1756        LOG.error(msg, e);
1757        procedure.setFailure(new RemoteProcedureException(msg, e));
1758      }
1759
1760      if (!procedure.isFailed()) {
1761        if (subprocs != null) {
1762          if (subprocs.length == 1 && subprocs[0] == procedure) {
1763            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
1764            // i.e. we go around this loop again rather than go back out on the scheduler queue.
1765            subprocs = null;
1766            reExecute = true;
1767            LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
1768          } else {
1769            // Yield the current procedure, and make the subprocedure runnable
1770            // subprocs may come back 'null'.
1771            subprocs = initializeChildren(procStack, procedure, subprocs);
1772            LOG.info("Initialized subprocedures=" +
1773              (subprocs == null? null:
1774                Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
1775                collect(Collectors.toList()).toString()));
1776          }
1777        } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1778          LOG.trace("Added to timeoutExecutor {}", procedure);
1779          timeoutExecutor.add(procedure);
1780        } else if (!suspended) {
1781          // No subtask, so we are done
1782          procedure.setState(ProcedureState.SUCCESS);
1783        }
1784      }
1785
1786      // Add the procedure to the stack
1787      procStack.addRollbackStep(procedure);
1788
1789      // allows to kill the executor before something is stored to the wal.
1790      // useful to test the procedure recovery.
1791      if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) {
1792        kill("TESTING: Kill BEFORE store update: " + procedure);
1793      }
1794
1795      // TODO: The code here doesn't check if store is running before persisting to the store as
1796      // it relies on the method call below to throw RuntimeException to wind up the stack and
1797      // executor thread to stop. The statement following the method call below seems to check if
1798      // store is not running, to prevent scheduling children procedures, re-execution or yield
1799      // of this procedure. This may need more scrutiny and subsequent cleanup in future
1800      //
1801      // Commit the transaction even if a suspend (state may have changed). Note this append
1802      // can take a bunch of time to complete.
1803      if (procedure.needPersistence()) {
1804        updateStoreOnExec(procStack, procedure, subprocs);
1805      }
1806
1807      // if the store is not running we are aborting
1808      if (!store.isRunning()) {
1809        return;
1810      }
1811      // if the procedure is kind enough to pass the slot to someone else, yield
1812      if (procedure.isRunnable() && !suspended &&
1813          procedure.isYieldAfterExecutionStep(getEnvironment())) {
1814        yieldProcedure(procedure);
1815        return;
1816      }
1817
1818      assert (reExecute && subprocs == null) || !reExecute;
1819    } while (reExecute);
1820
1821    // Allows to kill the executor after something is stored to the WAL but before the below
1822    // state settings are done -- in particular the one on the end where we make parent
1823    // RUNNABLE again when its children are done; see countDownChildren.
1824    if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
1825      kill("TESTING: Kill AFTER store update: " + procedure);
1826    }
1827
1828    // Submit the new subprocedures
1829    if (subprocs != null && !procedure.isFailed()) {
1830      submitChildrenProcedures(subprocs);
1831    }
1832
1833    // we need to log the release lock operation before waking up the parent procedure, as there
1834    // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
1835    // the sub procedures from store and cause problems...
1836    releaseLock(procedure, false);
1837
1838    // if the procedure is complete and has a parent, count down the children latch.
1839    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
1840    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
1841      countDownChildren(procStack, procedure);
1842    }
1843  }
1844
1845  private void kill(String msg) {
1846    LOG.debug(msg);
1847    stop();
1848    throw new RuntimeException(msg);
1849  }
1850
1851  private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
1852      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1853    assert subprocs != null : "expected subprocedures";
1854    final long rootProcId = getRootProcedureId(procedure);
1855    for (int i = 0; i < subprocs.length; ++i) {
1856      Procedure<TEnvironment> subproc = subprocs[i];
1857      if (subproc == null) {
1858        String msg = "subproc[" + i + "] is null, aborting the procedure";
1859        procedure.setFailure(new RemoteProcedureException(msg,
1860          new IllegalArgumentIOException(msg)));
1861        return null;
1862      }
1863
1864      assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
1865      subproc.setParentProcId(procedure.getProcId());
1866      subproc.setRootProcId(rootProcId);
1867      subproc.setProcId(nextProcId());
1868      procStack.addSubProcedure(subproc);
1869    }
1870
1871    if (!procedure.isFailed()) {
1872      procedure.setChildrenLatch(subprocs.length);
1873      switch (procedure.getState()) {
1874        case RUNNABLE:
1875          procedure.setState(ProcedureState.WAITING);
1876          break;
1877        case WAITING_TIMEOUT:
1878          timeoutExecutor.add(procedure);
1879          break;
1880        default:
1881          break;
1882      }
1883    }
1884    return subprocs;
1885  }
1886
1887  private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
1888    for (int i = 0; i < subprocs.length; ++i) {
1889      Procedure<TEnvironment> subproc = subprocs[i];
1890      subproc.updateMetricsOnSubmit(getEnvironment());
1891      assert !procedures.containsKey(subproc.getProcId());
1892      procedures.put(subproc.getProcId(), subproc);
1893      scheduler.addFront(subproc);
1894    }
1895  }
1896
1897  private void countDownChildren(RootProcedureState<TEnvironment> procStack,
1898      Procedure<TEnvironment> procedure) {
1899    Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
1900    if (parent == null) {
1901      assert procStack.isRollingback();
1902      return;
1903    }
1904
1905    // If this procedure is the last child awake the parent procedure
1906    if (parent.tryRunnable()) {
1907      // If we succeeded in making the parent runnable -- i.e. all of its
1908      // children have completed, move parent to front of the queue.
1909      store.update(parent);
1910      scheduler.addFront(parent);
1911      LOG.info("Finished subprocedure pid={}, resume processing parent {}",
1912          procedure.getProcId(), parent);
1913      return;
1914    }
1915  }
1916
1917  private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
1918      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1919    if (subprocs != null && !procedure.isFailed()) {
1920      if (LOG.isTraceEnabled()) {
1921        LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
1922      }
1923      store.insert(procedure, subprocs);
1924    } else {
1925      LOG.trace("Store update {}", procedure);
1926      if (procedure.isFinished() && !procedure.hasParent()) {
1927        // remove child procedures
1928        final long[] childProcIds = procStack.getSubprocedureIds();
1929        if (childProcIds != null) {
1930          store.delete(procedure, childProcIds);
1931          for (int i = 0; i < childProcIds.length; ++i) {
1932            procedures.remove(childProcIds[i]);
1933          }
1934        } else {
1935          store.update(procedure);
1936        }
1937      } else {
1938        store.update(procedure);
1939      }
1940    }
1941  }
1942
1943  private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
1944    LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
1945    // NOTE: We don't call Thread.currentThread().interrupt()
1946    // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
1947    // the InterruptedException. If the master is going down, we will be notified
1948    // and the executor/store will be stopped.
1949    // (The interrupted procedure will be retried on the next run)
1950  }
1951
1952  private void execCompletionCleanup(Procedure<TEnvironment> proc) {
1953    final TEnvironment env = getEnvironment();
1954    if (proc.hasLock()) {
1955      LOG.warn("Usually this should not happen, we will release the lock before if the procedure" +
1956        " is finished, even if the holdLock is true, arrive here means we have some holes where" +
1957        " we do not release the lock. And the releaseLock below may fail since the procedure may" +
1958        " have already been deleted from the procedure store.");
1959      releaseLock(proc, true);
1960    }
1961    try {
1962      proc.completionCleanup(env);
1963    } catch (Throwable e) {
1964      // Catch NullPointerExceptions or similar errors...
1965      LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1966    }
1967  }
1968
1969  private void procedureFinished(Procedure<TEnvironment> proc) {
1970    // call the procedure completion cleanup handler
1971    execCompletionCleanup(proc);
1972
1973    CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
1974
1975    // update the executor internal state maps
1976    if (!proc.shouldWaitClientAck(getEnvironment())) {
1977      retainer.setClientAckTime(0);
1978    }
1979
1980    completed.put(proc.getProcId(), retainer);
1981    rollbackStack.remove(proc.getProcId());
1982    procedures.remove(proc.getProcId());
1983
1984    // call the runnableSet completion cleanup handler
1985    try {
1986      scheduler.completionCleanup(proc);
1987    } catch (Throwable e) {
1988      // Catch NullPointerExceptions or similar errors...
1989      LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
1990    }
1991
1992    // Notify the listeners
1993    sendProcedureFinishedNotification(proc.getProcId());
1994  }
1995
1996  RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
1997    return rollbackStack.get(rootProcId);
1998  }
1999
2000  @VisibleForTesting
2001  ProcedureScheduler getProcedureScheduler() {
2002    return scheduler;
2003  }
2004
2005  @VisibleForTesting
2006  int getCompletedSize() {
2007    return completed.size();
2008  }
2009
2010  // ==========================================================================
2011  //  Worker Thread
2012  // ==========================================================================
2013  private class WorkerThread extends StoppableThread {
2014    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
2015    private volatile Procedure<TEnvironment> activeProcedure;
2016    private boolean onlyPollUrgent = false;
2017    public WorkerThread(ThreadGroup group) {
2018      this(group, "PEWorker-");
2019    }
2020
2021    protected WorkerThread(ThreadGroup group, String prefix) {
2022      this(group, prefix, false);
2023    }
2024
2025    protected WorkerThread(ThreadGroup group, String prefix, boolean onlyPollUrgent) {
2026      super(group, prefix + workerId.incrementAndGet());
2027      this.onlyPollUrgent = onlyPollUrgent;
2028      setDaemon(true);
2029    }
2030
2031    @Override
2032    public void sendStopSignal() {
2033      scheduler.signalAll();
2034    }
2035    @Override
2036    public void run() {
2037      long lastUpdate = EnvironmentEdgeManager.currentTime();
2038      try {
2039        while (isRunning() && keepAlive(lastUpdate)) {
2040          Procedure<TEnvironment> proc = scheduler
2041              .poll(onlyPollUrgent, keepAliveTime, TimeUnit.MILLISECONDS);
2042          if (proc == null) {
2043            continue;
2044          }
2045          this.activeProcedure = proc;
2046          int activeCount = activeExecutorCount.incrementAndGet();
2047          int runningCount = store.setRunningProcedureCount(activeCount);
2048          LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
2049            runningCount, activeCount);
2050          executionStartTime.set(EnvironmentEdgeManager.currentTime());
2051          IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
2052          try {
2053            executeProcedure(proc);
2054          } catch (AssertionError e) {
2055            LOG.info("ASSERT pid=" + proc.getProcId(), e);
2056            throw e;
2057          } finally {
2058            procExecutionLock.releaseLockEntry(lockEntry);
2059            activeCount = activeExecutorCount.decrementAndGet();
2060            runningCount = store.setRunningProcedureCount(activeCount);
2061            LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
2062              runningCount, activeCount);
2063            this.activeProcedure = null;
2064            lastUpdate = EnvironmentEdgeManager.currentTime();
2065            executionStartTime.set(Long.MAX_VALUE);
2066          }
2067        }
2068      } catch (Throwable t) {
2069        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
2070      } finally {
2071        LOG.trace("Worker terminated.");
2072      }
2073      if (onlyPollUrgent) {
2074        urgentWorkerThreads.remove(this);
2075      } else {
2076        workerThreads.remove(this);
2077      }
2078    }
2079
2080    @Override
2081    public String toString() {
2082      Procedure<?> p = this.activeProcedure;
2083      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");
2084    }
2085
2086    /**
2087     * @return the time since the current procedure is running
2088     */
2089    public long getCurrentRunTime() {
2090      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
2091    }
2092
2093    // core worker never timeout
2094    protected boolean keepAlive(long lastUpdate) {
2095      return true;
2096    }
2097  }
2098
2099  // A worker thread which can be added when core workers are stuck. Will timeout after
2100  // keepAliveTime if there is no procedure to run.
2101  private final class KeepAliveWorkerThread extends WorkerThread {
2102
2103    public KeepAliveWorkerThread(ThreadGroup group) {
2104      super(group, "KeepAlivePEWorker-");
2105    }
2106
2107    @Override
2108    protected boolean keepAlive(long lastUpdate) {
2109      return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
2110    }
2111  }
2112
2113  // ----------------------------------------------------------------------------
2114  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the
2115  // full set of procedures pending and completed to write a compacted
2116  // version of the log (in case is a log)?
2117  // In theory no, procedures are have a short life, so at some point the store
2118  // will have the tracker saying everything is in the last log.
2119  // ----------------------------------------------------------------------------
2120
2121  private final class WorkerMonitor extends InlineChore {
2122    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
2123        "hbase.procedure.worker.monitor.interval.msec";
2124    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
2125
2126    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
2127        "hbase.procedure.worker.stuck.threshold.msec";
2128    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
2129
2130    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =
2131        "hbase.procedure.worker.add.stuck.percentage";
2132    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck
2133
2134    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
2135    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
2136    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;
2137
2138    public WorkerMonitor() {
2139      refreshConfig();
2140    }
2141
2142    @Override
2143    public void run() {
2144      final int stuckCount = checkForStuckWorkers();
2145      checkThreadCount(stuckCount);
2146
2147      // refresh interval (poor man dynamic conf update)
2148      refreshConfig();
2149    }
2150
2151    private int checkForStuckWorkers() {
2152      // check if any of the worker is stuck
2153      int stuckCount = 0;
2154      for (WorkerThread worker : workerThreads) {
2155        if (worker.getCurrentRunTime() < stuckThreshold) {
2156          continue;
2157        }
2158
2159        // WARN the worker is stuck
2160        stuckCount++;
2161        LOG.warn("Worker stuck {}, run time {}", worker,
2162          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
2163      }
2164      return stuckCount;
2165    }
2166
2167    private void checkThreadCount(final int stuckCount) {
2168      // nothing to do if there are no runnable tasks
2169      if (stuckCount < 1 || !scheduler.hasRunnables()) {
2170        return;
2171      }
2172
2173      // add a new thread if the worker stuck percentage exceed the threshold limit
2174      // and every handler is active.
2175      final float stuckPerc = ((float) stuckCount) / workerThreads.size();
2176      // let's add new worker thread more aggressively, as they will timeout finally if there is no
2177      // work to do.
2178      if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
2179        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
2180        workerThreads.add(worker);
2181        worker.start();
2182        LOG.debug("Added new worker thread {}", worker);
2183      }
2184    }
2185
2186    private void refreshConfig() {
2187      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,
2188          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);
2189      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,
2190        DEFAULT_WORKER_MONITOR_INTERVAL);
2191      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,
2192        DEFAULT_WORKER_STUCK_THRESHOLD);
2193    }
2194
2195    @Override
2196    public int getTimeoutInterval() {
2197      return timeoutInterval;
2198    }
2199  }
2200}