001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.UncheckedIOException;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Deque;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.Executor;
033import java.util.concurrent.Executors;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.stream.Collectors;
039import java.util.stream.Stream;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
043import org.apache.hadoop.hbase.log.HBaseMarkers;
044import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
048import org.apache.hadoop.hbase.procedure2.util.StringUtils;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.IdLock;
052import org.apache.hadoop.hbase.util.NonceKey;
053import org.apache.hadoop.hbase.util.Threads;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
063
064/**
065 * Thread Pool that executes the submitted procedures.
066 * The executor has a ProcedureStore associated.
067 * Each operation is logged and on restart the pending procedures are resumed.
068 *
069 * Unless the Procedure code throws an error (e.g. invalid user input)
070 * the procedure will complete (at some point in time), On restart the pending
071 * procedures are resumed and the once failed will be rolledback.
072 *
073 * The user can add procedures to the executor via submitProcedure(proc)
074 * check for the finished state via isFinished(procId)
075 * and get the result via getResult(procId)
076 */
077@InterfaceAudience.Private
078public class ProcedureExecutor<TEnvironment> {
079  private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
080
081  public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
082  private static final boolean DEFAULT_CHECK_OWNER_SET = false;
083
084  public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
085      "hbase.procedure.worker.keep.alive.time.msec";
086  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
087
088  public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
089  static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
090
091  public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
092  static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
093
094  /**
095   * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
096   * break PE having it fail at various junctures. When non-null, testing is set to an instance of
097   * the below internal {@link Testing} class with flags set for the particular test.
098   */
099  volatile Testing testing = null;
100
101  /**
102   * Class with parameters describing how to fail/die when in testing-context.
103   */
104  public static class Testing {
105    protected volatile boolean killIfHasParent = true;
106    protected volatile boolean killIfSuspended = false;
107
108    /**
109     * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is
110     * persisting all the state it needs to recover after a crash.
111     */
112    protected volatile boolean killBeforeStoreUpdate = false;
113    protected volatile boolean toggleKillBeforeStoreUpdate = false;
114
115    /**
116     * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978
117     * is about a case where memory-state was being set after store to WAL where a crash could
118     * cause us to get stuck. This flag allows killing at what was a vulnerable time.
119     */
120    protected volatile boolean killAfterStoreUpdate = false;
121    protected volatile boolean toggleKillAfterStoreUpdate = false;
122
123    protected boolean shouldKillBeforeStoreUpdate() {
124      final boolean kill = this.killBeforeStoreUpdate;
125      if (this.toggleKillBeforeStoreUpdate) {
126        this.killBeforeStoreUpdate = !kill;
127        LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
128      }
129      return kill;
130    }
131
132    protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) {
133      if (isSuspended && !killIfSuspended) {
134        return false;
135      }
136      if (hasParent && !killIfHasParent) {
137        return false;
138      }
139      return shouldKillBeforeStoreUpdate();
140    }
141
142    protected boolean shouldKillAfterStoreUpdate() {
143      final boolean kill = this.killAfterStoreUpdate;
144      if (this.toggleKillAfterStoreUpdate) {
145        this.killAfterStoreUpdate = !kill;
146        LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);
147      }
148      return kill;
149    }
150
151    protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {
152      return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();
153    }
154  }
155
156  public interface ProcedureExecutorListener {
157    void procedureLoaded(long procId);
158    void procedureAdded(long procId);
159    void procedureFinished(long procId);
160  }
161
162  /**
163   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
164   * Once a Root-Procedure completes (success or failure), the result will be added to this map.
165   * The user of ProcedureExecutor should call getResult(procId) to get the result.
166   */
167  private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
168    new ConcurrentHashMap<>();
169
170  /**
171   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
172   * The RootProcedureState contains the execution stack of the Root-Procedure,
173   * It is added to the map by submitProcedure() and removed on procedure completion.
174   */
175  private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
176    new ConcurrentHashMap<>();
177
178  /**
179   * Helper map to lookup the live procedures by ID.
180   * This map contains every procedure. root-procedures and subprocedures.
181   */
182  private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
183    new ConcurrentHashMap<>();
184
185  /**
186   * Helper map to lookup whether the procedure already issued from the same client. This map
187   * contains every root procedure.
188   */
189  private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
190
191  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
192    new CopyOnWriteArrayList<>();
193
194  private Configuration conf;
195
196  /**
197   * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
198   * resource handling rather than observing in a #join is unexpected).
199   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
200   * (Should be ok).
201   */
202  private ThreadGroup threadGroup;
203
204  /**
205   * Created in the {@link #init(int, boolean)}  method. Terminated in {@link #join()} (FIX! Doing
206   * resource handling rather than observing in a #join is unexpected).
207   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
208   * (Should be ok).
209   */
210  private CopyOnWriteArrayList<WorkerThread> workerThreads;
211
212  /**
213   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
214   * resource handling rather than observing in a #join is unexpected).
215   * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
216   * (Should be ok).
217   */
218  private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
219
220  private int corePoolSize;
221  private int maxPoolSize;
222
223  private volatile long keepAliveTime;
224
225  /**
226   * Scheduler/Queue that contains runnable procedures.
227   */
228  private final ProcedureScheduler scheduler;
229
230  private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
231    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
232
233  private final AtomicLong lastProcId = new AtomicLong(-1);
234  private final AtomicLong workerId = new AtomicLong(0);
235  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
236  private final AtomicBoolean running = new AtomicBoolean(false);
237  private final TEnvironment environment;
238  private final ProcedureStore store;
239
240  private final boolean checkOwnerSet;
241
242  // To prevent concurrent execution of the same procedure.
243  // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
244  // procedure is woken up before we finish the suspend which causes the same procedures to be
245  // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
246  // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
247  // execution of the same procedure.
248  private final IdLock procExecutionLock = new IdLock();
249
250  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
251      final ProcedureStore store) {
252    this(conf, environment, store, new SimpleProcedureScheduler());
253  }
254
255  private boolean isRootFinished(Procedure<?> proc) {
256    Procedure<?> rootProc = procedures.get(proc.getRootProcId());
257    return rootProc == null || rootProc.isFinished();
258  }
259
260  private void forceUpdateProcedure(long procId) throws IOException {
261    IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
262    try {
263      Procedure<TEnvironment> proc = procedures.get(procId);
264      if (proc != null) {
265        if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
266          LOG.debug("Procedure {} has already been finished and parent is succeeded," +
267            " skip force updating", proc);
268          return;
269        }
270      } else {
271        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
272        if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
273          LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
274          return;
275        }
276        long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
277        long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
278        if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
279          LOG.debug("Procedure {} has already been finished and expired, skip force updating",
280            procId);
281          return;
282        }
283        proc = retainer.getProcedure();
284      }
285      LOG.debug("Force update procedure {}", proc);
286      store.update(proc);
287    } finally {
288      procExecutionLock.releaseLockEntry(lockEntry);
289    }
290  }
291
292  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
293      final ProcedureStore store, final ProcedureScheduler scheduler) {
294    this.environment = environment;
295    this.scheduler = scheduler;
296    this.store = store;
297    this.conf = conf;
298    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
299    refreshConfiguration(conf);
300    store.registerListener(new ProcedureStoreListener() {
301
302      @Override
303      public void forceUpdate(long[] procIds) {
304        Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
305          try {
306            forceUpdateProcedure(procId);
307          } catch (IOException e) {
308            LOG.warn("Failed to force update procedure with pid={}", procId);
309          }
310        }));
311      }
312    });
313  }
314
315  private void load(final boolean abortOnCorruption) throws IOException {
316    Preconditions.checkArgument(completed.isEmpty(), "completed not empty");
317    Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty");
318    Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty");
319    Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty");
320
321    store.load(new ProcedureStore.ProcedureLoader() {
322      @Override
323      public void setMaxProcId(long maxProcId) {
324        assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
325        lastProcId.set(maxProcId);
326      }
327
328      @Override
329      public void load(ProcedureIterator procIter) throws IOException {
330        loadProcedures(procIter, abortOnCorruption);
331      }
332
333      @Override
334      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
335        int corruptedCount = 0;
336        while (procIter.hasNext()) {
337          Procedure<?> proc = procIter.next();
338          LOG.error("Corrupt " + proc);
339          corruptedCount++;
340        }
341        if (abortOnCorruption && corruptedCount > 0) {
342          throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
343        }
344      }
345    });
346  }
347
348  private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
349    proc.restoreLock(getEnvironment());
350    restored.add(proc.getProcId());
351  }
352
353  private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
354    while (!stack.isEmpty()) {
355      restoreLock(stack.pop(), restored);
356    }
357  }
358
359  // Restore the locks for all the procedures.
360  // Notice that we need to restore the locks starting from the root proc, otherwise there will be
361  // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
362  // calling the acquireLock method for the parent procedure.
363  // The algorithm is straight-forward:
364  // 1. Use a set to record the procedures which locks have already been restored.
365  // 2. Use a stack to store the hierarchy of the procedures
366  // 3. For all the procedure, we will first try to find its parent and push it into the stack,
367  // unless
368  // a. We have no parent, i.e, we are the root procedure
369  // b. The lock has already been restored(by checking the set introduced in #1)
370  // then we start to pop the stack and call acquireLock for each procedure.
371  // Notice that this should be done for all procedures, not only the ones in runnableList.
372  private void restoreLocks() {
373    Set<Long> restored = new HashSet<>();
374    Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
375    procedures.values().forEach(proc -> {
376      for (;;) {
377        if (restored.contains(proc.getProcId())) {
378          restoreLocks(stack, restored);
379          return;
380        }
381        if (!proc.hasParent()) {
382          restoreLock(proc, restored);
383          restoreLocks(stack, restored);
384          return;
385        }
386        stack.push(proc);
387        proc = procedures.get(proc.getParentProcId());
388      }
389    });
390  }
391
392  private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
393      throws IOException {
394    // 1. Build the rollback stack
395    int runnableCount = 0;
396    int failedCount = 0;
397    int waitingCount = 0;
398    int waitingTimeoutCount = 0;
399    while (procIter.hasNext()) {
400      boolean finished = procIter.isNextFinished();
401      @SuppressWarnings("unchecked")
402      Procedure<TEnvironment> proc = procIter.next();
403      NonceKey nonceKey = proc.getNonceKey();
404      long procId = proc.getProcId();
405
406      if (finished) {
407        completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
408        LOG.debug("Completed {}", proc);
409      } else {
410        if (!proc.hasParent()) {
411          assert !proc.isFinished() : "unexpected finished procedure";
412          rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
413        }
414
415        // add the procedure to the map
416        proc.beforeReplay(getEnvironment());
417        procedures.put(proc.getProcId(), proc);
418        switch (proc.getState()) {
419          case RUNNABLE:
420            runnableCount++;
421            break;
422          case FAILED:
423            failedCount++;
424            break;
425          case WAITING:
426            waitingCount++;
427            break;
428          case WAITING_TIMEOUT:
429            waitingTimeoutCount++;
430            break;
431          default:
432            break;
433        }
434      }
435
436      if (nonceKey != null) {
437        nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map
438      }
439    }
440
441    // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
442    // push it into the ProcedureScheduler directly to execute the rollback. But this does not work
443    // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
444    // the queue from runQueue in scheduler, and then when a procedure which has lock access, for
445    // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
446    // we will add the queue back to let the workers poll from it. The assumption here is that, the
447    // procedure which has the xlock should have been polled out already, so when loading we can not
448    // add the procedure to scheduler first and then call acquireLock, since the procedure is still
449    // in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
450    // then there is a dead lock
451    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
452    List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
453    List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
454    List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
455    procIter.reset();
456    while (procIter.hasNext()) {
457      if (procIter.isNextFinished()) {
458        procIter.skipNext();
459        continue;
460      }
461
462      @SuppressWarnings("unchecked")
463      Procedure<TEnvironment> proc = procIter.next();
464      assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
465      LOG.debug("Loading {}", proc);
466      Long rootProcId = getRootProcedureId(proc);
467      // The orphan procedures will be passed to handleCorrupted, so add an assert here
468      assert rootProcId != null;
469
470      if (proc.hasParent()) {
471        Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
472        if (parent != null && !proc.isFinished()) {
473          parent.incChildrenLatch();
474        }
475      }
476
477      RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
478      procStack.loadStack(proc);
479
480      proc.setRootProcId(rootProcId);
481      switch (proc.getState()) {
482        case RUNNABLE:
483          runnableList.add(proc);
484          break;
485        case WAITING:
486          waitingList.add(proc);
487          break;
488        case WAITING_TIMEOUT:
489          waitingTimeoutList.add(proc);
490          break;
491        case FAILED:
492          failedList.add(proc);
493          break;
494        case ROLLEDBACK:
495        case INITIALIZING:
496          String msg = "Unexpected " + proc.getState() + " state for " + proc;
497          LOG.error(msg);
498          throw new UnsupportedOperationException(msg);
499        default:
500          break;
501      }
502    }
503
504    // 3. Check the waiting procedures to see if some of them can be added to runnable.
505    waitingList.forEach(proc -> {
506      if (!proc.hasChildren()) {
507        // Normally, WAITING procedures should be waken by its children. But, there is a case that,
508        // all the children are successful and before they can wake up their parent procedure, the
509        // master was killed. So, during recovering the procedures from ProcedureWal, its children
510        // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
511        // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
512        // exception will throw:
513        // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
514        // "NOT RUNNABLE! " + procedure.toString());
515        proc.setState(ProcedureState.RUNNABLE);
516        runnableList.add(proc);
517      } else {
518        proc.afterReplay(getEnvironment());
519      }
520    });
521    // 4. restore locks
522    restoreLocks();
523
524    // 5. Push the procedures to the timeout executor
525    waitingTimeoutList.forEach(proc -> {
526      proc.afterReplay(getEnvironment());
527      timeoutExecutor.add(proc);
528    });
529
530    // 6. Push the procedure to the scheduler
531    failedList.forEach(scheduler::addBack);
532    runnableList.forEach(p -> {
533      p.afterReplay(getEnvironment());
534      if (!p.hasParent()) {
535        sendProcedureLoadedNotification(p.getProcId());
536      }
537      scheduler.addBack(p);
538    });
539    // After all procedures put into the queue, signal the worker threads.
540    // Otherwise, there is a race condition. See HBASE-21364.
541    scheduler.signalAll();
542  }
543
544  /**
545   * Initialize the procedure executor, but do not start workers. We will start them later.
546   * <p/>
547   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
548   * ensure a single executor, and start the procedure replay to resume and recover the previous
549   * pending and in-progress procedures.
550   * @param numThreads number of threads available for procedure execution.
551   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
552   *          is found on replay. otherwise false.
553   */
554  public void init(int numThreads, boolean abortOnCorruption) throws IOException {
555    // We have numThreads executor + one timer thread used for timing out
556    // procedures and triggering periodic procedures.
557    this.corePoolSize = numThreads;
558    this.maxPoolSize = 10 * numThreads;
559    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
560        corePoolSize, maxPoolSize);
561
562    this.threadGroup = new ThreadGroup("PEWorkerGroup");
563    this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
564
565    // Create the workers
566    workerId.set(0);
567    workerThreads = new CopyOnWriteArrayList<>();
568    for (int i = 0; i < corePoolSize; ++i) {
569      workerThreads.add(new WorkerThread(threadGroup));
570    }
571
572    long st, et;
573
574    // Acquire the store lease.
575    st = System.nanoTime();
576    store.recoverLease();
577    et = System.nanoTime();
578    LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
579      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
580
581    // start the procedure scheduler
582    scheduler.start();
583
584    // TODO: Split in two steps.
585    // TODO: Handle corrupted procedures (currently just a warn)
586    // The first one will make sure that we have the latest id,
587    // so we can start the threads and accept new procedures.
588    // The second step will do the actual load of old procedures.
589    st = System.nanoTime();
590    load(abortOnCorruption);
591    et = System.nanoTime();
592    LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
593      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
594  }
595
596  /**
597   * Start the workers.
598   */
599  public void startWorkers() throws IOException {
600    if (!running.compareAndSet(false, true)) {
601      LOG.warn("Already running");
602      return;
603    }
604    // Start the executors. Here we must have the lastProcId set.
605    LOG.trace("Start workers {}", workerThreads.size());
606    timeoutExecutor.start();
607    for (WorkerThread worker: workerThreads) {
608      worker.start();
609    }
610
611    // Internal chores
612    timeoutExecutor.add(new WorkerMonitor());
613
614    // Add completed cleaner chore
615    addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
616      nonceKeysToProcIdsMap));
617  }
618
619  public void stop() {
620    if (!running.getAndSet(false)) {
621      return;
622    }
623
624    LOG.info("Stopping");
625    scheduler.stop();
626    timeoutExecutor.sendStopSignal();
627  }
628
629  @VisibleForTesting
630  public void join() {
631    assert !isRunning() : "expected not running";
632
633    // stop the timeout executor
634    timeoutExecutor.awaitTermination();
635
636    // stop the worker threads
637    for (WorkerThread worker: workerThreads) {
638      worker.awaitTermination();
639    }
640
641    // Destroy the Thread Group for the executors
642    // TODO: Fix. #join is not place to destroy resources.
643    try {
644      threadGroup.destroy();
645    } catch (IllegalThreadStateException e) {
646      LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT",
647          this.threadGroup, e.getMessage());
648      // This dumps list of threads on STDOUT.
649      this.threadGroup.list();
650    }
651
652    // reset the in-memory state for testing
653    completed.clear();
654    rollbackStack.clear();
655    procedures.clear();
656    nonceKeysToProcIdsMap.clear();
657    scheduler.clear();
658    lastProcId.set(-1);
659  }
660
661  public void refreshConfiguration(final Configuration conf) {
662    this.conf = conf;
663    setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY,
664        DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS);
665  }
666
667  // ==========================================================================
668  //  Accessors
669  // ==========================================================================
670  public boolean isRunning() {
671    return running.get();
672  }
673
674  /**
675   * @return the current number of worker threads.
676   */
677  public int getWorkerThreadCount() {
678    return workerThreads.size();
679  }
680
681  /**
682   * @return the core pool size settings.
683   */
684  public int getCorePoolSize() {
685    return corePoolSize;
686  }
687
688  public int getActiveExecutorCount() {
689    return activeExecutorCount.get();
690  }
691
692  public TEnvironment getEnvironment() {
693    return this.environment;
694  }
695
696  public ProcedureStore getStore() {
697    return this.store;
698  }
699
700  ProcedureScheduler getScheduler() {
701    return scheduler;
702  }
703
704  public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
705    this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
706    this.scheduler.signalAll();
707  }
708
709  public long getKeepAliveTime(final TimeUnit timeUnit) {
710    return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
711  }
712
713  // ==========================================================================
714  //  Submit/Remove Chores
715  // ==========================================================================
716
717  /**
718   * Add a chore procedure to the executor
719   * @param chore the chore to add
720   */
721  public void addChore(ProcedureInMemoryChore<TEnvironment> chore) {
722    chore.setState(ProcedureState.WAITING_TIMEOUT);
723    timeoutExecutor.add(chore);
724  }
725
726  /**
727   * Remove a chore procedure from the executor
728   * @param chore the chore to remove
729   * @return whether the chore is removed, or it will be removed later
730   */
731  public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) {
732    chore.setState(ProcedureState.SUCCESS);
733    return timeoutExecutor.remove(chore);
734  }
735
736  // ==========================================================================
737  //  Nonce Procedure helpers
738  // ==========================================================================
739  /**
740   * Create a NonceKey from the specified nonceGroup and nonce.
741   * @param nonceGroup the group to use for the {@link NonceKey}
742   * @param nonce the nonce to use in the {@link NonceKey}
743   * @return the generated NonceKey
744   */
745  public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
746    return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
747  }
748
749  /**
750   * Register a nonce for a procedure that is going to be submitted.
751   * A procId will be reserved and on submitProcedure(),
752   * the procedure with the specified nonce will take the reserved ProcId.
753   * If someone already reserved the nonce, this method will return the procId reserved,
754   * otherwise an invalid procId will be returned. and the caller should procede
755   * and submit the procedure.
756   *
757   * @param nonceKey A unique identifier for this operation from the client or process.
758   * @return the procId associated with the nonce, if any otherwise an invalid procId.
759   */
760  public long registerNonce(final NonceKey nonceKey) {
761    if (nonceKey == null) {
762      return -1;
763    }
764
765    // check if we have already a Reserved ID for the nonce
766    Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
767    if (oldProcId == null) {
768      // reserve a new Procedure ID, this will be associated with the nonce
769      // and the procedure submitted with the specified nonce will use this ID.
770      final long newProcId = nextProcId();
771      oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
772      if (oldProcId == null) {
773        return -1;
774      }
775    }
776
777    // we found a registered nonce, but the procedure may not have been submitted yet.
778    // since the client expect the procedure to be submitted, spin here until it is.
779    final boolean traceEnabled = LOG.isTraceEnabled();
780    while (isRunning() &&
781           !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
782           nonceKeysToProcIdsMap.containsKey(nonceKey)) {
783      if (traceEnabled) {
784        LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
785      }
786      Threads.sleep(100);
787    }
788    return oldProcId.longValue();
789  }
790
791  /**
792   * Remove the NonceKey if the procedure was not submitted to the executor.
793   * @param nonceKey A unique identifier for this operation from the client or process.
794   */
795  public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
796    if (nonceKey == null) {
797      return;
798    }
799
800    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
801    if (procId == null) {
802      return;
803    }
804
805    // if the procedure was not submitted, remove the nonce
806    if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
807      nonceKeysToProcIdsMap.remove(nonceKey);
808    }
809  }
810
811  /**
812   * If the failure failed before submitting it, we may want to give back the
813   * same error to the requests with the same nonceKey.
814   *
815   * @param nonceKey A unique identifier for this operation from the client or process
816   * @param procName name of the procedure, used to inform the user
817   * @param procOwner name of the owner of the procedure, used to inform the user
818   * @param exception the failure to report to the user
819   */
820  public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
821      IOException exception) {
822    if (nonceKey == null) {
823      return;
824    }
825
826    Long procId = nonceKeysToProcIdsMap.get(nonceKey);
827    if (procId == null || completed.containsKey(procId)) {
828      return;
829    }
830
831    completed.computeIfAbsent(procId, (key) -> {
832      Procedure<TEnvironment> proc = new FailedProcedure<>(procId.longValue(),
833          procName, procOwner, nonceKey, exception);
834
835      return new CompletedProcedureRetainer<>(proc);
836    });
837  }
838
839  // ==========================================================================
840  //  Submit/Abort Procedure
841  // ==========================================================================
842  /**
843   * Add a new root-procedure to the executor.
844   * @param proc the new procedure to execute.
845   * @return the procedure id, that can be used to monitor the operation
846   */
847  public long submitProcedure(Procedure<TEnvironment> proc) {
848    return submitProcedure(proc, null);
849  }
850
851  /**
852   * Bypass a procedure. If the procedure is set to bypass, all the logic in
853   * execute/rollback will be ignored and it will return success, whatever.
854   * It is used to recover buggy stuck procedures, releasing the lock resources
855   * and letting other procedures run. Bypassing one procedure (and its ancestors will
856   * be bypassed automatically) may leave the cluster in a middle state, e.g. region
857   * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures,
858   * the operators may have to do some clean up on hdfs or schedule some assign procedures
859   * to let region online. DO AT YOUR OWN RISK.
860   * <p>
861   * A procedure can be bypassed only if
862   * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT
863   * or it is a root procedure without any child.
864   * 2. No other worker thread is executing it
865   * 3. No child procedure has been submitted
866   *
867   * <p>
868   * If all the requirements are meet, the procedure and its ancestors will be
869   * bypassed and persisted to WAL.
870   *
871   * <p>
872   * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
873   * TODO: What about WAITING_TIMEOUT?
874   * @param pids the procedure id
875   * @param lockWait time to wait lock
876   * @param force if force set to true, we will bypass the procedure even if it is executing.
877   *              This is for procedures which can't break out during executing(due to bug, mostly)
878   *              In this case, bypassing the procedure is not enough, since it is already stuck
879   *              there. We need to restart the master after bypassing, and letting the problematic
880   *              procedure to execute wth bypass=true, so in that condition, the procedure can be
881   *              successfully bypassed.
882   * @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
883   * @return true if bypass success
884   * @throws IOException IOException
885   */
886  public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,
887      boolean recursive)
888      throws IOException {
889    List<Boolean> result = new ArrayList<Boolean>(pids.size());
890    for(long pid: pids) {
891      result.add(bypassProcedure(pid, lockWait, force, recursive));
892    }
893    return result;
894  }
895
896  boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)
897      throws IOException {
898    Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
899    final Procedure<TEnvironment> procedure = getProcedure(pid);
900    if (procedure == null) {
901      LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
902      return false;
903    }
904
905    LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}",
906        procedure, lockWait, override, recursive);
907    IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
908    if (lockEntry == null && !override) {
909      LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}",
910          lockWait, procedure, override);
911      return false;
912    } else if (lockEntry == null) {
913      LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}",
914          lockWait, procedure, override);
915    }
916    try {
917      // check whether the procedure is already finished
918      if (procedure.isFinished()) {
919        LOG.debug("{} is already finished, skipping bypass", procedure);
920        return false;
921      }
922
923      if (procedure.hasChildren()) {
924        if (recursive) {
925          // EXPENSIVE. Checks each live procedure of which there could be many!!!
926          // Is there another way to get children of a procedure?
927          LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());
928          this.procedures.forEachValue(1 /*Single-threaded*/,
929            // Transformer
930            v -> v.getParentProcId() == procedure.getProcId()? v: null,
931            // Consumer
932            v -> {
933              try {
934                bypassProcedure(v.getProcId(), lockWait, override, recursive);
935              } catch (IOException e) {
936                LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
937              }
938            });
939        } else {
940          LOG.debug("{} has children, skipping bypass", procedure);
941          return false;
942        }
943      }
944
945      // If the procedure has no parent or no child, we are safe to bypass it in whatever state
946      if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
947          && procedure.getState() != ProcedureState.WAITING
948          && procedure.getState() != ProcedureState.WAITING_TIMEOUT) {
949        LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
950                + "(with no parent), {}",
951            procedure);
952        // Question: how is the bypass done here?
953        return false;
954      }
955
956      // Now, the procedure is not finished, and no one can execute it since we take the lock now
957      // And we can be sure that its ancestor is not running too, since their child has not
958      // finished yet
959      Procedure<TEnvironment> current = procedure;
960      while (current != null) {
961        LOG.debug("Bypassing {}", current);
962        current.bypass(getEnvironment());
963        store.update(procedure);
964        long parentID = current.getParentProcId();
965        current = getProcedure(parentID);
966      }
967
968      //wake up waiting procedure, already checked there is no child
969      if (procedure.getState() == ProcedureState.WAITING) {
970        procedure.setState(ProcedureState.RUNNABLE);
971        store.update(procedure);
972      }
973
974      // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
975      // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
976      if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
977        LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
978        if (timeoutExecutor.remove(procedure)) {
979          LOG.debug("removed procedure {} from timeoutExecutor", procedure);
980          timeoutExecutor.executeTimedoutProcedure(procedure);
981        }
982      } else if (lockEntry != null) {
983        scheduler.addFront(procedure);
984        LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
985      } else {
986        // If we don't have the lock, we can't re-submit the queue,
987        // since it is already executing. To get rid of the stuck situation, we
988        // need to restart the master. With the procedure set to bypass, the procedureExecutor
989        // will bypass it and won't get stuck again.
990        LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
991            + "skipping add to queue",
992          procedure);
993      }
994      return true;
995
996    } finally {
997      if (lockEntry != null) {
998        procExecutionLock.releaseLockEntry(lockEntry);
999      }
1000    }
1001  }
1002
1003  /**
1004   * Add a new root-procedure to the executor.
1005   * @param proc the new procedure to execute.
1006   * @param nonceKey the registered unique identifier for this operation from the client or process.
1007   * @return the procedure id, that can be used to monitor the operation
1008   */
1009  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
1010      justification = "FindBugs is blind to the check-for-null")
1011  public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
1012    Preconditions.checkArgument(lastProcId.get() >= 0);
1013
1014    prepareProcedure(proc);
1015
1016    final Long currentProcId;
1017    if (nonceKey != null) {
1018      currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
1019      Preconditions.checkArgument(currentProcId != null,
1020        "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);
1021    } else {
1022      currentProcId = nextProcId();
1023    }
1024
1025    // Initialize the procedure
1026    proc.setNonceKey(nonceKey);
1027    proc.setProcId(currentProcId.longValue());
1028
1029    // Commit the transaction
1030    store.insert(proc, null);
1031    LOG.debug("Stored {}", proc);
1032
1033    // Add the procedure to the executor
1034    return pushProcedure(proc);
1035  }
1036
1037  /**
1038   * Add a set of new root-procedure to the executor.
1039   * @param procs the new procedures to execute.
1040   */
1041  // TODO: Do we need to take nonces here?
1042  public void submitProcedures(Procedure<TEnvironment>[] procs) {
1043    Preconditions.checkArgument(lastProcId.get() >= 0);
1044    if (procs == null || procs.length <= 0) {
1045      return;
1046    }
1047
1048    // Prepare procedure
1049    for (int i = 0; i < procs.length; ++i) {
1050      prepareProcedure(procs[i]).setProcId(nextProcId());
1051    }
1052
1053    // Commit the transaction
1054    store.insert(procs);
1055    if (LOG.isDebugEnabled()) {
1056      LOG.debug("Stored " + Arrays.toString(procs));
1057    }
1058
1059    // Add the procedure to the executor
1060    for (int i = 0; i < procs.length; ++i) {
1061      pushProcedure(procs[i]);
1062    }
1063  }
1064
1065  private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
1066    Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
1067    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
1068    if (this.checkOwnerSet) {
1069      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
1070    }
1071    return proc;
1072  }
1073
1074  private long pushProcedure(Procedure<TEnvironment> proc) {
1075    final long currentProcId = proc.getProcId();
1076
1077    // Update metrics on start of a procedure
1078    proc.updateMetricsOnSubmit(getEnvironment());
1079
1080    // Create the rollback stack for the procedure
1081    RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
1082    rollbackStack.put(currentProcId, stack);
1083
1084    // Submit the new subprocedures
1085    assert !procedures.containsKey(currentProcId);
1086    procedures.put(currentProcId, proc);
1087    sendProcedureAddedNotification(currentProcId);
1088    scheduler.addBack(proc);
1089    return proc.getProcId();
1090  }
1091
1092  /**
1093   * Send an abort notification the specified procedure.
1094   * Depending on the procedure implementation the abort can be considered or ignored.
1095   * @param procId the procedure to abort
1096   * @return true if the procedure exists and has received the abort, otherwise false.
1097   */
1098  public boolean abort(long procId) {
1099    return abort(procId, true);
1100  }
1101
1102  /**
1103   * Send an abort notification to the specified procedure.
1104   * Depending on the procedure implementation, the abort can be considered or ignored.
1105   * @param procId the procedure to abort
1106   * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
1107   * @return true if the procedure exists and has received the abort, otherwise false.
1108   */
1109  public boolean abort(long procId, boolean mayInterruptIfRunning) {
1110    Procedure<TEnvironment> proc = procedures.get(procId);
1111    if (proc != null) {
1112      if (!mayInterruptIfRunning && proc.wasExecuted()) {
1113        return false;
1114      }
1115      return proc.abort(getEnvironment());
1116    }
1117    return false;
1118  }
1119
1120  // ==========================================================================
1121  //  Executor query helpers
1122  // ==========================================================================
1123  public Procedure<TEnvironment> getProcedure(final long procId) {
1124    return procedures.get(procId);
1125  }
1126
1127  public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
1128    Procedure<TEnvironment> proc = getProcedure(procId);
1129    if (clazz.isInstance(proc)) {
1130      return clazz.cast(proc);
1131    }
1132    return null;
1133  }
1134
1135  public Procedure<TEnvironment> getResult(long procId) {
1136    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1137    if (retainer == null) {
1138      return null;
1139    } else {
1140      return retainer.getProcedure();
1141    }
1142  }
1143
1144  /**
1145   * Return true if the procedure is finished.
1146   * The state may be "completed successfully" or "failed and rolledback".
1147   * Use getResult() to check the state or get the result data.
1148   * @param procId the ID of the procedure to check
1149   * @return true if the procedure execution is finished, otherwise false.
1150   */
1151  public boolean isFinished(final long procId) {
1152    return !procedures.containsKey(procId);
1153  }
1154
1155  /**
1156   * Return true if the procedure is started.
1157   * @param procId the ID of the procedure to check
1158   * @return true if the procedure execution is started, otherwise false.
1159   */
1160  public boolean isStarted(long procId) {
1161    Procedure<?> proc = procedures.get(procId);
1162    if (proc == null) {
1163      return completed.get(procId) != null;
1164    }
1165    return proc.wasExecuted();
1166  }
1167
1168  /**
1169   * Mark the specified completed procedure, as ready to remove.
1170   * @param procId the ID of the procedure to remove
1171   */
1172  public void removeResult(long procId) {
1173    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1174    if (retainer == null) {
1175      assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
1176      LOG.debug("pid={} already removed by the cleaner.", procId);
1177      return;
1178    }
1179
1180    // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
1181    retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
1182  }
1183
1184  public Procedure<TEnvironment> getResultOrProcedure(long procId) {
1185    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1186    if (retainer == null) {
1187      return procedures.get(procId);
1188    } else {
1189      return retainer.getProcedure();
1190    }
1191  }
1192
1193  /**
1194   * Check if the user is this procedure's owner
1195   * @param procId the target procedure
1196   * @param user the user
1197   * @return true if the user is the owner of the procedure,
1198   *   false otherwise or the owner is unknown.
1199   */
1200  public boolean isProcedureOwner(long procId, User user) {
1201    if (user == null) {
1202      return false;
1203    }
1204    final Procedure<TEnvironment> runningProc = procedures.get(procId);
1205    if (runningProc != null) {
1206      return runningProc.getOwner().equals(user.getShortName());
1207    }
1208
1209    final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1210    if (retainer != null) {
1211      return retainer.getProcedure().getOwner().equals(user.getShortName());
1212    }
1213
1214    // Procedure either does not exist or has already completed and got cleaned up.
1215    // At this time, we cannot check the owner of the procedure
1216    return false;
1217  }
1218
1219  /**
1220   * Should only be used when starting up, where the procedure workers have not been started.
1221   * <p/>
1222   * If the procedure works has been started, the return values maybe changed when you are
1223   * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
1224   * it will do a copy, and also include the finished procedures.
1225   */
1226  public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
1227    return procedures.values();
1228  }
1229
1230  /**
1231   * Get procedures.
1232   * @return the procedures in a list
1233   */
1234  public List<Procedure<TEnvironment>> getProcedures() {
1235    List<Procedure<TEnvironment>> procedureList =
1236      new ArrayList<>(procedures.size() + completed.size());
1237    procedureList.addAll(procedures.values());
1238    // Note: The procedure could show up twice in the list with different state, as
1239    // it could complete after we walk through procedures list and insert into
1240    // procedureList - it is ok, as we will use the information in the Procedure
1241    // to figure it out; to prevent this would increase the complexity of the logic.
1242    completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
1243      .forEach(procedureList::add);
1244    return procedureList;
1245  }
1246
1247  // ==========================================================================
1248  //  Listeners helpers
1249  // ==========================================================================
1250  public void registerListener(ProcedureExecutorListener listener) {
1251    this.listeners.add(listener);
1252  }
1253
1254  public boolean unregisterListener(ProcedureExecutorListener listener) {
1255    return this.listeners.remove(listener);
1256  }
1257
1258  private void sendProcedureLoadedNotification(final long procId) {
1259    if (!this.listeners.isEmpty()) {
1260      for (ProcedureExecutorListener listener: this.listeners) {
1261        try {
1262          listener.procedureLoaded(procId);
1263        } catch (Throwable e) {
1264          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1265        }
1266      }
1267    }
1268  }
1269
1270  private void sendProcedureAddedNotification(final long procId) {
1271    if (!this.listeners.isEmpty()) {
1272      for (ProcedureExecutorListener listener: this.listeners) {
1273        try {
1274          listener.procedureAdded(procId);
1275        } catch (Throwable e) {
1276          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1277        }
1278      }
1279    }
1280  }
1281
1282  private void sendProcedureFinishedNotification(final long procId) {
1283    if (!this.listeners.isEmpty()) {
1284      for (ProcedureExecutorListener listener: this.listeners) {
1285        try {
1286          listener.procedureFinished(procId);
1287        } catch (Throwable e) {
1288          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1289        }
1290      }
1291    }
1292  }
1293
1294  // ==========================================================================
1295  //  Procedure IDs helpers
1296  // ==========================================================================
1297  private long nextProcId() {
1298    long procId = lastProcId.incrementAndGet();
1299    if (procId < 0) {
1300      while (!lastProcId.compareAndSet(procId, 0)) {
1301        procId = lastProcId.get();
1302        if (procId >= 0) {
1303          break;
1304        }
1305      }
1306      while (procedures.containsKey(procId)) {
1307        procId = lastProcId.incrementAndGet();
1308      }
1309    }
1310    assert procId >= 0 : "Invalid procId " + procId;
1311    return procId;
1312  }
1313
1314  @VisibleForTesting
1315  protected long getLastProcId() {
1316    return lastProcId.get();
1317  }
1318
1319  @VisibleForTesting
1320  public Set<Long> getActiveProcIds() {
1321    return procedures.keySet();
1322  }
1323
1324  Long getRootProcedureId(Procedure<TEnvironment> proc) {
1325    return Procedure.getRootProcedureId(procedures, proc);
1326  }
1327
1328  // ==========================================================================
1329  //  Executions
1330  // ==========================================================================
1331  private void executeProcedure(Procedure<TEnvironment> proc) {
1332    if (proc.isFinished()) {
1333      LOG.debug("{} is already finished, skipping execution", proc);
1334      return;
1335    }
1336    final Long rootProcId = getRootProcedureId(proc);
1337    if (rootProcId == null) {
1338      // The 'proc' was ready to run but the root procedure was rolledback
1339      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
1340      executeRollback(proc);
1341      return;
1342    }
1343
1344    RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
1345    if (procStack == null) {
1346      LOG.warn("RootProcedureState is null for " + proc.getProcId());
1347      return;
1348    }
1349    do {
1350      // Try to acquire the execution
1351      if (!procStack.acquire(proc)) {
1352        if (procStack.setRollback()) {
1353          // we have the 'rollback-lock' we can start rollingback
1354          switch (executeRollback(rootProcId, procStack)) {
1355            case LOCK_ACQUIRED:
1356              break;
1357            case LOCK_YIELD_WAIT:
1358              procStack.unsetRollback();
1359              scheduler.yield(proc);
1360              break;
1361            case LOCK_EVENT_WAIT:
1362              LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
1363              procStack.unsetRollback();
1364              break;
1365            default:
1366              throw new UnsupportedOperationException();
1367          }
1368        } else {
1369          // if we can't rollback means that some child is still running.
1370          // the rollback will be executed after all the children are done.
1371          // If the procedure was never executed, remove and mark it as rolledback.
1372          if (!proc.wasExecuted()) {
1373            switch (executeRollback(proc)) {
1374              case LOCK_ACQUIRED:
1375                break;
1376              case LOCK_YIELD_WAIT:
1377                scheduler.yield(proc);
1378                break;
1379              case LOCK_EVENT_WAIT:
1380                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
1381                break;
1382              default:
1383                throw new UnsupportedOperationException();
1384            }
1385          }
1386        }
1387        break;
1388      }
1389
1390      // Execute the procedure
1391      assert proc.getState() == ProcedureState.RUNNABLE : proc;
1392      // Note that lock is NOT about concurrency but rather about ensuring
1393      // ownership of a procedure of an entity such as a region or table
1394      LockState lockState = acquireLock(proc);
1395      switch (lockState) {
1396        case LOCK_ACQUIRED:
1397          execProcedure(procStack, proc);
1398          break;
1399        case LOCK_YIELD_WAIT:
1400          LOG.info(lockState + " " + proc);
1401          scheduler.yield(proc);
1402          break;
1403        case LOCK_EVENT_WAIT:
1404          // Someone will wake us up when the lock is available
1405          LOG.debug(lockState + " " + proc);
1406          break;
1407        default:
1408          throw new UnsupportedOperationException();
1409      }
1410      procStack.release(proc);
1411
1412      if (proc.isSuccess()) {
1413        // update metrics on finishing the procedure
1414        proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
1415        LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
1416        // Finalize the procedure state
1417        if (proc.getProcId() == rootProcId) {
1418          procedureFinished(proc);
1419        } else {
1420          execCompletionCleanup(proc);
1421        }
1422        break;
1423      }
1424    } while (procStack.isFailed());
1425  }
1426
1427  private LockState acquireLock(Procedure<TEnvironment> proc) {
1428    TEnvironment env = getEnvironment();
1429    // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
1430    // hasLock is true.
1431    if (proc.hasLock()) {
1432      return LockState.LOCK_ACQUIRED;
1433    }
1434    return proc.doAcquireLock(env, store);
1435  }
1436
1437  private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
1438    TEnvironment env = getEnvironment();
1439    // For how the framework works, we know that we will always have the lock
1440    // when we call releaseLock(), so we can avoid calling proc.hasLock()
1441    if (force || !proc.holdLock(env) || proc.isFinished()) {
1442      proc.doReleaseLock(env, store);
1443    }
1444  }
1445
1446  /**
1447   * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
1448   * root-procedure will be visible as finished to user, and the result will be the fatal exception.
1449   */
1450  private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
1451    Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
1452    RemoteProcedureException exception = rootProc.getException();
1453    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
1454    // rolling back because the subprocedure does. Clarify.
1455    if (exception == null) {
1456      exception = procStack.getException();
1457      rootProc.setFailure(exception);
1458      store.update(rootProc);
1459    }
1460
1461    List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
1462    assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
1463
1464    int stackTail = subprocStack.size();
1465    while (stackTail-- > 0) {
1466      Procedure<TEnvironment> proc = subprocStack.get(stackTail);
1467      IdLock.Entry lockEntry = null;
1468      // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
1469      // this check, as the worker will hold the lock before executing a procedure. This is the only
1470      // place where we may hold two procedure execution locks, and there is a fence in the
1471      // RootProcedureState where we can make sure that only one worker can execute the rollback of
1472      // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
1473      // prevent race between us and the force update thread.
1474      if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
1475        try {
1476          lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
1477        } catch (IOException e) {
1478          // can only happen if interrupted, so not a big deal to propagate it
1479          throw new UncheckedIOException(e);
1480        }
1481      }
1482      try {
1483        // For the sub procedures which are successfully finished, we do not rollback them.
1484        // Typically, if we want to rollback a procedure, we first need to rollback it, and then
1485        // recursively rollback its ancestors. The state changes which are done by sub procedures
1486        // should be handled by parent procedures when rolling back. For example, when rolling back
1487        // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
1488        // online, instead of rolling back the original procedures which offlined the regions(in
1489        // fact these procedures can not be rolled back...).
1490        if (proc.isSuccess()) {
1491          // Just do the cleanup work, without actually executing the rollback
1492          subprocStack.remove(stackTail);
1493          cleanupAfterRollbackOneStep(proc);
1494          continue;
1495        }
1496        LockState lockState = acquireLock(proc);
1497        if (lockState != LockState.LOCK_ACQUIRED) {
1498          // can't take a lock on the procedure, add the root-proc back on the
1499          // queue waiting for the lock availability
1500          return lockState;
1501        }
1502
1503        lockState = executeRollback(proc);
1504        releaseLock(proc, false);
1505        boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
1506        abortRollback |= !isRunning() || !store.isRunning();
1507
1508        // allows to kill the executor before something is stored to the wal.
1509        // useful to test the procedure recovery.
1510        if (abortRollback) {
1511          return lockState;
1512        }
1513
1514        subprocStack.remove(stackTail);
1515
1516        // if the procedure is kind enough to pass the slot to someone else, yield
1517        // if the proc is already finished, do not yield
1518        if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
1519          return LockState.LOCK_YIELD_WAIT;
1520        }
1521
1522        if (proc != rootProc) {
1523          execCompletionCleanup(proc);
1524        }
1525      } finally {
1526        if (lockEntry != null) {
1527          procExecutionLock.releaseLockEntry(lockEntry);
1528        }
1529      }
1530    }
1531
1532    // Finalize the procedure state
1533    LOG.info("Rolled back {} exec-time={}", rootProc,
1534      StringUtils.humanTimeDiff(rootProc.elapsedTime()));
1535    procedureFinished(rootProc);
1536    return LockState.LOCK_ACQUIRED;
1537  }
1538
1539  private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {
1540    if (proc.removeStackIndex()) {
1541      if (!proc.isSuccess()) {
1542        proc.setState(ProcedureState.ROLLEDBACK);
1543      }
1544
1545      // update metrics on finishing the procedure (fail)
1546      proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);
1547
1548      if (proc.hasParent()) {
1549        store.delete(proc.getProcId());
1550        procedures.remove(proc.getProcId());
1551      } else {
1552        final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
1553        if (childProcIds != null) {
1554          store.delete(proc, childProcIds);
1555        } else {
1556          store.update(proc);
1557        }
1558      }
1559    } else {
1560      store.update(proc);
1561    }
1562  }
1563
1564  /**
1565   * Execute the rollback of the procedure step.
1566   * It updates the store with the new state (stack index)
1567   * or will remove completly the procedure in case it is a child.
1568   */
1569  private LockState executeRollback(Procedure<TEnvironment> proc) {
1570    try {
1571      proc.doRollback(getEnvironment());
1572    } catch (IOException e) {
1573      LOG.debug("Roll back attempt failed for {}", proc, e);
1574      return LockState.LOCK_YIELD_WAIT;
1575    } catch (InterruptedException e) {
1576      handleInterruptedException(proc, e);
1577      return LockState.LOCK_YIELD_WAIT;
1578    } catch (Throwable e) {
1579      // Catch NullPointerExceptions or similar errors...
1580      LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);
1581    }
1582
1583    // allows to kill the executor before something is stored to the wal.
1584    // useful to test the procedure recovery.
1585    if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
1586      String msg = "TESTING: Kill before store update";
1587      LOG.debug(msg);
1588      stop();
1589      throw new RuntimeException(msg);
1590    }
1591
1592    cleanupAfterRollbackOneStep(proc);
1593
1594    return LockState.LOCK_ACQUIRED;
1595  }
1596
1597  private void yieldProcedure(Procedure<TEnvironment> proc) {
1598    releaseLock(proc, false);
1599    scheduler.yield(proc);
1600  }
1601
1602  /**
1603   * Executes <code>procedure</code>
1604   * <ul>
1605   *  <li>Calls the doExecute() of the procedure
1606   *  <li>If the procedure execution didn't fail (i.e. valid user input)
1607   *  <ul>
1608   *    <li>...and returned subprocedures
1609   *    <ul><li>The subprocedures are initialized.
1610   *      <li>The subprocedures are added to the store
1611   *      <li>The subprocedures are added to the runnable queue
1612   *      <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
1613   *    </ul>
1614   *    </li>
1615   *   <li>...if there are no subprocedure
1616   *    <ul><li>the procedure completed successfully
1617   *      <li>if there is a parent (WAITING)
1618   *      <li>the parent state will be set to RUNNABLE
1619   *    </ul>
1620   *   </li>
1621   *  </ul>
1622   *  </li>
1623   *  <li>In case of failure
1624   *  <ul>
1625   *    <li>The store is updated with the new state</li>
1626   *    <li>The executor (caller of this method) will start the rollback of the procedure</li>
1627   *  </ul>
1628   *  </li>
1629   *  </ul>
1630   */
1631  private void execProcedure(RootProcedureState<TEnvironment> procStack,
1632      Procedure<TEnvironment> procedure) {
1633    Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
1634        "NOT RUNNABLE! " + procedure.toString());
1635
1636    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
1637    // The exception is caught below and then we hurry to the exit without disturbing state. The
1638    // idea is that the processing of this procedure will be unsuspended later by an external event
1639    // such the report of a region open.
1640    boolean suspended = false;
1641
1642    // Whether to 're-' -execute; run through the loop again.
1643    boolean reExecute = false;
1644
1645    Procedure<TEnvironment>[] subprocs = null;
1646    do {
1647      reExecute = false;
1648      procedure.resetPersistence();
1649      try {
1650        subprocs = procedure.doExecute(getEnvironment());
1651        if (subprocs != null && subprocs.length == 0) {
1652          subprocs = null;
1653        }
1654      } catch (ProcedureSuspendedException e) {
1655        LOG.trace("Suspend {}", procedure);
1656        suspended = true;
1657      } catch (ProcedureYieldException e) {
1658        LOG.trace("Yield {}", procedure, e);
1659        yieldProcedure(procedure);
1660        return;
1661      } catch (InterruptedException e) {
1662        LOG.trace("Yield interrupt {}", procedure, e);
1663        handleInterruptedException(procedure, e);
1664        yieldProcedure(procedure);
1665        return;
1666      } catch (Throwable e) {
1667        // Catch NullPointerExceptions or similar errors...
1668        String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
1669        LOG.error(msg, e);
1670        procedure.setFailure(new RemoteProcedureException(msg, e));
1671      }
1672
1673      if (!procedure.isFailed()) {
1674        if (subprocs != null) {
1675          if (subprocs.length == 1 && subprocs[0] == procedure) {
1676            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
1677            // i.e. we go around this loop again rather than go back out on the scheduler queue.
1678            subprocs = null;
1679            reExecute = true;
1680            LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
1681          } else {
1682            // Yield the current procedure, and make the subprocedure runnable
1683            // subprocs may come back 'null'.
1684            subprocs = initializeChildren(procStack, procedure, subprocs);
1685            LOG.info("Initialized subprocedures=" +
1686              (subprocs == null? null:
1687                Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
1688                collect(Collectors.toList()).toString()));
1689          }
1690        } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1691          LOG.trace("Added to timeoutExecutor {}", procedure);
1692          timeoutExecutor.add(procedure);
1693        } else if (!suspended) {
1694          // No subtask, so we are done
1695          procedure.setState(ProcedureState.SUCCESS);
1696        }
1697      }
1698
1699      // Add the procedure to the stack
1700      procStack.addRollbackStep(procedure);
1701
1702      // allows to kill the executor before something is stored to the wal.
1703      // useful to test the procedure recovery.
1704      if (testing != null &&
1705        testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())) {
1706        kill("TESTING: Kill BEFORE store update: " + procedure);
1707      }
1708
1709      // TODO: The code here doesn't check if store is running before persisting to the store as
1710      // it relies on the method call below to throw RuntimeException to wind up the stack and
1711      // executor thread to stop. The statement following the method call below seems to check if
1712      // store is not running, to prevent scheduling children procedures, re-execution or yield
1713      // of this procedure. This may need more scrutiny and subsequent cleanup in future
1714      //
1715      // Commit the transaction even if a suspend (state may have changed). Note this append
1716      // can take a bunch of time to complete.
1717      if (procedure.needPersistence()) {
1718        updateStoreOnExec(procStack, procedure, subprocs);
1719      }
1720
1721      // if the store is not running we are aborting
1722      if (!store.isRunning()) {
1723        return;
1724      }
1725      // if the procedure is kind enough to pass the slot to someone else, yield
1726      if (procedure.isRunnable() && !suspended &&
1727          procedure.isYieldAfterExecutionStep(getEnvironment())) {
1728        yieldProcedure(procedure);
1729        return;
1730      }
1731
1732      assert (reExecute && subprocs == null) || !reExecute;
1733    } while (reExecute);
1734
1735    // Allows to kill the executor after something is stored to the WAL but before the below
1736    // state settings are done -- in particular the one on the end where we make parent
1737    // RUNNABLE again when its children are done; see countDownChildren.
1738    if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
1739      kill("TESTING: Kill AFTER store update: " + procedure);
1740    }
1741
1742    // Submit the new subprocedures
1743    if (subprocs != null && !procedure.isFailed()) {
1744      submitChildrenProcedures(subprocs);
1745    }
1746
1747    // we need to log the release lock operation before waking up the parent procedure, as there
1748    // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
1749    // the sub procedures from store and cause problems...
1750    releaseLock(procedure, false);
1751
1752    // if the procedure is complete and has a parent, count down the children latch.
1753    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
1754    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
1755      countDownChildren(procStack, procedure);
1756    }
1757  }
1758
1759  private void kill(String msg) {
1760    LOG.debug(msg);
1761    stop();
1762    throw new RuntimeException(msg);
1763  }
1764
1765  private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
1766      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1767    assert subprocs != null : "expected subprocedures";
1768    final long rootProcId = getRootProcedureId(procedure);
1769    for (int i = 0; i < subprocs.length; ++i) {
1770      Procedure<TEnvironment> subproc = subprocs[i];
1771      if (subproc == null) {
1772        String msg = "subproc[" + i + "] is null, aborting the procedure";
1773        procedure.setFailure(new RemoteProcedureException(msg,
1774          new IllegalArgumentIOException(msg)));
1775        return null;
1776      }
1777
1778      assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
1779      subproc.setParentProcId(procedure.getProcId());
1780      subproc.setRootProcId(rootProcId);
1781      subproc.setProcId(nextProcId());
1782      procStack.addSubProcedure(subproc);
1783    }
1784
1785    if (!procedure.isFailed()) {
1786      procedure.setChildrenLatch(subprocs.length);
1787      switch (procedure.getState()) {
1788        case RUNNABLE:
1789          procedure.setState(ProcedureState.WAITING);
1790          break;
1791        case WAITING_TIMEOUT:
1792          timeoutExecutor.add(procedure);
1793          break;
1794        default:
1795          break;
1796      }
1797    }
1798    return subprocs;
1799  }
1800
1801  private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
1802    for (int i = 0; i < subprocs.length; ++i) {
1803      Procedure<TEnvironment> subproc = subprocs[i];
1804      subproc.updateMetricsOnSubmit(getEnvironment());
1805      assert !procedures.containsKey(subproc.getProcId());
1806      procedures.put(subproc.getProcId(), subproc);
1807      scheduler.addFront(subproc);
1808    }
1809  }
1810
1811  private void countDownChildren(RootProcedureState<TEnvironment> procStack,
1812      Procedure<TEnvironment> procedure) {
1813    Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
1814    if (parent == null) {
1815      assert procStack.isRollingback();
1816      return;
1817    }
1818
1819    // If this procedure is the last child awake the parent procedure
1820    if (parent.tryRunnable()) {
1821      // If we succeeded in making the parent runnable -- i.e. all of its
1822      // children have completed, move parent to front of the queue.
1823      store.update(parent);
1824      scheduler.addFront(parent);
1825      LOG.info("Finished subprocedure(s) of " + parent + "; resume parent processing.");
1826      return;
1827    }
1828  }
1829
1830  private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
1831      Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1832    if (subprocs != null && !procedure.isFailed()) {
1833      if (LOG.isTraceEnabled()) {
1834        LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
1835      }
1836      store.insert(procedure, subprocs);
1837    } else {
1838      LOG.trace("Store update {}", procedure);
1839      if (procedure.isFinished() && !procedure.hasParent()) {
1840        // remove child procedures
1841        final long[] childProcIds = procStack.getSubprocedureIds();
1842        if (childProcIds != null) {
1843          store.delete(procedure, childProcIds);
1844          for (int i = 0; i < childProcIds.length; ++i) {
1845            procedures.remove(childProcIds[i]);
1846          }
1847        } else {
1848          store.update(procedure);
1849        }
1850      } else {
1851        store.update(procedure);
1852      }
1853    }
1854  }
1855
1856  private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
1857    LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
1858    // NOTE: We don't call Thread.currentThread().interrupt()
1859    // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
1860    // the InterruptedException. If the master is going down, we will be notified
1861    // and the executor/store will be stopped.
1862    // (The interrupted procedure will be retried on the next run)
1863  }
1864
1865  private void execCompletionCleanup(Procedure<TEnvironment> proc) {
1866    final TEnvironment env = getEnvironment();
1867    if (proc.hasLock()) {
1868      LOG.warn("Usually this should not happen, we will release the lock before if the procedure" +
1869        " is finished, even if the holdLock is true, arrive here means we have some holes where" +
1870        " we do not release the lock. And the releaseLock below may fail since the procedure may" +
1871        " have already been deleted from the procedure store.");
1872      releaseLock(proc, true);
1873    }
1874    try {
1875      proc.completionCleanup(env);
1876    } catch (Throwable e) {
1877      // Catch NullPointerExceptions or similar errors...
1878      LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
1879    }
1880  }
1881
1882  private void procedureFinished(Procedure<TEnvironment> proc) {
1883    // call the procedure completion cleanup handler
1884    execCompletionCleanup(proc);
1885
1886    CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
1887
1888    // update the executor internal state maps
1889    if (!proc.shouldWaitClientAck(getEnvironment())) {
1890      retainer.setClientAckTime(0);
1891    }
1892
1893    completed.put(proc.getProcId(), retainer);
1894    rollbackStack.remove(proc.getProcId());
1895    procedures.remove(proc.getProcId());
1896
1897    // call the runnableSet completion cleanup handler
1898    try {
1899      scheduler.completionCleanup(proc);
1900    } catch (Throwable e) {
1901      // Catch NullPointerExceptions or similar errors...
1902      LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
1903    }
1904
1905    // Notify the listeners
1906    sendProcedureFinishedNotification(proc.getProcId());
1907  }
1908
1909  RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
1910    return rollbackStack.get(rootProcId);
1911  }
1912
1913  @VisibleForTesting
1914  ProcedureScheduler getProcedureScheduler() {
1915    return scheduler;
1916  }
1917
1918  @VisibleForTesting
1919  int getCompletedSize() {
1920    return completed.size();
1921  }
1922
1923  @VisibleForTesting
1924  public IdLock getProcExecutionLock() {
1925    return procExecutionLock;
1926  }
1927
1928  // ==========================================================================
1929  //  Worker Thread
1930  // ==========================================================================
1931  private class WorkerThread extends StoppableThread {
1932    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
1933    private volatile Procedure<TEnvironment> activeProcedure;
1934
1935    public WorkerThread(ThreadGroup group) {
1936      this(group, "PEWorker-");
1937    }
1938
1939    protected WorkerThread(ThreadGroup group, String prefix) {
1940      super(group, prefix + workerId.incrementAndGet());
1941      setDaemon(true);
1942    }
1943
1944    @Override
1945    public void sendStopSignal() {
1946      scheduler.signalAll();
1947    }
1948    @Override
1949    public void run() {
1950      long lastUpdate = EnvironmentEdgeManager.currentTime();
1951      try {
1952        while (isRunning() && keepAlive(lastUpdate)) {
1953          @SuppressWarnings("unchecked")
1954          Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
1955          if (proc == null) {
1956            continue;
1957          }
1958          this.activeProcedure = proc;
1959          int activeCount = activeExecutorCount.incrementAndGet();
1960          int runningCount = store.setRunningProcedureCount(activeCount);
1961          LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
1962            runningCount, activeCount);
1963          executionStartTime.set(EnvironmentEdgeManager.currentTime());
1964          IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
1965          try {
1966            executeProcedure(proc);
1967          } catch (AssertionError e) {
1968            LOG.info("ASSERT pid=" + proc.getProcId(), e);
1969            throw e;
1970          } finally {
1971            procExecutionLock.releaseLockEntry(lockEntry);
1972            activeCount = activeExecutorCount.decrementAndGet();
1973            runningCount = store.setRunningProcedureCount(activeCount);
1974            LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
1975              runningCount, activeCount);
1976            this.activeProcedure = null;
1977            lastUpdate = EnvironmentEdgeManager.currentTime();
1978            executionStartTime.set(Long.MAX_VALUE);
1979          }
1980        }
1981      } catch (Throwable t) {
1982        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
1983      } finally {
1984        LOG.trace("Worker terminated.");
1985      }
1986      workerThreads.remove(this);
1987    }
1988
1989    @Override
1990    public String toString() {
1991      Procedure<?> p = this.activeProcedure;
1992      return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");
1993    }
1994
1995    /**
1996     * @return the time since the current procedure is running
1997     */
1998    public long getCurrentRunTime() {
1999      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
2000    }
2001
2002    // core worker never timeout
2003    protected boolean keepAlive(long lastUpdate) {
2004      return true;
2005    }
2006  }
2007
2008  // A worker thread which can be added when core workers are stuck. Will timeout after
2009  // keepAliveTime if there is no procedure to run.
2010  private final class KeepAliveWorkerThread extends WorkerThread {
2011    public KeepAliveWorkerThread(ThreadGroup group) {
2012      super(group, "KeepAlivePEWorker-");
2013    }
2014
2015    @Override
2016    protected boolean keepAlive(long lastUpdate) {
2017      return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
2018    }
2019  }
2020
2021  // ----------------------------------------------------------------------------
2022  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the
2023  // full set of procedures pending and completed to write a compacted
2024  // version of the log (in case is a log)?
2025  // In theory no, procedures are have a short life, so at some point the store
2026  // will have the tracker saying everything is in the last log.
2027  // ----------------------------------------------------------------------------
2028
2029  private final class WorkerMonitor extends InlineChore {
2030    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
2031        "hbase.procedure.worker.monitor.interval.msec";
2032    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
2033
2034    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
2035        "hbase.procedure.worker.stuck.threshold.msec";
2036    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
2037
2038    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =
2039        "hbase.procedure.worker.add.stuck.percentage";
2040    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck
2041
2042    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
2043    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
2044    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;
2045
2046    public WorkerMonitor() {
2047      refreshConfig();
2048    }
2049
2050    @Override
2051    public void run() {
2052      final int stuckCount = checkForStuckWorkers();
2053      checkThreadCount(stuckCount);
2054
2055      // refresh interval (poor man dynamic conf update)
2056      refreshConfig();
2057    }
2058
2059    private int checkForStuckWorkers() {
2060      // check if any of the worker is stuck
2061      int stuckCount = 0;
2062      for (WorkerThread worker : workerThreads) {
2063        if (worker.getCurrentRunTime() < stuckThreshold) {
2064          continue;
2065        }
2066
2067        // WARN the worker is stuck
2068        stuckCount++;
2069        LOG.warn("Worker stuck {}, run time {}", worker,
2070          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
2071      }
2072      return stuckCount;
2073    }
2074
2075    private void checkThreadCount(final int stuckCount) {
2076      // nothing to do if there are no runnable tasks
2077      if (stuckCount < 1 || !scheduler.hasRunnables()) {
2078        return;
2079      }
2080
2081      // add a new thread if the worker stuck percentage exceed the threshold limit
2082      // and every handler is active.
2083      final float stuckPerc = ((float) stuckCount) / workerThreads.size();
2084      // let's add new worker thread more aggressively, as they will timeout finally if there is no
2085      // work to do.
2086      if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
2087        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
2088        workerThreads.add(worker);
2089        worker.start();
2090        LOG.debug("Added new worker thread {}", worker);
2091      }
2092    }
2093
2094    private void refreshConfig() {
2095      addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,
2096          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);
2097      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,
2098        DEFAULT_WORKER_MONITOR_INTERVAL);
2099      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,
2100        DEFAULT_WORKER_STUCK_THRESHOLD);
2101    }
2102
2103    @Override
2104    public int getTimeoutInterval() {
2105      return timeoutInterval;
2106    }
2107  }
2108}