001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Map;
024import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
025import org.apache.hadoop.hbase.metrics.Counter;
026import org.apache.hadoop.hbase.metrics.Histogram;
027import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
028import org.apache.hadoop.hbase.procedure2.util.StringUtils;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.NonceKey;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
037
038/**
039 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
040 * stack-indexes, etc.
041 * <p/>
042 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
043 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
044 * be called multiple times in the case of failure or a restart, so code must be idempotent. The
045 * return from an execute call is either: null to indicate we are done; ourself if there is more to
046 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes
047 * our execution.
048 * <p/>
049 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
050 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
051 * ProcedureState enum from protos:
052 * <ul>
053 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
054 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
055 * ROLLEDBACK state.</li>
056 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
057 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
058 * condition when scheduler/ executor will drop procedure from further processing is when procedure
059 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
060 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
061 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
062 * </ul>
063 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
064 * own state. This can lead to confusion. Try to keep the two distinct.
065 * <p/>
066 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
067 * step is supposed to cleanup the resources created during the execute() step. In case of failure
068 * and restart, rollback() may be called multiple times, so again the code must be idempotent.
069 * <p/>
070 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an
071 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
072 * locked for the life of a procedure -- not just the calls to execute -- then implementations
073 * should say so with the {@link #holdLock(Object)} method.
074 * <p/>
075 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
076 * implementation is a bit tricky so we add some comments hrre about it.
077 * <ul>
078 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
079 * whether we have the lock. We will set it to {@code true} in
080 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
081 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
082 * more.</li>
083 * <li>Also added a locked field in the proto message. When storing, the field will be set according
084 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
085 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
086 * message is {@code true}.</li>
087 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
088 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
089 * need to wait until master is initialized. So the solution here is that, we introduced a new
090 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
091 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
092 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
093 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
094 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
095 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
096 * {@code true}, when we just set the {@link #locked} field to true and return, without actually
097 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
098 * </ul>
099 * <p/>
100 * Procedures can be suspended or put in wait state with a callback that gets executed on
101 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and
102 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
103 * class for an example usage.
104 * </p>
105 * <p/>
106 * There are hooks for collecting metrics on submit of the procedure and on finish. See
107 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
108 */
109@InterfaceAudience.Private
110public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
111  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
112  public static final long NO_PROC_ID = -1;
113  protected static final int NO_TIMEOUT = -1;
114
115  public enum LockState {
116    LOCK_ACQUIRED, // Lock acquired and ready to execute
117    LOCK_YIELD_WAIT, // Lock not acquired, framework needs to yield
118    LOCK_EVENT_WAIT, // Lock not acquired, an event will yield the procedure
119  }
120
121  // Unchanged after initialization
122  private NonceKey nonceKey = null;
123  private String owner = null;
124  private long parentProcId = NO_PROC_ID;
125  private long rootProcId = NO_PROC_ID;
126  private long procId = NO_PROC_ID;
127  private long submittedTime;
128
129  // Runtime state, updated every operation
130  private ProcedureState state = ProcedureState.INITIALIZING;
131  private RemoteProcedureException exception = null;
132  private int[] stackIndexes = null;
133  private int childrenLatch = 0;
134
135  private volatile int timeout = NO_TIMEOUT;
136  private volatile long lastUpdate;
137
138  private volatile byte[] result = null;
139
140  private volatile boolean locked = false;
141
142  private boolean lockedWhenLoading = false;
143
144  /**
145   * Used for override complete of the procedure without actually doing any logic in the procedure.
146   * If bypass is set to true, when executing it will return null when {@link #doExecute(Object)} is
147   * called to finish the procedure and release any locks it may currently hold. The bypass does
148   * cleanup around the Procedure as far as the Procedure framework is concerned. It does not clean
149   * any internal state that the Procedure's themselves may have set. That is for the Procedures to
150   * do themselves when bypass is called. They should override bypass and do their cleanup in the
151   * overridden bypass method (be sure to call the parent bypass to ensure proper processing).
152   * <p>
153   * </p>
154   * Bypassing a procedure is not like aborting. Aborting a procedure will trigger a rollback. And
155   * since the {@link #abort(Object)} method is overrideable Some procedures may have chosen to
156   * ignore the aborting.
157   */
158  private volatile boolean bypass = false;
159
160  /**
161   * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to
162   * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the
163   * persistence of the procedure.
164   * <p/>
165   * This is useful when the procedure is in error and you want to retry later. The retry interval
166   * and the number of retries are usually not critical so skip the persistence can save some
167   * resources, and also speed up the restart processing.
168   * <p/>
169   * Notice that this value will be reset to true every time before execution. And when rolling back
170   * we do not test this value.
171   */
172  private boolean persist = true;
173
174  public boolean isBypass() {
175    return bypass;
176  }
177
178  /**
179   * Set the bypass to true. Only called in
180   * {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for now. DO NOT use
181   * this method alone, since we can't just bypass one single procedure. We need to bypass its
182   * ancestor too. If your Procedure has set state, it needs to undo it in here.
183   * @param env Current environment. May be null because of context; e.g. pretty-printing procedure
184   *            WALs where there is no 'environment' (and where Procedures that require an
185   *            'environment' won't be run.
186   */
187  protected void bypass(TEnvironment env) {
188    this.bypass = true;
189  }
190
191  boolean needPersistence() {
192    return persist;
193  }
194
195  void resetPersistence() {
196    persist = true;
197  }
198
199  protected final void skipPersistence() {
200    persist = false;
201  }
202
203  /**
204   * The main code of the procedure. It must be idempotent since execute() may be called multiple
205   * times in case of machine failure in the middle of the execution.
206   * @param env the environment passed to the ProcedureExecutor
207   * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
208   *         procedure is done.
209   * @throws ProcedureYieldException     the procedure will be added back to the queue and retried
210   *                                     later.
211   * @throws InterruptedException        the procedure will be added back to the queue and retried
212   *                                     later.
213   * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
214   *                                     and has set itself up waiting for an external event to wake
215   *                                     it back up again.
216   */
217  protected abstract Procedure<TEnvironment>[] execute(TEnvironment env)
218    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
219
220  /**
221   * The code to undo what was done by the execute() code. It is called when the procedure or one of
222   * the sub-procedures failed or an abort was requested. It should cleanup all the resources
223   * created by the execute() call. The implementation must be idempotent since rollback() may be
224   * called multiple time in case of machine failure in the middle of the execution.
225   * @param env the environment passed to the ProcedureExecutor
226   * @throws IOException          temporary failure, the rollback will retry later
227   * @throws InterruptedException the procedure will be added back to the queue and retried later
228   */
229  protected abstract void rollback(TEnvironment env) throws IOException, InterruptedException;
230
231  /**
232   * The abort() call is asynchronous and each procedure must decide how to deal with it, if they
233   * want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
234   * abort() method and then the execute() will check if the abort flag is set or not. abort() may
235   * be called multiple times from the client, so the implementation must be idempotent.
236   * <p>
237   * NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
238   * procedure implementor abort.
239   */
240  protected abstract boolean abort(TEnvironment env);
241
242  /**
243   * The user-level code of the procedure may have some state to persist (e.g. input arguments or
244   * current position in the processing state) to be able to resume on failure.
245   * @param serializer stores the serializable state
246   */
247  protected abstract void serializeStateData(ProcedureStateSerializer serializer)
248    throws IOException;
249
250  /**
251   * Called on store load to allow the user to decode the previously serialized state.
252   * @param serializer contains the serialized state
253   */
254  protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
255    throws IOException;
256
257  /**
258   * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
259   * call us to determine whether we need to wait for initialization, second, it will call
260   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
261   * <p/>
262   * This is because that when master restarts, we need to restore the lock state for all the
263   * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
264   * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
265   * of the initialization!), so we need to split the code into two steps, and when restore, we just
266   * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
267   * @return true means we need to wait until the environment has been initialized, otherwise true.
268   */
269  protected boolean waitInitialized(TEnvironment env) {
270    return false;
271  }
272
273  /**
274   * The user should override this method if they need a lock on an Entity. A lock can be anything,
275   * and it is up to the implementor. The Procedure Framework will call this method just before it
276   * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
277   * execute.
278   * <p/>
279   * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
280   * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
281   * <p/>
282   * Example: in our Master we can execute request in parallel for different tables. We can create
283   * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
284   * queued waiting that specific table create to happen.
285   * <p/>
286   * There are 3 LockState:
287   * <ul>
288   * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
289   * execute.</li>
290   * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
291   * take care of readding the procedure back to the runnable set for retry</li>
292   * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
293   * care of readding the procedure back to the runnable set when the lock is available.</li>
294   * </ul>
295   * @return the lock state as described above.
296   */
297  protected LockState acquireLock(TEnvironment env) {
298    return LockState.LOCK_ACQUIRED;
299  }
300
301  /**
302   * The user should override this method, and release lock if necessary.
303   */
304  protected void releaseLock(TEnvironment env) {
305    // no-op
306  }
307
308  /**
309   * Used to keep the procedure lock even when the procedure is yielding or suspended.
310   * @return true if the procedure should hold on the lock until completionCleanup()
311   */
312  protected boolean holdLock(TEnvironment env) {
313    return false;
314  }
315
316  /**
317   * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)}
318   * returns true, the procedure executor will call acquireLock() once and thereafter not call
319   * {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls release/acquire
320   * around each invocation of {@link #execute(Object)}.
321   * @see #holdLock(Object)
322   * @return true if the procedure has the lock, false otherwise.
323   */
324  public final boolean hasLock() {
325    return locked;
326  }
327
328  /**
329   * Called when the procedure is loaded for replay. The procedure implementor may use this method
330   * to perform some quick operation before replay. e.g. failing the procedure if the state on
331   * replay may be unknown.
332   */
333  protected void beforeReplay(TEnvironment env) {
334    // no-op
335  }
336
337  /**
338   * Called when the procedure is ready to be added to the queue after the loading/replay operation.
339   */
340  protected void afterReplay(TEnvironment env) {
341    // no-op
342  }
343
344  /**
345   * Called when the procedure is marked as completed (success or rollback). The procedure
346   * implementor may use this method to cleanup in-memory states. This operation will not be retried
347   * on failure. If a procedure took a lock, it will have been released when this method runs.
348   */
349  protected void completionCleanup(TEnvironment env) {
350    // no-op
351  }
352
353  /**
354   * By default, the procedure framework/executor will try to run procedures start to finish. Return
355   * true to make the executor yield between each execution step to give other procedures a chance
356   * to run.
357   * @param env the environment passed to the ProcedureExecutor
358   * @return Return true if the executor should yield on completion of an execution step. Defaults
359   *         to return false.
360   */
361  protected boolean isYieldAfterExecutionStep(TEnvironment env) {
362    return false;
363  }
364
365  /**
366   * By default, the executor will keep the procedure result around util the eviction TTL is
367   * expired. The client can cut down the waiting time by requesting that the result is removed from
368   * the executor. In case of system started procedure, we can force the executor to auto-ack.
369   * @param env the environment passed to the ProcedureExecutor
370   * @return true if the executor should wait the client ack for the result. Defaults to return
371   *         true.
372   */
373  protected boolean shouldWaitClientAck(TEnvironment env) {
374    return true;
375  }
376
377  /**
378   * Override this method to provide procedure specific counters for submitted count, failed count
379   * and time histogram.
380   * @param env The environment passed to the procedure executor
381   * @return Container object for procedure related metric
382   */
383  protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
384    return null;
385  }
386
387  /**
388   * This function will be called just when procedure is submitted for execution. Override this
389   * method to update the metrics at the beginning of the procedure. The default implementation
390   * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
391   * {@link ProcedureMetrics}.
392   */
393  protected void updateMetricsOnSubmit(TEnvironment env) {
394    ProcedureMetrics metrics = getProcedureMetrics(env);
395    if (metrics == null) {
396      return;
397    }
398
399    Counter submittedCounter = metrics.getSubmittedCounter();
400    if (submittedCounter != null) {
401      submittedCounter.increment();
402    }
403  }
404
405  /**
406   * This function will be called just after procedure execution is finished. Override this method
407   * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
408   * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
409   * time histogram for successfully completed procedures. Increments failed counter for failed
410   * procedures.
411   * <p/>
412   * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
413   * successfully finished siblings, this function may get called twice in certain cases for certain
414   * procedures. Explore further if this can be called once.
415   * @param env     The environment passed to the procedure executor
416   * @param runtime Runtime of the procedure in milliseconds
417   * @param success true if procedure is completed successfully
418   */
419  protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
420    ProcedureMetrics metrics = getProcedureMetrics(env);
421    if (metrics == null) {
422      return;
423    }
424
425    if (success) {
426      Histogram timeHisto = metrics.getTimeHisto();
427      if (timeHisto != null) {
428        timeHisto.update(runtime);
429      }
430    } else {
431      Counter failedCounter = metrics.getFailedCounter();
432      if (failedCounter != null) {
433        failedCounter.increment();
434      }
435    }
436  }
437
438  @Override
439  public String toString() {
440    // Return the simple String presentation of the procedure.
441    return toStringSimpleSB().toString();
442  }
443
444  /**
445   * Build the StringBuilder for the simple form of procedure string.
446   * @return the StringBuilder
447   */
448  protected StringBuilder toStringSimpleSB() {
449    final StringBuilder sb = new StringBuilder();
450
451    sb.append("pid=");
452    sb.append(getProcId());
453
454    if (hasParent()) {
455      sb.append(", ppid=");
456      sb.append(getParentProcId());
457    }
458
459    /*
460     * TODO Enable later when this is being used. Currently owner not used. if (hasOwner()) {
461     * sb.append(", owner="); sb.append(getOwner()); }
462     */
463
464    sb.append(", state="); // pState for Procedure State as opposed to any other kind.
465    toStringState(sb);
466
467    // Only print out locked if actually locked. Most of the time it is not.
468    if (this.locked) {
469      sb.append(", locked=").append(locked);
470    }
471
472    if (bypass) {
473      sb.append(", bypass=").append(bypass);
474    }
475
476    if (hasException()) {
477      sb.append(", exception=" + getException());
478    }
479
480    sb.append("; ");
481    toStringClassDetails(sb);
482
483    return sb;
484  }
485
486  /**
487   * Extend the toString() information with more procedure details
488   */
489  public String toStringDetails() {
490    final StringBuilder sb = toStringSimpleSB();
491
492    sb.append(" submittedTime=");
493    sb.append(getSubmittedTime());
494
495    sb.append(", lastUpdate=");
496    sb.append(getLastUpdate());
497
498    final int[] stackIndices = getStackIndexes();
499    if (stackIndices != null) {
500      sb.append("\n");
501      sb.append("stackIndexes=");
502      sb.append(Arrays.toString(stackIndices));
503    }
504
505    return sb.toString();
506  }
507
508  protected String toStringClass() {
509    StringBuilder sb = new StringBuilder();
510    toStringClassDetails(sb);
511    return sb.toString();
512  }
513
514  /**
515   * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
516   * generic Procedure State with Procedure particulars.
517   * @param builder Append current {@link ProcedureState}
518   */
519  protected void toStringState(StringBuilder builder) {
520    builder.append(getState());
521  }
522
523  /**
524   * Extend the toString() information with the procedure details e.g. className and parameters
525   * @param builder the string builder to use to append the proc specific information
526   */
527  protected void toStringClassDetails(StringBuilder builder) {
528    builder.append(getClass().getName());
529  }
530
531  // ==========================================================================
532  // Those fields are unchanged after initialization.
533  //
534  // Each procedure will get created from the user or during
535  // ProcedureExecutor.start() during the load() phase and then submitted
536  // to the executor. these fields will never be changed after initialization
537  // ==========================================================================
538  public long getProcId() {
539    return procId;
540  }
541
542  public boolean hasParent() {
543    return parentProcId != NO_PROC_ID;
544  }
545
546  public long getParentProcId() {
547    return parentProcId;
548  }
549
550  public long getRootProcId() {
551    return rootProcId;
552  }
553
554  public String getProcName() {
555    return toStringClass();
556  }
557
558  public NonceKey getNonceKey() {
559    return nonceKey;
560  }
561
562  public long getSubmittedTime() {
563    return submittedTime;
564  }
565
566  public String getOwner() {
567    return owner;
568  }
569
570  public boolean hasOwner() {
571    return owner != null;
572  }
573
574  /**
575   * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
576   */
577  protected void setProcId(long procId) {
578    this.procId = procId;
579    this.submittedTime = EnvironmentEdgeManager.currentTime();
580    setState(ProcedureState.RUNNABLE);
581  }
582
583  /**
584   * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
585   */
586  protected void setParentProcId(long parentProcId) {
587    this.parentProcId = parentProcId;
588  }
589
590  protected void setRootProcId(long rootProcId) {
591    this.rootProcId = rootProcId;
592  }
593
594  /**
595   * Called by the ProcedureExecutor to set the value to the newly created procedure.
596   */
597  protected void setNonceKey(NonceKey nonceKey) {
598    this.nonceKey = nonceKey;
599  }
600
601  public void setOwner(String owner) {
602    this.owner = StringUtils.isEmpty(owner) ? null : owner;
603  }
604
605  public void setOwner(User owner) {
606    assert owner != null : "expected owner to be not null";
607    setOwner(owner.getShortName());
608  }
609
610  /**
611   * Called on store load to initialize the Procedure internals after the creation/deserialization.
612   */
613  protected void setSubmittedTime(long submittedTime) {
614    this.submittedTime = submittedTime;
615  }
616
617  // ==========================================================================
618  // runtime state - timeout related
619  // ==========================================================================
620  /**
621   * @param timeout timeout interval in msec
622   */
623  protected void setTimeout(int timeout) {
624    this.timeout = timeout;
625  }
626
627  public boolean hasTimeout() {
628    return timeout != NO_TIMEOUT;
629  }
630
631  /** Returns the timeout in msec */
632  public int getTimeout() {
633    return timeout;
634  }
635
636  /**
637   * Called on store load to initialize the Procedure internals after the creation/deserialization.
638   */
639  protected void setLastUpdate(long lastUpdate) {
640    this.lastUpdate = lastUpdate;
641  }
642
643  /**
644   * Called by ProcedureExecutor after each time a procedure step is executed.
645   */
646  protected void updateTimestamp() {
647    this.lastUpdate = EnvironmentEdgeManager.currentTime();
648  }
649
650  public long getLastUpdate() {
651    return lastUpdate;
652  }
653
654  /**
655   * Timeout of the next timeout. Called by the ProcedureExecutor if the procedure has timeout set
656   * and the procedure is in the waiting queue.
657   * @return the timestamp of the next timeout.
658   */
659  protected long getTimeoutTimestamp() {
660    return getLastUpdate() + getTimeout();
661  }
662
663  // ==========================================================================
664  // runtime state
665  // ==========================================================================
666  /** Returns the time elapsed between the last update and the start time of the procedure. */
667  public long elapsedTime() {
668    return getLastUpdate() - getSubmittedTime();
669  }
670
671  /** Returns the serialized result if any, otherwise null */
672  public byte[] getResult() {
673    return result;
674  }
675
676  /**
677   * The procedure may leave a "result" on completion.
678   * @param result the serialized result that will be passed to the client
679   */
680  protected void setResult(byte[] result) {
681    this.result = result;
682  }
683
684  /**
685   * Will only be called when loading procedures from procedure store, where we need to record
686   * whether the procedure has already held a lock. Later we will call {@link #restoreLock(Object)}
687   * to actually acquire the lock.
688   */
689  final void lockedWhenLoading() {
690    this.lockedWhenLoading = true;
691  }
692
693  /**
694   * Can only be called when restarting, before the procedure actually being executed, as after we
695   * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset
696   * {@link #lockedWhenLoading} to false.
697   * <p/>
698   * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
699   * front of a queue.
700   */
701  public boolean isLockedWhenLoading() {
702    return lockedWhenLoading;
703  }
704
705  // ==============================================================================================
706  // Runtime state, updated every operation by the ProcedureExecutor
707  //
708  // There is always 1 thread at the time operating on the state of the procedure.
709  // The ProcedureExecutor may check and set states, or some Procecedure may
710  // update its own state. but no concurrent updates. we use synchronized here
711  // just because the procedure can get scheduled on different executor threads on each step.
712  // ==============================================================================================
713
714  /** Returns true if the procedure is in a RUNNABLE state. */
715  public synchronized boolean isRunnable() {
716    return state == ProcedureState.RUNNABLE;
717  }
718
719  public synchronized boolean isInitializing() {
720    return state == ProcedureState.INITIALIZING;
721  }
722
723  /** Returns true if the procedure has failed. It may or may not have rolled back. */
724  public synchronized boolean isFailed() {
725    return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
726  }
727
728  /** Returns true if the procedure is finished successfully. */
729  public synchronized boolean isSuccess() {
730    return state == ProcedureState.SUCCESS && !hasException();
731  }
732
733  /**
734   * @return true if the procedure is finished. The Procedure may be completed successfully or
735   *         rolledback.
736   */
737  public synchronized boolean isFinished() {
738    return isSuccess() || state == ProcedureState.ROLLEDBACK;
739  }
740
741  /** Returns true if the procedure is waiting for a child to finish or for an external event. */
742  public synchronized boolean isWaiting() {
743    switch (state) {
744      case WAITING:
745      case WAITING_TIMEOUT:
746        return true;
747      default:
748        break;
749    }
750    return false;
751  }
752
753  protected synchronized void setState(final ProcedureState state) {
754    this.state = state;
755    updateTimestamp();
756  }
757
758  public synchronized ProcedureState getState() {
759    return state;
760  }
761
762  protected void setFailure(final String source, final Throwable cause) {
763    setFailure(new RemoteProcedureException(source, cause));
764  }
765
766  protected synchronized void setFailure(final RemoteProcedureException exception) {
767    this.exception = exception;
768    if (!isFinished()) {
769      setState(ProcedureState.FAILED);
770    }
771  }
772
773  protected void setAbortFailure(final String source, final String msg) {
774    setFailure(source, new ProcedureAbortedException(msg));
775  }
776
777  /**
778   * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
779   * <p/>
780   * Another usage for this method is to implement retrying. A procedure can set the state to
781   * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
782   * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
783   * call {@link #setTimeout(int)} method to set the timeout. And you should also override this
784   * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
785   * timeout event has been handled.
786   * @return true to let the framework handle the timeout as abort, false in case the procedure
787   *         handled the timeout itself.
788   */
789  protected synchronized boolean setTimeoutFailure(TEnvironment env) {
790    if (state == ProcedureState.WAITING_TIMEOUT) {
791      long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
792      setFailure("ProcedureExecutor",
793        new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
794      return true;
795    }
796    return false;
797  }
798
799  public synchronized boolean hasException() {
800    return exception != null;
801  }
802
803  public synchronized RemoteProcedureException getException() {
804    return exception;
805  }
806
807  /**
808   * Called by the ProcedureExecutor on procedure-load to restore the latch state
809   */
810  protected synchronized void setChildrenLatch(int numChildren) {
811    this.childrenLatch = numChildren;
812    if (LOG.isTraceEnabled()) {
813      LOG.trace("CHILD LATCH INCREMENT SET " + this.childrenLatch, new Throwable(this.toString()));
814    }
815  }
816
817  /**
818   * Called by the ProcedureExecutor on procedure-load to restore the latch state
819   */
820  protected synchronized void incChildrenLatch() {
821    // TODO: can this be inferred from the stack? I think so...
822    this.childrenLatch++;
823    if (LOG.isTraceEnabled()) {
824      LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
825    }
826  }
827
828  /**
829   * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
830   */
831  private synchronized boolean childrenCountDown() {
832    assert childrenLatch > 0 : this;
833    boolean b = --childrenLatch == 0;
834    if (LOG.isTraceEnabled()) {
835      LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
836    }
837    return b;
838  }
839
840  /**
841   * Try to set this procedure into RUNNABLE state. Succeeds if all subprocedures/children are done.
842   * @return True if we were able to move procedure to RUNNABLE state.
843   */
844  synchronized boolean tryRunnable() {
845    // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
846    if (getState() == ProcedureState.WAITING && childrenCountDown()) {
847      setState(ProcedureState.RUNNABLE);
848      return true;
849    } else {
850      return false;
851    }
852  }
853
854  protected synchronized boolean hasChildren() {
855    return childrenLatch > 0;
856  }
857
858  protected synchronized int getChildrenLatch() {
859    return childrenLatch;
860  }
861
862  /**
863   * Called by the RootProcedureState on procedure execution. Each procedure store its stack-index
864   * positions.
865   */
866  protected synchronized void addStackIndex(final int index) {
867    if (stackIndexes == null) {
868      stackIndexes = new int[] { index };
869    } else {
870      int count = stackIndexes.length;
871      stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
872      stackIndexes[count] = index;
873    }
874  }
875
876  protected synchronized boolean removeStackIndex() {
877    if (stackIndexes != null && stackIndexes.length > 1) {
878      stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
879      return false;
880    } else {
881      stackIndexes = null;
882      return true;
883    }
884  }
885
886  /**
887   * Called on store load to initialize the Procedure internals after the creation/deserialization.
888   */
889  protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
890    this.stackIndexes = new int[stackIndexes.size()];
891    for (int i = 0; i < this.stackIndexes.length; ++i) {
892      this.stackIndexes[i] = stackIndexes.get(i);
893    }
894  }
895
896  protected synchronized boolean wasExecuted() {
897    return stackIndexes != null;
898  }
899
900  protected synchronized int[] getStackIndexes() {
901    return stackIndexes;
902  }
903
904  // ==========================================================================
905  // Internal methods - called by the ProcedureExecutor
906  // ==========================================================================
907
908  /**
909   * Internal method called by the ProcedureExecutor that starts the user-level code execute().
910   * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
911   *                                     skip out without changing states or releasing any locks
912   *                                     held.
913   */
914  protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
915    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
916    try {
917      updateTimestamp();
918      if (bypass) {
919        LOG.info("{} bypassed, returning null to finish it", this);
920        return null;
921      }
922      return execute(env);
923    } finally {
924      updateTimestamp();
925    }
926  }
927
928  /**
929   * Internal method called by the ProcedureExecutor that starts the user-level code rollback().
930   */
931  protected void doRollback(TEnvironment env) throws IOException, InterruptedException {
932    try {
933      updateTimestamp();
934      if (bypass) {
935        LOG.info("{} bypassed, skipping rollback", this);
936        return;
937      }
938      rollback(env);
939    } finally {
940      updateTimestamp();
941    }
942  }
943
944  final void restoreLock(TEnvironment env) {
945    if (!lockedWhenLoading) {
946      LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
947      return;
948    }
949
950    if (isFinished()) {
951      LOG.debug("{} is already finished, skip acquiring lock.", this);
952      return;
953    }
954
955    if (isBypass()) {
956      LOG.debug("{} is already bypassed, skip acquiring lock.", this);
957      return;
958    }
959    // this can happen if the parent stores the sub procedures but before it can
960    // release its lock, the master restarts
961    if (getState() == ProcedureState.WAITING && !holdLock(env)) {
962      LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this);
963      lockedWhenLoading = false;
964      return;
965    }
966    LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
967    LockState state = acquireLock(env);
968    assert state == LockState.LOCK_ACQUIRED;
969  }
970
971  /**
972   * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
973   */
974  final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
975    if (waitInitialized(env)) {
976      return LockState.LOCK_EVENT_WAIT;
977    }
978    if (lockedWhenLoading) {
979      // reset it so we will not consider it anymore
980      lockedWhenLoading = false;
981      locked = true;
982      // Here we return without persist the locked state, as lockedWhenLoading is true means
983      // that the locked field of the procedure stored in procedure store is true, so we do not need
984      // to store it again.
985      return LockState.LOCK_ACQUIRED;
986    }
987    LockState state = acquireLock(env);
988    if (state == LockState.LOCK_ACQUIRED) {
989      locked = true;
990      // persist that we have held the lock. This must be done before we actually execute the
991      // procedure, otherwise when restarting, we may consider the procedure does not have a lock,
992      // but it may have already done some changes as we have already executed it, and if another
993      // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
994      // not expect that another procedure can be executed in the middle.
995      store.update(this);
996    }
997    return state;
998  }
999
1000  /**
1001   * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
1002   */
1003  final void doReleaseLock(TEnvironment env, ProcedureStore store) {
1004    locked = false;
1005    // persist that we have released the lock. This must be done before we actually release the
1006    // lock. Another procedure may take this lock immediately after we release the lock, and if we
1007    // crash before persist the information that we have already released the lock, then when
1008    // restarting there will be two procedures which both have the lock and cause problems.
1009    if (getState() != ProcedureState.ROLLEDBACK) {
1010      // If the state is ROLLEDBACK, it means that we have already deleted the procedure from
1011      // procedure store, so do not need to log the release operation any more.
1012      store.update(this);
1013    }
1014    releaseLock(env);
1015  }
1016
1017  @Override
1018  public int compareTo(final Procedure<TEnvironment> other) {
1019    return Long.compare(getProcId(), other.getProcId());
1020  }
1021
1022  // ==========================================================================
1023  // misc utils
1024  // ==========================================================================
1025
1026  /**
1027   * Get an hashcode for the specified Procedure ID
1028   * @return the hashcode for the specified procId
1029   */
1030  public static long getProcIdHashCode(long procId) {
1031    long h = procId;
1032    h ^= h >> 16;
1033    h *= 0x85ebca6b;
1034    h ^= h >> 13;
1035    h *= 0xc2b2ae35;
1036    h ^= h >> 16;
1037    return h;
1038  }
1039
1040  /**
1041   * Helper to lookup the root Procedure ID given a specified procedure.
1042   */
1043  protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
1044    Procedure<T> proc) {
1045    while (proc.hasParent()) {
1046      proc = procedures.get(proc.getParentProcId());
1047      if (proc == null) {
1048        return null;
1049      }
1050    }
1051    return proc.getProcId();
1052  }
1053
1054  /**
1055   * @param a the first procedure to be compared.
1056   * @param b the second procedure to be compared.
1057   * @return true if the two procedures have the same parent
1058   */
1059  public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
1060    return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
1061  }
1062}