001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.UncheckedIOException;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Deque;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.Executor;
033import java.util.concurrent.Executors;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.stream.Collectors;
039import java.util.stream.Stream;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
043import org.apache.hadoop.hbase.log.HBaseMarkers;
044import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
048import org.apache.hadoop.hbase.procedure2.util.StringUtils;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.IdLock;
052import org.apache.hadoop.hbase.util.NonceKey;
053import org.apache.hadoop.hbase.util.Threads;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
063
064/**
065 * Thread Pool that executes the submitted procedures.
066 * The executor has a ProcedureStore associated.
067 * Each operation is logged and on restart the pending procedures are resumed.
068 *
069 * Unless the Procedure code throws an error (e.g. invalid user input)
070 * the procedure will complete (at some point in time), On restart the pending
071 * procedures are resumed and the once failed will be rolledback.
072 *
073 * The user can add procedures to the executor via submitProcedure(proc)
074 * check for the finished state via isFinished(procId)
075 * and get the result via getResult(procId)
076 */
077@InterfaceAudience.Private
078public class ProcedureExecutor<TEnvironment> {
079  private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
080
081  public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
082  private static final boolean DEFAULT_CHECK_OWNER_SET = false;
083
084  public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
085      "hbase.procedure.worker.keep.alive.time.msec";
086  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
087
088  // Enable this flag if you want to upgrade to 2.2+, there are some incompatible changes on how we
089  // assign or unassign a region, so we need to make sure all these procedures have been finished
090  // before we start the master with new code. See HBASE-20881 and HBASE-21075 for more details.
091  public static final String UPGRADE_TO_2_2 = "hbase.procedure.upgrade-to-2-2";
092  private static final boolean DEFAULT_UPGRADE_TO_2_2 = false;
093
094  public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
095  static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
096
097  public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
098  static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
099
100  /**
101   * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
102   * break PE having it fail at various junctures. When non-null, testing is set to an instance of
103   * the below internal {@link Testing} class with flags set for the particular test.
104   */
105  Testing testing = null;
106
107  /**
108   * Class with parameters describing how to fail/die when in testing-context.
109   */
110  public static class Testing {
111    protected boolean killIfSuspended = false;
112
113    /**
114     * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is
115     * persisting all the state it needs to recover after a crash.
116     */
117    protected boolean killBeforeStoreUpdate = false;
118    protected boolean toggleKillBeforeStoreUpdate = false;
119
120    /**
121     * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978
122     * is about a case where memory-state was being set after store to WAL where a crash could
123     * cause us to get stuck. This flag allows killing at what was a vulnerable time.
124     */
125    protected boolean killAfterStoreUpdate = false;
126    protected boolean toggleKillAfterStoreUpdate = false;
127
128    protected boolean shouldKillBeforeStoreUpdate() {
129      final boolean kill = this.killBeforeStoreUpdate;
130      if (this.toggleKillBeforeStoreUpdate) {
131        this.killBeforeStoreUpdate = !kill;
132        LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
133      }
134      return kill;
135    }
136
137    protected boolean shouldKillBeforeStoreUpdate(final boolean isSuspended) {
138      return (isSuspended && !killIfSuspended) ? false : shouldKillBeforeStoreUpdate();
139    }
140
141    protected boolean shouldKillAfterStoreUpdate() {
142      final boolean kill = this.killAfterStoreUpdate;
143      if (this.toggleKillAfterStoreUpdate) {
144        this.killAfterStoreUpdate = !kill;
145        LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);
146      }
147      return kill;
148    }
149
150    protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {
151      return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();
152    }
153  }
154
155  public interface ProcedureExecutorListener {
156    void procedureLoaded(long procId);
157    void procedureAdded(long procId);
158    void procedureFinished(long procId);
159  }
160
161  /**
162   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
163   * Once a Root-Procedure completes (success or failure), the result will be added to this map.
164   * The user of ProcedureExecutor should call getResult(procId) to get the result.
165   */
166  private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
167    new ConcurrentHashMap<>();
168
169  /**
170   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
171   * The RootProcedureState contains the execution stack of the Root-Procedure,
172   * It is added to the map by submitProcedure() and removed on procedure completion.
173   */
174  private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
175    new ConcurrentHashMap<>();
176
177  /**
178   * Helper map to lookup the live procedures by ID.
179   * This map contains every procedure. root-procedures and subprocedures.
180   */
181  private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
182    new ConcurrentHashMap<>();
183
184  /**
185   * Helper map to lookup whether the procedure already issued from the same client. This map
186   * contains every root procedure.
187   */
188  private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
189
190  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
191    new CopyOnWriteArrayList<>();
192
193  private Configuration conf;
194
195  /**
196   * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
197   * resource handling rather than observing in a #join is unexpected).
198   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
199   * (Should be ok).
200   */
201  private ThreadGroup threadGroup;
202
203  /**
204   * Created in the {@link #init(int, boolean)}  method. Terminated in {@link #join()} (FIX! Doing
205   * resource handling rather than observing in a #join is unexpected).
206   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
207   * (Should be ok).
208   */
209  private CopyOnWriteArrayList<WorkerThread> workerThreads;
210
211  /**
212   * Worker thread only for urgent tasks.
213   */
214  private CopyOnWriteArrayList<WorkerThread> urgentWorkerThreads;
215
216  /**
217   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
218   * resource handling rather than observing in a #join is unexpected).
219   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
220   * (Should be ok).
221   */
222  private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
223
224  private int corePoolSize;
225  private int maxPoolSize;
226  private int urgentPoolSize;
227
228  private volatile long keepAliveTime;
229
230  /**
231   * Scheduler/Queue that contains runnable procedures.
232   */
233  private final ProcedureScheduler scheduler;
234
235  private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
236    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
237
238  private final AtomicLong lastProcId = new AtomicLong(-1);
239  private final AtomicLong workerId = new AtomicLong(0);
240  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
241  private final AtomicBoolean running = new AtomicBoolean(false);
242  private final TEnvironment environment;
243  private final ProcedureStore store;
244
245  private final boolean checkOwnerSet;
246
247  // To prevent concurrent execution of the same procedure.
248  // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
249  // procedure is woken up before we finish the suspend which causes the same procedures to be
250  // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
251  // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
252  // execution of the same procedure.
253  private final IdLock procExecutionLock = new IdLock();
254
255  private final boolean upgradeTo2_2;
256
257  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
258      final ProcedureStore store) {
259    this(conf, environment, store, new SimpleProcedureScheduler());
260  }
261
262  private boolean isRootFinished(Procedure<?> proc) {
263    Procedure<?> rootProc = procedures.get(proc.getRootProcId());
264    return rootProc == null || rootProc.isFinished();
265  }
266
267  private void forceUpdateProcedure(long procId) throws IOException {
268    IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
269    try {
270      Procedure<TEnvironment> proc = procedures.get(procId);
271      if (proc != null) {
272        if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
273          LOG.debug("Procedure {} has already been finished and parent is succeeded," +
274            " skip force updating", proc);
275          return;
276        }
277      } else {
278        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
279        if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
280          LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
281          return;
282        }
283        long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
284        long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
285        if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
286          LOG.debug("Procedure {} has already been finished and expired, skip force updating",
287            procId);
288          return;
289        }
290        proc = retainer.getProcedure();
291      }
292      LOG.debug("Force update procedure {}", proc);
293      store.update(proc);
294    } finally {
295      procExecutionLock.releaseLockEntry(lockEntry);
296    }
297  }
298
299  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
300      final ProcedureStore store, final ProcedureScheduler scheduler) {
301    this.environment = environment;
302    this.scheduler = scheduler;
303    this.store = store;
304    this.conf = conf;
305    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
306    this.upgradeTo2_2 = conf.getBoolean(UPGRADE_TO_2_2, DEFAULT_UPGRADE_TO_2_2);
307    refreshConfiguration(conf);
308    store.registerListener(new ProcedureStoreListener() {
309
310      @Override
311      public void forceUpdate(long[] procIds) {
312        Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
313          try {
314            forceUpdateProcedure(procId);
315          } catch (IOException e) {
316            LOG.warn("Failed to force update procedure with pid={}", procId);
317          }
318        }));
319      }
320    });
321  }
322
323  private void load(final boolean abortOnCorruption) throws IOException {
324    Preconditions.checkArgument(completed.isEmpty(), "completed not empty");
325    Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty");
326    Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty");
327    Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty");
328
329    store.load(new ProcedureStore.ProcedureLoader() {
330      @Override
331      public void setMaxProcId(long maxProcId) {
332        assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
333        lastProcId.set(maxProcId);
334      }
335
336      @Override
337      public void load(ProcedureIterator procIter) throws IOException {
338        loadProcedures(procIter, abortOnCorruption);
339      }
340
341      @Override
342      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
343        int corruptedCount = 0;
344        while (procIter.hasNext()) {
345          Procedure<?> proc = procIter.next();
346          LOG.error("Corrupt " + proc);
347          corruptedCount++;
348        }
349        if (abortOnCorruption && corruptedCount > 0) {
350          throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
351        }
352      }
353    });
354  }
355
356  private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
357    proc.restoreLock(getEnvironment());
358    restored.add(proc.getProcId());
359  }
360
361  private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
362    while (!stack.isEmpty()) {
363      restoreLock(stack.pop(), restored);
364    }
365  }
366
367  // Restore the locks for all the procedures.
368  // Notice that we need to restore the locks starting from the root proc, otherwise there will be
369  // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
370  // calling the acquireLock method for the parent procedure.
371  // The algorithm is straight-forward:
372  // 1. Use a set to record the procedures which locks have already been restored.
373  // 2. Use a stack to store the hierarchy of the procedures
374  // 3. For all the procedure, we will first try to find its parent and push it into the stack,
375  // unless
376  // a. We have no parent, i.e, we are the root procedure
377  // b. The lock has already been restored(by checking the set introduced in #1)
378  // then we start to pop the stack and call acquireLock for each procedure.
379  // Notice that this should be done for all procedures, not only the ones in runnableList.
380  private void restoreLocks() {
381    Set<Long> restored = new HashSet<>();
382    Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
383    procedures.values().forEach(proc -> {
384      for (;;) {
385        if (restored.contains(proc.getProcId())) {
386          restoreLocks(stack, restored);
387          return;
388        }
389        if (!proc.hasParent()) {
390          restoreLock(proc, restored);
391          restoreLocks(stack, restored);
392          return;
393        }
394        stack.push(proc);
395        proc = procedures.get(proc.getParentProcId());
396      }
397    });
398  }
399
400  private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
401      throws IOException {
402    // 1. Build the rollback stack
403    int runnableCount = 0;
404    int failedCount = 0;
405    int waitingCount = 0;
406    int waitingTimeoutCount = 0;
407    while (procIter.hasNext()) {
408      boolean finished = procIter.isNextFinished();
409      Procedure<TEnvironment> proc = procIter.next();
410      NonceKey nonceKey = proc.getNonceKey();
411      long procId = proc.getProcId();
412
413      if (finished) {
414        completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
415        LOG.debug("Completed {}", proc);
416      } else {
417        if (!proc.hasParent()) {
418          assert !proc.isFinished() : "unexpected finished procedure";
419          rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
420        }
421
422        // add the procedure to the map
423        proc.beforeReplay(getEnvironment());
424        procedures.put(proc.getProcId(), proc);
425        switch (proc.getState()) {
426          case RUNNABLE:
427            runnableCount++;
428            break;
429          case FAILED:
430            failedCount++;
431            break;
432          case WAITING:
433            waitingCount++;
434            break;
435          case WAITING_TIMEOUT:
436            waitingTimeoutCount++;
437            break;
438          default:
439            break;
440        }
441      }
442
443      // add the nonce to the map
444      if (nonceKey != null) {
445        nonceKeysToProcIdsMap.put(nonceKey, procId);
446      }
447    }
448
449    // 2. Initialize the stacks
450    // In the old implementation, for procedures in FAILED state, we will push it into the
451    // ProcedureScheduler directly to execute the rollback. But this does not work after we
452    // introduce the restore lock stage.
453    // For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and
454    // then when a procedure which has lock access, for example, a sub procedure of the procedure
455    // which has the xlock, is pushed into the scheduler, we will add the queue back to let the
456    // workers poll from it. The assumption here is that, the procedure which has the xlock should
457    // have been polled out already, so when loading we can not add the procedure to scheduler first
458    // and then call acquireLock, since the procedure is still in the queue, and since we will
459    // remove the queue from runQueue, then no one can poll it out, then there is a dead lock
460    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
461    List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
462    List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
463    List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
464    procIter.reset();
465    while (procIter.hasNext()) {
466      if (procIter.isNextFinished()) {
467        procIter.skipNext();
468        continue;
469      }
470
471      Procedure<TEnvironment> proc = procIter.next();
472      assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
473
474      LOG.debug("Loading {}", proc);
475
476      Long rootProcId = getRootProcedureId(proc);
477      // The orphan procedures will be passed to handleCorrupted, so add an assert here
478      assert rootProcId != null;
479
480      if (proc.hasParent()) {
481        Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
482        if (parent != null && !proc.isFinished()) {
483          parent.incChildrenLatch();
484        }
485      }
486
487      RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
488      procStack.loadStack(proc);
489
490      proc.setRootProcId(rootProcId);
491      switch (proc.getState()) {
492        case RUNNABLE:
493          runnableList.add(proc);
494          break;
495        case WAITING:
496          waitingList.add(proc);
497          break;
498        case WAITING_TIMEOUT:
499          waitingTimeoutList.add(proc);
500          break;
501        case FAILED:
502          failedList.add(proc);
503          break;
504        case ROLLEDBACK:
505        case INITIALIZING:
506          String msg = "Unexpected " + proc.getState() + " state for " + proc;
507          LOG.error(msg);
508          throw new UnsupportedOperationException(msg);
509        default:
510          break;
511      }
512    }
513
514    // 3. Check the waiting procedures to see if some of them can be added to runnable.
515    waitingList.forEach(proc -> {
516      if (!proc.hasChildren()) {
517        // Normally, WAITING procedures should be waken by its children.
518        // But, there is a case that, all the children are successful and before
519        // they can wake up their parent procedure, the master was killed.
520        // So, during recovering the procedures from ProcedureWal, its children
521        // are not loaded because of their SUCCESS state.
522        // So we need to continue to run this WAITING procedure. But before
523        // executing, we need to set its state to RUNNABLE, otherwise, a exception
524        // will throw:
525        // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
526        // "NOT RUNNABLE! " + procedure.toString());
527        proc.setState(ProcedureState.RUNNABLE);
528        runnableList.add(proc);
529      } else {
530        proc.afterReplay(getEnvironment());
531      }
532    });
533    // 4. restore locks
534    restoreLocks();
535
536    // 5. Push the procedures to the timeout executor
537    waitingTimeoutList.forEach(proc -> {
538      proc.afterReplay(getEnvironment());
539      timeoutExecutor.add(proc);
540    });
541
542    // 6. Push the procedure to the scheduler
543    failedList.forEach(scheduler::addBack);
544    runnableList.forEach(p -> {
545      p.afterReplay(getEnvironment());
546      if (!p.hasParent()) {
547        sendProcedureLoadedNotification(p.getProcId());
548      }
549      scheduler.addBack(p);
550    });
551    // After all procedures put into the queue, signal the worker threads.
552    // Otherwise, there is a race condition. See HBASE-21364.
553    scheduler.signalAll();
554  }
555
556  /**
557   * Initialize the procedure executor, but do not start workers. We will start them later.
558   * <p/>
559   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
560   * ensure a single executor, and start the procedure replay to resume and recover the previous
561   * pending and in-progress procedures.
562   * @param numThreads number of threads available for procedure execution.
563   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
564   *          is found on replay. otherwise false.
565   */
566  public void init(int numThreads, boolean abortOnCorruption) throws IOException {
567    init(numThreads, 0, abortOnCorruption);
568  }
569
570  /**
571   * Initialize the procedure executor, but do not start workers. We will start them later.
572   * <p/>
573   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
574   * ensure a single executor, and start the procedure replay to resume and recover the previous
575   * pending and in-progress procedures.
576   * @param numThreads number of threads available for procedure execution.
577   * @param urgentNumThreads number of threads available for urgent procedure execution.
578   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
579   *          is found on replay. otherwise false.
580   */
581  public void init(int numThreads, int urgentNumThreads,
582      boolean abortOnCorruption) throws IOException {
583    // We have numThreads executor + one timer thread used for timing out
584    // procedures and triggering periodic procedures.
585    this.corePoolSize = numThreads;
586    this.maxPoolSize = 10 * numThreads;
587    this.urgentPoolSize = urgentNumThreads;
588    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker "
589            + "count={}, start {} urgent thread(s)",
590        corePoolSize, maxPoolSize, urgentPoolSize);
591
592    this.threadGroup = new ThreadGroup("PEWorkerGroup");
593    this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
594
595    // Create the workers
596    workerId.set(0);
597    workerThreads = new CopyOnWriteArrayList<>();
598    urgentWorkerThreads = new CopyOnWriteArrayList<>();
599    for (int i = 0; i < corePoolSize; ++i) {
600      workerThreads.add(new WorkerThread(threadGroup));
601    }
602    for (int i = 0; i < urgentNumThreads; ++i) {
603      urgentWorkerThreads
604          .add(new WorkerThread(threadGroup, "UrgentPEWorker-", true));
605    }
606
607    long st, et;
608
609    // Acquire the store lease.
610    st = System.nanoTime();
611    store.recoverLease();
612    et = System.nanoTime();
613    LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
614      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
615
616    // start the procedure scheduler
617    scheduler.start();
618
619    // TODO: Split in two steps.
620    // TODO: Handle corrupted procedures (currently just a warn)
621    // The first one will make sure that we have the latest id,
622    // so we can start the threads and accept new procedures.
623    // The second step will do the actual load of old procedures.
624    st = System.nanoTime();
625    load(abortOnCorruption);
626    et = System.nanoTime();
627    LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
628      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
629  }
630
631  /**
632   * Start the workers.
633   */
634  public void startWorkers() throws IOException {
635    if (!running.compareAndSet(false, true)) {
636      LOG.warn("Already running");
637      return;
638    }
639    // Start the executors. Here we must have the lastProcId set.
640    LOG.debug("Start workers {}, urgent workers {}", workerThreads.size(),
641        urgentWorkerThreads.size());
642    timeoutExecutor.start();
643    for (WorkerThread worker: workerThreads) {
644      worker.start();
645    }
646
647    for (WorkerThread worker: urgentWorkerThreads) {
648      worker.start();
649    }
650
651    // Internal chores
652    timeoutExecutor.add(new WorkerMonitor());
653
654    if (upgradeTo2_2) {
655      timeoutExecutor.add(new InlineChore() {
656
657        @Override
658        public void run() {
659          if (procedures.isEmpty()) {
660            LOG.info("UPGRADE OK: All existed procedures have been finished, quit...");
661            System.exit(0);
662          }
663        }
664
665        @Override
666        public int getTimeoutInterval() {
667          // check every 5 seconds to see if we can quit
668          return 5000;
669        }
670      });
671    }
672
673    // Add completed cleaner chore
674    addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
675      nonceKeysToProcIdsMap));
676  }
677
678  public void stop() {
679    if (!running.getAndSet(false)) {
680      return;
681    }
682
683    LOG.info("Stopping");
684    scheduler.stop();
685    timeoutExecutor.sendStopSignal();
686  }
687
688  @VisibleForTesting
689  public void join() {
690    assert !isRunning() : "expected not running";
691
692    // stop the timeout executor
693    timeoutExecutor.awaitTermination();
694
695    // stop the worker threads
696    for (WorkerThread worker: workerThreads) {
697      worker.awaitTermination();
698    }
699
700    // stop the worker threads
701    for (WorkerThread worker: urgentWorkerThreads) {
702      worker.awaitTermination();
703    }
704
705    // Destroy the Thread Group for the executors
706    // TODO: Fix. #join is not place to destroy resources.
707    try {
708      threadGroup.destroy();
709    } catch (IllegalThreadStateException e) {
710      LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT",
711          this.threadGroup, e.getMessage());
712      // This dumps list of threads on STDOUT.
713      this.threadGroup.list();
714    }
715
716    // reset the in-memory state for testing
717    completed.clear();
718    rollbackStack.clear();
719    procedures.clear();
720    nonceKeysToProcIdsMap.clear();
721    scheduler.clear();
722    lastProcId.set(-1);
723  }
724
725  public void refreshConfiguration(final Configuration conf) {
726    this.conf = conf;
727    setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY,
728        DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS);
729  }
730
731  // ==========================================================================
732  //  Accessors
733  // ==========================================================================
734  public boolean isRunning() {
735    return running.get();
736  }
737
738  /**
739   * @return the current number of worker threads.
740   */
741  public int getWorkerThreadCount() {
742    return workerThreads.size() + urgentWorkerThreads.size();
743  }
744
745  /**
746   * @return the core pool size settings.
747   */
748  public int getCorePoolSize() {
749    return corePoolSize;
750  }
751
752  public int getUrgentPoolSize() {
753    return urgentPoolSize;
754  }
755
756  public int getActiveExecutorCount() {
757    return activeExecutorCount.get();
758  }
759
760  public TEnvironment getEnvironment() {
761    return this.environment;
762  }
763
764  public ProcedureStore getStore() {
765    return this.store;
766  }
767
768  ProcedureScheduler getScheduler() {
769    return scheduler;
770  }
771
772  public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
773    this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
774    this.scheduler.signalAll();
775  }
776
777  public long getKeepAliveTime(final TimeUnit timeUnit) {
778    return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
779  }
780
781  // ==========================================================================
782  //  Submit/Remove Chores
783  // ==========================================================================
784
785  /**
786   * Add a chore procedure to the executor
787   * @param chore the chore to add
788   */
789  public void addChore(ProcedureInMemoryChore<TEnvironment> chore) {
790    chore.setState(ProcedureState.WAITING_TIMEOUT);
791    timeoutExecutor.add(chore);
792  }
793
794  /**
795   * Remove a chore procedure from the executor
796   * @param chore the chore to remove
797   * @return whether the chore is removed, or it will be removed later
798   */
799  public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) {
800    chore.setState(ProcedureState.SUCCESS);
801    return timeoutExecutor.remove(chore);
802  }
803
804  // ==========================================================================
805  //  Nonce Procedure helpers
806  // ==========================================================================
807  /**
808   * Create a NoneKey from the specified nonceGroup and nonce.
809   * @param nonceGroup
810   * @param nonce
811   * @return the generated NonceKey
812   */
813  public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
814    return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
815  }
816
817  /**
818   * Register a nonce for a procedure that is going to be submitted.
819   * A procId will be reserved and on submitProcedure(),
820   * the procedure with the specified nonce will take the reserved ProcId.
821   * If someone already reserved the nonce, this method will return the procId reserved,
822   * otherwise an invalid procId will be returned. and the caller should procede
823   * and submit the procedure.
824   *
825   * @param nonceKey A unique identifier for this operation from the client or process.
826   * @return the procId associated with the nonce, if any otherwise an invalid procId.
827   */
828  public long registerNonce(final NonceKey nonceKey) {
829    if (nonceKey == null) return -1;
830
831    // check if we have already a Reserved ID for the nonce
832    Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
833    if (oldProcId == null) {
834      // reserve a new Procedure ID, this will be associated with the nonce
835      // and the procedure submitted with the specified nonce will use this ID.
836      final long newProcId = nextProcId();
837      oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
838      if (oldProcId == null) return -1;
839    }
840
841    // we found a registered nonce, but the procedure may not have been submitted yet.
842    // since the client expect the procedure to be submitted, spin here until it is.
843    final boolean traceEnabled = LOG.isTraceEnabled();
844    while (isRunning() &&
845           !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
846           nonceKeysToProcIdsMap.containsKey(nonceKey)) {
847      if (traceEnabled) {
848        LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
849      }
850      Threads.sleep(100);
851    }
852    return oldProcId.longValue();
853  }
854
855  /**
856   * Remove the NonceKey if the procedure was not submitted to the executor.
857   * @param nonceKey A unique identifier for this operation from the client or process.
858   */
859  public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
860    if (nonceKey == null) return;
861
862    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
863    if (procId == null) return;
864
865    // if the procedure was not submitted, remove the nonce
866    if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
867      nonceKeysToProcIdsMap.remove(nonceKey);
868    }
869  }
870
871  /**
872   * If the failure failed before submitting it, we may want to give back the
873   * same error to the requests with the same nonceKey.
874   *
875   * @param nonceKey A unique identifier for this operation from the client or process
876   * @param procName name of the procedure, used to inform the user
877   * @param procOwner name of the owner of the procedure, used to inform the user
878   * @param exception the failure to report to the user
879   */
880  public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
881      IOException exception) {
882    if (nonceKey == null) {
883      return;
884    }
885
886    Long procId = nonceKeysToProcIdsMap.get(nonceKey);
887    if (procId == null || completed.containsKey(procId)) {
888      return;
889    }
890
891    Procedure<TEnvironment> proc =
892      new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
893
894    completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
895  }
896
897  // ==========================================================================
898  //  Submit/Abort Procedure
899  // ==========================================================================
900  /**
901   * Add a new root-procedure to the executor.
902   * @param proc the new procedure to execute.
903   * @return the procedure id, that can be used to monitor the operation
904   */
905  public long submitProcedure(Procedure<TEnvironment> proc) {
906    return submitProcedure(proc, null);
907  }
908
909  /**
910   * Bypass a procedure. If the procedure is set to bypass, all the logic in
911   * execute/rollback will be ignored and it will return success, whatever.
912   * It is used to recover buggy stuck procedures, releasing the lock resources
913   * and letting other procedures run. Bypassing one procedure (and its ancestors will
914   * be bypassed automatically) may leave the cluster in a middle state, e.g. region
915   * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures,
916   * the operators may have to do some clean up on hdfs or schedule some assign procedures
917   * to let region online. DO AT YOUR OWN RISK.
918   * <p>
919   * A procedure can be bypassed only if
920   * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT
921   * or it is a root procedure without any child.
922   * 2. No other worker thread is executing it
923   * 3. No child procedure has been submitted
924   *
925   * <p>
926   * If all the requirements are meet, the procedure and its ancestors will be
927   * bypassed and persisted to WAL.
928   *
929   * <p>
930   * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
931   * TODO: What about WAITING_TIMEOUT?
932   * @param pids the procedure ids
933   * @param lockWait time to wait lock
934   * @param force if force set to true, we will bypass the procedure even if it is executing.
935   *              This is for procedures which can't break out during executing(due to bug, mostly)
936   *              In this case, bypassing the procedure is not enough, since it is already stuck
937   *              there. We need to restart the master after bypassing, and letting the problematic
938   *              procedure to execute wth bypass=true, so in that condition, the procedure can be
939   *              successfully bypassed.
940   * @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
941   * @return true if bypass success
942   * @throws IOException IOException
943   */
944  public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,
945      boolean recursive)
946      throws IOException {
947    List<Boolean> result = new ArrayList<Boolean>(pids.size());
948    for(long pid: pids) {
949      result.add(bypassProcedure(pid, lockWait, force, recursive));
950    }
951    return result;
952  }
953
954  boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)
955      throws IOException {
956    Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
957    final Procedure<TEnvironment> procedure = getProcedure(pid);
958    if (procedure == null) {
959      LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
960      return false;
961    }
962
963    LOG.info("Begin bypass {} with lockWait={}, override={}, recursive={}",
964        procedure, lockWait, override, recursive);
965    IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
966    if (lockEntry == null && !override) {
967      LOG.info("Waited {} ms, but {} is still running, skipping bypass with force={}",
968          lockWait, procedure, override);
969      return false;
970    } else if (lockEntry == null) {
971      LOG.info("Waited {} ms, but {} is still running, begin bypass with force={}",
972          lockWait, procedure, override);
973    }
974    try {
975      // check whether the procedure is already finished
976      if (procedure.isFinished()) {
977        LOG.info("{} is already finished, skipping bypass", procedure);
978        return false;
979      }
980
981      if (procedure.hasChildren()) {
982        if (recursive) {
983          // EXPENSIVE. Checks each live procedure of which there could be many!!!
984          // Is there another way to get children of a procedure?
985          LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());
986          this.procedures.forEachValue(1 /*Single-threaded*/,
987            // Transformer
988            v -> {
989              return v.getParentProcId() == procedure.getProcId()? v: null;
990            },
991            // Consumer
992            v -> {
993              boolean result = false;
994              IOException ioe = null;
995              try {
996                result = bypassProcedure(v.getProcId(), lockWait, override, recursive);
997              } catch (IOException e) {
998                LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
999              }
1000            });
1001        } else {
1002          LOG.info("{} has children, skipping bypass", procedure);
1003          return false;
1004        }
1005      }
1006
1007      // If the procedure has no parent or no child, we are safe to bypass it in whatever state
1008      if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
1009          && procedure.getState() != ProcedureState.WAITING
1010          && procedure.getState() != ProcedureState.WAITING_TIMEOUT) {
1011        LOG.info("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
1012                + "(with no parent), {}",
1013            procedure);
1014        // Question: how is the bypass done here?
1015        return false;
1016      }
1017
1018      // Now, the procedure is not finished, and no one can execute it since we take the lock now
1019      // And we can be sure that its ancestor is not running too, since their child has not
1020      // finished yet
1021      Procedure<TEnvironment> current = procedure;
1022      while (current != null) {
1023        LOG.info("Bypassing {}", current);
1024        current.bypass(getEnvironment());
1025        store.update(procedure);
1026        long parentID = current.getParentProcId();
1027        current = getProcedure(parentID);
1028      }
1029
1030      //wake up waiting procedure, already checked there is no child
1031      if (procedure.getState() == ProcedureState.WAITING) {
1032        procedure.setState(ProcedureState.RUNNABLE);
1033        store.update(procedure);
1034      }
1035
1036      // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
1037      // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
1038      if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1039        LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
1040        if (timeoutExecutor.remove(procedure)) {
1041          LOG.debug("removed procedure {} from timeoutExecutor", procedure);
1042          timeoutExecutor.executeTimedoutProcedure(procedure);
1043        }
1044      } else if (lockEntry != null) {
1045        scheduler.addFront(procedure);
1046        LOG.info("Bypassing {} and its ancestors successfully, adding to queue", procedure);
1047      } else {
1048        // If we don't have the lock, we can't re-submit the queue,
1049        // since it is already executing. To get rid of the stuck situation, we
1050        // need to restart the master. With the procedure set to bypass, the procedureExecutor
1051        // will bypass it and won't get stuck again.
1052        LOG.info("Bypassing {} and its ancestors successfully, but since it is already running, "
1053            + "skipping add to queue", procedure);
1054      }
1055      return true;
1056
1057    } finally {
1058      if (lockEntry != null) {
1059        procExecutionLock.releaseLockEntry(lockEntry);
1060      }
1061    }
1062  }
1063
1064  /**
1065   * Add a new root-procedure to the executor.
1066   * @param proc the new procedure to execute.
1067   * @param nonceKey the registered unique identifier for this operation from the client or process.
1068   * @return the procedure id, that can be used to monitor the operation
1069   */
1070  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
1071      justification = "FindBugs is blind to the check-for-null")
1072  public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
1073    Preconditions.checkArgument(lastProcId.get() >= 0);
1074
1075    prepareProcedure(proc);
1076
1077    final Long currentProcId;
1078    if (nonceKey != null) {
1079      currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
1080      Preconditions.checkArgument(currentProcId != null,
1081        "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);
1082    } else {
1083      currentProcId = nextProcId();
1084    }
1085
1086    // Initialize the procedure
1087    proc.setNonceKey(nonceKey);
1088    proc.setProcId(currentProcId.longValue());
1089
1090    // Commit the transaction
1091    store.insert(proc, null);
1092    LOG.debug("Stored {}", proc);
1093
1094    // Add the procedure to the executor
1095    return pushProcedure(proc);
1096  }
1097
1098  /**
1099   * Add a set of new root-procedure to the executor.
1100   * @param procs the new procedures to execute.
1101   */
1102  // TODO: Do we need to take nonces here?
1103  public void submitProcedures(Procedure<TEnvironment>[] procs) {
1104    Preconditions.checkArgument(lastProcId.get() >= 0);
1105    if (procs == null || procs.length <= 0) {
1106      return;
1107    }
1108
1109    // Prepare procedure
1110    for (int i = 0; i < procs.length; ++i) {
1111      prepareProcedure(procs[i]).setProcId(nextProcId());
1112    }
1113
1114    // Commit the transaction
1115    store.insert(procs);
1116    if (LOG.isDebugEnabled()) {
1117      LOG.debug("Stored " + Arrays.toString(procs));
1118    }
1119    // Add the procedure to the executor
1120    for (Procedure<TEnvironment> proc : procs) {
1121      pushProcedure(proc);
1122    }
1123  }
1124
1125  private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
1126    Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
1127    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
1128    if (this.checkOwnerSet) {
1129      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
1130    }
1131    return proc;
1132  }
1133
1134  private long pushProcedure(Procedure<TEnvironment> proc) {
1135    long currentProcId = proc.getProcId();
1136    // If we are going to upgrade to 2.2+, and this is not a sub procedure, do not push it to
1137    // scheduler. After we finish all the ongoing procedures, the master will quit.
1138    if (upgradeTo2_2 && !proc.hasParent()) {
1139      return currentProcId;
1140    }
1141
1142    // Update metrics on start of a procedure
1143    proc.updateMetricsOnSubmit(getEnvironment());
1144
1145    // Create the rollback stack for the procedure
1146    RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
1147    rollbackStack.put(currentProcId, stack);
1148
1149    // Submit the new subprocedures
1150    assert !procedures.containsKey(currentProcId);
1151    procedures.put(currentProcId, proc);
1152    sendProcedureAddedNotification(currentProcId);
1153    scheduler.addBack(proc);
1154    return proc.getProcId();
1155  }
1156
1157  /**
1158   * Send an abort notification the specified procedure.
1159   * Depending on the procedure implementation the abort can be considered or ignored.
1160   * @param procId the procedure to abort
1161   * @return true if the procedure exists and has received the abort, otherwise false.
1162   */
1163  public boolean abort(long procId) {
1164    return abort(procId, true);
1165  }
1166
1167  /**
1168   * Send an abort notification to the specified procedure.
1169   * Depending on the procedure implementation, the abort can be considered or ignored.
1170   * @param procId the procedure to abort
1171   * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
1172   * @return true if the procedure exists and has received the abort, otherwise false.
1173   */
1174  public boolean abort(long procId, boolean mayInterruptIfRunning) {
1175    Procedure<TEnvironment> proc = procedures.get(procId);
1176    if (proc != null) {
1177      if (!mayInterruptIfRunning && proc.wasExecuted()) {
1178        return false;
1179      }
1180      return proc.abort(getEnvironment());
1181    }
1182    return false;
1183  }
1184
1185  // ==========================================================================
1186  //  Executor query helpers
1187  // ==========================================================================
1188  public Procedure<TEnvironment> getProcedure(final long procId) {
1189    return procedures.get(procId);
1190  }
1191
1192  public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
1193    Procedure<TEnvironment> proc = getProcedure(procId);
1194    if (clazz.isInstance(proc)) {
1195      return clazz.cast(proc);
1196    }
1197    return null;
1198  }
1199
1200  public Procedure<TEnvironment> getResult(long procId) {
1201    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1202    if (retainer == null) {
1203      return null;
1204    } else {
1205      return retainer.getProcedure();
1206    }
1207  }
1208
1209  /**
1210   * Return true if the procedure is finished.
1211   * The state may be "completed successfully" or "failed and rolledback".
1212   * Use getResult() to check the state or get the result data.
1213   * @param procId the ID of the procedure to check
1214   * @return true if the procedure execution is finished, otherwise false.
1215   */
1216  public boolean isFinished(final long procId) {
1217    return !procedures.containsKey(procId);
1218  }
1219
1220  /**
1221   * Return true if the procedure is started.
1222   * @param procId the ID of the procedure to check
1223   * @return true if the procedure execution is started, otherwise false.
1224   */
1225  public boolean isStarted(long procId) {
1226    Procedure<?> proc = procedures.get(procId);
1227    if (proc == null) {
1228      return completed.get(procId) != null;
1229    }
1230    return proc.wasExecuted();
1231  }
1232
1233  /**
1234   * Mark the specified completed procedure, as ready to remove.
1235   * @param procId the ID of the procedure to remove
1236   */
1237  public void removeResult(long procId) {
1238    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1239    if (retainer == null) {
1240      assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
1241      LOG.debug("pid={} already removed by the cleaner.", procId);
1242      return;
1243    }
1244
1245    // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
1246    retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
1247  }
1248
1249  public Procedure<TEnvironment> getResultOrProcedure(long procId) {
1250    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1251    if (retainer == null) {
1252      return procedures.get(procId);
1253    } else {
1254      return retainer.getProcedure();
1255    }
1256  }
1257
1258  /**
1259   * Check if the user is this procedure's owner
1260   * @param procId the target procedure
1261   * @param user the user
1262   * @return true if the user is the owner of the procedure,
1263   *   false otherwise or the owner is unknown.
1264   */
1265  public boolean isProcedureOwner(long procId, User user) {
1266    if (user == null) {
1267      return false;
1268    }
1269    final Procedure<TEnvironment> runningProc = procedures.get(procId);
1270    if (runningProc != null) {
1271      return runningProc.getOwner().equals(user.getShortName());
1272    }
1273
1274    final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1275    if (retainer != null) {
1276      return retainer.getProcedure().getOwner().equals(user.getShortName());
1277    }
1278
1279    // Procedure either does not exist or has already completed and got cleaned up.
1280    // At this time, we cannot check the owner of the procedure
1281    return false;
1282  }
1283
1284
1285  /**
1286   * Should only be used when starting up, where the procedure workers have not been started.
1287   * <p/>
1288   * If the procedure works has been started, the return values maybe changed when you are
1289   * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
1290   * it will do a copy, and also include the finished procedures.
1291   */
1292  public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
1293    return procedures.values();
1294  }
1295
1296  /**
1297   * Get procedures.
1298   * @return the procedures in a list
1299   */
1300  public List<Procedure<TEnvironment>> getProcedures() {
1301    List<Procedure<TEnvironment>> procedureList =
1302      new ArrayList<>(procedures.size() + completed.size());
1303    procedureList.addAll(procedures.values());
1304    // Note: The procedure could show up twice in the list with different state, as
1305    // it could complete after we walk through procedures list and insert into
1306    // procedureList - it is ok, as we will use the information in the Procedure
1307    // to figure it out; to prevent this would increase the complexity of the logic.
1308    completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
1309      .forEach(procedureList::add);
1310    return procedureList;
1311  }
1312
1313  // ==========================================================================
1314  //  Listeners helpers
1315  // ==========================================================================
1316  public void registerListener(ProcedureExecutorListener listener) {
1317    this.listeners.add(listener);
1318  }
1319
1320  public boolean unregisterListener(ProcedureExecutorListener listener) {
1321    return this.listeners.remove(listener);
1322  }
1323
1324  private void sendProcedureLoadedNotification(final long procId) {
1325    if (!this.listeners.isEmpty()) {
1326      for (ProcedureExecutorListener listener: this.listeners) {
1327        try {
1328          listener.procedureLoaded(procId);
1329        } catch (Throwable e) {
1330          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1331        }
1332      }
1333    }
1334  }
1335
1336  private void sendProcedureAddedNotification(final long procId) {
1337    if (!this.listeners.isEmpty()) {
1338      for (ProcedureExecutorListener listener: this.listeners) {
1339        try {
1340          listener.procedureAdded(procId);
1341        } catch (Throwable e) {
1342          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1343        }
1344      }
1345    }
1346  }
1347
1348  private void sendProcedureFinishedNotification(final long procId) {
1349    if (!this.listeners.isEmpty()) {
1350      for (ProcedureExecutorListener listener: this.listeners) {
1351        try {
1352          listener.procedureFinished(procId);
1353        } catch (Throwable e) {
1354          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1355        }
1356      }
1357    }
1358  }
1359
1360  // ==========================================================================
1361  //  Procedure IDs helpers
1362  // ==========================================================================
1363  private long nextProcId() {
1364    long procId = lastProcId.incrementAndGet();
1365    if (procId < 0) {
1366      while (!lastProcId.compareAndSet(procId, 0)) {
1367        procId = lastProcId.get();
1368        if (procId >= 0)
1369          break;
1370      }
1371      while (procedures.containsKey(procId)) {
1372        procId = lastProcId.incrementAndGet();
1373      }
1374    }
1375    assert procId >= 0 : "Invalid procId " + procId;
1376    return procId;
1377  }
1378
1379  @VisibleForTesting
1380  protected long getLastProcId() {
1381    return lastProcId.get();
1382  }
1383
1384  @VisibleForTesting
1385  public Set<Long> getActiveProcIds() {
1386    return procedures.keySet();
1387  }
1388
1389  Long getRootProcedureId(Procedure<TEnvironment> proc) {
1390    return Procedure.getRootProcedureId(procedures, proc);
1391  }
1392
1393  // ==========================================================================
1394  //  Executions
1395  // ==========================================================================
1396  private void executeProcedure(Procedure<TEnvironment> proc) {
1397    if (proc.isFinished()) {
1398      LOG.debug("{} is already finished, skipping execution", proc);
1399      return;
1400    }
1401    final Long rootProcId = getRootProcedureId(proc);
1402    if (rootProcId == null) {
1403      // The 'proc' was ready to run but the root procedure was rolledback
1404      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
1405      executeRollback(proc);
1406      return;
1407    }
1408
1409    RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
1410    if (procStack == null) {
1411      LOG.warn("RootProcedureState is null for " + proc.getProcId());
1412      return;
1413    }
1414    do {
1415      // Try to acquire the execution
1416      if (!procStack.acquire(proc)) {
1417        if (procStack.setRollback()) {
1418          // we have the 'rollback-lock' we can start rollingback
1419          switch (executeRollback(rootProcId, procStack)) {
1420            case LOCK_ACQUIRED:
1421              break;
1422            case LOCK_YIELD_WAIT:
1423              procStack.unsetRollback();
1424              scheduler.yield(proc);
1425              break;
1426            case LOCK_EVENT_WAIT:
1427              LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
1428              procStack.unsetRollback();
1429              break;
1430            default:
1431              throw new UnsupportedOperationException();
1432          }
1433        } else {
1434          // if we can't rollback means that some child is still running.
1435          // the rollback will be executed after all the children are done.
1436          // If the procedure was never executed, remove and mark it as rolledback.
1437          if (!proc.wasExecuted()) {
1438            switch (executeRollback(proc)) {
1439              case LOCK_ACQUIRED:
1440                break;
1441              case LOCK_YIELD_WAIT:
1442                scheduler.yield(proc);
1443                break;
1444              case LOCK_EVENT_WAIT:
1445                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
1446                break;
1447              default:
1448                throw new UnsupportedOperationException();
1449            }
1450          }
1451        }
1452        break;
1453      }
1454
1455      // Execute the procedure
1456      assert proc.getState() == ProcedureState.RUNNABLE : proc;
1457      // Note that lock is NOT about concurrency but rather about ensuring
1458      // ownership of a procedure of an entity such as a region or table
1459      LockState lockState = acquireLock(proc);
1460      switch (lockState) {
1461        case LOCK_ACQUIRED:
1462          execProcedure(procStack, proc);
1463          break;
1464        case LOCK_YIELD_WAIT:
1465          LOG.info(lockState + " " + proc);
1466          scheduler.yield(proc);
1467          break;
1468        case LOCK_EVENT_WAIT:
1469          // Someone will wake us up when the lock is available
1470          LOG.debug(lockState + " " + proc);
1471          break;
1472        default:
1473          throw new UnsupportedOperationException();
1474      }
1475      procStack.release(proc);
1476
1477      if (proc.isSuccess()) {
1478        // update metrics on finishing the procedure.
1479        proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
1480        // Print out count of outstanding siblings if this procedure has a parent.
1481        Procedure<TEnvironment> parent = null;
1482        if (proc.hasParent()) {
1483          parent = procedures.get(proc.getParentProcId());
1484        }
1485        LOG.info("Finished {} in {}{}",
1486            proc,
1487            StringUtils.humanTimeDiff(proc.elapsedTime()),
1488            parent != null? (", unfinishedSiblingCount=" + parent.getChildrenLatch()): "");
1489        // Finalize the procedure state
1490        if (proc.getProcId() == rootProcId) {
1491          procedureFinished(proc);
1492        } else {
1493          execCompletionCleanup(proc);
1494        }
1495        break;
1496      }
1497    } while (procStack.isFailed());
1498  }
1499
1500  private LockState acquireLock(Procedure<TEnvironment> proc) {
1501    TEnvironment env = getEnvironment();
1502    // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
1503    // hasLock is true.
1504    if (proc.hasLock()) {
1505      return LockState.LOCK_ACQUIRED;
1506    }
1507    return proc.doAcquireLock(env, store);
1508  }
1509
1510  private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
1511    TEnvironment env = getEnvironment();
1512    // For how the framework works, we know that we will always have the lock
1513    // when we call releaseLock(), so we can avoid calling proc.hasLock()
1514    if (force || !proc.holdLock(env) || proc.isFinished()) {
1515      proc.doReleaseLock(env, store);
1516    }
1517  }
1518
1519  /**
1520   * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
1521   * root-procedure will be visible as finished to user, and the result will be the fatal exception.
1522   */
1523  private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
1524    Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
1525    RemoteProcedureException exception = rootProc.getException();
1526    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
1527    // rolling back because the subprocedure does. Clarify.
1528    if (exception == null) {
1529      exception = procStack.getException();
1530      rootProc.setFailure(exception);
1531      store.update(rootProc);
1532    }
1533
1534    List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
1535    assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
1536
1537    int stackTail = subprocStack.size();
1538    while (stackTail-- > 0) {
1539      Procedure<TEnvironment> proc = subprocStack.get(stackTail);
1540      IdLock.Entry lockEntry = null;
1541      // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
1542      // this check, as the worker will hold the lock before executing a procedure. This is the only
1543      // place where we may hold two procedure execution locks, and there is a fence in the
1544      // RootProcedureState where we can make sure that only one worker can execute the rollback of
1545      // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
1546      // prevent race between us and the force update thread.
1547      if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
1548        try {
1549          lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
1550        } catch (IOException e) {
1551          // can only happen if interrupted, so not a big deal to propagate it
1552          throw new UncheckedIOException(e);
1553        }
1554      }
1555      try {
1556        // For the sub procedures which are successfully finished, we do not rollback them.
1557        // Typically, if we want to rollback a procedure, we first need to rollback it, and then
1558        // recursively rollback its ancestors. The state changes which are done by sub procedures
1559        // should be handled by parent procedures when rolling back. For example, when rolling back
1560        // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
1561        // online, instead of rolling back the original procedures which offlined the regions(in
1562        // fact these procedures can not be rolled back...).
1563        if (proc.isSuccess()) {
1564          // Just do the cleanup work, without actually executing the rollback
1565          subprocStack.remove(stackTail);
1566          cleanupAfterRollbackOneStep(proc);
1567          continue;
1568        }
1569        LockState lockState = acquireLock(proc);
1570        if (lockState != LockState.LOCK_ACQUIRED) {
1571          // can't take a lock on the procedure, add the root-proc back on the
1572          // queue waiting for the lock availability
1573          return lockState;
1574        }
1575
1576        lockState = executeRollback(proc);
1577        releaseLock(proc, false);
1578        boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
1579        abortRollback |= !isRunning() || !store.isRunning();
1580
1581        // allows to kill the executor before something is stored to the wal.
1582        // useful to test the procedure recovery.
1583        if (abortRollback) {
1584          return lockState;
1585        }
1586
1587        subprocStack.remove(stackTail);
1588
1589        // if the procedure is kind enough to pass the slot to someone else, yield
1590        // if the proc is already finished, do not yield
1591        if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
1592          return LockState.LOCK_YIELD_WAIT;
1593        }
1594
1595        if (proc != rootProc) {
1596          execCompletionCleanup(proc);
1597        }
1598      } finally {
1599        if (lockEntry != null) {
1600          procExecutionLock.releaseLockEntry(lockEntry);
1601        }
1602      }
1603    }
1604
1605    // Finalize the procedure state
1606    LOG.info("Rolled back {} exec-time={}", rootProc,
1607      StringUtils.humanTimeDiff(rootProc.elapsedTime()));
1608    procedureFinished(rootProc);
1609    return LockState.LOCK_ACQUIRED;
1610  }
1611
1612  private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {
1613    if (proc.removeStackIndex()) {
1614      if (!proc.isSuccess()) {
1615        proc.setState(ProcedureState.ROLLEDBACK);
1616      }
1617
1618      // update metrics on finishing the procedure (fail)
1619      proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);
1620
1621      if (proc.hasParent()) {
1622        store.delete(proc.getProcId());
1623        procedures.remove(proc.getProcId());
1624      } else {
1625        final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
1626        if (childProcIds != null) {
1627          store.delete(proc, childProcIds);
1628        } else {
1629          store.update(proc);
1630        }
1631      }
1632    } else {
1633      store.update(proc);
1634    }
1635  }
1636
1637  /**
1638   * Execute the rollback of the procedure step.
1639   * It updates the store with the new state (stack index)
1640   * or will remove completly the procedure in case it is a child.
1641   */
1642  private LockState executeRollback(Procedure<TEnvironment> proc) {
1643    try {
1644      proc.doRollback(getEnvironment());
1645    } catch (IOException e) {
1646      LOG.debug("Roll back attempt failed for {}", proc, e);
1647      return LockState.LOCK_YIELD_WAIT;
1648    } catch (InterruptedException e) {
1649      handleInterruptedException(proc, e);
1650      return LockState.LOCK_YIELD_WAIT;
1651    } catch (Throwable e) {
1652      // Catch NullPointerExceptions or similar errors...
1653      LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);
1654    }
1655
1656    // allows to kill the executor before something is stored to the wal.
1657    // useful to test the procedure recovery.
1658    if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1659      String msg = "TESTING: Kill before store update";
1660      LOG.debug(msg);
1661      stop();
1662      throw new RuntimeException(msg);
1663    }
1664
1665    cleanupAfterRollbackOneStep(proc);
1666
1667    return LockState.LOCK_ACQUIRED;
1668  }
1669
1670  private void yieldProcedure(Procedure<TEnvironment> proc) {
1671    releaseLock(proc, false);
1672    scheduler.yield(proc);
1673  }
1674
1675  /**
1676   * Executes <code>procedure</code>
1677   * <ul>
1678   *  <li>Calls the doExecute() of the procedure
1679   *  <li>If the procedure execution didn't fail (i.e. valid user input)
1680   *  <ul>
1681   *    <li>...and returned subprocedures
1682   *    <ul><li>The subprocedures are initialized.
1683   *      <li>The subprocedures are added to the store
1684   *      <li>The subprocedures are added to the runnable queue
1685   *      <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
1686   *    </ul>
1687   *    </li>
1688   *   <li>...if there are no subprocedure
1689   *    <ul><li>the procedure completed successfully
1690   *      <li>if there is a parent (WAITING)
1691   *      <li>the parent state will be set to RUNNABLE
1692   *    </ul>
1693   *   </li>
1694   *  </ul>
1695   *  </li>
1696   *  <li>In case of failure
1697   *  <ul>
1698   *    <li>The store is updated with the new state</li>
1699   *    <li>The executor (caller of this method) will start the rollback of the procedure</li>
1700   *  </ul>
1701   *  </li>
1702   *  </ul>
1703   */
1704  private void execProcedure(RootProcedureState<TEnvironment> procStack,
1705      Procedure<TEnvironment> procedure) {
1706    Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
1707      "NOT RUNNABLE! " + procedure.toString());
1708
1709    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
1710    // The exception is caught below and then we hurry to the exit without disturbing state. The
1711    // idea is that the processing of this procedure will be unsuspended later by an external event
1712    // such the report of a region open.
1713    boolean suspended = false;
1714
1715    // Whether to 're-' -execute; run through the loop again.
1716    boolean reExecute = false;
1717
1718    Procedure<TEnvironment>[] subprocs = null;
1719    do {
1720      reExecute = false;
1721      procedure.resetPersistence();
1722      try {
1723        subprocs = procedure.doExecute(getEnvironment());
1724        if (subprocs != null && subprocs.length == 0) {
1725          subprocs = null;
1726        }
1727      } catch (ProcedureSuspendedException e) {
1728        LOG.trace("Suspend {}", procedure);
1729        suspended = true;
1730      } catch (ProcedureYieldException e) {
1731        LOG.trace("Yield {}", procedure, e);
1732        yieldProcedure(procedure);
1733        return;
1734      } catch (InterruptedException e) {
1735        LOG.trace("Yield interrupt {}", procedure, e);
1736        handleInterruptedException(procedure, e);
1737        yieldProcedure(procedure);
1738        return;
1739      } catch (Throwable e) {
1740        // Catch NullPointerExceptions or similar errors...
1741        String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
1742        LOG.error(msg, e);
1743        procedure.setFailure(new RemoteProcedureException(msg, e));
1744      }
1745
1746      if (!procedure.isFailed()) {
1747        if (subprocs != null) {
1748          if (subprocs.length == 1 && subprocs[0] == procedure) {
1749            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
1750            // i.e. we go around this loop again rather than go back out on the scheduler queue.
1751            subprocs = null;
1752            reExecute = true;
1753            LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
1754          } else {
1755            // Yield the current procedure, and make the subprocedure runnable
1756            // subprocs may come back 'null'.
1757            subprocs = initializeChildren(procStack, procedure, subprocs);
1758            LOG.info("Initialized subprocedures=" +
1759              (subprocs == null? null:
1760                Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
1761                collect(Collectors.toList()).toString()));
1762          }
1763        } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1764          LOG.trace("Added to timeoutExecutor {}", procedure);
1765          timeoutExecutor.add(procedure);
1766        } else if (!suspended) {
1767          // No subtask, so we are done
1768          procedure.setState(ProcedureState.SUCCESS);
1769        }
1770      }
1771
1772      // Add the procedure to the stack
1773      procStack.addRollbackStep(procedure);
1774
1775      // allows to kill the executor before something is stored to the wal.
1776      // useful to test the procedure recovery.
1777      if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) {
1778        kill("TESTING: Kill BEFORE store update: " + procedure);
1779      }
1780
1781      // TODO: The code here doesn't check if store is running before persisting to the store as
1782      // it relies on the method call below to throw RuntimeException to wind up the stack and
1783      // executor thread to stop. The statement following the method call below seems to check if
1784      // store is not running, to prevent scheduling children procedures, re-execution or yield
1785      // of this procedure. This may need more scrutiny and subsequent cleanup in future
1786      //
1787      // Commit the transaction even if a suspend (state may have changed). Note this append
1788      // can take a bunch of time to complete.
1789      if (procedure.needPersistence()) {
1790        updateStoreOnExec(procStack, procedure, subprocs);
1791      }
1792
1793      // if the store is not running we are aborting
1794      if (!store.isRunning()) {
1795        return;
1796      }
1797      // if the procedure is kind enough to pass the slot to someone else, yield
1798      if (procedure.isRunnable() && !suspended &&
1799          procedure.isYieldAfterExecutionStep(getEnvironment())) {
1800        yieldProcedure(procedure);
1801        return;
1802      }
1803
1804      assert (reExecute && subprocs == null) || !reExecute;
1805    } while (reExecute);
1806
1807    // Allows to kill the executor after something is stored to the WAL but before the below
1808    // state settings are done -- in particular the one on the end where we make parent
1809    // RUNNABLE again when its children are done; see countDownChildren.
1810    if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
1811      kill("TESTING: Kill AFTER store update: " + procedure);
1812    }
1813
1814    // Submit the new subprocedures
1815    if (subprocs != null && !procedure.isFailed()) {
1816      submitChildrenProcedures(subprocs);
1817    }
1818
1819    // we need to log the release lock operation before waking up the parent procedure, as there
1820    // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
1821    // the sub procedures from store and cause problems...
1822    releaseLock(procedure, false);
1823
1824    // if the procedure is complete and has a parent, count down the children latch.
1825    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
1826    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
1827      countDownChildren(procStack, procedure);
1828    }
1829  }
1830
1831  private void kill(String msg) {
1832    LOG.debug(msg);
1833    stop();
1834    throw new RuntimeException(msg);
1835  }
1836
1837  private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
1838      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1839    assert subprocs != null : "expected subprocedures";
1840    final long rootProcId = getRootProcedureId(procedure);
1841    for (int i = 0; i < subprocs.length; ++i) {
1842      Procedure<TEnvironment> subproc = subprocs[i];
1843      if (subproc == null) {
1844        String msg = "subproc[" + i + "] is null, aborting the procedure";
1845        procedure.setFailure(new RemoteProcedureException(msg,
1846          new IllegalArgumentIOException(msg)));
1847        return null;
1848      }
1849
1850      assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
1851      subproc.setParentProcId(procedure.getProcId());
1852      subproc.setRootProcId(rootProcId);
1853      subproc.setProcId(nextProcId());
1854      procStack.addSubProcedure(subproc);
1855    }
1856
1857    if (!procedure.isFailed()) {
1858      procedure.setChildrenLatch(subprocs.length);
1859      switch (procedure.getState()) {
1860        case RUNNABLE:
1861          procedure.setState(ProcedureState.WAITING);
1862          break;
1863        case WAITING_TIMEOUT:
1864          timeoutExecutor.add(procedure);
1865          break;
1866        default:
1867          break;
1868      }
1869    }
1870    return subprocs;
1871  }
1872
1873  private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
1874    for (int i = 0; i < subprocs.length; ++i) {
1875      Procedure<TEnvironment> subproc = subprocs[i];
1876      subproc.updateMetricsOnSubmit(getEnvironment());
1877      assert !procedures.containsKey(subproc.getProcId());
1878      procedures.put(subproc.getProcId(), subproc);
1879      scheduler.addFront(subproc);
1880    }
1881  }
1882
1883  private void countDownChildren(RootProcedureState<TEnvironment> procStack,
1884      Procedure<TEnvironment> procedure) {
1885    Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
1886    if (parent == null) {
1887      assert procStack.isRollingback();
1888      return;
1889    }
1890
1891    // If this procedure is the last child awake the parent procedure
1892    if (parent.tryRunnable()) {
1893      // If we succeeded in making the parent runnable -- i.e. all of its
1894      // children have completed, move parent to front of the queue.
1895      store.update(parent);
1896      scheduler.addFront(parent);
1897      LOG.info("Finished subprocedure pid={}, resume processing parent {}",
1898          procedure.getProcId(), parent);
1899      return;
1900    }
1901  }
1902
1903  private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
1904      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1905    if (subprocs != null && !procedure.isFailed()) {
1906      if (LOG.isTraceEnabled()) {
1907        LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
1908      }
1909      store.insert(procedure, subprocs);
1910    } else {
1911      LOG.trace("Store update {}", procedure);
1912      if (procedure.isFinished() && !procedure.hasParent()) {
1913        // remove child procedures
1914        final long[] childProcIds = procStack.getSubprocedureIds();
1915        if (childProcIds != null) {
1916          store.delete(procedure, childProcIds);
1917          for (int i = 0; i < childProcIds.length; ++i) {
1918            procedures.remove(childProcIds[i]);
1919          }
1920        } else {
1921          store.update(procedure);
1922        }
1923      } else {
1924        store.update(procedure);
1925      }
1926    }
1927  }
1928
1929  private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
1930    LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
1931    // NOTE: We don't call Thread.currentThread().interrupt()
1932    // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
1933    // the InterruptedException. If the master is going down, we will be notified
1934    // and the executor/store will be stopped.
1935    // (The interrupted procedure will be retried on the next run)
1936  }
1937
1938  private void execCompletionCleanup(Procedure<TEnvironment> proc) {
1939    final TEnvironment env = getEnvironment();
1940    if (proc.hasLock()) {
1941      LOG.warn("Usually this should not happen, we will release the lock before if the procedure" +
1942        " is finished, even if the holdLock is true, arrive here means we have some holes where" +
1943        " we do not release the lock. And the releaseLock below may fail since the procedure may" +
1944        " have already been deleted from the procedure store.");
1945      releaseLock(proc, true);
1946    }
1947    try {
1948      proc.completionCleanup(env);
1949    } catch (Throwable e) {
1950      // Catch NullPointerExceptions or similar errors...
1951      LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1952    }
1953  }
1954
1955  private void procedureFinished(Procedure<TEnvironment> proc) {
1956    // call the procedure completion cleanup handler
1957    execCompletionCleanup(proc);
1958
1959    CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
1960
1961    // update the executor internal state maps
1962    if (!proc.shouldWaitClientAck(getEnvironment())) {
1963      retainer.setClientAckTime(0);
1964    }
1965
1966    completed.put(proc.getProcId(), retainer);
1967    rollbackStack.remove(proc.getProcId());
1968    procedures.remove(proc.getProcId());
1969
1970    // call the runnableSet completion cleanup handler
1971    try {
1972      scheduler.completionCleanup(proc);
1973    } catch (Throwable e) {
1974      // Catch NullPointerExceptions or similar errors...
1975      LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
1976    }
1977
1978    // Notify the listeners
1979    sendProcedureFinishedNotification(proc.getProcId());
1980  }
1981
1982  RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
1983    return rollbackStack.get(rootProcId);
1984  }
1985
1986  @VisibleForTesting
1987  ProcedureScheduler getProcedureScheduler() {
1988    return scheduler;
1989  }
1990
1991  @VisibleForTesting
1992  int getCompletedSize() {
1993    return completed.size();
1994  }
1995
1996  // ==========================================================================
1997  //  Worker Thread
1998  // ==========================================================================
1999  private class WorkerThread extends StoppableThread {
2000    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
2001    private volatile Procedure<TEnvironment> activeProcedure;
2002    private boolean onlyPollUrgent = false;
2003    public WorkerThread(ThreadGroup group) {
2004      this(group, "PEWorker-");
2005    }
2006
2007    protected WorkerThread(ThreadGroup group, String prefix) {
2008      this(group, prefix, false);
2009    }
2010
2011    protected WorkerThread(ThreadGroup group, String prefix, boolean onlyPollUrgent) {
2012      super(group, prefix + workerId.incrementAndGet());
2013      this.onlyPollUrgent = onlyPollUrgent;
2014      setDaemon(true);
2015    }
2016
2017    @Override
2018    public void sendStopSignal() {
2019      scheduler.signalAll();
2020    }
2021    @Override
2022    public void run() {
2023      long lastUpdate = EnvironmentEdgeManager.currentTime();
2024      try {
2025        while (isRunning() && keepAlive(lastUpdate)) {
2026          Procedure<TEnvironment> proc = scheduler
2027              .poll(onlyPollUrgent, keepAliveTime, TimeUnit.MILLISECONDS);
2028          if (proc == null) {
2029            continue;
2030          }
2031          this.activeProcedure = proc;
2032          int activeCount = activeExecutorCount.incrementAndGet();
2033          int runningCount = store.setRunningProcedureCount(activeCount);
2034          LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
2035            runningCount, activeCount);
2036          executionStartTime.set(EnvironmentEdgeManager.currentTime());
2037          IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
2038          try {
2039            executeProcedure(proc);
2040          } catch (AssertionError e) {
2041            LOG.info("ASSERT pid=" + proc.getProcId(), e);
2042            throw e;
2043          } finally {
2044            procExecutionLock.releaseLockEntry(lockEntry);
2045            activeCount = activeExecutorCount.decrementAndGet();
2046            runningCount = store.setRunningProcedureCount(activeCount);
2047            LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
2048              runningCount, activeCount);
2049            this.activeProcedure = null;
2050            lastUpdate = EnvironmentEdgeManager.currentTime();
2051            executionStartTime.set(Long.MAX_VALUE);
2052          }
2053        }
2054      } catch (Throwable t) {
2055        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
2056      } finally {
2057        LOG.trace("Worker terminated.");
2058      }
2059      if (onlyPollUrgent) {
2060        urgentWorkerThreads.remove(this);
2061      } else {
2062        workerThreads.remove(this);
2063      }
2064    }
2065
2066    @Override
2067    public String toString() {
2068      Procedure<?> p = this.activeProcedure;
2069      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");
2070    }
2071
2072    /**
2073     * @return the time since the current procedure is running
2074     */
2075    public long getCurrentRunTime() {
2076      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
2077    }
2078
2079    // core worker never timeout
2080    protected boolean keepAlive(long lastUpdate) {
2081      return true;
2082    }
2083  }
2084
2085  // A worker thread which can be added when core workers are stuck. Will timeout after
2086  // keepAliveTime if there is no procedure to run.
2087  private final class KeepAliveWorkerThread extends WorkerThread {
2088
2089    public KeepAliveWorkerThread(ThreadGroup group) {
2090      super(group, "KeepAlivePEWorker-");
2091    }
2092
2093    @Override
2094    protected boolean keepAlive(long lastUpdate) {
2095      return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
2096    }
2097  }
2098
2099  // ----------------------------------------------------------------------------
2100  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the
2101  // full set of procedures pending and completed to write a compacted
2102  // version of the log (in case is a log)?
2103  // In theory no, procedures are have a short life, so at some point the store
2104  // will have the tracker saying everything is in the last log.
2105  // ----------------------------------------------------------------------------
2106
2107  private final class WorkerMonitor extends InlineChore {
2108    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
2109        "hbase.procedure.worker.monitor.interval.msec";
2110    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
2111
2112    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
2113        "hbase.procedure.worker.stuck.threshold.msec";
2114    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
2115
2116    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =
2117        "hbase.procedure.worker.add.stuck.percentage";
2118    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck
2119
2120    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
2121    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
2122    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;
2123
2124    public WorkerMonitor() {
2125      refreshConfig();
2126    }
2127
2128    @Override
2129    public void run() {
2130      final int stuckCount = checkForStuckWorkers();
2131      checkThreadCount(stuckCount);
2132
2133      // refresh interval (poor man dynamic conf update)
2134      refreshConfig();
2135    }
2136
2137    private int checkForStuckWorkers() {
2138      // check if any of the worker is stuck
2139      int stuckCount = 0;
2140      for (WorkerThread worker : workerThreads) {
2141        if (worker.getCurrentRunTime() < stuckThreshold) {
2142          continue;
2143        }
2144
2145        // WARN the worker is stuck
2146        stuckCount++;
2147        LOG.warn("Worker stuck {}, run time {}", worker,
2148          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
2149      }
2150      return stuckCount;
2151    }
2152
2153    private void checkThreadCount(final int stuckCount) {
2154      // nothing to do if there are no runnable tasks
2155      if (stuckCount < 1 || !scheduler.hasRunnables()) {
2156        return;
2157      }
2158
2159      // add a new thread if the worker stuck percentage exceed the threshold limit
2160      // and every handler is active.
2161      final float stuckPerc = ((float) stuckCount) / workerThreads.size();
2162      // let's add new worker thread more aggressively, as they will timeout finally if there is no
2163      // work to do.
2164      if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
2165        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
2166        workerThreads.add(worker);
2167        worker.start();
2168        LOG.debug("Added new worker thread {}", worker);
2169      }
2170    }
2171
2172    private void refreshConfig() {
2173      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,
2174          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);
2175      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,
2176        DEFAULT_WORKER_MONITOR_INTERVAL);
2177      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,
2178        DEFAULT_WORKER_STUCK_THRESHOLD);
2179    }
2180
2181    @Override
2182    public int getTimeoutInterval() {
2183      return timeoutInterval;
2184    }
2185  }
2186}