001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Map;
024import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
025import org.apache.hadoop.hbase.metrics.Counter;
026import org.apache.hadoop.hbase.metrics.Histogram;
027import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
028import org.apache.hadoop.hbase.procedure2.util.StringUtils;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.NonceKey;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
037
038/**
039 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
040 * stack-indexes, etc.
041 * <p/>
042 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
043 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
044 * be called multiple times in the case of failure or a restart, so code must be idempotent. The
045 * return from an execute call is either: null to indicate we are done; ourself if there is more to
046 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes
047 * our execution.
048 * <p/>
049 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
050 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
051 * ProcedureState enum from protos:
052 * <ul>
053 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
054 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
055 * ROLLEDBACK state.</li>
056 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
057 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
058 * condition when scheduler/ executor will drop procedure from further processing is when procedure
059 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
060 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
061 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
062 * </ul>
063 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
064 * own state. This can lead to confusion. Try to keep the two distinct.
065 * <p/>
066 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
067 * step is supposed to cleanup the resources created during the execute() step. In case of failure
068 * and restart, rollback() may be called multiple times, so again the code must be idempotent.
069 * <p/>
070 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an
071 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
072 * locked for the life of a procedure -- not just the calls to execute -- then implementations
073 * should say so with the {@link #holdLock(Object)} method.
074 * <p/>
075 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
076 * implementation is a bit tricky so we add some comments hrre about it.
077 * <ul>
078 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
079 * whether we have the lock. We will set it to {@code true} in
080 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
081 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
082 * more.</li>
083 * <li>Also added a locked field in the proto message. When storing, the field will be set according
084 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
085 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
086 * message is {@code true}.</li>
087 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
088 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
089 * need to wait until master is initialized. So the solution here is that, we introduced a new
090 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
091 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
092 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
093 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
094 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
095 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
096 * {@code true}, when we just set the {@link #locked} field to true and return, without actually
097 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
098 * </ul>
099 * <p/>
100 * Procedures can be suspended or put in wait state with a callback that gets executed on
101 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and
102 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
103 * class for an example usage.
104 * </p>
105 * <p/>
106 * There are hooks for collecting metrics on submit of the procedure and on finish. See
107 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
108 */
109@InterfaceAudience.Private
110public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
111  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
112  public static final long NO_PROC_ID = -1;
113  protected static final int NO_TIMEOUT = -1;
114
115  public enum LockState {
116    LOCK_ACQUIRED,       // Lock acquired and ready to execute
117    LOCK_YIELD_WAIT,     // Lock not acquired, framework needs to yield
118    LOCK_EVENT_WAIT,     // Lock not acquired, an event will yield the procedure
119  }
120
121  // Unchanged after initialization
122  private NonceKey nonceKey = null;
123  private String owner = null;
124  private long parentProcId = NO_PROC_ID;
125  private long rootProcId = NO_PROC_ID;
126  private long procId = NO_PROC_ID;
127  private long submittedTime;
128
129  // Runtime state, updated every operation
130  private ProcedureState state = ProcedureState.INITIALIZING;
131  private RemoteProcedureException exception = null;
132  private int[] stackIndexes = null;
133  private int childrenLatch = 0;
134
135  private volatile int timeout = NO_TIMEOUT;
136  private volatile long lastUpdate;
137
138  private volatile byte[] result = null;
139
140  private volatile boolean locked = false;
141
142  private boolean lockedWhenLoading = false;
143
144  /**
145   * Used for override complete of the procedure without actually doing any logic in the procedure.
146   * If bypass is set to true, when executing it will return null when
147   * {@link #doExecute(Object)} is called to finish the procedure and release any locks
148   * it may currently hold. The bypass does cleanup around the Procedure as far as the
149   * Procedure framework is concerned. It does not clean any internal state that the
150   * Procedure's themselves may have set. That is for the Procedures to do themselves
151   * when bypass is called. They should override bypass and do their cleanup in the
152   * overridden bypass method (be sure to call the parent bypass to ensure proper
153   * processing).
154   * <p></p>Bypassing a procedure is not like aborting. Aborting a procedure will trigger
155   * a rollback. And since the {@link #abort(Object)} method is overrideable
156   * Some procedures may have chosen to ignore the aborting.
157   */
158  private volatile boolean bypass = false;
159
160  /**
161   * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to
162   * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the
163   * persistence of the procedure.
164   * <p/>
165   * This is useful when the procedure is in error and you want to retry later. The retry interval
166   * and the number of retries are usually not critical so skip the persistence can save some
167   * resources, and also speed up the restart processing.
168   * <p/>
169   * Notice that this value will be reset to true every time before execution. And when rolling back
170   * we do not test this value.
171   */
172  private boolean persist = true;
173
174  public boolean isBypass() {
175    return bypass;
176  }
177
178  /**
179   * Set the bypass to true.
180   * Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for now.
181   * DO NOT use this method alone, since we can't just bypass one single procedure. We need to
182   * bypass its ancestor too. If your Procedure has set state, it needs to undo it in here.
183   * @param env Current environment. May be null because of context; e.g. pretty-printing
184   *            procedure WALs where there is no 'environment' (and where Procedures that require
185   *            an 'environment' won't be run.
186   */
187  protected void bypass(TEnvironment env) {
188    this.bypass = true;
189  }
190
191  boolean needPersistence() {
192    return persist;
193  }
194
195  void resetPersistence() {
196    persist = true;
197  }
198
199  protected final void skipPersistence() {
200    persist = false;
201  }
202
203  /**
204   * The main code of the procedure. It must be idempotent since execute()
205   * may be called multiple times in case of machine failure in the middle
206   * of the execution.
207   * @param env the environment passed to the ProcedureExecutor
208   * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
209   *         procedure is done.
210   * @throws ProcedureYieldException the procedure will be added back to the queue and retried
211   *         later.
212   * @throws InterruptedException the procedure will be added back to the queue and retried later.
213   * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
214   *         and has set itself up waiting for an external event to wake it back up again.
215   */
216  protected abstract Procedure<TEnvironment>[] execute(TEnvironment env)
217    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
218
219  /**
220   * The code to undo what was done by the execute() code.
221   * It is called when the procedure or one of the sub-procedures failed or an
222   * abort was requested. It should cleanup all the resources created by
223   * the execute() call. The implementation must be idempotent since rollback()
224   * may be called multiple time in case of machine failure in the middle
225   * of the execution.
226   * @param env the environment passed to the ProcedureExecutor
227   * @throws IOException temporary failure, the rollback will retry later
228   * @throws InterruptedException the procedure will be added back to the queue and retried later
229   */
230  protected abstract void rollback(TEnvironment env)
231    throws IOException, InterruptedException;
232
233  /**
234   * The abort() call is asynchronous and each procedure must decide how to deal
235   * with it, if they want to be abortable. The simplest implementation
236   * is to have an AtomicBoolean set in the abort() method and then the execute()
237   * will check if the abort flag is set or not.
238   * abort() may be called multiple times from the client, so the implementation
239   * must be idempotent.
240   *
241   * <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification
242   * that allows the procedure implementor abort.
243   */
244  protected abstract boolean abort(TEnvironment env);
245
246  /**
247   * The user-level code of the procedure may have some state to
248   * persist (e.g. input arguments or current position in the processing state) to
249   * be able to resume on failure.
250   * @param serializer stores the serializable state
251   */
252  protected abstract void serializeStateData(ProcedureStateSerializer serializer)
253    throws IOException;
254
255  /**
256   * Called on store load to allow the user to decode the previously serialized
257   * state.
258   * @param serializer contains the serialized state
259   */
260  protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
261    throws IOException;
262
263  /**
264   * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
265   * call us to determine whether we need to wait for initialization, second, it will call
266   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
267   * <p/>
268   * This is because that when master restarts, we need to restore the lock state for all the
269   * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
270   * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
271   * of the initialization!), so we need to split the code into two steps, and when restore, we just
272   * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
273   * @return true means we need to wait until the environment has been initialized, otherwise true.
274   */
275  protected boolean waitInitialized(TEnvironment env) {
276    return false;
277  }
278
279  /**
280   * The user should override this method if they need a lock on an Entity. A lock can be anything,
281   * and it is up to the implementor. The Procedure Framework will call this method just before it
282   * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
283   * execute.
284   * <p/>
285   * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
286   * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
287   * <p/>
288   * Example: in our Master we can execute request in parallel for different tables. We can create
289   * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
290   * queued waiting that specific table create to happen.
291   * <p/>
292   * There are 3 LockState:
293   * <ul>
294   * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
295   * execute.</li>
296   * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
297   * take care of readding the procedure back to the runnable set for retry</li>
298   * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
299   * care of readding the procedure back to the runnable set when the lock is available.</li>
300   * </ul>
301   * @return the lock state as described above.
302   */
303  protected LockState acquireLock(TEnvironment env) {
304    return LockState.LOCK_ACQUIRED;
305  }
306
307  /**
308   * The user should override this method, and release lock if necessary.
309   */
310  protected void releaseLock(TEnvironment env) {
311    // no-op
312  }
313
314  /**
315   * Used to keep the procedure lock even when the procedure is yielding or suspended.
316   * @return true if the procedure should hold on the lock until completionCleanup()
317   */
318  protected boolean holdLock(TEnvironment env) {
319    return false;
320  }
321
322  /**
323   * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)}
324   * returns true, the procedure executor will call acquireLock() once and thereafter
325   * not call {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls
326   * release/acquire around each invocation of {@link #execute(Object)}.
327   * @see #holdLock(Object)
328   * @return true if the procedure has the lock, false otherwise.
329   */
330  public final boolean hasLock() {
331    return locked;
332  }
333
334  /**
335   * Called when the procedure is loaded for replay.
336   * The procedure implementor may use this method to perform some quick
337   * operation before replay.
338   * e.g. failing the procedure if the state on replay may be unknown.
339   */
340  protected void beforeReplay(TEnvironment env) {
341    // no-op
342  }
343
344  /**
345   * Called when the procedure is ready to be added to the queue after
346   * the loading/replay operation.
347   */
348  protected void afterReplay(TEnvironment env) {
349    // no-op
350  }
351
352  /**
353   * Called when the procedure is marked as completed (success or rollback).
354   * The procedure implementor may use this method to cleanup in-memory states.
355   * This operation will not be retried on failure. If a procedure took a lock,
356   * it will have been released when this method runs.
357   */
358  protected void completionCleanup(TEnvironment env) {
359    // no-op
360  }
361
362  /**
363   * By default, the procedure framework/executor will try to run procedures start to finish.
364   * Return true to make the executor yield between each execution step to
365   * give other procedures a chance to run.
366   * @param env the environment passed to the ProcedureExecutor
367   * @return Return true if the executor should yield on completion of an execution step.
368   *         Defaults to return false.
369   */
370  protected boolean isYieldAfterExecutionStep(TEnvironment env) {
371    return false;
372  }
373
374  /**
375   * By default, the executor will keep the procedure result around util
376   * the eviction TTL is expired. The client can cut down the waiting time
377   * by requesting that the result is removed from the executor.
378   * In case of system started procedure, we can force the executor to auto-ack.
379   * @param env the environment passed to the ProcedureExecutor
380   * @return true if the executor should wait the client ack for the result.
381   *         Defaults to return true.
382   */
383  protected boolean shouldWaitClientAck(TEnvironment env) {
384    return true;
385  }
386
387  /**
388   * Override this method to provide procedure specific counters for submitted count, failed
389   * count and time histogram.
390   * @param env The environment passed to the procedure executor
391   * @return Container object for procedure related metric
392   */
393  protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
394    return null;
395  }
396
397  /**
398   * This function will be called just when procedure is submitted for execution. Override this
399   * method to update the metrics at the beginning of the procedure. The default implementation
400   * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
401   * {@link ProcedureMetrics}.
402   */
403  protected void updateMetricsOnSubmit(TEnvironment env) {
404    ProcedureMetrics metrics = getProcedureMetrics(env);
405    if (metrics == null) {
406      return;
407    }
408
409    Counter submittedCounter = metrics.getSubmittedCounter();
410    if (submittedCounter != null) {
411      submittedCounter.increment();
412    }
413  }
414
415  /**
416   * This function will be called just after procedure execution is finished. Override this method
417   * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
418   * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
419   * time histogram for successfully completed procedures. Increments failed counter for failed
420   * procedures.
421   * <p/>
422   * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
423   * successfully finished siblings, this function may get called twice in certain cases for certain
424   * procedures. Explore further if this can be called once.
425   * @param env The environment passed to the procedure executor
426   * @param runtime Runtime of the procedure in milliseconds
427   * @param success true if procedure is completed successfully
428   */
429  protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
430    ProcedureMetrics metrics = getProcedureMetrics(env);
431    if (metrics == null) {
432      return;
433    }
434
435    if (success) {
436      Histogram timeHisto = metrics.getTimeHisto();
437      if (timeHisto != null) {
438        timeHisto.update(runtime);
439      }
440    } else {
441      Counter failedCounter = metrics.getFailedCounter();
442      if (failedCounter != null) {
443        failedCounter.increment();
444      }
445    }
446  }
447
448  @Override
449  public String toString() {
450    // Return the simple String presentation of the procedure.
451    return toStringSimpleSB().toString();
452  }
453
454  /**
455   * Build the StringBuilder for the simple form of procedure string.
456   * @return the StringBuilder
457   */
458  protected StringBuilder toStringSimpleSB() {
459    final StringBuilder sb = new StringBuilder();
460
461    sb.append("pid=");
462    sb.append(getProcId());
463
464    if (hasParent()) {
465      sb.append(", ppid=");
466      sb.append(getParentProcId());
467    }
468
469    /*
470     * TODO
471     * Enable later when this is being used.
472     * Currently owner not used.
473    if (hasOwner()) {
474      sb.append(", owner=");
475      sb.append(getOwner());
476    }*/
477
478    sb.append(", state="); // pState for Procedure State as opposed to any other kind.
479    toStringState(sb);
480
481    // Only print out locked if actually locked. Most of the time it is not.
482    if (this.locked) {
483      sb.append(", locked=").append(locked);
484    }
485
486    if (bypass) {
487      sb.append(", bypass=").append(bypass);
488    }
489
490    if (hasException()) {
491      sb.append(", exception=" + getException());
492    }
493
494    sb.append("; ");
495    toStringClassDetails(sb);
496
497    return sb;
498  }
499
500  /**
501   * Extend the toString() information with more procedure details
502   */
503  public String toStringDetails() {
504    final StringBuilder sb = toStringSimpleSB();
505
506    sb.append(" submittedTime=");
507    sb.append(getSubmittedTime());
508
509    sb.append(", lastUpdate=");
510    sb.append(getLastUpdate());
511
512    final int[] stackIndices = getStackIndexes();
513    if (stackIndices != null) {
514      sb.append("\n");
515      sb.append("stackIndexes=");
516      sb.append(Arrays.toString(stackIndices));
517    }
518
519    return sb.toString();
520  }
521
522  protected String toStringClass() {
523    StringBuilder sb = new StringBuilder();
524    toStringClassDetails(sb);
525    return sb.toString();
526  }
527
528  /**
529   * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
530   * generic Procedure State with Procedure particulars.
531   * @param builder Append current {@link ProcedureState}
532   */
533  protected void toStringState(StringBuilder builder) {
534    builder.append(getState());
535  }
536
537  /**
538   * Extend the toString() information with the procedure details
539   * e.g. className and parameters
540   * @param builder the string builder to use to append the proc specific information
541   */
542  protected void toStringClassDetails(StringBuilder builder) {
543    builder.append(getClass().getName());
544  }
545
546  // ==========================================================================
547  //  Those fields are unchanged after initialization.
548  //
549  //  Each procedure will get created from the user or during
550  //  ProcedureExecutor.start() during the load() phase and then submitted
551  //  to the executor. these fields will never be changed after initialization
552  // ==========================================================================
553  public long getProcId() {
554    return procId;
555  }
556
557  public boolean hasParent() {
558    return parentProcId != NO_PROC_ID;
559  }
560
561  public long getParentProcId() {
562    return parentProcId;
563  }
564
565  public long getRootProcId() {
566    return rootProcId;
567  }
568
569  public String getProcName() {
570    return toStringClass();
571  }
572
573  public NonceKey getNonceKey() {
574    return nonceKey;
575  }
576
577  public long getSubmittedTime() {
578    return submittedTime;
579  }
580
581  public String getOwner() {
582    return owner;
583  }
584
585  public boolean hasOwner() {
586    return owner != null;
587  }
588
589  /**
590   * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
591   */
592  protected void setProcId(long procId) {
593    this.procId = procId;
594    this.submittedTime = EnvironmentEdgeManager.currentTime();
595    setState(ProcedureState.RUNNABLE);
596  }
597
598  /**
599   * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
600   */
601  protected void setParentProcId(long parentProcId) {
602    this.parentProcId = parentProcId;
603  }
604
605  protected void setRootProcId(long rootProcId) {
606    this.rootProcId = rootProcId;
607  }
608
609  /**
610   * Called by the ProcedureExecutor to set the value to the newly created procedure.
611   */
612  protected void setNonceKey(NonceKey nonceKey) {
613    this.nonceKey = nonceKey;
614  }
615
616  public void setOwner(String owner) {
617    this.owner = StringUtils.isEmpty(owner) ? null : owner;
618  }
619
620  public void setOwner(User owner) {
621    assert owner != null : "expected owner to be not null";
622    setOwner(owner.getShortName());
623  }
624
625  /**
626   * Called on store load to initialize the Procedure internals after
627   * the creation/deserialization.
628   */
629  protected void setSubmittedTime(long submittedTime) {
630    this.submittedTime = submittedTime;
631  }
632
633  // ==========================================================================
634  //  runtime state - timeout related
635  // ==========================================================================
636  /**
637   * @param timeout timeout interval in msec
638   */
639  protected void setTimeout(int timeout) {
640    this.timeout = timeout;
641  }
642
643  public boolean hasTimeout() {
644    return timeout != NO_TIMEOUT;
645  }
646
647  /**
648   * @return the timeout in msec
649   */
650  public int getTimeout() {
651    return timeout;
652  }
653
654  /**
655   * Called on store load to initialize the Procedure internals after
656   * the creation/deserialization.
657   */
658  protected void setLastUpdate(long lastUpdate) {
659    this.lastUpdate = lastUpdate;
660  }
661
662  /**
663   * Called by ProcedureExecutor after each time a procedure step is executed.
664   */
665  protected void updateTimestamp() {
666    this.lastUpdate = EnvironmentEdgeManager.currentTime();
667  }
668
669  public long getLastUpdate() {
670    return lastUpdate;
671  }
672
673  /**
674   * Timeout of the next timeout.
675   * Called by the ProcedureExecutor if the procedure has timeout set and
676   * the procedure is in the waiting queue.
677   * @return the timestamp of the next timeout.
678   */
679  protected long getTimeoutTimestamp() {
680    return getLastUpdate() + getTimeout();
681  }
682
683  // ==========================================================================
684  //  runtime state
685  // ==========================================================================
686  /**
687   * @return the time elapsed between the last update and the start time of the procedure.
688   */
689  public long elapsedTime() {
690    return getLastUpdate() - getSubmittedTime();
691  }
692
693  /**
694   * @return the serialized result if any, otherwise null
695   */
696  public byte[] getResult() {
697    return result;
698  }
699
700  /**
701   * The procedure may leave a "result" on completion.
702   * @param result the serialized result that will be passed to the client
703   */
704  protected void setResult(byte[] result) {
705    this.result = result;
706  }
707
708  /**
709   * Will only be called when loading procedures from procedure store, where we need to record
710   * whether the procedure has already held a lock. Later we will call
711   * {@link #restoreLock(Object)} to actually acquire the lock.
712   */
713  final void lockedWhenLoading() {
714    this.lockedWhenLoading = true;
715  }
716
717  /**
718   * Can only be called when restarting, before the procedure actually being executed, as after we
719   * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset
720   * {@link #lockedWhenLoading} to false.
721   * <p/>
722   * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
723   * front of a queue.
724   */
725  public boolean isLockedWhenLoading() {
726    return lockedWhenLoading;
727  }
728
729  // ==============================================================================================
730  //  Runtime state, updated every operation by the ProcedureExecutor
731  //
732  //  There is always 1 thread at the time operating on the state of the procedure.
733  //  The ProcedureExecutor may check and set states, or some Procecedure may
734  //  update its own state. but no concurrent updates. we use synchronized here
735  //  just because the procedure can get scheduled on different executor threads on each step.
736  // ==============================================================================================
737
738  /**
739   * @return true if the procedure is in a RUNNABLE state.
740   */
741  public synchronized boolean isRunnable() {
742    return state == ProcedureState.RUNNABLE;
743  }
744
745  public synchronized boolean isInitializing() {
746    return state == ProcedureState.INITIALIZING;
747  }
748
749  /**
750   * @return true if the procedure has failed. It may or may not have rolled back.
751   */
752  public synchronized boolean isFailed() {
753    return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
754  }
755
756  /**
757   * @return true if the procedure is finished successfully.
758   */
759  public synchronized boolean isSuccess() {
760    return state == ProcedureState.SUCCESS && !hasException();
761  }
762
763  /**
764   * @return true if the procedure is finished. The Procedure may be completed successfully or
765   *         rolledback.
766   */
767  public synchronized boolean isFinished() {
768    return isSuccess() || state == ProcedureState.ROLLEDBACK;
769  }
770
771  /**
772   * @return true if the procedure is waiting for a child to finish or for an external event.
773   */
774  public synchronized boolean isWaiting() {
775    switch (state) {
776      case WAITING:
777      case WAITING_TIMEOUT:
778        return true;
779      default:
780        break;
781    }
782    return false;
783  }
784
785  protected synchronized void setState(final ProcedureState state) {
786    this.state = state;
787    updateTimestamp();
788  }
789
790  public synchronized ProcedureState getState() {
791    return state;
792  }
793
794  protected void setFailure(final String source, final Throwable cause) {
795    setFailure(new RemoteProcedureException(source, cause));
796  }
797
798  protected synchronized void setFailure(final RemoteProcedureException exception) {
799    this.exception = exception;
800    if (!isFinished()) {
801      setState(ProcedureState.FAILED);
802    }
803  }
804
805  protected void setAbortFailure(final String source, final String msg) {
806    setFailure(source, new ProcedureAbortedException(msg));
807  }
808
809  /**
810   * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
811   * <p/>
812   * Another usage for this method is to implement retrying. A procedure can set the state to
813   * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
814   * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
815   * call {@link #setTimeout(int)} method to set the timeout. And you should also override this
816   * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
817   * timeout event has been handled.
818   * @return true to let the framework handle the timeout as abort, false in case the procedure
819   *         handled the timeout itself.
820   */
821  protected synchronized boolean setTimeoutFailure(TEnvironment env) {
822    if (state == ProcedureState.WAITING_TIMEOUT) {
823      long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
824      setFailure("ProcedureExecutor",
825        new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
826      return true;
827    }
828    return false;
829  }
830
831  public synchronized boolean hasException() {
832    return exception != null;
833  }
834
835  public synchronized RemoteProcedureException getException() {
836    return exception;
837  }
838
839  /**
840   * Called by the ProcedureExecutor on procedure-load to restore the latch state
841   */
842  protected synchronized void setChildrenLatch(int numChildren) {
843    this.childrenLatch = numChildren;
844    if (LOG.isTraceEnabled()) {
845      LOG.trace("CHILD LATCH INCREMENT SET " +
846          this.childrenLatch, new Throwable(this.toString()));
847    }
848  }
849
850  /**
851   * Called by the ProcedureExecutor on procedure-load to restore the latch state
852   */
853  protected synchronized void incChildrenLatch() {
854    // TODO: can this be inferred from the stack? I think so...
855    this.childrenLatch++;
856    if (LOG.isTraceEnabled()) {
857      LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
858    }
859  }
860
861  /**
862   * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
863   */
864  private synchronized boolean childrenCountDown() {
865    assert childrenLatch > 0: this;
866    boolean b = --childrenLatch == 0;
867    if (LOG.isTraceEnabled()) {
868      LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
869    }
870    return b;
871  }
872
873  /**
874   * Try to set this procedure into RUNNABLE state.
875   * Succeeds if all subprocedures/children are done.
876   * @return True if we were able to move procedure to RUNNABLE state.
877   */
878  synchronized boolean tryRunnable() {
879    // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
880    if (getState() == ProcedureState.WAITING && childrenCountDown()) {
881      setState(ProcedureState.RUNNABLE);
882      return true;
883    } else {
884      return false;
885    }
886  }
887
888  protected synchronized boolean hasChildren() {
889    return childrenLatch > 0;
890  }
891
892  protected synchronized int getChildrenLatch() {
893    return childrenLatch;
894  }
895
896  /**
897   * Called by the RootProcedureState on procedure execution.
898   * Each procedure store its stack-index positions.
899   */
900  protected synchronized void addStackIndex(final int index) {
901    if (stackIndexes == null) {
902      stackIndexes = new int[] { index };
903    } else {
904      int count = stackIndexes.length;
905      stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
906      stackIndexes[count] = index;
907    }
908  }
909
910  protected synchronized boolean removeStackIndex() {
911    if (stackIndexes != null && stackIndexes.length > 1) {
912      stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
913      return false;
914    } else {
915      stackIndexes = null;
916      return true;
917    }
918  }
919
920  /**
921   * Called on store load to initialize the Procedure internals after
922   * the creation/deserialization.
923   */
924  protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
925    this.stackIndexes = new int[stackIndexes.size()];
926    for (int i = 0; i < this.stackIndexes.length; ++i) {
927      this.stackIndexes[i] = stackIndexes.get(i);
928    }
929  }
930
931  protected synchronized boolean wasExecuted() {
932    return stackIndexes != null;
933  }
934
935  protected synchronized int[] getStackIndexes() {
936    return stackIndexes;
937  }
938
939  // ==========================================================================
940  //  Internal methods - called by the ProcedureExecutor
941  // ==========================================================================
942
943  /**
944   * Internal method called by the ProcedureExecutor that starts the user-level code execute().
945   * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
946   *           skip out without changing states or releasing any locks held.
947   */
948  protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
949      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
950    try {
951      updateTimestamp();
952      if (bypass) {
953        LOG.info("{} bypassed, returning null to finish it", this);
954        return null;
955      }
956      return execute(env);
957    } finally {
958      updateTimestamp();
959    }
960  }
961
962  /**
963   * Internal method called by the ProcedureExecutor that starts the user-level code rollback().
964   */
965  protected void doRollback(TEnvironment env)
966      throws IOException, InterruptedException {
967    try {
968      updateTimestamp();
969      if (bypass) {
970        LOG.info("{} bypassed, skipping rollback", this);
971        return;
972      }
973      rollback(env);
974    } finally {
975      updateTimestamp();
976    }
977  }
978
979  final void restoreLock(TEnvironment env) {
980    if (!lockedWhenLoading) {
981      LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
982      return;
983    }
984
985    if (isFinished()) {
986      LOG.debug("{} is already finished, skip acquiring lock.", this);
987      return;
988    }
989
990    if (isBypass()) {
991      LOG.debug("{} is already bypassed, skip acquiring lock.", this);
992      return;
993    }
994    // this can happen if the parent stores the sub procedures but before it can
995    // release its lock, the master restarts
996    if (getState() == ProcedureState.WAITING && !holdLock(env)) {
997      LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this);
998      lockedWhenLoading = false;
999      return;
1000    }
1001    LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
1002    LockState state = acquireLock(env);
1003    assert state == LockState.LOCK_ACQUIRED;
1004  }
1005
1006  /**
1007   * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
1008   */
1009  final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
1010    if (waitInitialized(env)) {
1011      return LockState.LOCK_EVENT_WAIT;
1012    }
1013    if (lockedWhenLoading) {
1014      // reset it so we will not consider it anymore
1015      lockedWhenLoading = false;
1016      locked = true;
1017      // Here we return without persist the locked state, as lockedWhenLoading is true means
1018      // that the locked field of the procedure stored in procedure store is true, so we do not need
1019      // to store it again.
1020      return LockState.LOCK_ACQUIRED;
1021    }
1022    LockState state = acquireLock(env);
1023    if (state == LockState.LOCK_ACQUIRED) {
1024      locked = true;
1025      // persist that we have held the lock. This must be done before we actually execute the
1026      // procedure, otherwise when restarting, we may consider the procedure does not have a lock,
1027      // but it may have already done some changes as we have already executed it, and if another
1028      // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
1029      // not expect that another procedure can be executed in the middle.
1030      store.update(this);
1031    }
1032    return state;
1033  }
1034
1035  /**
1036   * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
1037   */
1038  final void doReleaseLock(TEnvironment env, ProcedureStore store) {
1039    locked = false;
1040    // persist that we have released the lock. This must be done before we actually release the
1041    // lock. Another procedure may take this lock immediately after we release the lock, and if we
1042    // crash before persist the information that we have already released the lock, then when
1043    // restarting there will be two procedures which both have the lock and cause problems.
1044    if (getState() != ProcedureState.ROLLEDBACK) {
1045      // If the state is ROLLEDBACK, it means that we have already deleted the procedure from
1046      // procedure store, so do not need to log the release operation any more.
1047      store.update(this);
1048    }
1049    releaseLock(env);
1050  }
1051
1052  @Override
1053  public int compareTo(final Procedure<TEnvironment> other) {
1054    return Long.compare(getProcId(), other.getProcId());
1055  }
1056
1057  // ==========================================================================
1058  //  misc utils
1059  // ==========================================================================
1060
1061  /**
1062   * Get an hashcode for the specified Procedure ID
1063   * @return the hashcode for the specified procId
1064   */
1065  public static long getProcIdHashCode(long procId) {
1066    long h = procId;
1067    h ^= h >> 16;
1068    h *= 0x85ebca6b;
1069    h ^= h >> 13;
1070    h *= 0xc2b2ae35;
1071    h ^= h >> 16;
1072    return h;
1073  }
1074
1075  /**
1076   * Helper to lookup the root Procedure ID given a specified procedure.
1077   */
1078  protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
1079      Procedure<T> proc) {
1080    while (proc.hasParent()) {
1081      proc = procedures.get(proc.getParentProcId());
1082      if (proc == null) {
1083        return null;
1084      }
1085    }
1086    return proc.getProcId();
1087  }
1088
1089  /**
1090   * @param a the first procedure to be compared.
1091   * @param b the second procedure to be compared.
1092   * @return true if the two procedures have the same parent
1093   */
1094  public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
1095    return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
1096  }
1097}