001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Map;
024import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
025import org.apache.hadoop.hbase.metrics.Counter;
026import org.apache.hadoop.hbase.metrics.Histogram;
027import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
028import org.apache.hadoop.hbase.procedure2.util.StringUtils;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.NonceKey;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
039
040/**
041 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
042 * stack-indexes, etc.
043 * <p/>
044 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
045 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
046 * be called multiple times in the case of failure or a restart, so code must be idempotent. The
047 * return from an execute call is either: null to indicate we are done; ourself if there is more to
048 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes
049 * our execution.
050 * <p/>
051 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
052 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
053 * ProcedureState enum from protos:
054 * <ul>
055 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
056 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
057 * ROLLEDBACK state.</li>
058 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
059 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
060 * condition when scheduler/ executor will drop procedure from further processing is when procedure
061 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
062 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
063 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
064 * </ul>
065 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
066 * own state. This can lead to confusion. Try to keep the two distinct.
067 * <p/>
068 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
069 * step is supposed to cleanup the resources created during the execute() step. In case of failure
070 * and restart, rollback() may be called multiple times, so again the code must be idempotent.
071 * <p/>
072 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an
073 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
074 * locked for the life of a procedure -- not just the calls to execute -- then implementations
075 * should say so with the {@link #holdLock(Object)} method.
076 * <p/>
077 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
078 * implementation is a bit tricky so we add some comments hrre about it.
079 * <ul>
080 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
081 * whether we have the lock. We will set it to {@code true} in
082 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
083 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
084 * more.</li>
085 * <li>Also added a locked field in the proto message. When storing, the field will be set according
086 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
087 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
088 * message is {@code true}.</li>
089 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
090 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
091 * need to wait until master is initialized. So the solution here is that, we introduced a new
092 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
093 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
094 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
095 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
096 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
097 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
098 * {@code true}, when we just set the {@link #locked} field to true and return, without actually
099 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
100 * </ul>
101 * <p/>
102 * Procedures can be suspended or put in wait state with a callback that gets executed on
103 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and
104 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
105 * class for an example usage.
106 * </p>
107 * <p/>
108 * There are hooks for collecting metrics on submit of the procedure and on finish. See
109 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
110 */
111@InterfaceAudience.Private
112public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
113  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
114  public static final long NO_PROC_ID = -1;
115  protected static final int NO_TIMEOUT = -1;
116
117  public enum LockState {
118    LOCK_ACQUIRED,       // Lock acquired and ready to execute
119    LOCK_YIELD_WAIT,     // Lock not acquired, framework needs to yield
120    LOCK_EVENT_WAIT,     // Lock not acquired, an event will yield the procedure
121  }
122
123  // Unchanged after initialization
124  private NonceKey nonceKey = null;
125  private String owner = null;
126  private long parentProcId = NO_PROC_ID;
127  private long rootProcId = NO_PROC_ID;
128  private long procId = NO_PROC_ID;
129  private long submittedTime;
130
131  // Runtime state, updated every operation
132  private ProcedureState state = ProcedureState.INITIALIZING;
133  private RemoteProcedureException exception = null;
134  private int[] stackIndexes = null;
135  private int childrenLatch = 0;
136
137  private volatile int timeout = NO_TIMEOUT;
138  private volatile long lastUpdate;
139
140  private volatile byte[] result = null;
141
142  private volatile boolean locked = false;
143
144  private boolean lockedWhenLoading = false;
145
146  /**
147   * Used for override complete of the procedure without actually doing any logic in the procedure.
148   * If bypass is set to true, when executing it will return null when
149   * {@link #doExecute(Object)} is called to finish the procedure and release any locks
150   * it may currently hold. The bypass does cleanup around the Procedure as far as the
151   * Procedure framework is concerned. It does not clean any internal state that the
152   * Procedure's themselves may have set. That is for the Procedures to do themselves
153   * when bypass is called. They should override bypass and do their cleanup in the
154   * overridden bypass method (be sure to call the parent bypass to ensure proper
155   * processing).
156   * <p></p>Bypassing a procedure is not like aborting. Aborting a procedure will trigger
157   * a rollback. And since the {@link #abort(Object)} method is overrideable
158   * Some procedures may have chosen to ignore the aborting.
159   */
160  private volatile boolean bypass = false;
161
162  /**
163   * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to
164   * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the
165   * persistence of the procedure.
166   * <p/>
167   * This is useful when the procedure is in error and you want to retry later. The retry interval
168   * and the number of retries are usually not critical so skip the persistence can save some
169   * resources, and also speed up the restart processing.
170   * <p/>
171   * Notice that this value will be reset to true every time before execution. And when rolling back
172   * we do not test this value.
173   */
174  private boolean persist = true;
175
176  public boolean isBypass() {
177    return bypass;
178  }
179
180  /**
181   * Set the bypass to true.
182   * Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for now.
183   * DO NOT use this method alone, since we can't just bypass one single procedure. We need to
184   * bypass its ancestor too. If your Procedure has set state, it needs to undo it in here.
185   * @param env Current environment. May be null because of context; e.g. pretty-printing
186   *            procedure WALs where there is no 'environment' (and where Procedures that require
187   *            an 'environment' won't be run.
188   */
189  protected void bypass(TEnvironment env) {
190    this.bypass = true;
191  }
192
193  boolean needPersistence() {
194    return persist;
195  }
196
197  void resetPersistence() {
198    persist = true;
199  }
200
201  protected final void skipPersistence() {
202    persist = false;
203  }
204
205  /**
206   * The main code of the procedure. It must be idempotent since execute()
207   * may be called multiple times in case of machine failure in the middle
208   * of the execution.
209   * @param env the environment passed to the ProcedureExecutor
210   * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
211   *         procedure is done.
212   * @throws ProcedureYieldException the procedure will be added back to the queue and retried
213   *         later.
214   * @throws InterruptedException the procedure will be added back to the queue and retried later.
215   * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
216   *         and has set itself up waiting for an external event to wake it back up again.
217   */
218  protected abstract Procedure<TEnvironment>[] execute(TEnvironment env)
219    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
220
221  /**
222   * The code to undo what was done by the execute() code.
223   * It is called when the procedure or one of the sub-procedures failed or an
224   * abort was requested. It should cleanup all the resources created by
225   * the execute() call. The implementation must be idempotent since rollback()
226   * may be called multiple time in case of machine failure in the middle
227   * of the execution.
228   * @param env the environment passed to the ProcedureExecutor
229   * @throws IOException temporary failure, the rollback will retry later
230   * @throws InterruptedException the procedure will be added back to the queue and retried later
231   */
232  protected abstract void rollback(TEnvironment env)
233    throws IOException, InterruptedException;
234
235  /**
236   * The abort() call is asynchronous and each procedure must decide how to deal
237   * with it, if they want to be abortable. The simplest implementation
238   * is to have an AtomicBoolean set in the abort() method and then the execute()
239   * will check if the abort flag is set or not.
240   * abort() may be called multiple times from the client, so the implementation
241   * must be idempotent.
242   *
243   * <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification
244   * that allows the procedure implementor abort.
245   */
246  protected abstract boolean abort(TEnvironment env);
247
248  /**
249   * The user-level code of the procedure may have some state to
250   * persist (e.g. input arguments or current position in the processing state) to
251   * be able to resume on failure.
252   * @param serializer stores the serializable state
253   */
254  protected abstract void serializeStateData(ProcedureStateSerializer serializer)
255    throws IOException;
256
257  /**
258   * Called on store load to allow the user to decode the previously serialized
259   * state.
260   * @param serializer contains the serialized state
261   */
262  protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
263    throws IOException;
264
265  /**
266   * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
267   * call us to determine whether we need to wait for initialization, second, it will call
268   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
269   * <p/>
270   * This is because that when master restarts, we need to restore the lock state for all the
271   * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
272   * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
273   * of the initialization!), so we need to split the code into two steps, and when restore, we just
274   * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
275   * @return true means we need to wait until the environment has been initialized, otherwise true.
276   */
277  protected boolean waitInitialized(TEnvironment env) {
278    return false;
279  }
280
281  /**
282   * The user should override this method if they need a lock on an Entity. A lock can be anything,
283   * and it is up to the implementor. The Procedure Framework will call this method just before it
284   * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
285   * execute.
286   * <p/>
287   * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
288   * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
289   * <p/>
290   * Example: in our Master we can execute request in parallel for different tables. We can create
291   * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
292   * queued waiting that specific table create to happen.
293   * <p/>
294   * There are 3 LockState:
295   * <ul>
296   * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
297   * execute.</li>
298   * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
299   * take care of readding the procedure back to the runnable set for retry</li>
300   * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
301   * care of readding the procedure back to the runnable set when the lock is available.</li>
302   * </ul>
303   * @return the lock state as described above.
304   */
305  protected LockState acquireLock(TEnvironment env) {
306    return LockState.LOCK_ACQUIRED;
307  }
308
309  /**
310   * The user should override this method, and release lock if necessary.
311   */
312  protected void releaseLock(TEnvironment env) {
313    // no-op
314  }
315
316  /**
317   * Used to keep the procedure lock even when the procedure is yielding or suspended.
318   * @return true if the procedure should hold on the lock until completionCleanup()
319   */
320  protected boolean holdLock(TEnvironment env) {
321    return false;
322  }
323
324  /**
325   * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)}
326   * returns true, the procedure executor will call acquireLock() once and thereafter
327   * not call {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls
328   * release/acquire around each invocation of {@link #execute(Object)}.
329   * @see #holdLock(Object)
330   * @return true if the procedure has the lock, false otherwise.
331   */
332  public final boolean hasLock() {
333    return locked;
334  }
335
336  /**
337   * Called when the procedure is loaded for replay.
338   * The procedure implementor may use this method to perform some quick
339   * operation before replay.
340   * e.g. failing the procedure if the state on replay may be unknown.
341   */
342  protected void beforeReplay(TEnvironment env) {
343    // no-op
344  }
345
346  /**
347   * Called when the procedure is ready to be added to the queue after
348   * the loading/replay operation.
349   */
350  protected void afterReplay(TEnvironment env) {
351    // no-op
352  }
353
354  /**
355   * Called when the procedure is marked as completed (success or rollback).
356   * The procedure implementor may use this method to cleanup in-memory states.
357   * This operation will not be retried on failure. If a procedure took a lock,
358   * it will have been released when this method runs.
359   */
360  protected void completionCleanup(TEnvironment env) {
361    // no-op
362  }
363
364  /**
365   * By default, the procedure framework/executor will try to run procedures start to finish.
366   * Return true to make the executor yield between each execution step to
367   * give other procedures a chance to run.
368   * @param env the environment passed to the ProcedureExecutor
369   * @return Return true if the executor should yield on completion of an execution step.
370   *         Defaults to return false.
371   */
372  protected boolean isYieldAfterExecutionStep(TEnvironment env) {
373    return false;
374  }
375
376  /**
377   * By default, the executor will keep the procedure result around util
378   * the eviction TTL is expired. The client can cut down the waiting time
379   * by requesting that the result is removed from the executor.
380   * In case of system started procedure, we can force the executor to auto-ack.
381   * @param env the environment passed to the ProcedureExecutor
382   * @return true if the executor should wait the client ack for the result.
383   *         Defaults to return true.
384   */
385  protected boolean shouldWaitClientAck(TEnvironment env) {
386    return true;
387  }
388
389  /**
390   * Override this method to provide procedure specific counters for submitted count, failed
391   * count and time histogram.
392   * @param env The environment passed to the procedure executor
393   * @return Container object for procedure related metric
394   */
395  protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
396    return null;
397  }
398
399  /**
400   * This function will be called just when procedure is submitted for execution. Override this
401   * method to update the metrics at the beginning of the procedure. The default implementation
402   * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
403   * {@link ProcedureMetrics}.
404   */
405  protected void updateMetricsOnSubmit(TEnvironment env) {
406    ProcedureMetrics metrics = getProcedureMetrics(env);
407    if (metrics == null) {
408      return;
409    }
410
411    Counter submittedCounter = metrics.getSubmittedCounter();
412    if (submittedCounter != null) {
413      submittedCounter.increment();
414    }
415  }
416
417  /**
418   * This function will be called just after procedure execution is finished. Override this method
419   * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
420   * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
421   * time histogram for successfully completed procedures. Increments failed counter for failed
422   * procedures.
423   * <p/>
424   * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
425   * successfully finished siblings, this function may get called twice in certain cases for certain
426   * procedures. Explore further if this can be called once.
427   * @param env The environment passed to the procedure executor
428   * @param runtime Runtime of the procedure in milliseconds
429   * @param success true if procedure is completed successfully
430   */
431  protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
432    ProcedureMetrics metrics = getProcedureMetrics(env);
433    if (metrics == null) {
434      return;
435    }
436
437    if (success) {
438      Histogram timeHisto = metrics.getTimeHisto();
439      if (timeHisto != null) {
440        timeHisto.update(runtime);
441      }
442    } else {
443      Counter failedCounter = metrics.getFailedCounter();
444      if (failedCounter != null) {
445        failedCounter.increment();
446      }
447    }
448  }
449
450  @Override
451  public String toString() {
452    // Return the simple String presentation of the procedure.
453    return toStringSimpleSB().toString();
454  }
455
456  /**
457   * Build the StringBuilder for the simple form of procedure string.
458   * @return the StringBuilder
459   */
460  protected StringBuilder toStringSimpleSB() {
461    final StringBuilder sb = new StringBuilder();
462
463    sb.append("pid=");
464    sb.append(getProcId());
465
466    if (hasParent()) {
467      sb.append(", ppid=");
468      sb.append(getParentProcId());
469    }
470
471    /*
472     * TODO
473     * Enable later when this is being used.
474     * Currently owner not used.
475    if (hasOwner()) {
476      sb.append(", owner=");
477      sb.append(getOwner());
478    }*/
479
480    sb.append(", state="); // pState for Procedure State as opposed to any other kind.
481    toStringState(sb);
482
483    // Only print out locked if actually locked. Most of the time it is not.
484    if (this.locked) {
485      sb.append(", locked=").append(locked);
486    }
487
488    if (bypass) {
489      sb.append(", bypass=").append(bypass);
490    }
491
492    if (hasException()) {
493      sb.append(", exception=" + getException());
494    }
495
496    sb.append("; ");
497    toStringClassDetails(sb);
498
499    return sb;
500  }
501
502  /**
503   * Extend the toString() information with more procedure details
504   */
505  public String toStringDetails() {
506    final StringBuilder sb = toStringSimpleSB();
507
508    sb.append(" submittedTime=");
509    sb.append(getSubmittedTime());
510
511    sb.append(", lastUpdate=");
512    sb.append(getLastUpdate());
513
514    final int[] stackIndices = getStackIndexes();
515    if (stackIndices != null) {
516      sb.append("\n");
517      sb.append("stackIndexes=");
518      sb.append(Arrays.toString(stackIndices));
519    }
520
521    return sb.toString();
522  }
523
524  protected String toStringClass() {
525    StringBuilder sb = new StringBuilder();
526    toStringClassDetails(sb);
527    return sb.toString();
528  }
529
530  /**
531   * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
532   * generic Procedure State with Procedure particulars.
533   * @param builder Append current {@link ProcedureState}
534   */
535  protected void toStringState(StringBuilder builder) {
536    builder.append(getState());
537  }
538
539  /**
540   * Extend the toString() information with the procedure details
541   * e.g. className and parameters
542   * @param builder the string builder to use to append the proc specific information
543   */
544  protected void toStringClassDetails(StringBuilder builder) {
545    builder.append(getClass().getName());
546  }
547
548  // ==========================================================================
549  //  Those fields are unchanged after initialization.
550  //
551  //  Each procedure will get created from the user or during
552  //  ProcedureExecutor.start() during the load() phase and then submitted
553  //  to the executor. these fields will never be changed after initialization
554  // ==========================================================================
555  public long getProcId() {
556    return procId;
557  }
558
559  public boolean hasParent() {
560    return parentProcId != NO_PROC_ID;
561  }
562
563  public long getParentProcId() {
564    return parentProcId;
565  }
566
567  public long getRootProcId() {
568    return rootProcId;
569  }
570
571  public String getProcName() {
572    return toStringClass();
573  }
574
575  public NonceKey getNonceKey() {
576    return nonceKey;
577  }
578
579  public long getSubmittedTime() {
580    return submittedTime;
581  }
582
583  public String getOwner() {
584    return owner;
585  }
586
587  public boolean hasOwner() {
588    return owner != null;
589  }
590
591  /**
592   * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
593   */
594  @VisibleForTesting
595  protected void setProcId(long procId) {
596    this.procId = procId;
597    this.submittedTime = EnvironmentEdgeManager.currentTime();
598    setState(ProcedureState.RUNNABLE);
599  }
600
601  /**
602   * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
603   */
604  protected void setParentProcId(long parentProcId) {
605    this.parentProcId = parentProcId;
606  }
607
608  protected void setRootProcId(long rootProcId) {
609    this.rootProcId = rootProcId;
610  }
611
612  /**
613   * Called by the ProcedureExecutor to set the value to the newly created procedure.
614   */
615  @VisibleForTesting
616  protected void setNonceKey(NonceKey nonceKey) {
617    this.nonceKey = nonceKey;
618  }
619
620  @VisibleForTesting
621  public void setOwner(String owner) {
622    this.owner = StringUtils.isEmpty(owner) ? null : owner;
623  }
624
625  public void setOwner(User owner) {
626    assert owner != null : "expected owner to be not null";
627    setOwner(owner.getShortName());
628  }
629
630  /**
631   * Called on store load to initialize the Procedure internals after
632   * the creation/deserialization.
633   */
634  protected void setSubmittedTime(long submittedTime) {
635    this.submittedTime = submittedTime;
636  }
637
638  // ==========================================================================
639  //  runtime state - timeout related
640  // ==========================================================================
641  /**
642   * @param timeout timeout interval in msec
643   */
644  protected void setTimeout(int timeout) {
645    this.timeout = timeout;
646  }
647
648  public boolean hasTimeout() {
649    return timeout != NO_TIMEOUT;
650  }
651
652  /**
653   * @return the timeout in msec
654   */
655  public int getTimeout() {
656    return timeout;
657  }
658
659  /**
660   * Called on store load to initialize the Procedure internals after
661   * the creation/deserialization.
662   */
663  protected void setLastUpdate(long lastUpdate) {
664    this.lastUpdate = lastUpdate;
665  }
666
667  /**
668   * Called by ProcedureExecutor after each time a procedure step is executed.
669   */
670  protected void updateTimestamp() {
671    this.lastUpdate = EnvironmentEdgeManager.currentTime();
672  }
673
674  public long getLastUpdate() {
675    return lastUpdate;
676  }
677
678  /**
679   * Timeout of the next timeout.
680   * Called by the ProcedureExecutor if the procedure has timeout set and
681   * the procedure is in the waiting queue.
682   * @return the timestamp of the next timeout.
683   */
684  protected long getTimeoutTimestamp() {
685    return getLastUpdate() + getTimeout();
686  }
687
688  // ==========================================================================
689  //  runtime state
690  // ==========================================================================
691  /**
692   * @return the time elapsed between the last update and the start time of the procedure.
693   */
694  public long elapsedTime() {
695    return getLastUpdate() - getSubmittedTime();
696  }
697
698  /**
699   * @return the serialized result if any, otherwise null
700   */
701  public byte[] getResult() {
702    return result;
703  }
704
705  /**
706   * The procedure may leave a "result" on completion.
707   * @param result the serialized result that will be passed to the client
708   */
709  protected void setResult(byte[] result) {
710    this.result = result;
711  }
712
713  /**
714   * Will only be called when loading procedures from procedure store, where we need to record
715   * whether the procedure has already held a lock. Later we will call
716   * {@link #restoreLock(Object)} to actually acquire the lock.
717   */
718  final void lockedWhenLoading() {
719    this.lockedWhenLoading = true;
720  }
721
722  /**
723   * Can only be called when restarting, before the procedure actually being executed, as after we
724   * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset
725   * {@link #lockedWhenLoading} to false.
726   * <p/>
727   * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
728   * front of a queue.
729   */
730  public boolean isLockedWhenLoading() {
731    return lockedWhenLoading;
732  }
733
734  // ==============================================================================================
735  //  Runtime state, updated every operation by the ProcedureExecutor
736  //
737  //  There is always 1 thread at the time operating on the state of the procedure.
738  //  The ProcedureExecutor may check and set states, or some Procecedure may
739  //  update its own state. but no concurrent updates. we use synchronized here
740  //  just because the procedure can get scheduled on different executor threads on each step.
741  // ==============================================================================================
742
743  /**
744   * @return true if the procedure is in a RUNNABLE state.
745   */
746  public synchronized boolean isRunnable() {
747    return state == ProcedureState.RUNNABLE;
748  }
749
750  public synchronized boolean isInitializing() {
751    return state == ProcedureState.INITIALIZING;
752  }
753
754  /**
755   * @return true if the procedure has failed. It may or may not have rolled back.
756   */
757  public synchronized boolean isFailed() {
758    return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
759  }
760
761  /**
762   * @return true if the procedure is finished successfully.
763   */
764  public synchronized boolean isSuccess() {
765    return state == ProcedureState.SUCCESS && !hasException();
766  }
767
768  /**
769   * @return true if the procedure is finished. The Procedure may be completed successfully or
770   *         rolledback.
771   */
772  public synchronized boolean isFinished() {
773    return isSuccess() || state == ProcedureState.ROLLEDBACK;
774  }
775
776  /**
777   * @return true if the procedure is waiting for a child to finish or for an external event.
778   */
779  public synchronized boolean isWaiting() {
780    switch (state) {
781      case WAITING:
782      case WAITING_TIMEOUT:
783        return true;
784      default:
785        break;
786    }
787    return false;
788  }
789
790  @VisibleForTesting
791  protected synchronized void setState(final ProcedureState state) {
792    this.state = state;
793    updateTimestamp();
794  }
795
796  public synchronized ProcedureState getState() {
797    return state;
798  }
799
800  protected void setFailure(final String source, final Throwable cause) {
801    setFailure(new RemoteProcedureException(source, cause));
802  }
803
804  protected synchronized void setFailure(final RemoteProcedureException exception) {
805    this.exception = exception;
806    if (!isFinished()) {
807      setState(ProcedureState.FAILED);
808    }
809  }
810
811  protected void setAbortFailure(final String source, final String msg) {
812    setFailure(source, new ProcedureAbortedException(msg));
813  }
814
815  /**
816   * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
817   * <p/>
818   * Another usage for this method is to implement retrying. A procedure can set the state to
819   * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
820   * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
821   * call {@link #setTimeout(int)} method to set the timeout. And you should also override this
822   * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
823   * timeout event has been handled.
824   * @return true to let the framework handle the timeout as abort, false in case the procedure
825   *         handled the timeout itself.
826   */
827  protected synchronized boolean setTimeoutFailure(TEnvironment env) {
828    if (state == ProcedureState.WAITING_TIMEOUT) {
829      long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
830      setFailure("ProcedureExecutor",
831        new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
832      return true;
833    }
834    return false;
835  }
836
837  public synchronized boolean hasException() {
838    return exception != null;
839  }
840
841  public synchronized RemoteProcedureException getException() {
842    return exception;
843  }
844
845  /**
846   * Called by the ProcedureExecutor on procedure-load to restore the latch state
847   */
848  protected synchronized void setChildrenLatch(int numChildren) {
849    this.childrenLatch = numChildren;
850    if (LOG.isTraceEnabled()) {
851      LOG.trace("CHILD LATCH INCREMENT SET " +
852          this.childrenLatch, new Throwable(this.toString()));
853    }
854  }
855
856  /**
857   * Called by the ProcedureExecutor on procedure-load to restore the latch state
858   */
859  protected synchronized void incChildrenLatch() {
860    // TODO: can this be inferred from the stack? I think so...
861    this.childrenLatch++;
862    if (LOG.isTraceEnabled()) {
863      LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
864    }
865  }
866
867  /**
868   * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
869   */
870  private synchronized boolean childrenCountDown() {
871    assert childrenLatch > 0: this;
872    boolean b = --childrenLatch == 0;
873    if (LOG.isTraceEnabled()) {
874      LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
875    }
876    return b;
877  }
878
879  /**
880   * Try to set this procedure into RUNNABLE state.
881   * Succeeds if all subprocedures/children are done.
882   * @return True if we were able to move procedure to RUNNABLE state.
883   */
884  synchronized boolean tryRunnable() {
885    // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
886    if (getState() == ProcedureState.WAITING && childrenCountDown()) {
887      setState(ProcedureState.RUNNABLE);
888      return true;
889    } else {
890      return false;
891    }
892  }
893
894  protected synchronized boolean hasChildren() {
895    return childrenLatch > 0;
896  }
897
898  protected synchronized int getChildrenLatch() {
899    return childrenLatch;
900  }
901
902  /**
903   * Called by the RootProcedureState on procedure execution.
904   * Each procedure store its stack-index positions.
905   */
906  protected synchronized void addStackIndex(final int index) {
907    if (stackIndexes == null) {
908      stackIndexes = new int[] { index };
909    } else {
910      int count = stackIndexes.length;
911      stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
912      stackIndexes[count] = index;
913    }
914  }
915
916  protected synchronized boolean removeStackIndex() {
917    if (stackIndexes != null && stackIndexes.length > 1) {
918      stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
919      return false;
920    } else {
921      stackIndexes = null;
922      return true;
923    }
924  }
925
926  /**
927   * Called on store load to initialize the Procedure internals after
928   * the creation/deserialization.
929   */
930  protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
931    this.stackIndexes = new int[stackIndexes.size()];
932    for (int i = 0; i < this.stackIndexes.length; ++i) {
933      this.stackIndexes[i] = stackIndexes.get(i);
934    }
935  }
936
937  protected synchronized boolean wasExecuted() {
938    return stackIndexes != null;
939  }
940
941  protected synchronized int[] getStackIndexes() {
942    return stackIndexes;
943  }
944
945  // ==========================================================================
946  //  Internal methods - called by the ProcedureExecutor
947  // ==========================================================================
948
949  /**
950   * Internal method called by the ProcedureExecutor that starts the user-level code execute().
951   * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
952   *           skip out without changing states or releasing any locks held.
953   */
954  protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
955      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
956    try {
957      updateTimestamp();
958      if (bypass) {
959        LOG.info("{} bypassed, returning null to finish it", this);
960        return null;
961      }
962      return execute(env);
963    } finally {
964      updateTimestamp();
965    }
966  }
967
968  /**
969   * Internal method called by the ProcedureExecutor that starts the user-level code rollback().
970   */
971  protected void doRollback(TEnvironment env)
972      throws IOException, InterruptedException {
973    try {
974      updateTimestamp();
975      if (bypass) {
976        LOG.info("{} bypassed, skipping rollback", this);
977        return;
978      }
979      rollback(env);
980    } finally {
981      updateTimestamp();
982    }
983  }
984
985  final void restoreLock(TEnvironment env) {
986    if (!lockedWhenLoading) {
987      LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
988      return;
989    }
990
991    if (isFinished()) {
992      LOG.debug("{} is already finished, skip acquiring lock.", this);
993      return;
994    }
995
996    if (isBypass()) {
997      LOG.debug("{} is already bypassed, skip acquiring lock.", this);
998      return;
999    }
1000    // this can happen if the parent stores the sub procedures but before it can
1001    // release its lock, the master restarts
1002    if (getState() == ProcedureState.WAITING && !holdLock(env)) {
1003      LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this);
1004      lockedWhenLoading = false;
1005      return;
1006    }
1007    LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
1008    LockState state = acquireLock(env);
1009    assert state == LockState.LOCK_ACQUIRED;
1010  }
1011
1012  /**
1013   * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
1014   */
1015  final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
1016    if (waitInitialized(env)) {
1017      return LockState.LOCK_EVENT_WAIT;
1018    }
1019    if (lockedWhenLoading) {
1020      // reset it so we will not consider it anymore
1021      lockedWhenLoading = false;
1022      locked = true;
1023      // Here we return without persist the locked state, as lockedWhenLoading is true means
1024      // that the locked field of the procedure stored in procedure store is true, so we do not need
1025      // to store it again.
1026      return LockState.LOCK_ACQUIRED;
1027    }
1028    LockState state = acquireLock(env);
1029    if (state == LockState.LOCK_ACQUIRED) {
1030      locked = true;
1031      // persist that we have held the lock. This must be done before we actually execute the
1032      // procedure, otherwise when restarting, we may consider the procedure does not have a lock,
1033      // but it may have already done some changes as we have already executed it, and if another
1034      // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
1035      // not expect that another procedure can be executed in the middle.
1036      store.update(this);
1037    }
1038    return state;
1039  }
1040
1041  /**
1042   * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
1043   */
1044  final void doReleaseLock(TEnvironment env, ProcedureStore store) {
1045    locked = false;
1046    // persist that we have released the lock. This must be done before we actually release the
1047    // lock. Another procedure may take this lock immediately after we release the lock, and if we
1048    // crash before persist the information that we have already released the lock, then when
1049    // restarting there will be two procedures which both have the lock and cause problems.
1050    if (getState() != ProcedureState.ROLLEDBACK) {
1051      // If the state is ROLLEDBACK, it means that we have already deleted the procedure from
1052      // procedure store, so do not need to log the release operation any more.
1053      store.update(this);
1054    }
1055    releaseLock(env);
1056  }
1057
1058  @Override
1059  public int compareTo(final Procedure<TEnvironment> other) {
1060    return Long.compare(getProcId(), other.getProcId());
1061  }
1062
1063  // ==========================================================================
1064  //  misc utils
1065  // ==========================================================================
1066
1067  /**
1068   * Get an hashcode for the specified Procedure ID
1069   * @return the hashcode for the specified procId
1070   */
1071  public static long getProcIdHashCode(long procId) {
1072    long h = procId;
1073    h ^= h >> 16;
1074    h *= 0x85ebca6b;
1075    h ^= h >> 13;
1076    h *= 0xc2b2ae35;
1077    h ^= h >> 16;
1078    return h;
1079  }
1080
1081  /**
1082   * Helper to lookup the root Procedure ID given a specified procedure.
1083   */
1084  protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
1085      Procedure<T> proc) {
1086    while (proc.hasParent()) {
1087      proc = procedures.get(proc.getParentProcId());
1088      if (proc == null) {
1089        return null;
1090      }
1091    }
1092    return proc.getProcId();
1093  }
1094
1095  /**
1096   * @param a the first procedure to be compared.
1097   * @param b the second procedure to be compared.
1098   * @return true if the two procedures have the same parent
1099   */
1100  public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
1101    return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
1102  }
1103}