001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Map;
024import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
025import org.apache.hadoop.hbase.metrics.Counter;
026import org.apache.hadoop.hbase.metrics.Histogram;
027import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
028import org.apache.hadoop.hbase.procedure2.util.StringUtils;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.NonceKey;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
037
038/**
039 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
040 * stack-indexes, etc.
041 * <p/>
042 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
043 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
044 * be called multiple times in the case of failure or a restart, so code must be idempotent. The
045 * return from an execute call is either: null to indicate we are done; ourself if there is more to
046 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes
047 * our execution.
048 * <p/>
049 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
050 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
051 * ProcedureState enum from protos:
052 * <ul>
053 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
054 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
055 * ROLLEDBACK state.</li>
056 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
057 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
058 * condition when scheduler/ executor will drop procedure from further processing is when procedure
059 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
060 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
061 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
062 * </ul>
063 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
064 * own state. This can lead to confusion. Try to keep the two distinct.
065 * <p/>
066 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
067 * step is supposed to cleanup the resources created during the execute() step. In case of failure
068 * and restart, rollback() may be called multiple times, so again the code must be idempotent.
069 * <p/>
070 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an
071 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
072 * locked for the life of a procedure -- not just the calls to execute -- then implementations
073 * should say so with the {@link #holdLock(Object)} method.
074 * <p/>
075 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
076 * implementation is a bit tricky so we add some comments hrre about it.
077 * <ul>
078 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
079 * whether we have the lock. We will set it to {@code true} in
080 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
081 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
082 * more.</li>
083 * <li>Also added a locked field in the proto message. When storing, the field will be set according
084 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
085 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
086 * message is {@code true}.</li>
087 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
088 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
089 * need to wait until master is initialized. So the solution here is that, we introduced a new
090 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
091 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
092 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
093 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
094 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
095 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
096 * {@code true}, when we just set the {@link #locked} field to true and return, without actually
097 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
098 * </ul>
099 * <p/>
100 * Procedures can be suspended or put in wait state with a callback that gets executed on
101 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and
102 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
103 * class for an example usage.
104 * </p>
105 * <p/>
106 * There are hooks for collecting metrics on submit of the procedure and on finish. See
107 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
108 */
109@InterfaceAudience.Private
110public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
111  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
112  public static final long NO_PROC_ID = -1;
113  protected static final int NO_TIMEOUT = -1;
114
115  public enum LockState {
116    LOCK_ACQUIRED, // Lock acquired and ready to execute
117    LOCK_YIELD_WAIT, // Lock not acquired, framework needs to yield
118    LOCK_EVENT_WAIT, // Lock not acquired, an event will yield the procedure
119  }
120
121  // Unchanged after initialization
122  private NonceKey nonceKey = null;
123  private String owner = null;
124  private long parentProcId = NO_PROC_ID;
125  private long rootProcId = NO_PROC_ID;
126  private long procId = NO_PROC_ID;
127  private long submittedTime;
128
129  // Runtime state, updated every operation
130  private ProcedureState state = ProcedureState.INITIALIZING;
131  private RemoteProcedureException exception = null;
132  private int[] stackIndexes = null;
133  private int childrenLatch = 0;
134
135  private volatile int timeout = NO_TIMEOUT;
136  private volatile long lastUpdate;
137
138  private volatile byte[] result = null;
139
140  private volatile boolean locked = false;
141
142  private boolean lockedWhenLoading = false;
143
144  /**
145   * Used for override complete of the procedure without actually doing any logic in the procedure.
146   * If bypass is set to true, when executing it will return null when {@link #doExecute(Object)} is
147   * called to finish the procedure and release any locks it may currently hold. The bypass does
148   * cleanup around the Procedure as far as the Procedure framework is concerned. It does not clean
149   * any internal state that the Procedure's themselves may have set. That is for the Procedures to
150   * do themselves when bypass is called. They should override bypass and do their cleanup in the
151   * overridden bypass method (be sure to call the parent bypass to ensure proper processing).
152   * <p>
153   * </p>
154   * Bypassing a procedure is not like aborting. Aborting a procedure will trigger a rollback. And
155   * since the {@link #abort(Object)} method is overrideable Some procedures may have chosen to
156   * ignore the aborting.
157   */
158  private volatile boolean bypass = false;
159
160  /**
161   * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to
162   * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the
163   * persistence of the procedure.
164   * <p/>
165   * This is useful when the procedure is in error and you want to retry later. The retry interval
166   * and the number of retries are usually not critical so skip the persistence can save some
167   * resources, and also speed up the restart processing.
168   * <p/>
169   * Notice that this value will be reset to true every time before execution. And when rolling back
170   * we do not test this value.
171   */
172  private boolean persist = true;
173
174  public boolean isBypass() {
175    return bypass;
176  }
177
178  /**
179   * Set the bypass to true. Only called in
180   * {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for now. DO NOT use
181   * this method alone, since we can't just bypass one single procedure. We need to bypass its
182   * ancestor too. If your Procedure has set state, it needs to undo it in here.
183   * @param env Current environment. May be null because of context; e.g. pretty-printing procedure
184   *            WALs where there is no 'environment' (and where Procedures that require an
185   *            'environment' won't be run.
186   */
187  protected void bypass(TEnvironment env) {
188    this.bypass = true;
189  }
190
191  boolean needPersistence() {
192    return persist;
193  }
194
195  void resetPersistence() {
196    persist = true;
197  }
198
199  protected final void skipPersistence() {
200    persist = false;
201  }
202
203  /**
204   * The main code of the procedure. It must be idempotent since execute() may be called multiple
205   * times in case of machine failure in the middle of the execution.
206   * @param env the environment passed to the ProcedureExecutor
207   * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
208   *         procedure is done.
209   * @throws ProcedureYieldException     the procedure will be added back to the queue and retried
210   *                                     later.
211   * @throws InterruptedException        the procedure will be added back to the queue and retried
212   *                                     later.
213   * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
214   *                                     and has set itself up waiting for an external event to wake
215   *                                     it back up again.
216   */
217  protected abstract Procedure<TEnvironment>[] execute(TEnvironment env)
218    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
219
220  /**
221   * The code to undo what was done by the execute() code. It is called when the procedure or one of
222   * the sub-procedures failed or an abort was requested. It should cleanup all the resources
223   * created by the execute() call. The implementation must be idempotent since rollback() may be
224   * called multiple time in case of machine failure in the middle of the execution.
225   * @param env the environment passed to the ProcedureExecutor
226   * @throws IOException          temporary failure, the rollback will retry later
227   * @throws InterruptedException the procedure will be added back to the queue and retried later
228   */
229  protected abstract void rollback(TEnvironment env) throws IOException, InterruptedException;
230
231  /**
232   * The abort() call is asynchronous and each procedure must decide how to deal with it, if they
233   * want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
234   * abort() method and then the execute() will check if the abort flag is set or not. abort() may
235   * be called multiple times from the client, so the implementation must be idempotent.
236   * <p>
237   * NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
238   * procedure implementor abort.
239   */
240  protected abstract boolean abort(TEnvironment env);
241
242  /**
243   * The user-level code of the procedure may have some state to persist (e.g. input arguments or
244   * current position in the processing state) to be able to resume on failure.
245   * @param serializer stores the serializable state
246   */
247  protected abstract void serializeStateData(ProcedureStateSerializer serializer)
248    throws IOException;
249
250  /**
251   * Called on store load to allow the user to decode the previously serialized state.
252   * @param serializer contains the serialized state
253   */
254  protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
255    throws IOException;
256
257  /**
258   * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
259   * call us to determine whether we need to wait for initialization, second, it will call
260   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
261   * <p/>
262   * This is because that when master restarts, we need to restore the lock state for all the
263   * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
264   * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
265   * of the initialization!), so we need to split the code into two steps, and when restore, we just
266   * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
267   * @return true means we need to wait until the environment has been initialized, otherwise true.
268   */
269  protected boolean waitInitialized(TEnvironment env) {
270    return false;
271  }
272
273  /**
274   * The user should override this method if they need a lock on an Entity. A lock can be anything,
275   * and it is up to the implementor. The Procedure Framework will call this method just before it
276   * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
277   * execute.
278   * <p/>
279   * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
280   * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
281   * <p/>
282   * Example: in our Master we can execute request in parallel for different tables. We can create
283   * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
284   * queued waiting that specific table create to happen.
285   * <p/>
286   * There are 3 LockState:
287   * <ul>
288   * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
289   * execute.</li>
290   * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
291   * take care of readding the procedure back to the runnable set for retry</li>
292   * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
293   * care of readding the procedure back to the runnable set when the lock is available.</li>
294   * </ul>
295   * @return the lock state as described above.
296   */
297  protected LockState acquireLock(TEnvironment env) {
298    return LockState.LOCK_ACQUIRED;
299  }
300
301  /**
302   * The user should override this method, and release lock if necessary.
303   */
304  protected void releaseLock(TEnvironment env) {
305    // no-op
306  }
307
308  /**
309   * Used to keep the procedure lock even when the procedure is yielding or suspended.
310   * @return true if the procedure should hold on the lock until completionCleanup()
311   */
312  protected boolean holdLock(TEnvironment env) {
313    return false;
314  }
315
316  /**
317   * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)}
318   * returns true, the procedure executor will call acquireLock() once and thereafter not call
319   * {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls release/acquire
320   * around each invocation of {@link #execute(Object)}.
321   * @see #holdLock(Object)
322   * @return true if the procedure has the lock, false otherwise.
323   */
324  public final boolean hasLock() {
325    return locked;
326  }
327
328  /**
329   * Called when the procedure is loaded for replay. The procedure implementor may use this method
330   * to perform some quick operation before replay. e.g. failing the procedure if the state on
331   * replay may be unknown.
332   */
333  protected void beforeReplay(TEnvironment env) {
334    // no-op
335  }
336
337  /**
338   * Called when the procedure is ready to be added to the queue after the loading/replay operation.
339   */
340  protected void afterReplay(TEnvironment env) {
341    // no-op
342  }
343
344  /**
345   * Called when the procedure is marked as completed (success or rollback). The procedure
346   * implementor may use this method to cleanup in-memory states. This operation will not be retried
347   * on failure. If a procedure took a lock, it will have been released when this method runs.
348   */
349  protected void completionCleanup(TEnvironment env) {
350    // no-op
351  }
352
353  /**
354   * By default, the procedure framework/executor will try to run procedures start to finish. Return
355   * true to make the executor yield between each execution step to give other procedures a chance
356   * to run.
357   * @param env the environment passed to the ProcedureExecutor
358   * @return Return true if the executor should yield on completion of an execution step. Defaults
359   *         to return false.
360   */
361  protected boolean isYieldAfterExecutionStep(TEnvironment env) {
362    return false;
363  }
364
365  /**
366   * By default, the executor will keep the procedure result around util the eviction TTL is
367   * expired. The client can cut down the waiting time by requesting that the result is removed from
368   * the executor. In case of system started procedure, we can force the executor to auto-ack.
369   * @param env the environment passed to the ProcedureExecutor
370   * @return true if the executor should wait the client ack for the result. Defaults to return
371   *         true.
372   */
373  protected boolean shouldWaitClientAck(TEnvironment env) {
374    return true;
375  }
376
377  /**
378   * Override this method to provide procedure specific counters for submitted count, failed count
379   * and time histogram.
380   * @param env The environment passed to the procedure executor
381   * @return Container object for procedure related metric
382   */
383  protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
384    return null;
385  }
386
387  /**
388   * This function will be called just when procedure is submitted for execution. Override this
389   * method to update the metrics at the beginning of the procedure. The default implementation
390   * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
391   * {@link ProcedureMetrics}.
392   */
393  protected void updateMetricsOnSubmit(TEnvironment env) {
394    ProcedureMetrics metrics = getProcedureMetrics(env);
395    if (metrics == null) {
396      return;
397    }
398
399    Counter submittedCounter = metrics.getSubmittedCounter();
400    if (submittedCounter != null) {
401      submittedCounter.increment();
402    }
403  }
404
405  /**
406   * This function will be called just after procedure execution is finished. Override this method
407   * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
408   * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
409   * time histogram for successfully completed procedures. Increments failed counter for failed
410   * procedures.
411   * <p/>
412   * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
413   * successfully finished siblings, this function may get called twice in certain cases for certain
414   * procedures. Explore further if this can be called once.
415   * @param env     The environment passed to the procedure executor
416   * @param runtime Runtime of the procedure in milliseconds
417   * @param success true if procedure is completed successfully
418   */
419  protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
420    ProcedureMetrics metrics = getProcedureMetrics(env);
421    if (metrics == null) {
422      return;
423    }
424
425    if (success) {
426      Histogram timeHisto = metrics.getTimeHisto();
427      if (timeHisto != null) {
428        timeHisto.update(runtime);
429      }
430    } else {
431      Counter failedCounter = metrics.getFailedCounter();
432      if (failedCounter != null) {
433        failedCounter.increment();
434      }
435    }
436  }
437
438  @Override
439  public String toString() {
440    // Return the simple String presentation of the procedure.
441    return toStringSimpleSB().toString();
442  }
443
444  /**
445   * Build the StringBuilder for the simple form of procedure string.
446   * @return the StringBuilder
447   */
448  protected StringBuilder toStringSimpleSB() {
449    final StringBuilder sb = new StringBuilder();
450
451    sb.append("pid=");
452    sb.append(getProcId());
453
454    if (hasParent()) {
455      sb.append(", ppid=");
456      sb.append(getParentProcId());
457    }
458
459    /*
460     * TODO Enable later when this is being used. Currently owner not used. if (hasOwner()) {
461     * sb.append(", owner="); sb.append(getOwner()); }
462     */
463
464    sb.append(", state="); // pState for Procedure State as opposed to any other kind.
465    toStringState(sb);
466
467    sb.append(", hasLock=").append(locked);
468
469    if (bypass) {
470      sb.append(", bypass=").append(bypass);
471    }
472
473    if (hasException()) {
474      sb.append(", exception=" + getException());
475    }
476
477    sb.append("; ");
478    toStringClassDetails(sb);
479
480    return sb;
481  }
482
483  /**
484   * Extend the toString() information with more procedure details
485   */
486  public String toStringDetails() {
487    final StringBuilder sb = toStringSimpleSB();
488
489    sb.append(" submittedTime=");
490    sb.append(getSubmittedTime());
491
492    sb.append(", lastUpdate=");
493    sb.append(getLastUpdate());
494
495    final int[] stackIndices = getStackIndexes();
496    if (stackIndices != null) {
497      sb.append("\n");
498      sb.append("stackIndexes=");
499      sb.append(Arrays.toString(stackIndices));
500    }
501
502    return sb.toString();
503  }
504
505  protected String toStringClass() {
506    StringBuilder sb = new StringBuilder();
507    toStringClassDetails(sb);
508    return sb.toString();
509  }
510
511  /**
512   * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
513   * generic Procedure State with Procedure particulars.
514   * @param builder Append current {@link ProcedureState}
515   */
516  protected void toStringState(StringBuilder builder) {
517    builder.append(getState());
518  }
519
520  /**
521   * Extend the toString() information with the procedure details e.g. className and parameters
522   * @param builder the string builder to use to append the proc specific information
523   */
524  protected void toStringClassDetails(StringBuilder builder) {
525    builder.append(getClass().getName());
526  }
527
528  // ==========================================================================
529  // Those fields are unchanged after initialization.
530  //
531  // Each procedure will get created from the user or during
532  // ProcedureExecutor.start() during the load() phase and then submitted
533  // to the executor. these fields will never be changed after initialization
534  // ==========================================================================
535  public long getProcId() {
536    return procId;
537  }
538
539  public boolean hasParent() {
540    return parentProcId != NO_PROC_ID;
541  }
542
543  public long getParentProcId() {
544    return parentProcId;
545  }
546
547  public long getRootProcId() {
548    return rootProcId;
549  }
550
551  public String getProcName() {
552    return toStringClass();
553  }
554
555  public NonceKey getNonceKey() {
556    return nonceKey;
557  }
558
559  public long getSubmittedTime() {
560    return submittedTime;
561  }
562
563  public String getOwner() {
564    return owner;
565  }
566
567  public boolean hasOwner() {
568    return owner != null;
569  }
570
571  /**
572   * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
573   */
574  protected void setProcId(long procId) {
575    this.procId = procId;
576    this.submittedTime = EnvironmentEdgeManager.currentTime();
577    setState(ProcedureState.RUNNABLE);
578  }
579
580  /**
581   * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
582   */
583  protected void setParentProcId(long parentProcId) {
584    this.parentProcId = parentProcId;
585  }
586
587  protected void setRootProcId(long rootProcId) {
588    this.rootProcId = rootProcId;
589  }
590
591  /**
592   * Called by the ProcedureExecutor to set the value to the newly created procedure.
593   */
594  protected void setNonceKey(NonceKey nonceKey) {
595    this.nonceKey = nonceKey;
596  }
597
598  public void setOwner(String owner) {
599    this.owner = StringUtils.isEmpty(owner) ? null : owner;
600  }
601
602  public void setOwner(User owner) {
603    assert owner != null : "expected owner to be not null";
604    setOwner(owner.getShortName());
605  }
606
607  /**
608   * Called on store load to initialize the Procedure internals after the creation/deserialization.
609   */
610  protected void setSubmittedTime(long submittedTime) {
611    this.submittedTime = submittedTime;
612  }
613
614  // ==========================================================================
615  // runtime state - timeout related
616  // ==========================================================================
617  /**
618   * @param timeout timeout interval in msec
619   */
620  protected void setTimeout(int timeout) {
621    this.timeout = timeout;
622  }
623
624  public boolean hasTimeout() {
625    return timeout != NO_TIMEOUT;
626  }
627
628  /**
629   * @return the timeout in msec
630   */
631  public int getTimeout() {
632    return timeout;
633  }
634
635  /**
636   * Called on store load to initialize the Procedure internals after the creation/deserialization.
637   */
638  protected void setLastUpdate(long lastUpdate) {
639    this.lastUpdate = lastUpdate;
640  }
641
642  /**
643   * Called by ProcedureExecutor after each time a procedure step is executed.
644   */
645  protected void updateTimestamp() {
646    this.lastUpdate = EnvironmentEdgeManager.currentTime();
647  }
648
649  public long getLastUpdate() {
650    return lastUpdate;
651  }
652
653  /**
654   * Timeout of the next timeout. Called by the ProcedureExecutor if the procedure has timeout set
655   * and the procedure is in the waiting queue.
656   * @return the timestamp of the next timeout.
657   */
658  protected long getTimeoutTimestamp() {
659    return getLastUpdate() + getTimeout();
660  }
661
662  // ==========================================================================
663  // runtime state
664  // ==========================================================================
665  /**
666   * @return the time elapsed between the last update and the start time of the procedure.
667   */
668  public long elapsedTime() {
669    return getLastUpdate() - getSubmittedTime();
670  }
671
672  /**
673   * @return the serialized result if any, otherwise null
674   */
675  public byte[] getResult() {
676    return result;
677  }
678
679  /**
680   * The procedure may leave a "result" on completion.
681   * @param result the serialized result that will be passed to the client
682   */
683  protected void setResult(byte[] result) {
684    this.result = result;
685  }
686
687  /**
688   * Will only be called when loading procedures from procedure store, where we need to record
689   * whether the procedure has already held a lock. Later we will call {@link #restoreLock(Object)}
690   * to actually acquire the lock.
691   */
692  final void lockedWhenLoading() {
693    this.lockedWhenLoading = true;
694  }
695
696  /**
697   * Can only be called when restarting, before the procedure actually being executed, as after we
698   * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset
699   * {@link #lockedWhenLoading} to false.
700   * <p/>
701   * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
702   * front of a queue.
703   */
704  public boolean isLockedWhenLoading() {
705    return lockedWhenLoading;
706  }
707
708  // ==============================================================================================
709  // Runtime state, updated every operation by the ProcedureExecutor
710  //
711  // There is always 1 thread at the time operating on the state of the procedure.
712  // The ProcedureExecutor may check and set states, or some Procecedure may
713  // update its own state. but no concurrent updates. we use synchronized here
714  // just because the procedure can get scheduled on different executor threads on each step.
715  // ==============================================================================================
716
717  /**
718   * @return true if the procedure is in a RUNNABLE state.
719   */
720  public synchronized boolean isRunnable() {
721    return state == ProcedureState.RUNNABLE;
722  }
723
724  public synchronized boolean isInitializing() {
725    return state == ProcedureState.INITIALIZING;
726  }
727
728  /**
729   * @return true if the procedure has failed. It may or may not have rolled back.
730   */
731  public synchronized boolean isFailed() {
732    return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
733  }
734
735  /**
736   * @return true if the procedure is finished successfully.
737   */
738  public synchronized boolean isSuccess() {
739    return state == ProcedureState.SUCCESS && !hasException();
740  }
741
742  /**
743   * @return true if the procedure is finished. The Procedure may be completed successfully or
744   *         rolledback.
745   */
746  public synchronized boolean isFinished() {
747    return isSuccess() || state == ProcedureState.ROLLEDBACK;
748  }
749
750  /**
751   * @return true if the procedure is waiting for a child to finish or for an external event.
752   */
753  public synchronized boolean isWaiting() {
754    switch (state) {
755      case WAITING:
756      case WAITING_TIMEOUT:
757        return true;
758      default:
759        break;
760    }
761    return false;
762  }
763
764  protected synchronized void setState(final ProcedureState state) {
765    this.state = state;
766    updateTimestamp();
767  }
768
769  public synchronized ProcedureState getState() {
770    return state;
771  }
772
773  protected void setFailure(final String source, final Throwable cause) {
774    setFailure(new RemoteProcedureException(source, cause));
775  }
776
777  protected synchronized void setFailure(final RemoteProcedureException exception) {
778    this.exception = exception;
779    if (!isFinished()) {
780      setState(ProcedureState.FAILED);
781    }
782  }
783
784  protected void setAbortFailure(final String source, final String msg) {
785    setFailure(source, new ProcedureAbortedException(msg));
786  }
787
788  /**
789   * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
790   * <p/>
791   * Another usage for this method is to implement retrying. A procedure can set the state to
792   * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
793   * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
794   * call {@link #setTimeout(int)} method to set the timeout. And you should also override this
795   * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
796   * timeout event has been handled.
797   * @return true to let the framework handle the timeout as abort, false in case the procedure
798   *         handled the timeout itself.
799   */
800  protected synchronized boolean setTimeoutFailure(TEnvironment env) {
801    if (state == ProcedureState.WAITING_TIMEOUT) {
802      long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
803      setFailure("ProcedureExecutor",
804        new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
805      return true;
806    }
807    return false;
808  }
809
810  public synchronized boolean hasException() {
811    return exception != null;
812  }
813
814  public synchronized RemoteProcedureException getException() {
815    return exception;
816  }
817
818  /**
819   * Called by the ProcedureExecutor on procedure-load to restore the latch state
820   */
821  protected synchronized void setChildrenLatch(int numChildren) {
822    this.childrenLatch = numChildren;
823    if (LOG.isTraceEnabled()) {
824      LOG.trace("CHILD LATCH INCREMENT SET " + this.childrenLatch, new Throwable(this.toString()));
825    }
826  }
827
828  /**
829   * Called by the ProcedureExecutor on procedure-load to restore the latch state
830   */
831  protected synchronized void incChildrenLatch() {
832    // TODO: can this be inferred from the stack? I think so...
833    this.childrenLatch++;
834    if (LOG.isTraceEnabled()) {
835      LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
836    }
837  }
838
839  /**
840   * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
841   */
842  private synchronized boolean childrenCountDown() {
843    assert childrenLatch > 0 : this;
844    boolean b = --childrenLatch == 0;
845    if (LOG.isTraceEnabled()) {
846      LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
847    }
848    return b;
849  }
850
851  /**
852   * Try to set this procedure into RUNNABLE state. Succeeds if all subprocedures/children are done.
853   * @return True if we were able to move procedure to RUNNABLE state.
854   */
855  synchronized boolean tryRunnable() {
856    // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
857    if (getState() == ProcedureState.WAITING && childrenCountDown()) {
858      setState(ProcedureState.RUNNABLE);
859      return true;
860    } else {
861      return false;
862    }
863  }
864
865  protected synchronized boolean hasChildren() {
866    return childrenLatch > 0;
867  }
868
869  protected synchronized int getChildrenLatch() {
870    return childrenLatch;
871  }
872
873  /**
874   * Called by the RootProcedureState on procedure execution. Each procedure store its stack-index
875   * positions.
876   */
877  protected synchronized void addStackIndex(final int index) {
878    if (stackIndexes == null) {
879      stackIndexes = new int[] { index };
880    } else {
881      int count = stackIndexes.length;
882      stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
883      stackIndexes[count] = index;
884    }
885  }
886
887  protected synchronized boolean removeStackIndex() {
888    if (stackIndexes != null && stackIndexes.length > 1) {
889      stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
890      return false;
891    } else {
892      stackIndexes = null;
893      return true;
894    }
895  }
896
897  /**
898   * Called on store load to initialize the Procedure internals after the creation/deserialization.
899   */
900  protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
901    this.stackIndexes = new int[stackIndexes.size()];
902    for (int i = 0; i < this.stackIndexes.length; ++i) {
903      this.stackIndexes[i] = stackIndexes.get(i);
904    }
905  }
906
907  protected synchronized boolean wasExecuted() {
908    return stackIndexes != null;
909  }
910
911  protected synchronized int[] getStackIndexes() {
912    return stackIndexes;
913  }
914
915  // ==========================================================================
916  // Internal methods - called by the ProcedureExecutor
917  // ==========================================================================
918
919  /**
920   * Internal method called by the ProcedureExecutor that starts the user-level code execute().
921   * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
922   *                                     skip out without changing states or releasing any locks
923   *                                     held.
924   */
925  protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
926    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
927    try {
928      updateTimestamp();
929      if (bypass) {
930        LOG.info("{} bypassed, returning null to finish it", this);
931        return null;
932      }
933      return execute(env);
934    } finally {
935      updateTimestamp();
936    }
937  }
938
939  /**
940   * Internal method called by the ProcedureExecutor that starts the user-level code rollback().
941   */
942  protected void doRollback(TEnvironment env) throws IOException, InterruptedException {
943    try {
944      updateTimestamp();
945      if (bypass) {
946        LOG.info("{} bypassed, skipping rollback", this);
947        return;
948      }
949      rollback(env);
950    } finally {
951      updateTimestamp();
952    }
953  }
954
955  final void restoreLock(TEnvironment env) {
956    if (!lockedWhenLoading) {
957      LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
958      return;
959    }
960
961    if (isFinished()) {
962      LOG.debug("{} is already finished, skip acquiring lock.", this);
963      return;
964    }
965
966    if (isBypass()) {
967      LOG.debug("{} is already bypassed, skip acquiring lock.", this);
968      return;
969    }
970    // this can happen if the parent stores the sub procedures but before it can
971    // release its lock, the master restarts
972    if (getState() == ProcedureState.WAITING && !holdLock(env)) {
973      LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this);
974      lockedWhenLoading = false;
975      return;
976    }
977    LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
978    LockState state = acquireLock(env);
979    assert state == LockState.LOCK_ACQUIRED;
980  }
981
982  /**
983   * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
984   */
985  final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
986    if (waitInitialized(env)) {
987      return LockState.LOCK_EVENT_WAIT;
988    }
989    if (lockedWhenLoading) {
990      // reset it so we will not consider it anymore
991      lockedWhenLoading = false;
992      locked = true;
993      // Here we return without persist the locked state, as lockedWhenLoading is true means
994      // that the locked field of the procedure stored in procedure store is true, so we do not need
995      // to store it again.
996      return LockState.LOCK_ACQUIRED;
997    }
998    LockState state = acquireLock(env);
999    if (state == LockState.LOCK_ACQUIRED) {
1000      locked = true;
1001      // persist that we have held the lock. This must be done before we actually execute the
1002      // procedure, otherwise when restarting, we may consider the procedure does not have a lock,
1003      // but it may have already done some changes as we have already executed it, and if another
1004      // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
1005      // not expect that another procedure can be executed in the middle.
1006      store.update(this);
1007    }
1008    return state;
1009  }
1010
1011  /**
1012   * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
1013   */
1014  final void doReleaseLock(TEnvironment env, ProcedureStore store) {
1015    locked = false;
1016    // persist that we have released the lock. This must be done before we actually release the
1017    // lock. Another procedure may take this lock immediately after we release the lock, and if we
1018    // crash before persist the information that we have already released the lock, then when
1019    // restarting there will be two procedures which both have the lock and cause problems.
1020    if (getState() != ProcedureState.ROLLEDBACK) {
1021      // If the state is ROLLEDBACK, it means that we have already deleted the procedure from
1022      // procedure store, so do not need to log the release operation any more.
1023      store.update(this);
1024    }
1025    releaseLock(env);
1026  }
1027
1028  @Override
1029  public int compareTo(final Procedure<TEnvironment> other) {
1030    return Long.compare(getProcId(), other.getProcId());
1031  }
1032
1033  // ==========================================================================
1034  // misc utils
1035  // ==========================================================================
1036
1037  /**
1038   * Get an hashcode for the specified Procedure ID
1039   * @return the hashcode for the specified procId
1040   */
1041  public static long getProcIdHashCode(long procId) {
1042    long h = procId;
1043    h ^= h >> 16;
1044    h *= 0x85ebca6b;
1045    h ^= h >> 13;
1046    h *= 0xc2b2ae35;
1047    h ^= h >> 16;
1048    return h;
1049  }
1050
1051  /**
1052   * Helper to lookup the root Procedure ID given a specified procedure.
1053   */
1054  protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
1055    Procedure<T> proc) {
1056    while (proc.hasParent()) {
1057      proc = procedures.get(proc.getParentProcId());
1058      if (proc == null) {
1059        return null;
1060      }
1061    }
1062    return proc.getProcId();
1063  }
1064
1065  /**
1066   * @param a the first procedure to be compared.
1067   * @param b the second procedure to be compared.
1068   * @return true if the two procedures have the same parent
1069   */
1070  public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
1071    return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
1072  }
1073}