001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.List;
024import java.util.Map;
025import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
026import org.apache.hadoop.hbase.metrics.Counter;
027import org.apache.hadoop.hbase.metrics.Histogram;
028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
029import org.apache.hadoop.hbase.procedure2.util.StringUtils;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.hbase.util.NonceKey;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
040
041/**
042 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
043 * stack-indexes, etc.
044 * <p/>
045 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
046 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
047 * be called multiple times in the case of failure or a restart, so code must be idempotent. The
048 * return from an execute call is either: null to indicate we are done; ourself if there is more to
049 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes
050 * our execution.
051 * <p/>
052 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
053 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
054 * ProcedureState enum from protos:
055 * <ul>
056 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
057 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
058 * ROLLEDBACK state.</li>
059 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
060 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
061 * condition when scheduler/ executor will drop procedure from further processing is when procedure
062 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
063 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
064 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
065 * </ul>
066 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
067 * own state. This can lead to confusion. Try to keep the two distinct.
068 * <p/>
069 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
070 * step is supposed to cleanup the resources created during the execute() step. In case of failure
071 * and restart, rollback() may be called multiple times, so again the code must be idempotent.
072 * <p/>
073 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an
074 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
075 * locked for the life of a procedure -- not just the calls to execute -- then implementations
076 * should say so with the {@link #holdLock(Object)} method.
077 * <p/>
078 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
079 * implementation is a bit tricky so we add some comments hrre about it.
080 * <ul>
081 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
082 * whether we have the lock. We will set it to {@code true} in
083 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
084 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
085 * more.</li>
086 * <li>Also added a locked field in the proto message. When storing, the field will be set according
087 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
088 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
089 * message is {@code true}.</li>
090 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
091 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
092 * need to wait until master is initialized. So the solution here is that, we introduced a new
093 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
094 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
095 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
096 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
097 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
098 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
099 * {@code true}, when we just set the {@link #locked} field to true and return, without actually
100 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
101 * </ul>
102 * <p/>
103 * Procedures can be suspended or put in wait state with a callback that gets executed on
104 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and
105 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
106 * class for an example usage.
107 * </p>
108 * <p/>
109 * There are hooks for collecting metrics on submit of the procedure and on finish. See
110 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
111 */
112@InterfaceAudience.Private
113public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
114  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
115  public static final long NO_PROC_ID = -1;
116  protected static final int NO_TIMEOUT = -1;
117
118  public enum LockState {
119    LOCK_ACQUIRED,       // Lock acquired and ready to execute
120    LOCK_YIELD_WAIT,     // Lock not acquired, framework needs to yield
121    LOCK_EVENT_WAIT,     // Lock not acquired, an event will yield the procedure
122  }
123
124  // Unchanged after initialization
125  private NonceKey nonceKey = null;
126  private String owner = null;
127  private long parentProcId = NO_PROC_ID;
128  private long rootProcId = NO_PROC_ID;
129  private long procId = NO_PROC_ID;
130  private long submittedTime;
131
132  // Runtime state, updated every operation
133  private ProcedureState state = ProcedureState.INITIALIZING;
134  private RemoteProcedureException exception = null;
135  private int[] stackIndexes = null;
136  private int childrenLatch = 0;
137
138  private volatile int timeout = NO_TIMEOUT;
139  private volatile long lastUpdate;
140
141  private volatile byte[] result = null;
142
143  private volatile boolean locked = false;
144
145  private boolean lockedWhenLoading = false;
146
147  /**
148   * Used for override complete of the procedure without actually doing any logic in the procedure.
149   * If bypass is set to true, when executing it will return null when
150   * {@link #doExecute(Object)} is called to finish the procedure and release any locks
151   * it may currently hold. The bypass does cleanup around the Procedure as far as the
152   * Procedure framework is concerned. It does not clean any internal state that the
153   * Procedure's themselves may have set. That is for the Procedures to do themselves
154   * when bypass is called. They should override bypass and do their cleanup in the
155   * overridden bypass method (be sure to call the parent bypass to ensure proper
156   * processing).
157   * <p></p>Bypassing a procedure is not like aborting. Aborting a procedure will trigger
158   * a rollback. And since the {@link #abort(Object)} method is overrideable
159   * Some procedures may have chosen to ignore the aborting.
160   */
161  private volatile boolean bypass = false;
162
163  /**
164   * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to
165   * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the
166   * persistence of the procedure.
167   * <p/>
168   * This is useful when the procedure is in error and you want to retry later. The retry interval
169   * and the number of retries are usually not critical so skip the persistence can save some
170   * resources, and also speed up the restart processing.
171   * <p/>
172   * Notice that this value will be reset to true every time before execution. And when rolling back
173   * we do not test this value.
174   */
175  private boolean persist = true;
176
177  public boolean isBypass() {
178    return bypass;
179  }
180
181  /**
182   * Set the bypass to true.
183   * Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for
184   * now.
185   * DO NOT use this method alone, since we can't just bypass one single procedure. We need to
186   * bypass its ancestor too. If your Procedure has set state, it needs to undo it in here.
187   * @param env Current environment. May be null because of context; e.g. pretty-printing
188   *            procedure WALs where there is no 'environment' (and where Procedures that require
189   *            an 'environment' won't be run.
190   */
191  protected void bypass(TEnvironment env) {
192    this.bypass = true;
193  }
194
195  boolean needPersistence() {
196    return persist;
197  }
198
199  void resetPersistence() {
200    persist = true;
201  }
202
203  protected final void skipPersistence() {
204    persist = false;
205  }
206
207  /**
208   * The main code of the procedure. It must be idempotent since execute()
209   * may be called multiple times in case of machine failure in the middle
210   * of the execution.
211   * @param env the environment passed to the ProcedureExecutor
212   * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
213   * procedure is done.
214   * @throws ProcedureYieldException the procedure will be added back to the queue and retried later.
215   * @throws InterruptedException the procedure will be added back to the queue and retried later.
216   * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself and
217   * has set itself up waiting for an external event to wake it back up again.
218   */
219  protected abstract Procedure<TEnvironment>[] execute(TEnvironment env)
220    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
221
222  /**
223   * The code to undo what was done by the execute() code.
224   * It is called when the procedure or one of the sub-procedures failed or an
225   * abort was requested. It should cleanup all the resources created by
226   * the execute() call. The implementation must be idempotent since rollback()
227   * may be called multiple time in case of machine failure in the middle
228   * of the execution.
229   * @param env the environment passed to the ProcedureExecutor
230   * @throws IOException temporary failure, the rollback will retry later
231   * @throws InterruptedException the procedure will be added back to the queue and retried later
232   */
233  protected abstract void rollback(TEnvironment env)
234    throws IOException, InterruptedException;
235
236  /**
237   * The abort() call is asynchronous and each procedure must decide how to deal
238   * with it, if they want to be abortable. The simplest implementation
239   * is to have an AtomicBoolean set in the abort() method and then the execute()
240   * will check if the abort flag is set or not.
241   * abort() may be called multiple times from the client, so the implementation
242   * must be idempotent.
243   *
244   * <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification
245   * that allows the procedure implementor abort.
246   */
247  protected abstract boolean abort(TEnvironment env);
248
249  /**
250   * The user-level code of the procedure may have some state to
251   * persist (e.g. input arguments or current position in the processing state) to
252   * be able to resume on failure.
253   * @param serializer stores the serializable state
254   */
255  protected abstract void serializeStateData(ProcedureStateSerializer serializer)
256    throws IOException;
257
258  /**
259   * Called on store load to allow the user to decode the previously serialized
260   * state.
261   * @param serializer contains the serialized state
262   */
263  protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
264    throws IOException;
265
266  /**
267   * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
268   * call us to determine whether we need to wait for initialization, second, it will call
269   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
270   * <p/>
271   * This is because that when master restarts, we need to restore the lock state for all the
272   * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
273   * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
274   * of the initialization!), so we need to split the code into two steps, and when restore, we just
275   * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
276   * @return true means we need to wait until the environment has been initialized, otherwise true.
277   */
278  protected boolean waitInitialized(TEnvironment env) {
279    return false;
280  }
281
282  /**
283   * The user should override this method if they need a lock on an Entity. A lock can be anything,
284   * and it is up to the implementor. The Procedure Framework will call this method just before it
285   * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
286   * execute.
287   * <p/>
288   * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
289   * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
290   * <p/>
291   * Example: in our Master we can execute request in parallel for different tables. We can create
292   * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
293   * queued waiting that specific table create to happen.
294   * <p/>
295   * There are 3 LockState:
296   * <ul>
297   * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
298   * execute.</li>
299   * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
300   * take care of readding the procedure back to the runnable set for retry</li>
301   * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
302   * care of readding the procedure back to the runnable set when the lock is available.</li>
303   * </ul>
304   * @return the lock state as described above.
305   */
306  protected LockState acquireLock(TEnvironment env) {
307    return LockState.LOCK_ACQUIRED;
308  }
309
310  /**
311   * The user should override this method, and release lock if necessary.
312   */
313  protected void releaseLock(TEnvironment env) {
314    // no-op
315  }
316
317  /**
318   * Used to keep the procedure lock even when the procedure is yielding or suspended.
319   * @return true if the procedure should hold on the lock until completionCleanup()
320   */
321  protected boolean holdLock(TEnvironment env) {
322    return false;
323  }
324
325  /**
326   * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)}
327   * returns true, the procedure executor will call acquireLock() once and thereafter
328   * not call {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls
329   * release/acquire around each invocation of {@link #execute(Object)}.
330   * @see #holdLock(Object)
331   * @return true if the procedure has the lock, false otherwise.
332   */
333  public final boolean hasLock() {
334    return locked;
335  }
336
337  /**
338   * Called when the procedure is loaded for replay.
339   * The procedure implementor may use this method to perform some quick
340   * operation before replay.
341   * e.g. failing the procedure if the state on replay may be unknown.
342   */
343  protected void beforeReplay(TEnvironment env) {
344    // no-op
345  }
346
347  /**
348   * Called when the procedure is ready to be added to the queue after
349   * the loading/replay operation.
350   */
351  protected void afterReplay(TEnvironment env) {
352    // no-op
353  }
354
355  /**
356   * Called when the procedure is marked as completed (success or rollback).
357   * The procedure implementor may use this method to cleanup in-memory states.
358   * This operation will not be retried on failure. If a procedure took a lock,
359   * it will have been released when this method runs.
360   */
361  protected void completionCleanup(TEnvironment env) {
362    // no-op
363  }
364
365  /**
366   * By default, the procedure framework/executor will try to run procedures start to finish.
367   * Return true to make the executor yield between each execution step to
368   * give other procedures a chance to run.
369   * @param env the environment passed to the ProcedureExecutor
370   * @return Return true if the executor should yield on completion of an execution step.
371   *         Defaults to return false.
372   */
373  protected boolean isYieldAfterExecutionStep(TEnvironment env) {
374    return false;
375  }
376
377  /**
378   * By default, the executor will keep the procedure result around util
379   * the eviction TTL is expired. The client can cut down the waiting time
380   * by requesting that the result is removed from the executor.
381   * In case of system started procedure, we can force the executor to auto-ack.
382   * @param env the environment passed to the ProcedureExecutor
383   * @return true if the executor should wait the client ack for the result.
384   *         Defaults to return true.
385   */
386  protected boolean shouldWaitClientAck(TEnvironment env) {
387    return true;
388  }
389
390  /**
391   * Override this method to provide procedure specific counters for submitted count, failed
392   * count and time histogram.
393   * @param env The environment passed to the procedure executor
394   * @return Container object for procedure related metric
395   */
396  protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
397    return null;
398  }
399
400  /**
401   * This function will be called just when procedure is submitted for execution. Override this
402   * method to update the metrics at the beginning of the procedure. The default implementation
403   * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
404   * {@link ProcedureMetrics}.
405   */
406  protected void updateMetricsOnSubmit(TEnvironment env) {
407    ProcedureMetrics metrics = getProcedureMetrics(env);
408    if (metrics == null) {
409      return;
410    }
411
412    Counter submittedCounter = metrics.getSubmittedCounter();
413    if (submittedCounter != null) {
414      submittedCounter.increment();
415    }
416  }
417
418  /**
419   * This function will be called just after procedure execution is finished. Override this method
420   * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
421   * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
422   * time histogram for successfully completed procedures. Increments failed counter for failed
423   * procedures.
424   * <p/>
425   * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
426   * successfully finished siblings, this function may get called twice in certain cases for certain
427   * procedures. Explore further if this can be called once.
428   * @param env The environment passed to the procedure executor
429   * @param runtime Runtime of the procedure in milliseconds
430   * @param success true if procedure is completed successfully
431   */
432  protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
433    ProcedureMetrics metrics = getProcedureMetrics(env);
434    if (metrics == null) {
435      return;
436    }
437
438    if (success) {
439      Histogram timeHisto = metrics.getTimeHisto();
440      if (timeHisto != null) {
441        timeHisto.update(runtime);
442      }
443    } else {
444      Counter failedCounter = metrics.getFailedCounter();
445      if (failedCounter != null) {
446        failedCounter.increment();
447      }
448    }
449  }
450
451  @Override
452  public String toString() {
453    // Return the simple String presentation of the procedure.
454    return toStringSimpleSB().toString();
455  }
456
457  /**
458   * Build the StringBuilder for the simple form of procedure string.
459   * @return the StringBuilder
460   */
461  protected StringBuilder toStringSimpleSB() {
462    final StringBuilder sb = new StringBuilder();
463
464    sb.append("pid=");
465    sb.append(getProcId());
466
467    if (hasParent()) {
468      sb.append(", ppid=");
469      sb.append(getParentProcId());
470    }
471
472    /**
473     * TODO
474     * Enable later when this is being used.
475     * Currently owner not used.
476    if (hasOwner()) {
477      sb.append(", owner=");
478      sb.append(getOwner());
479    }*/
480
481    sb.append(", state="); // pState for Procedure State as opposed to any other kind.
482    toStringState(sb);
483
484    // Only print out locked if actually locked. Most of the time it is not.
485    if (this.locked) {
486      sb.append(", locked=").append(locked);
487    }
488
489    if (bypass) {
490      sb.append(", bypass=").append(bypass);
491    }
492
493    if (hasException()) {
494      sb.append(", exception=" + getException());
495    }
496
497    sb.append("; ");
498    toStringClassDetails(sb);
499
500    return sb;
501  }
502
503  /**
504   * Extend the toString() information with more procedure details
505   */
506  public String toStringDetails() {
507    final StringBuilder sb = toStringSimpleSB();
508
509    sb.append(" submittedTime=");
510    sb.append(getSubmittedTime());
511
512    sb.append(", lastUpdate=");
513    sb.append(getLastUpdate());
514
515    final int[] stackIndices = getStackIndexes();
516    if (stackIndices != null) {
517      sb.append("\n");
518      sb.append("stackIndexes=");
519      sb.append(Arrays.toString(stackIndices));
520    }
521
522    return sb.toString();
523  }
524
525  protected String toStringClass() {
526    StringBuilder sb = new StringBuilder();
527    toStringClassDetails(sb);
528    return sb.toString();
529  }
530
531  /**
532   * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
533   * generic Procedure State with Procedure particulars.
534   * @param builder Append current {@link ProcedureState}
535   */
536  protected void toStringState(StringBuilder builder) {
537    builder.append(getState());
538  }
539
540  /**
541   * Extend the toString() information with the procedure details
542   * e.g. className and parameters
543   * @param builder the string builder to use to append the proc specific information
544   */
545  protected void toStringClassDetails(StringBuilder builder) {
546    builder.append(getClass().getName());
547  }
548
549  // ==========================================================================
550  //  Those fields are unchanged after initialization.
551  //
552  //  Each procedure will get created from the user or during
553  //  ProcedureExecutor.start() during the load() phase and then submitted
554  //  to the executor. these fields will never be changed after initialization
555  // ==========================================================================
556  public long getProcId() {
557    return procId;
558  }
559
560  public boolean hasParent() {
561    return parentProcId != NO_PROC_ID;
562  }
563
564  public long getParentProcId() {
565    return parentProcId;
566  }
567
568  public long getRootProcId() {
569    return rootProcId;
570  }
571
572  public String getProcName() {
573    return toStringClass();
574  }
575
576  public NonceKey getNonceKey() {
577    return nonceKey;
578  }
579
580  public long getSubmittedTime() {
581    return submittedTime;
582  }
583
584  public String getOwner() {
585    return owner;
586  }
587
588  public boolean hasOwner() {
589    return owner != null;
590  }
591
592  /**
593   * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
594   */
595  @VisibleForTesting
596  protected void setProcId(long procId) {
597    this.procId = procId;
598    this.submittedTime = EnvironmentEdgeManager.currentTime();
599    setState(ProcedureState.RUNNABLE);
600  }
601
602  /**
603   * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
604   */
605  protected void setParentProcId(long parentProcId) {
606    this.parentProcId = parentProcId;
607  }
608
609  protected void setRootProcId(long rootProcId) {
610    this.rootProcId = rootProcId;
611  }
612
613  /**
614   * Called by the ProcedureExecutor to set the value to the newly created procedure.
615   */
616  @VisibleForTesting
617  protected void setNonceKey(NonceKey nonceKey) {
618    this.nonceKey = nonceKey;
619  }
620
621  @VisibleForTesting
622  public void setOwner(String owner) {
623    this.owner = StringUtils.isEmpty(owner) ? null : owner;
624  }
625
626  public void setOwner(User owner) {
627    assert owner != null : "expected owner to be not null";
628    setOwner(owner.getShortName());
629  }
630
631  /**
632   * Called on store load to initialize the Procedure internals after
633   * the creation/deserialization.
634   */
635  protected void setSubmittedTime(long submittedTime) {
636    this.submittedTime = submittedTime;
637  }
638
639  // ==========================================================================
640  //  runtime state - timeout related
641  // ==========================================================================
642  /**
643   * @param timeout timeout interval in msec
644   */
645  protected void setTimeout(int timeout) {
646    this.timeout = timeout;
647  }
648
649  public boolean hasTimeout() {
650    return timeout != NO_TIMEOUT;
651  }
652
653  /**
654   * @return the timeout in msec
655   */
656  public int getTimeout() {
657    return timeout;
658  }
659
660  /**
661   * Called on store load to initialize the Procedure internals after
662   * the creation/deserialization.
663   */
664  protected void setLastUpdate(long lastUpdate) {
665    this.lastUpdate = lastUpdate;
666  }
667
668  /**
669   * Called by ProcedureExecutor after each time a procedure step is executed.
670   */
671  protected void updateTimestamp() {
672    this.lastUpdate = EnvironmentEdgeManager.currentTime();
673  }
674
675  public long getLastUpdate() {
676    return lastUpdate;
677  }
678
679  /**
680   * Timeout of the next timeout.
681   * Called by the ProcedureExecutor if the procedure has timeout set and
682   * the procedure is in the waiting queue.
683   * @return the timestamp of the next timeout.
684   */
685  protected long getTimeoutTimestamp() {
686    return getLastUpdate() + getTimeout();
687  }
688
689  // ==========================================================================
690  //  runtime state
691  // ==========================================================================
692  /**
693   * @return the time elapsed between the last update and the start time of the procedure.
694   */
695  public long elapsedTime() {
696    return getLastUpdate() - getSubmittedTime();
697  }
698
699  /**
700   * @return the serialized result if any, otherwise null
701   */
702  public byte[] getResult() {
703    return result;
704  }
705
706  /**
707   * The procedure may leave a "result" on completion.
708   * @param result the serialized result that will be passed to the client
709   */
710  protected void setResult(byte[] result) {
711    this.result = result;
712  }
713
714  /**
715   * Will only be called when loading procedures from procedure store, where we need to record
716   * whether the procedure has already held a lock. Later we will call
717   * {@link #restoreLock(Object, ProcedureStore)} to actually acquire the lock.
718   */
719  final void lockedWhenLoading() {
720    this.lockedWhenLoading = true;
721  }
722
723  /**
724   * Can only be called when restarting, before the procedure actually being executed, as after we
725   * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset
726   * {@link #lockedWhenLoading} to false.
727   * <p/>
728   * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
729   * front of a queue.
730   */
731  public boolean isLockedWhenLoading() {
732    return lockedWhenLoading;
733  }
734
735  // ==============================================================================================
736  //  Runtime state, updated every operation by the ProcedureExecutor
737  //
738  //  There is always 1 thread at the time operating on the state of the procedure.
739  //  The ProcedureExecutor may check and set states, or some Procecedure may
740  //  update its own state. but no concurrent updates. we use synchronized here
741  //  just because the procedure can get scheduled on different executor threads on each step.
742  // ==============================================================================================
743
744  /**
745   * @return true if the procedure is in a RUNNABLE state.
746   */
747  public synchronized boolean isRunnable() {
748    return state == ProcedureState.RUNNABLE;
749  }
750
751  public synchronized boolean isInitializing() {
752    return state == ProcedureState.INITIALIZING;
753  }
754
755  /**
756   * @return true if the procedure has failed. It may or may not have rolled back.
757   */
758  public synchronized boolean isFailed() {
759    return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
760  }
761
762  /**
763   * @return true if the procedure is finished successfully.
764   */
765  public synchronized boolean isSuccess() {
766    return state == ProcedureState.SUCCESS && !hasException();
767  }
768
769  /**
770   * @return true if the procedure is finished. The Procedure may be completed successfully or
771   * rolledback.
772   */
773  public synchronized boolean isFinished() {
774    return isSuccess() || state == ProcedureState.ROLLEDBACK;
775  }
776
777  /**
778   * @return true if the procedure is waiting for a child to finish or for an external event.
779   */
780  public synchronized boolean isWaiting() {
781    switch (state) {
782      case WAITING:
783      case WAITING_TIMEOUT:
784        return true;
785      default:
786        break;
787    }
788    return false;
789  }
790
791  @VisibleForTesting
792  protected synchronized void setState(final ProcedureState state) {
793    this.state = state;
794    updateTimestamp();
795  }
796
797  public synchronized ProcedureState getState() {
798    return state;
799  }
800
801  protected void setFailure(final String source, final Throwable cause) {
802    setFailure(new RemoteProcedureException(source, cause));
803  }
804
805  protected synchronized void setFailure(final RemoteProcedureException exception) {
806    this.exception = exception;
807    if (!isFinished()) {
808      setState(ProcedureState.FAILED);
809    }
810  }
811
812  protected void setAbortFailure(final String source, final String msg) {
813    setFailure(source, new ProcedureAbortedException(msg));
814  }
815
816  /**
817   * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
818   * <p/>
819   * Another usage for this method is to implement retrying. A procedure can set the state to
820   * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
821   * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
822   * call {@link #setTimeout(int)} method to set the timeout. And you should also override this
823   * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
824   * timeout event has been handled.
825   * @return true to let the framework handle the timeout as abort, false in case the procedure
826   *         handled the timeout itself.
827   */
828  protected synchronized boolean setTimeoutFailure(TEnvironment env) {
829    if (state == ProcedureState.WAITING_TIMEOUT) {
830      long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
831      setFailure("ProcedureExecutor",
832        new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
833      return true;
834    }
835    return false;
836  }
837
838  public synchronized boolean hasException() {
839    return exception != null;
840  }
841
842  public synchronized RemoteProcedureException getException() {
843    return exception;
844  }
845
846  /**
847   * Called by the ProcedureExecutor on procedure-load to restore the latch state
848   */
849  protected synchronized void setChildrenLatch(int numChildren) {
850    this.childrenLatch = numChildren;
851    if (LOG.isTraceEnabled()) {
852      LOG.trace("CHILD LATCH INCREMENT SET " +
853          this.childrenLatch, new Throwable(this.toString()));
854    }
855  }
856
857  /**
858   * Called by the ProcedureExecutor on procedure-load to restore the latch state
859   */
860  protected synchronized void incChildrenLatch() {
861    // TODO: can this be inferred from the stack? I think so...
862    this.childrenLatch++;
863    if (LOG.isTraceEnabled()) {
864      LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
865    }
866  }
867
868  /**
869   * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
870   */
871  private synchronized boolean childrenCountDown() {
872    assert childrenLatch > 0: this;
873    boolean b = --childrenLatch == 0;
874    if (LOG.isTraceEnabled()) {
875      LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
876    }
877    return b;
878  }
879
880  /**
881   * Try to set this procedure into RUNNABLE state.
882   * Succeeds if all subprocedures/children are done.
883   * @return True if we were able to move procedure to RUNNABLE state.
884   */
885  synchronized boolean tryRunnable() {
886    // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
887    if (getState() == ProcedureState.WAITING && childrenCountDown()) {
888      setState(ProcedureState.RUNNABLE);
889      return true;
890    } else {
891      return false;
892    }
893  }
894
895  protected synchronized boolean hasChildren() {
896    return childrenLatch > 0;
897  }
898
899  /**
900   * @return Count of children outstanding (Badly named).
901   */
902  protected synchronized int getChildrenLatch() {
903    return childrenLatch;
904  }
905
906  /**
907   * Called by the RootProcedureState on procedure execution.
908   * Each procedure store its stack-index positions.
909   */
910  protected synchronized void addStackIndex(final int index) {
911    if (stackIndexes == null) {
912      stackIndexes = new int[] { index };
913    } else {
914      int count = stackIndexes.length;
915      stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
916      stackIndexes[count] = index;
917    }
918  }
919
920  protected synchronized boolean removeStackIndex() {
921    if (stackIndexes != null && stackIndexes.length > 1) {
922      stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
923      return false;
924    } else {
925      stackIndexes = null;
926      return true;
927    }
928  }
929
930  /**
931   * Called on store load to initialize the Procedure internals after
932   * the creation/deserialization.
933   */
934  protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
935    this.stackIndexes = new int[stackIndexes.size()];
936    for (int i = 0; i < this.stackIndexes.length; ++i) {
937      this.stackIndexes[i] = stackIndexes.get(i);
938    }
939  }
940
941  protected synchronized boolean wasExecuted() {
942    return stackIndexes != null;
943  }
944
945  protected synchronized int[] getStackIndexes() {
946    return stackIndexes;
947  }
948
949  // ==========================================================================
950  //  Internal methods - called by the ProcedureExecutor
951  // ==========================================================================
952
953  /**
954   * Internal method called by the ProcedureExecutor that starts the user-level code execute().
955   * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
956   *           skip out without changing states or releasing any locks held.
957   */
958  protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
959      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
960    try {
961      updateTimestamp();
962      if (bypass) {
963        LOG.info("{} bypassed, returning null to finish it", this);
964        return null;
965      }
966      return execute(env);
967    } finally {
968      updateTimestamp();
969    }
970  }
971
972  /**
973   * Internal method called by the ProcedureExecutor that starts the user-level code rollback().
974   */
975  protected void doRollback(TEnvironment env)
976      throws IOException, InterruptedException {
977    try {
978      updateTimestamp();
979      if (bypass) {
980        LOG.info("{} bypassed, skipping rollback", this);
981        return;
982      }
983      rollback(env);
984    } finally {
985      updateTimestamp();
986    }
987  }
988
989  final void restoreLock(TEnvironment env) {
990    if (!lockedWhenLoading) {
991      LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
992      return;
993    }
994
995    if (isFinished()) {
996      LOG.debug("{} is already finished, skip acquiring lock.", this);
997      return;
998    }
999
1000    if (isBypass()) {
1001      LOG.debug("{} is already bypassed, skip acquiring lock.", this);
1002      return;
1003    }
1004    // this can happen if the parent stores the sub procedures but before it can
1005    // release its lock, the master restarts
1006    if (getState() == ProcedureState.WAITING && !holdLock(env)) {
1007      LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this);
1008      lockedWhenLoading = false;
1009      return;
1010    }
1011    LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
1012    LockState state = acquireLock(env);
1013    assert state == LockState.LOCK_ACQUIRED;
1014  }
1015
1016  /**
1017   * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
1018   */
1019  final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
1020    if (waitInitialized(env)) {
1021      return LockState.LOCK_EVENT_WAIT;
1022    }
1023    if (lockedWhenLoading) {
1024      // reset it so we will not consider it anymore
1025      lockedWhenLoading = false;
1026      locked = true;
1027      // Here we return without persist the locked state, as lockedWhenLoading is true means
1028      // that the locked field of the procedure stored in procedure store is true, so we do not need
1029      // to store it again.
1030      return LockState.LOCK_ACQUIRED;
1031    }
1032    LockState state = acquireLock(env);
1033    if (state == LockState.LOCK_ACQUIRED) {
1034      locked = true;
1035      // persist that we have held the lock. This must be done before we actually execute the
1036      // procedure, otherwise when restarting, we may consider the procedure does not have a lock,
1037      // but it may have already done some changes as we have already executed it, and if another
1038      // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
1039      // not expect that another procedure can be executed in the middle.
1040      store.update(this);
1041    }
1042    return state;
1043  }
1044
1045  /**
1046   * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
1047   */
1048  final void doReleaseLock(TEnvironment env, ProcedureStore store) {
1049    locked = false;
1050    // persist that we have released the lock. This must be done before we actually release the
1051    // lock. Another procedure may take this lock immediately after we release the lock, and if we
1052    // crash before persist the information that we have already released the lock, then when
1053    // restarting there will be two procedures which both have the lock and cause problems.
1054    if (getState() != ProcedureState.ROLLEDBACK) {
1055      // If the state is ROLLEDBACK, it means that we have already deleted the procedure from
1056      // procedure store, so do not need to log the release operation any more.
1057      store.update(this);
1058    }
1059    releaseLock(env);
1060  }
1061
1062  @Override
1063  public int compareTo(final Procedure<TEnvironment> other) {
1064    return Long.compare(getProcId(), other.getProcId());
1065  }
1066
1067  // ==========================================================================
1068  //  misc utils
1069  // ==========================================================================
1070
1071  /**
1072   * Get an hashcode for the specified Procedure ID
1073   * @return the hashcode for the specified procId
1074   */
1075  public static long getProcIdHashCode(long procId) {
1076    long h = procId;
1077    h ^= h >> 16;
1078    h *= 0x85ebca6b;
1079    h ^= h >> 13;
1080    h *= 0xc2b2ae35;
1081    h ^= h >> 16;
1082    return h;
1083  }
1084
1085  /**
1086   * Helper to lookup the root Procedure ID given a specified procedure.
1087   */
1088  protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
1089      Procedure<T> proc) {
1090    while (proc.hasParent()) {
1091      proc = procedures.get(proc.getParentProcId());
1092      if (proc == null) {
1093        return null;
1094      }
1095    }
1096    return proc.getProcId();
1097  }
1098
1099  /**
1100   * @param a the first procedure to be compared.
1101   * @param b the second procedure to be compared.
1102   * @return true if the two procedures have the same parent
1103   */
1104  public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
1105    return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
1106  }
1107}