001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Map;
024import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
025import org.apache.hadoop.hbase.metrics.Counter;
026import org.apache.hadoop.hbase.metrics.Histogram;
027import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
028import org.apache.hadoop.hbase.procedure2.util.StringUtils;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.NonceKey;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
039
040/**
041 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
042 * stack-indexes, etc.
043 * <p/>
044 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
045 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
046 * be called multiple times in the case of failure or a restart, so code must be idempotent. The
047 * return from an execute call is either: null to indicate we are done; ourself if there is more to
048 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes
049 * our execution.
050 * <p/>
051 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
052 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
053 * ProcedureState enum from protos:
054 * <ul>
055 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
056 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
057 * ROLLEDBACK state.</li>
058 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
059 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
060 * condition when scheduler/ executor will drop procedure from further processing is when procedure
061 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
062 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
063 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
064 * </ul>
065 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
066 * own state. This can lead to confusion. Try to keep the two distinct.
067 * <p/>
068 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
069 * step is supposed to cleanup the resources created during the execute() step. In case of failure
070 * and restart, rollback() may be called multiple times, so again the code must be idempotent.
071 * <p/>
072 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an
073 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
074 * locked for the life of a procedure -- not just the calls to execute -- then implementations
075 * should say so with the {@link #holdLock(Object)} method.
076 * <p/>
077 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
078 * implementation is a bit tricky so we add some comments hrre about it.
079 * <ul>
080 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
081 * whether we have the lock. We will set it to {@code true} in
082 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
083 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
084 * more.</li>
085 * <li>Also added a locked field in the proto message. When storing, the field will be set according
086 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
087 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
088 * message is {@code true}.</li>
089 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
090 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
091 * need to wait until master is initialized. So the solution here is that, we introduced a new
092 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
093 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
094 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
095 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
096 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
097 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
098 * {@code true}, when we just set the {@link #locked} field to true and return, without actually
099 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
100 * </ul>
101 * <p/>
102 * Procedures can be suspended or put in wait state with a callback that gets executed on
103 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and
104 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
105 * class for an example usage.
106 * </p>
107 * <p/>
108 * There are hooks for collecting metrics on submit of the procedure and on finish. See
109 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
110 */
111@InterfaceAudience.Private
112public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
113  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
114  public static final long NO_PROC_ID = -1;
115  protected static final int NO_TIMEOUT = -1;
116
117  public enum LockState {
118    LOCK_ACQUIRED,       // Lock acquired and ready to execute
119    LOCK_YIELD_WAIT,     // Lock not acquired, framework needs to yield
120    LOCK_EVENT_WAIT,     // Lock not acquired, an event will yield the procedure
121  }
122
123  // Unchanged after initialization
124  private NonceKey nonceKey = null;
125  private String owner = null;
126  private long parentProcId = NO_PROC_ID;
127  private long rootProcId = NO_PROC_ID;
128  private long procId = NO_PROC_ID;
129  private long submittedTime;
130
131  // Runtime state, updated every operation
132  private ProcedureState state = ProcedureState.INITIALIZING;
133  private RemoteProcedureException exception = null;
134  private int[] stackIndexes = null;
135  private int childrenLatch = 0;
136
137  private volatile int timeout = NO_TIMEOUT;
138  private volatile long lastUpdate;
139
140  private volatile byte[] result = null;
141
142  private volatile boolean locked = false;
143
144  private boolean lockedWhenLoading = false;
145
146  /**
147   * Used for override complete of the procedure without actually doing any logic in the procedure.
148   * If bypass is set to true, when executing it will return null when
149   * {@link #doExecute(Object)} is called to finish the procedure and release any locks
150   * it may currently hold. The bypass does cleanup around the Procedure as far as the
151   * Procedure framework is concerned. It does not clean any internal state that the
152   * Procedure's themselves may have set. That is for the Procedures to do themselves
153   * when bypass is called. They should override bypass and do their cleanup in the
154   * overridden bypass method (be sure to call the parent bypass to ensure proper
155   * processing).
156   * <p></p>Bypassing a procedure is not like aborting. Aborting a procedure will trigger
157   * a rollback. And since the {@link #abort(Object)} method is overrideable
158   * Some procedures may have chosen to ignore the aborting.
159   */
160  private volatile boolean bypass = false;
161
162  /**
163   * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to
164   * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the
165   * persistence of the procedure.
166   * <p/>
167   * This is useful when the procedure is in error and you want to retry later. The retry interval
168   * and the number of retries are usually not critical so skip the persistence can save some
169   * resources, and also speed up the restart processing.
170   * <p/>
171   * Notice that this value will be reset to true every time before execution. And when rolling back
172   * we do not test this value.
173   */
174  private boolean persist = true;
175
176  public boolean isBypass() {
177    return bypass;
178  }
179
180  /**
181   * Set the bypass to true.
182   * Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for now.
183   * DO NOT use this method alone, since we can't just bypass one single procedure. We need to
184   * bypass its ancestor too. If your Procedure has set state, it needs to undo it in here.
185   * @param env Current environment. May be null because of context; e.g. pretty-printing
186   *            procedure WALs where there is no 'environment' (and where Procedures that require
187   *            an 'environment' won't be run.
188   */
189  protected void bypass(TEnvironment env) {
190    this.bypass = true;
191  }
192
193  boolean needPersistence() {
194    return persist;
195  }
196
197  void resetPersistence() {
198    persist = true;
199  }
200
201  protected final void skipPersistence() {
202    persist = false;
203  }
204
205  /**
206   * The main code of the procedure. It must be idempotent since execute()
207   * may be called multiple times in case of machine failure in the middle
208   * of the execution.
209   * @param env the environment passed to the ProcedureExecutor
210   * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
211   *         procedure is done.
212   * @throws ProcedureYieldException the procedure will be added back to the queue and retried
213   *         later.
214   * @throws InterruptedException the procedure will be added back to the queue and retried later.
215   * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
216   *         and has set itself up waiting for an external event to wake it back up again.
217   */
218  protected abstract Procedure<TEnvironment>[] execute(TEnvironment env)
219    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
220
221  /**
222   * The code to undo what was done by the execute() code.
223   * It is called when the procedure or one of the sub-procedures failed or an
224   * abort was requested. It should cleanup all the resources created by
225   * the execute() call. The implementation must be idempotent since rollback()
226   * may be called multiple time in case of machine failure in the middle
227   * of the execution.
228   * @param env the environment passed to the ProcedureExecutor
229   * @throws IOException temporary failure, the rollback will retry later
230   * @throws InterruptedException the procedure will be added back to the queue and retried later
231   */
232  protected abstract void rollback(TEnvironment env)
233    throws IOException, InterruptedException;
234
235  /**
236   * The abort() call is asynchronous and each procedure must decide how to deal
237   * with it, if they want to be abortable. The simplest implementation
238   * is to have an AtomicBoolean set in the abort() method and then the execute()
239   * will check if the abort flag is set or not.
240   * abort() may be called multiple times from the client, so the implementation
241   * must be idempotent.
242   *
243   * <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification
244   * that allows the procedure implementor abort.
245   */
246  protected abstract boolean abort(TEnvironment env);
247
248  /**
249   * The user-level code of the procedure may have some state to
250   * persist (e.g. input arguments or current position in the processing state) to
251   * be able to resume on failure.
252   * @param serializer stores the serializable state
253   */
254  protected abstract void serializeStateData(ProcedureStateSerializer serializer)
255    throws IOException;
256
257  /**
258   * Called on store load to allow the user to decode the previously serialized
259   * state.
260   * @param serializer contains the serialized state
261   */
262  protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
263    throws IOException;
264
265  /**
266   * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
267   * call us to determine whether we need to wait for initialization, second, it will call
268   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
269   * <p/>
270   * This is because that when master restarts, we need to restore the lock state for all the
271   * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
272   * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
273   * of the initialization!), so we need to split the code into two steps, and when restore, we just
274   * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
275   * @return true means we need to wait until the environment has been initialized, otherwise true.
276   */
277  protected boolean waitInitialized(TEnvironment env) {
278    return false;
279  }
280
281  /**
282   * The user should override this method if they need a lock on an Entity. A lock can be anything,
283   * and it is up to the implementor. The Procedure Framework will call this method just before it
284   * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
285   * execute.
286   * <p/>
287   * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
288   * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
289   * <p/>
290   * Example: in our Master we can execute request in parallel for different tables. We can create
291   * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
292   * queued waiting that specific table create to happen.
293   * <p/>
294   * There are 3 LockState:
295   * <ul>
296   * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
297   * execute.</li>
298   * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
299   * take care of readding the procedure back to the runnable set for retry</li>
300   * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
301   * care of readding the procedure back to the runnable set when the lock is available.</li>
302   * </ul>
303   * @return the lock state as described above.
304   */
305  protected LockState acquireLock(TEnvironment env) {
306    return LockState.LOCK_ACQUIRED;
307  }
308
309  /**
310   * The user should override this method, and release lock if necessary.
311   */
312  protected void releaseLock(TEnvironment env) {
313    // no-op
314  }
315
316  /**
317   * Used to keep the procedure lock even when the procedure is yielding or suspended.
318   * @return true if the procedure should hold on the lock until completionCleanup()
319   */
320  protected boolean holdLock(TEnvironment env) {
321    return false;
322  }
323
324  /**
325   * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)}
326   * returns true, the procedure executor will call acquireLock() once and thereafter
327   * not call {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls
328   * release/acquire around each invocation of {@link #execute(Object)}.
329   * @see #holdLock(Object)
330   * @return true if the procedure has the lock, false otherwise.
331   */
332  public final boolean hasLock() {
333    return locked;
334  }
335
336  /**
337   * Called when the procedure is loaded for replay.
338   * The procedure implementor may use this method to perform some quick
339   * operation before replay.
340   * e.g. failing the procedure if the state on replay may be unknown.
341   */
342  protected void beforeReplay(TEnvironment env) {
343    // no-op
344  }
345
346  /**
347   * Called when the procedure is ready to be added to the queue after
348   * the loading/replay operation.
349   */
350  protected void afterReplay(TEnvironment env) {
351    // no-op
352  }
353
354  /**
355   * Called when the procedure is marked as completed (success or rollback).
356   * The procedure implementor may use this method to cleanup in-memory states.
357   * This operation will not be retried on failure. If a procedure took a lock,
358   * it will have been released when this method runs.
359   */
360  protected void completionCleanup(TEnvironment env) {
361    // no-op
362  }
363
364  /**
365   * By default, the procedure framework/executor will try to run procedures start to finish.
366   * Return true to make the executor yield between each execution step to
367   * give other procedures a chance to run.
368   * @param env the environment passed to the ProcedureExecutor
369   * @return Return true if the executor should yield on completion of an execution step.
370   *         Defaults to return false.
371   */
372  protected boolean isYieldAfterExecutionStep(TEnvironment env) {
373    return false;
374  }
375
376  /**
377   * By default, the executor will keep the procedure result around util
378   * the eviction TTL is expired. The client can cut down the waiting time
379   * by requesting that the result is removed from the executor.
380   * In case of system started procedure, we can force the executor to auto-ack.
381   * @param env the environment passed to the ProcedureExecutor
382   * @return true if the executor should wait the client ack for the result.
383   *         Defaults to return true.
384   */
385  protected boolean shouldWaitClientAck(TEnvironment env) {
386    return true;
387  }
388
389  /**
390   * Override this method to provide procedure specific counters for submitted count, failed
391   * count and time histogram.
392   * @param env The environment passed to the procedure executor
393   * @return Container object for procedure related metric
394   */
395  protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
396    return null;
397  }
398
399  /**
400   * This function will be called just when procedure is submitted for execution. Override this
401   * method to update the metrics at the beginning of the procedure. The default implementation
402   * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
403   * {@link ProcedureMetrics}.
404   */
405  protected void updateMetricsOnSubmit(TEnvironment env) {
406    ProcedureMetrics metrics = getProcedureMetrics(env);
407    if (metrics == null) {
408      return;
409    }
410
411    Counter submittedCounter = metrics.getSubmittedCounter();
412    if (submittedCounter != null) {
413      submittedCounter.increment();
414    }
415  }
416
417  /**
418   * This function will be called just after procedure execution is finished. Override this method
419   * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
420   * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
421   * time histogram for successfully completed procedures. Increments failed counter for failed
422   * procedures.
423   * <p/>
424   * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
425   * successfully finished siblings, this function may get called twice in certain cases for certain
426   * procedures. Explore further if this can be called once.
427   * @param env The environment passed to the procedure executor
428   * @param runtime Runtime of the procedure in milliseconds
429   * @param success true if procedure is completed successfully
430   */
431  protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
432    ProcedureMetrics metrics = getProcedureMetrics(env);
433    if (metrics == null) {
434      return;
435    }
436
437    if (success) {
438      Histogram timeHisto = metrics.getTimeHisto();
439      if (timeHisto != null) {
440        timeHisto.update(runtime);
441      }
442    } else {
443      Counter failedCounter = metrics.getFailedCounter();
444      if (failedCounter != null) {
445        failedCounter.increment();
446      }
447    }
448  }
449
450  @Override
451  public String toString() {
452    // Return the simple String presentation of the procedure.
453    return toStringSimpleSB().toString();
454  }
455
456  /**
457   * Build the StringBuilder for the simple form of procedure string.
458   * @return the StringBuilder
459   */
460  protected StringBuilder toStringSimpleSB() {
461    final StringBuilder sb = new StringBuilder();
462
463    sb.append("pid=");
464    sb.append(getProcId());
465
466    if (hasParent()) {
467      sb.append(", ppid=");
468      sb.append(getParentProcId());
469    }
470
471    /*
472     * TODO
473     * Enable later when this is being used.
474     * Currently owner not used.
475    if (hasOwner()) {
476      sb.append(", owner=");
477      sb.append(getOwner());
478    }*/
479
480    sb.append(", state="); // pState for Procedure State as opposed to any other kind.
481    toStringState(sb);
482
483    sb.append(", hasLock=").append(locked);
484
485    if (bypass) {
486      sb.append(", bypass=").append(bypass);
487    }
488
489    if (hasException()) {
490      sb.append(", exception=" + getException());
491    }
492
493    sb.append("; ");
494    toStringClassDetails(sb);
495
496    return sb;
497  }
498
499  /**
500   * Extend the toString() information with more procedure details
501   */
502  public String toStringDetails() {
503    final StringBuilder sb = toStringSimpleSB();
504
505    sb.append(" submittedTime=");
506    sb.append(getSubmittedTime());
507
508    sb.append(", lastUpdate=");
509    sb.append(getLastUpdate());
510
511    final int[] stackIndices = getStackIndexes();
512    if (stackIndices != null) {
513      sb.append("\n");
514      sb.append("stackIndexes=");
515      sb.append(Arrays.toString(stackIndices));
516    }
517
518    return sb.toString();
519  }
520
521  protected String toStringClass() {
522    StringBuilder sb = new StringBuilder();
523    toStringClassDetails(sb);
524    return sb.toString();
525  }
526
527  /**
528   * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
529   * generic Procedure State with Procedure particulars.
530   * @param builder Append current {@link ProcedureState}
531   */
532  protected void toStringState(StringBuilder builder) {
533    builder.append(getState());
534  }
535
536  /**
537   * Extend the toString() information with the procedure details
538   * e.g. className and parameters
539   * @param builder the string builder to use to append the proc specific information
540   */
541  protected void toStringClassDetails(StringBuilder builder) {
542    builder.append(getClass().getName());
543  }
544
545  // ==========================================================================
546  //  Those fields are unchanged after initialization.
547  //
548  //  Each procedure will get created from the user or during
549  //  ProcedureExecutor.start() during the load() phase and then submitted
550  //  to the executor. these fields will never be changed after initialization
551  // ==========================================================================
552  public long getProcId() {
553    return procId;
554  }
555
556  public boolean hasParent() {
557    return parentProcId != NO_PROC_ID;
558  }
559
560  public long getParentProcId() {
561    return parentProcId;
562  }
563
564  public long getRootProcId() {
565    return rootProcId;
566  }
567
568  public String getProcName() {
569    return toStringClass();
570  }
571
572  public NonceKey getNonceKey() {
573    return nonceKey;
574  }
575
576  public long getSubmittedTime() {
577    return submittedTime;
578  }
579
580  public String getOwner() {
581    return owner;
582  }
583
584  public boolean hasOwner() {
585    return owner != null;
586  }
587
588  /**
589   * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
590   */
591  @VisibleForTesting
592  protected void setProcId(long procId) {
593    this.procId = procId;
594    this.submittedTime = EnvironmentEdgeManager.currentTime();
595    setState(ProcedureState.RUNNABLE);
596  }
597
598  /**
599   * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
600   */
601  protected void setParentProcId(long parentProcId) {
602    this.parentProcId = parentProcId;
603  }
604
605  protected void setRootProcId(long rootProcId) {
606    this.rootProcId = rootProcId;
607  }
608
609  /**
610   * Called by the ProcedureExecutor to set the value to the newly created procedure.
611   */
612  @VisibleForTesting
613  protected void setNonceKey(NonceKey nonceKey) {
614    this.nonceKey = nonceKey;
615  }
616
617  @VisibleForTesting
618  public void setOwner(String owner) {
619    this.owner = StringUtils.isEmpty(owner) ? null : owner;
620  }
621
622  public void setOwner(User owner) {
623    assert owner != null : "expected owner to be not null";
624    setOwner(owner.getShortName());
625  }
626
627  /**
628   * Called on store load to initialize the Procedure internals after
629   * the creation/deserialization.
630   */
631  protected void setSubmittedTime(long submittedTime) {
632    this.submittedTime = submittedTime;
633  }
634
635  // ==========================================================================
636  //  runtime state - timeout related
637  // ==========================================================================
638  /**
639   * @param timeout timeout interval in msec
640   */
641  protected void setTimeout(int timeout) {
642    this.timeout = timeout;
643  }
644
645  public boolean hasTimeout() {
646    return timeout != NO_TIMEOUT;
647  }
648
649  /**
650   * @return the timeout in msec
651   */
652  public int getTimeout() {
653    return timeout;
654  }
655
656  /**
657   * Called on store load to initialize the Procedure internals after
658   * the creation/deserialization.
659   */
660  protected void setLastUpdate(long lastUpdate) {
661    this.lastUpdate = lastUpdate;
662  }
663
664  /**
665   * Called by ProcedureExecutor after each time a procedure step is executed.
666   */
667  protected void updateTimestamp() {
668    this.lastUpdate = EnvironmentEdgeManager.currentTime();
669  }
670
671  public long getLastUpdate() {
672    return lastUpdate;
673  }
674
675  /**
676   * Timeout of the next timeout.
677   * Called by the ProcedureExecutor if the procedure has timeout set and
678   * the procedure is in the waiting queue.
679   * @return the timestamp of the next timeout.
680   */
681  protected long getTimeoutTimestamp() {
682    return getLastUpdate() + getTimeout();
683  }
684
685  // ==========================================================================
686  //  runtime state
687  // ==========================================================================
688  /**
689   * @return the time elapsed between the last update and the start time of the procedure.
690   */
691  public long elapsedTime() {
692    return getLastUpdate() - getSubmittedTime();
693  }
694
695  /**
696   * @return the serialized result if any, otherwise null
697   */
698  public byte[] getResult() {
699    return result;
700  }
701
702  /**
703   * The procedure may leave a "result" on completion.
704   * @param result the serialized result that will be passed to the client
705   */
706  protected void setResult(byte[] result) {
707    this.result = result;
708  }
709
710  /**
711   * Will only be called when loading procedures from procedure store, where we need to record
712   * whether the procedure has already held a lock. Later we will call
713   * {@link #restoreLock(Object)} to actually acquire the lock.
714   */
715  final void lockedWhenLoading() {
716    this.lockedWhenLoading = true;
717  }
718
719  /**
720   * Can only be called when restarting, before the procedure actually being executed, as after we
721   * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset
722   * {@link #lockedWhenLoading} to false.
723   * <p/>
724   * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
725   * front of a queue.
726   */
727  public boolean isLockedWhenLoading() {
728    return lockedWhenLoading;
729  }
730
731  // ==============================================================================================
732  //  Runtime state, updated every operation by the ProcedureExecutor
733  //
734  //  There is always 1 thread at the time operating on the state of the procedure.
735  //  The ProcedureExecutor may check and set states, or some Procecedure may
736  //  update its own state. but no concurrent updates. we use synchronized here
737  //  just because the procedure can get scheduled on different executor threads on each step.
738  // ==============================================================================================
739
740  /**
741   * @return true if the procedure is in a RUNNABLE state.
742   */
743  public synchronized boolean isRunnable() {
744    return state == ProcedureState.RUNNABLE;
745  }
746
747  public synchronized boolean isInitializing() {
748    return state == ProcedureState.INITIALIZING;
749  }
750
751  /**
752   * @return true if the procedure has failed. It may or may not have rolled back.
753   */
754  public synchronized boolean isFailed() {
755    return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
756  }
757
758  /**
759   * @return true if the procedure is finished successfully.
760   */
761  public synchronized boolean isSuccess() {
762    return state == ProcedureState.SUCCESS && !hasException();
763  }
764
765  /**
766   * @return true if the procedure is finished. The Procedure may be completed successfully or
767   *         rolledback.
768   */
769  public synchronized boolean isFinished() {
770    return isSuccess() || state == ProcedureState.ROLLEDBACK;
771  }
772
773  /**
774   * @return true if the procedure is waiting for a child to finish or for an external event.
775   */
776  public synchronized boolean isWaiting() {
777    switch (state) {
778      case WAITING:
779      case WAITING_TIMEOUT:
780        return true;
781      default:
782        break;
783    }
784    return false;
785  }
786
787  @VisibleForTesting
788  protected synchronized void setState(final ProcedureState state) {
789    this.state = state;
790    updateTimestamp();
791  }
792
793  public synchronized ProcedureState getState() {
794    return state;
795  }
796
797  protected void setFailure(final String source, final Throwable cause) {
798    setFailure(new RemoteProcedureException(source, cause));
799  }
800
801  protected synchronized void setFailure(final RemoteProcedureException exception) {
802    this.exception = exception;
803    if (!isFinished()) {
804      setState(ProcedureState.FAILED);
805    }
806  }
807
808  protected void setAbortFailure(final String source, final String msg) {
809    setFailure(source, new ProcedureAbortedException(msg));
810  }
811
812  /**
813   * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
814   * <p/>
815   * Another usage for this method is to implement retrying. A procedure can set the state to
816   * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
817   * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
818   * call {@link #setTimeout(int)} method to set the timeout. And you should also override this
819   * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
820   * timeout event has been handled.
821   * @return true to let the framework handle the timeout as abort, false in case the procedure
822   *         handled the timeout itself.
823   */
824  protected synchronized boolean setTimeoutFailure(TEnvironment env) {
825    if (state == ProcedureState.WAITING_TIMEOUT) {
826      long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
827      setFailure("ProcedureExecutor",
828        new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
829      return true;
830    }
831    return false;
832  }
833
834  public synchronized boolean hasException() {
835    return exception != null;
836  }
837
838  public synchronized RemoteProcedureException getException() {
839    return exception;
840  }
841
842  /**
843   * Called by the ProcedureExecutor on procedure-load to restore the latch state
844   */
845  protected synchronized void setChildrenLatch(int numChildren) {
846    this.childrenLatch = numChildren;
847    if (LOG.isTraceEnabled()) {
848      LOG.trace("CHILD LATCH INCREMENT SET " +
849          this.childrenLatch, new Throwable(this.toString()));
850    }
851  }
852
853  /**
854   * Called by the ProcedureExecutor on procedure-load to restore the latch state
855   */
856  protected synchronized void incChildrenLatch() {
857    // TODO: can this be inferred from the stack? I think so...
858    this.childrenLatch++;
859    if (LOG.isTraceEnabled()) {
860      LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
861    }
862  }
863
864  /**
865   * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
866   */
867  private synchronized boolean childrenCountDown() {
868    assert childrenLatch > 0: this;
869    boolean b = --childrenLatch == 0;
870    if (LOG.isTraceEnabled()) {
871      LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
872    }
873    return b;
874  }
875
876  /**
877   * Try to set this procedure into RUNNABLE state.
878   * Succeeds if all subprocedures/children are done.
879   * @return True if we were able to move procedure to RUNNABLE state.
880   */
881  synchronized boolean tryRunnable() {
882    // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
883    if (getState() == ProcedureState.WAITING && childrenCountDown()) {
884      setState(ProcedureState.RUNNABLE);
885      return true;
886    } else {
887      return false;
888    }
889  }
890
891  protected synchronized boolean hasChildren() {
892    return childrenLatch > 0;
893  }
894
895  protected synchronized int getChildrenLatch() {
896    return childrenLatch;
897  }
898
899  /**
900   * Called by the RootProcedureState on procedure execution.
901   * Each procedure store its stack-index positions.
902   */
903  protected synchronized void addStackIndex(final int index) {
904    if (stackIndexes == null) {
905      stackIndexes = new int[] { index };
906    } else {
907      int count = stackIndexes.length;
908      stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
909      stackIndexes[count] = index;
910    }
911  }
912
913  protected synchronized boolean removeStackIndex() {
914    if (stackIndexes != null && stackIndexes.length > 1) {
915      stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
916      return false;
917    } else {
918      stackIndexes = null;
919      return true;
920    }
921  }
922
923  /**
924   * Called on store load to initialize the Procedure internals after
925   * the creation/deserialization.
926   */
927  protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
928    this.stackIndexes = new int[stackIndexes.size()];
929    for (int i = 0; i < this.stackIndexes.length; ++i) {
930      this.stackIndexes[i] = stackIndexes.get(i);
931    }
932  }
933
934  protected synchronized boolean wasExecuted() {
935    return stackIndexes != null;
936  }
937
938  protected synchronized int[] getStackIndexes() {
939    return stackIndexes;
940  }
941
942  // ==========================================================================
943  //  Internal methods - called by the ProcedureExecutor
944  // ==========================================================================
945
946  /**
947   * Internal method called by the ProcedureExecutor that starts the user-level code execute().
948   * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
949   *           skip out without changing states or releasing any locks held.
950   */
951  protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
952      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
953    try {
954      updateTimestamp();
955      if (bypass) {
956        LOG.info("{} bypassed, returning null to finish it", this);
957        return null;
958      }
959      return execute(env);
960    } finally {
961      updateTimestamp();
962    }
963  }
964
965  /**
966   * Internal method called by the ProcedureExecutor that starts the user-level code rollback().
967   */
968  protected void doRollback(TEnvironment env)
969      throws IOException, InterruptedException {
970    try {
971      updateTimestamp();
972      if (bypass) {
973        LOG.info("{} bypassed, skipping rollback", this);
974        return;
975      }
976      rollback(env);
977    } finally {
978      updateTimestamp();
979    }
980  }
981
982  final void restoreLock(TEnvironment env) {
983    if (!lockedWhenLoading) {
984      LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
985      return;
986    }
987
988    if (isFinished()) {
989      LOG.debug("{} is already finished, skip acquiring lock.", this);
990      return;
991    }
992
993    if (isBypass()) {
994      LOG.debug("{} is already bypassed, skip acquiring lock.", this);
995      return;
996    }
997    // this can happen if the parent stores the sub procedures but before it can
998    // release its lock, the master restarts
999    if (getState() == ProcedureState.WAITING && !holdLock(env)) {
1000      LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this);
1001      lockedWhenLoading = false;
1002      return;
1003    }
1004    LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
1005    LockState state = acquireLock(env);
1006    assert state == LockState.LOCK_ACQUIRED;
1007  }
1008
1009  /**
1010   * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
1011   */
1012  final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
1013    if (waitInitialized(env)) {
1014      return LockState.LOCK_EVENT_WAIT;
1015    }
1016    if (lockedWhenLoading) {
1017      // reset it so we will not consider it anymore
1018      lockedWhenLoading = false;
1019      locked = true;
1020      // Here we return without persist the locked state, as lockedWhenLoading is true means
1021      // that the locked field of the procedure stored in procedure store is true, so we do not need
1022      // to store it again.
1023      return LockState.LOCK_ACQUIRED;
1024    }
1025    LockState state = acquireLock(env);
1026    if (state == LockState.LOCK_ACQUIRED) {
1027      locked = true;
1028      // persist that we have held the lock. This must be done before we actually execute the
1029      // procedure, otherwise when restarting, we may consider the procedure does not have a lock,
1030      // but it may have already done some changes as we have already executed it, and if another
1031      // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
1032      // not expect that another procedure can be executed in the middle.
1033      store.update(this);
1034    }
1035    return state;
1036  }
1037
1038  /**
1039   * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
1040   */
1041  final void doReleaseLock(TEnvironment env, ProcedureStore store) {
1042    locked = false;
1043    // persist that we have released the lock. This must be done before we actually release the
1044    // lock. Another procedure may take this lock immediately after we release the lock, and if we
1045    // crash before persist the information that we have already released the lock, then when
1046    // restarting there will be two procedures which both have the lock and cause problems.
1047    if (getState() != ProcedureState.ROLLEDBACK) {
1048      // If the state is ROLLEDBACK, it means that we have already deleted the procedure from
1049      // procedure store, so do not need to log the release operation any more.
1050      store.update(this);
1051    }
1052    releaseLock(env);
1053  }
1054
1055  @Override
1056  public int compareTo(final Procedure<TEnvironment> other) {
1057    return Long.compare(getProcId(), other.getProcId());
1058  }
1059
1060  // ==========================================================================
1061  //  misc utils
1062  // ==========================================================================
1063
1064  /**
1065   * Get an hashcode for the specified Procedure ID
1066   * @return the hashcode for the specified procId
1067   */
1068  public static long getProcIdHashCode(long procId) {
1069    long h = procId;
1070    h ^= h >> 16;
1071    h *= 0x85ebca6b;
1072    h ^= h >> 13;
1073    h *= 0xc2b2ae35;
1074    h ^= h >> 16;
1075    return h;
1076  }
1077
1078  /**
1079   * Helper to lookup the root Procedure ID given a specified procedure.
1080   */
1081  protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
1082      Procedure<T> proc) {
1083    while (proc.hasParent()) {
1084      proc = procedures.get(proc.getParentProcId());
1085      if (proc == null) {
1086        return null;
1087      }
1088    }
1089    return proc.getProcId();
1090  }
1091
1092  /**
1093   * @param a the first procedure to be compared.
1094   * @param b the second procedure to be compared.
1095   * @return true if the two procedures have the same parent
1096   */
1097  public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
1098    return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
1099  }
1100}