001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.ThreadLocalRandom;
025import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
026import org.apache.hadoop.hbase.metrics.Counter;
027import org.apache.hadoop.hbase.metrics.Histogram;
028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
029import org.apache.hadoop.hbase.procedure2.util.StringUtils;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.hbase.util.NonceKey;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
039
040/**
041 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
042 * stack-indexes, etc.
043 * <p/>
044 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
045 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
046 * be called multiple times in the case of failure or a restart, so code must be idempotent. The
047 * return from an execute call is either: null to indicate we are done; ourself if there is more to
048 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes
049 * our execution.
050 * <p/>
051 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
052 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
053 * ProcedureState enum from protos:
054 * <ul>
055 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
056 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
057 * ROLLEDBACK state.</li>
058 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
059 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
060 * condition when scheduler/ executor will drop procedure from further processing is when procedure
061 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
062 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
063 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
064 * </ul>
065 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
066 * own state. This can lead to confusion. Try to keep the two distinct.
067 * <p/>
068 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
069 * step is supposed to cleanup the resources created during the execute() step. In case of failure
070 * and restart, rollback() may be called multiple times, so again the code must be idempotent.
071 * <p/>
072 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an
073 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
074 * locked for the life of a procedure -- not just the calls to execute -- then implementations
075 * should say so with the {@link #holdLock(Object)} method.
076 * <p/>
077 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
078 * implementation is a bit tricky so we add some comments hrre about it.
079 * <ul>
080 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
081 * whether we have the lock. We will set it to {@code true} in
082 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
083 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
084 * more.</li>
085 * <li>Also added a locked field in the proto message. When storing, the field will be set according
086 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
087 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
088 * message is {@code true}.</li>
089 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
090 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
091 * need to wait until master is initialized. So the solution here is that, we introduced a new
092 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
093 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
094 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
095 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
096 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
097 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
098 * {@code true}, when we just set the {@link #locked} field to true and return, without actually
099 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
100 * </ul>
101 * <p/>
102 * Procedures can be suspended or put in wait state with a callback that gets executed on
103 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and
104 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
105 * class for an example usage.
106 * </p>
107 * <p/>
108 * There are hooks for collecting metrics on submit of the procedure and on finish. See
109 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
110 */
111@InterfaceAudience.Private
112public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
113  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
114  public static final long NO_PROC_ID = -1;
115  protected static final int NO_TIMEOUT = -1;
116
117  public enum LockState {
118    LOCK_ACQUIRED, // Lock acquired and ready to execute
119    LOCK_YIELD_WAIT, // Lock not acquired, framework needs to yield
120    LOCK_EVENT_WAIT, // Lock not acquired, an event will yield the procedure
121  }
122
123  // Unchanged after initialization
124  private NonceKey nonceKey = null;
125  private String owner = null;
126  private long parentProcId = NO_PROC_ID;
127  private long rootProcId = NO_PROC_ID;
128  private long procId = NO_PROC_ID;
129  private long submittedTime;
130  private boolean isCriticalSystemTable;
131
132  // Runtime state, updated every operation
133  private ProcedureState state = ProcedureState.INITIALIZING;
134  private RemoteProcedureException exception = null;
135  private int[] stackIndexes = null;
136  private int childrenLatch = 0;
137  // since we do not always maintain stackIndexes if the root procedure does not support rollback,
138  // we need a separated flag to indicate whether a procedure was executed
139  private boolean wasExecuted;
140
141  private volatile int timeout = NO_TIMEOUT;
142  private volatile long lastUpdate;
143
144  private volatile byte[] result = null;
145
146  private volatile boolean locked = false;
147
148  private boolean lockedWhenLoading = false;
149
150  /**
151   * Used for override complete of the procedure without actually doing any logic in the procedure.
152   * If bypass is set to true, when executing it will return null when {@link #doExecute(Object)} is
153   * called to finish the procedure and release any locks it may currently hold. The bypass does
154   * cleanup around the Procedure as far as the Procedure framework is concerned. It does not clean
155   * any internal state that the Procedure's themselves may have set. That is for the Procedures to
156   * do themselves when bypass is called. They should override bypass and do their cleanup in the
157   * overridden bypass method (be sure to call the parent bypass to ensure proper processing).
158   * <p>
159   * </p>
160   * Bypassing a procedure is not like aborting. Aborting a procedure will trigger a rollback. And
161   * since the {@link #abort(Object)} method is overrideable Some procedures may have chosen to
162   * ignore the aborting.
163   */
164  private volatile boolean bypass = false;
165
166  /**
167   * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to
168   * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the
169   * persistence of the procedure.
170   * <p/>
171   * This is useful when the procedure is in error and you want to retry later. The retry interval
172   * and the number of retries are usually not critical so skip the persistence can save some
173   * resources, and also speed up the restart processing.
174   * <p/>
175   * Notice that this value will be reset to true every time before execution. And when rolling back
176   * we do not test this value.
177   */
178  private boolean persist = true;
179
180  public boolean isBypass() {
181    return bypass;
182  }
183
184  /**
185   * Set the bypass to true. Only called in
186   * {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for now. DO NOT use
187   * this method alone, since we can't just bypass one single procedure. We need to bypass its
188   * ancestor too. If your Procedure has set state, it needs to undo it in here.
189   * @param env Current environment. May be null because of context; e.g. pretty-printing procedure
190   *            WALs where there is no 'environment' (and where Procedures that require an
191   *            'environment' won't be run.
192   */
193  protected void bypass(TEnvironment env) {
194    this.bypass = true;
195  }
196
197  boolean needPersistence() {
198    return persist;
199  }
200
201  void resetPersistence() {
202    persist = true;
203  }
204
205  protected final void skipPersistence() {
206    persist = false;
207  }
208
209  /**
210   * The main code of the procedure. It must be idempotent since execute() may be called multiple
211   * times in case of machine failure in the middle of the execution.
212   * @param env the environment passed to the ProcedureExecutor
213   * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
214   *         procedure is done.
215   * @throws ProcedureYieldException     the procedure will be added back to the queue and retried
216   *                                     later.
217   * @throws InterruptedException        the procedure will be added back to the queue and retried
218   *                                     later.
219   * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
220   *                                     and has set itself up waiting for an external event to wake
221   *                                     it back up again.
222   */
223  protected abstract Procedure<TEnvironment>[] execute(TEnvironment env)
224    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
225
226  /**
227   * The code to undo what was done by the execute() code. It is called when the procedure or one of
228   * the sub-procedures failed or an abort was requested. It should cleanup all the resources
229   * created by the execute() call. The implementation must be idempotent since rollback() may be
230   * called multiple time in case of machine failure in the middle of the execution.
231   * @param env the environment passed to the ProcedureExecutor
232   * @throws IOException          temporary failure, the rollback will retry later
233   * @throws InterruptedException the procedure will be added back to the queue and retried later
234   */
235  protected abstract void rollback(TEnvironment env) throws IOException, InterruptedException;
236
237  /**
238   * The abort() call is asynchronous and each procedure must decide how to deal with it, if they
239   * want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
240   * abort() method and then the execute() will check if the abort flag is set or not. abort() may
241   * be called multiple times from the client, so the implementation must be idempotent.
242   * <p>
243   * NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
244   * procedure implementor abort.
245   */
246  protected abstract boolean abort(TEnvironment env);
247
248  /**
249   * The user-level code of the procedure may have some state to persist (e.g. input arguments or
250   * current position in the processing state) to be able to resume on failure.
251   * @param serializer stores the serializable state
252   */
253  protected abstract void serializeStateData(ProcedureStateSerializer serializer)
254    throws IOException;
255
256  /**
257   * Called on store load to allow the user to decode the previously serialized state.
258   * @param serializer contains the serialized state
259   */
260  protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
261    throws IOException;
262
263  /**
264   * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
265   * call us to determine whether we need to wait for initialization, second, it will call
266   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
267   * <p/>
268   * This is because that when master restarts, we need to restore the lock state for all the
269   * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
270   * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
271   * of the initialization!), so we need to split the code into two steps, and when restore, we just
272   * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
273   * @return true means we need to wait until the environment has been initialized, otherwise true.
274   */
275  protected boolean waitInitialized(TEnvironment env) {
276    return false;
277  }
278
279  /**
280   * The user should override this method if they need a lock on an Entity. A lock can be anything,
281   * and it is up to the implementor. The Procedure Framework will call this method just before it
282   * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
283   * execute.
284   * <p/>
285   * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
286   * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
287   * <p/>
288   * Example: in our Master we can execute request in parallel for different tables. We can create
289   * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
290   * queued waiting that specific table create to happen.
291   * <p/>
292   * There are 3 LockState:
293   * <ul>
294   * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
295   * execute.</li>
296   * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
297   * take care of readding the procedure back to the runnable set for retry</li>
298   * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
299   * care of readding the procedure back to the runnable set when the lock is available.</li>
300   * </ul>
301   * @return the lock state as described above.
302   */
303  protected LockState acquireLock(TEnvironment env) {
304    return LockState.LOCK_ACQUIRED;
305  }
306
307  /**
308   * The user should override this method, and release lock if necessary.
309   */
310  protected void releaseLock(TEnvironment env) {
311    // no-op
312  }
313
314  /**
315   * Used to keep the procedure lock even when the procedure is yielding or suspended.
316   * @return true if the procedure should hold on the lock until completionCleanup()
317   */
318  protected boolean holdLock(TEnvironment env) {
319    return false;
320  }
321
322  /**
323   * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)}
324   * returns true, the procedure executor will call acquireLock() once and thereafter not call
325   * {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls release/acquire
326   * around each invocation of {@link #execute(Object)}.
327   * @see #holdLock(Object)
328   * @return true if the procedure has the lock, false otherwise.
329   */
330  public final boolean hasLock() {
331    return locked;
332  }
333
334  /**
335   * Called when the procedure is loaded for replay. The procedure implementor may use this method
336   * to perform some quick operation before replay. e.g. failing the procedure if the state on
337   * replay may be unknown.
338   */
339  protected void beforeReplay(TEnvironment env) {
340    // no-op
341  }
342
343  /**
344   * Called when the procedure is ready to be added to the queue after the loading/replay operation.
345   */
346  protected void afterReplay(TEnvironment env) {
347    // no-op
348  }
349
350  /**
351   * Called before we call the execute method of this procedure, but after we acquire the execution
352   * lock and procedure scheduler lock.
353   */
354  protected void beforeExec(TEnvironment env) throws ProcedureSuspendedException {
355    // no-op
356  }
357
358  /**
359   * Called after we call the execute method of this procedure, and also after we initialize all the
360   * sub procedures and persist the the state if persistence is needed.
361   * <p>
362   * This is for doing some hooks after we initialize the sub procedures. See HBASE-29259 for more
363   * details on why we can not release the region lock inside the execute method.
364   */
365  protected void afterExec(TEnvironment env) {
366    // no-op
367  }
368
369  /**
370   * Called when the procedure is marked as completed (success or rollback). The procedure
371   * implementor may use this method to cleanup in-memory states. This operation will not be retried
372   * on failure. If a procedure took a lock, it will have been released when this method runs.
373   */
374  protected void completionCleanup(TEnvironment env) {
375    // no-op
376  }
377
378  /**
379   * By default, the procedure framework/executor will try to run procedures start to finish. Return
380   * true to make the executor yield between each execution step to give other procedures a chance
381   * to run.
382   * @param env the environment passed to the ProcedureExecutor
383   * @return Return true if the executor should yield on completion of an execution step. Defaults
384   *         to return false.
385   */
386  protected boolean isYieldAfterExecutionStep(TEnvironment env) {
387    return false;
388  }
389
390  /**
391   * By default, the executor will keep the procedure result around util the eviction TTL is
392   * expired. The client can cut down the waiting time by requesting that the result is removed from
393   * the executor. In case of system started procedure, we can force the executor to auto-ack.
394   * @param env the environment passed to the ProcedureExecutor
395   * @return true if the executor should wait the client ack for the result. Defaults to return
396   *         true.
397   */
398  protected boolean shouldWaitClientAck(TEnvironment env) {
399    return true;
400  }
401
402  /**
403   * Override this method to provide procedure specific counters for submitted count, failed count
404   * and time histogram.
405   * @param env The environment passed to the procedure executor
406   * @return Container object for procedure related metric
407   */
408  protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
409    return null;
410  }
411
412  /**
413   * This function will be called just when procedure is submitted for execution. Override this
414   * method to update the metrics at the beginning of the procedure. The default implementation
415   * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
416   * {@link ProcedureMetrics}.
417   */
418  protected void updateMetricsOnSubmit(TEnvironment env) {
419    ProcedureMetrics metrics = getProcedureMetrics(env);
420    if (metrics == null) {
421      return;
422    }
423
424    Counter submittedCounter = metrics.getSubmittedCounter();
425    if (submittedCounter != null) {
426      submittedCounter.increment();
427    }
428  }
429
430  /**
431   * This function will be called just after procedure execution is finished. Override this method
432   * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
433   * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
434   * time histogram for successfully completed procedures. Increments failed counter for failed
435   * procedures.
436   * <p/>
437   * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
438   * successfully finished siblings, this function may get called twice in certain cases for certain
439   * procedures. Explore further if this can be called once.
440   * @param env     The environment passed to the procedure executor
441   * @param runtime Runtime of the procedure in milliseconds
442   * @param success true if procedure is completed successfully
443   */
444  protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
445    ProcedureMetrics metrics = getProcedureMetrics(env);
446    if (metrics == null) {
447      return;
448    }
449
450    if (success) {
451      Histogram timeHisto = metrics.getTimeHisto();
452      if (timeHisto != null) {
453        timeHisto.update(runtime);
454      }
455    } else {
456      Counter failedCounter = metrics.getFailedCounter();
457      if (failedCounter != null) {
458        failedCounter.increment();
459      }
460    }
461  }
462
463  @Override
464  public String toString() {
465    // Return the simple String presentation of the procedure.
466    return toStringSimpleSB().toString();
467  }
468
469  /**
470   * Build the StringBuilder for the simple form of procedure string.
471   * @return the StringBuilder
472   */
473  protected StringBuilder toStringSimpleSB() {
474    final StringBuilder sb = new StringBuilder();
475
476    sb.append("pid=");
477    sb.append(getProcId());
478
479    if (hasParent()) {
480      sb.append(", ppid=");
481      sb.append(getParentProcId());
482    }
483
484    /*
485     * TODO Enable later when this is being used. Currently owner not used. if (hasOwner()) {
486     * sb.append(", owner="); sb.append(getOwner()); }
487     */
488
489    sb.append(", state="); // pState for Procedure State as opposed to any other kind.
490    toStringState(sb);
491
492    sb.append(", hasLock=").append(locked);
493
494    if (bypass) {
495      sb.append(", bypass=").append(bypass);
496    }
497
498    if (hasException()) {
499      sb.append(", exception=" + getException());
500    }
501
502    sb.append("; ");
503    toStringClassDetails(sb);
504
505    return sb;
506  }
507
508  /**
509   * Extend the toString() information with more procedure details
510   */
511  public String toStringDetails() {
512    final StringBuilder sb = toStringSimpleSB();
513
514    sb.append(" submittedTime=");
515    sb.append(getSubmittedTime());
516
517    sb.append(", lastUpdate=");
518    sb.append(getLastUpdate());
519
520    final int[] stackIndices = getStackIndexes();
521    if (stackIndices != null) {
522      sb.append("\n");
523      sb.append("stackIndexes=");
524      sb.append(Arrays.toString(stackIndices));
525    }
526
527    return sb.toString();
528  }
529
530  protected String toStringClass() {
531    StringBuilder sb = new StringBuilder();
532    toStringClassDetails(sb);
533    return sb.toString();
534  }
535
536  /**
537   * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
538   * generic Procedure State with Procedure particulars.
539   * @param builder Append current {@link ProcedureState}
540   */
541  protected void toStringState(StringBuilder builder) {
542    builder.append(getState());
543  }
544
545  /**
546   * Extend the toString() information with the procedure details e.g. className and parameters
547   * @param builder the string builder to use to append the proc specific information
548   */
549  protected void toStringClassDetails(StringBuilder builder) {
550    builder.append(getClass().getName());
551  }
552
553  // ==========================================================================
554  // Those fields are unchanged after initialization.
555  //
556  // Each procedure will get created from the user or during
557  // ProcedureExecutor.start() during the load() phase and then submitted
558  // to the executor. these fields will never be changed after initialization
559  // ==========================================================================
560  public long getProcId() {
561    return procId;
562  }
563
564  public boolean hasParent() {
565    return parentProcId != NO_PROC_ID;
566  }
567
568  public long getParentProcId() {
569    return parentProcId;
570  }
571
572  public long getRootProcId() {
573    return rootProcId;
574  }
575
576  public String getProcName() {
577    return toStringClass();
578  }
579
580  public NonceKey getNonceKey() {
581    return nonceKey;
582  }
583
584  public long getSubmittedTime() {
585    return submittedTime;
586  }
587
588  public String getOwner() {
589    return owner;
590  }
591
592  public boolean hasOwner() {
593    return owner != null;
594  }
595
596  /**
597   * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
598   */
599  protected void setProcId(long procId) {
600    this.procId = procId;
601    this.submittedTime = EnvironmentEdgeManager.currentTime();
602    setState(ProcedureState.RUNNABLE);
603  }
604
605  /**
606   * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
607   */
608  protected void setParentProcId(long parentProcId) {
609    this.parentProcId = parentProcId;
610  }
611
612  public void setCriticalSystemTable(boolean isCriticalSystemTable) {
613    this.isCriticalSystemTable = isCriticalSystemTable;
614  }
615
616  public boolean isCriticalSystemTable() {
617    return isCriticalSystemTable;
618  }
619
620  protected void setRootProcId(long rootProcId) {
621    this.rootProcId = rootProcId;
622  }
623
624  /**
625   * Called by the ProcedureExecutor to set the value to the newly created procedure.
626   */
627  protected void setNonceKey(NonceKey nonceKey) {
628    this.nonceKey = nonceKey;
629  }
630
631  public void setOwner(String owner) {
632    this.owner = StringUtils.isEmpty(owner) ? null : owner;
633  }
634
635  public void setOwner(User owner) {
636    assert owner != null : "expected owner to be not null";
637    setOwner(owner.getShortName());
638  }
639
640  /**
641   * Called on store load to initialize the Procedure internals after the creation/deserialization.
642   */
643  protected void setSubmittedTime(long submittedTime) {
644    this.submittedTime = submittedTime;
645  }
646
647  // ==========================================================================
648  // runtime state - timeout related
649  // ==========================================================================
650  /**
651   * @param timeout timeout interval in msec
652   */
653  protected void setTimeout(int timeout) {
654    this.timeout = timeout;
655  }
656
657  public boolean hasTimeout() {
658    return timeout != NO_TIMEOUT;
659  }
660
661  /** Returns the timeout in msec */
662  public int getTimeout() {
663    return timeout;
664  }
665
666  /**
667   * Called on store load to initialize the Procedure internals after the creation/deserialization.
668   */
669  protected void setLastUpdate(long lastUpdate) {
670    this.lastUpdate = lastUpdate;
671  }
672
673  /**
674   * Called by ProcedureExecutor after each time a procedure step is executed.
675   */
676  protected void updateTimestamp() {
677    this.lastUpdate = EnvironmentEdgeManager.currentTime();
678  }
679
680  public long getLastUpdate() {
681    return lastUpdate;
682  }
683
684  /**
685   * Timeout of the next timeout. Called by the ProcedureExecutor if the procedure has timeout set
686   * and the procedure is in the waiting queue.
687   * @return the timestamp of the next timeout.
688   */
689  protected long getTimeoutTimestamp() {
690    return getLastUpdate() + getTimeout();
691  }
692
693  // ==========================================================================
694  // runtime state
695  // ==========================================================================
696  /** Returns the time elapsed between the last update and the start time of the procedure. */
697  public long elapsedTime() {
698    return getLastUpdate() - getSubmittedTime();
699  }
700
701  /** Returns the serialized result if any, otherwise null */
702  public byte[] getResult() {
703    return result;
704  }
705
706  /**
707   * The procedure may leave a "result" on completion.
708   * @param result the serialized result that will be passed to the client
709   */
710  protected void setResult(byte[] result) {
711    this.result = result;
712  }
713
714  /**
715   * Will only be called when loading procedures from procedure store, where we need to record
716   * whether the procedure has already held a lock. Later we will call {@link #restoreLock(Object)}
717   * to actually acquire the lock.
718   */
719  final void lockedWhenLoading() {
720    this.lockedWhenLoading = true;
721  }
722
723  /**
724   * Can only be called when restarting, before the procedure actually being executed, as after we
725   * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset
726   * {@link #lockedWhenLoading} to false.
727   * <p/>
728   * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
729   * front of a queue.
730   */
731  public boolean isLockedWhenLoading() {
732    return lockedWhenLoading;
733  }
734
735  // ==============================================================================================
736  // Runtime state, updated every operation by the ProcedureExecutor
737  //
738  // There is always 1 thread at the time operating on the state of the procedure.
739  // The ProcedureExecutor may check and set states, or some Procecedure may
740  // update its own state. but no concurrent updates. we use synchronized here
741  // just because the procedure can get scheduled on different executor threads on each step.
742  // ==============================================================================================
743
744  /** Returns true if the procedure is in a RUNNABLE state. */
745  public synchronized boolean isRunnable() {
746    return state == ProcedureState.RUNNABLE;
747  }
748
749  public synchronized boolean isInitializing() {
750    return state == ProcedureState.INITIALIZING;
751  }
752
753  /** Returns true if the procedure has failed. It may or may not have rolled back. */
754  public synchronized boolean isFailed() {
755    return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
756  }
757
758  /** Returns true if the procedure is finished successfully. */
759  public synchronized boolean isSuccess() {
760    return state == ProcedureState.SUCCESS && !hasException();
761  }
762
763  /**
764   * @return true if the procedure is finished. The Procedure may be completed successfully or
765   *         rolledback.
766   */
767  public synchronized boolean isFinished() {
768    return isSuccess() || state == ProcedureState.ROLLEDBACK;
769  }
770
771  /** Returns true if the procedure is waiting for a child to finish or for an external event. */
772  public synchronized boolean isWaiting() {
773    switch (state) {
774      case WAITING:
775      case WAITING_TIMEOUT:
776        return true;
777      default:
778        break;
779    }
780    return false;
781  }
782
783  protected synchronized void setState(final ProcedureState state) {
784    this.state = state;
785    updateTimestamp();
786  }
787
788  public synchronized ProcedureState getState() {
789    return state;
790  }
791
792  protected void setFailure(final String source, final Throwable cause) {
793    setFailure(new RemoteProcedureException(source, cause));
794  }
795
796  protected synchronized void setFailure(final RemoteProcedureException exception) {
797    this.exception = exception;
798    if (!isFinished()) {
799      setState(ProcedureState.FAILED);
800    }
801  }
802
803  protected void setAbortFailure(final String source, final String msg) {
804    setFailure(source, new ProcedureAbortedException(msg));
805  }
806
807  /**
808   * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
809   * <p/>
810   * Another usage for this method is to implement retrying. A procedure can set the state to
811   * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
812   * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
813   * call {@link #setTimeout(int)} method to set the timeout. And you should also override this
814   * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
815   * timeout event has been handled.
816   * @return true to let the framework handle the timeout as abort, false in case the procedure
817   *         handled the timeout itself.
818   */
819  protected synchronized boolean setTimeoutFailure(TEnvironment env) {
820    if (state == ProcedureState.WAITING_TIMEOUT) {
821      long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
822      setFailure("ProcedureExecutor",
823        new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
824      return true;
825    }
826    return false;
827  }
828
829  public synchronized boolean hasException() {
830    return exception != null;
831  }
832
833  public synchronized RemoteProcedureException getException() {
834    return exception;
835  }
836
837  /**
838   * Called by the ProcedureExecutor on procedure-load to restore the latch state
839   */
840  protected synchronized void setChildrenLatch(int numChildren) {
841    this.childrenLatch = numChildren;
842    if (LOG.isTraceEnabled()) {
843      LOG.trace("CHILD LATCH INCREMENT SET " + this.childrenLatch, new Throwable(this.toString()));
844    }
845  }
846
847  /**
848   * Called by the ProcedureExecutor on procedure-load to restore the latch state
849   */
850  protected synchronized void incChildrenLatch() {
851    // TODO: can this be inferred from the stack? I think so...
852    this.childrenLatch++;
853    if (LOG.isTraceEnabled()) {
854      LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
855    }
856  }
857
858  /**
859   * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
860   */
861  private synchronized boolean childrenCountDown() {
862    assert childrenLatch > 0 : this;
863    boolean b = --childrenLatch == 0;
864    if (LOG.isTraceEnabled()) {
865      LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
866    }
867    return b;
868  }
869
870  /**
871   * Try to set this procedure into RUNNABLE state. Succeeds if all subprocedures/children are done.
872   * @return True if we were able to move procedure to RUNNABLE state.
873   */
874  synchronized boolean tryRunnable() {
875    // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
876    if (getState() == ProcedureState.WAITING && childrenCountDown()) {
877      setState(ProcedureState.RUNNABLE);
878      return true;
879    } else {
880      return false;
881    }
882  }
883
884  protected synchronized boolean hasChildren() {
885    return childrenLatch > 0;
886  }
887
888  protected synchronized int getChildrenLatch() {
889    return childrenLatch;
890  }
891
892  /**
893   * Called by the RootProcedureState on procedure execution. Each procedure store its stack-index
894   * positions.
895   */
896  protected synchronized void addStackIndex(final int index) {
897    if (stackIndexes == null) {
898      stackIndexes = new int[] { index };
899    } else {
900      int count = stackIndexes.length;
901      stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
902      stackIndexes[count] = index;
903    }
904    wasExecuted = true;
905  }
906
907  protected synchronized boolean removeStackIndex() {
908    if (stackIndexes != null && stackIndexes.length > 1) {
909      stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
910      return false;
911    } else {
912      stackIndexes = null;
913      return true;
914    }
915  }
916
917  /**
918   * Called on store load to initialize the Procedure internals after the creation/deserialization.
919   */
920  protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
921    this.stackIndexes = new int[stackIndexes.size()];
922    for (int i = 0; i < this.stackIndexes.length; ++i) {
923      this.stackIndexes[i] = stackIndexes.get(i);
924    }
925    // for backward compatible, where a procedure is serialized before we added the executed flag,
926    // the flag will be false so we need to set the wasExecuted flag here
927    this.wasExecuted = true;
928  }
929
930  protected synchronized void setExecuted() {
931    this.wasExecuted = true;
932  }
933
934  public synchronized boolean wasExecuted() {
935    return wasExecuted;
936  }
937
938  protected synchronized int[] getStackIndexes() {
939    return stackIndexes;
940  }
941
942  /**
943   * Return whether the procedure supports rollback. If the procedure does not support rollback, we
944   * can skip the rollback state management which could increase the performance. See HBASE-28210
945   * and HBASE-28212.
946   */
947  protected boolean isRollbackSupported() {
948    return true;
949  }
950
951  // ==========================================================================
952  // Internal methods - called by the ProcedureExecutor
953  // ==========================================================================
954
955  /**
956   * Internal method called by the ProcedureExecutor that starts the user-level code execute().
957   * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
958   *                                     skip out without changing states or releasing any locks
959   *                                     held.
960   */
961  protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
962    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
963    try {
964      updateTimestamp();
965      if (bypass) {
966        LOG.info("{} bypassed, returning null to finish it", this);
967        return null;
968      }
969      return execute(env);
970    } finally {
971      updateTimestamp();
972    }
973  }
974
975  /**
976   * Internal method called by the ProcedureExecutor that starts the user-level code rollback().
977   */
978  protected void doRollback(TEnvironment env) throws IOException, InterruptedException {
979    try {
980      updateTimestamp();
981      if (bypass) {
982        LOG.info("{} bypassed, skipping rollback", this);
983        return;
984      }
985      rollback(env);
986    } finally {
987      updateTimestamp();
988    }
989  }
990
991  final void restoreLock(TEnvironment env) {
992    if (!lockedWhenLoading) {
993      LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
994      return;
995    }
996
997    if (isFinished()) {
998      LOG.debug("{} is already finished, skip acquiring lock.", this);
999      return;
1000    }
1001
1002    if (isBypass()) {
1003      LOG.debug("{} is already bypassed, skip acquiring lock.", this);
1004      return;
1005    }
1006    // this can happen if the parent stores the sub procedures but before it can
1007    // release its lock, the master restarts
1008    if (getState() == ProcedureState.WAITING && !holdLock(env)) {
1009      LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this);
1010      lockedWhenLoading = false;
1011      return;
1012    }
1013    LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
1014    LockState state = acquireLock(env);
1015    assert state == LockState.LOCK_ACQUIRED;
1016  }
1017
1018  /**
1019   * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
1020   */
1021  final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
1022    if (waitInitialized(env)) {
1023      return LockState.LOCK_EVENT_WAIT;
1024    }
1025    if (lockedWhenLoading) {
1026      // reset it so we will not consider it anymore
1027      lockedWhenLoading = false;
1028      locked = true;
1029      // Here we return without persist the locked state, as lockedWhenLoading is true means
1030      // that the locked field of the procedure stored in procedure store is true, so we do not need
1031      // to store it again.
1032      return LockState.LOCK_ACQUIRED;
1033    }
1034    LockState state = acquireLock(env);
1035    if (state == LockState.LOCK_ACQUIRED) {
1036      locked = true;
1037      // persist that we have held the lock. This must be done before we actually execute the
1038      // procedure, otherwise when restarting, we may consider the procedure does not have a lock,
1039      // but it may have already done some changes as we have already executed it, and if another
1040      // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
1041      // not expect that another procedure can be executed in the middle.
1042      store.update(this);
1043    }
1044    return state;
1045  }
1046
1047  /**
1048   * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
1049   */
1050  final void doReleaseLock(TEnvironment env, ProcedureStore store) {
1051    locked = false;
1052    // persist that we have released the lock. This must be done before we actually release the
1053    // lock. Another procedure may take this lock immediately after we release the lock, and if we
1054    // crash before persist the information that we have already released the lock, then when
1055    // restarting there will be two procedures which both have the lock and cause problems.
1056    if (getState() != ProcedureState.ROLLEDBACK) {
1057      // If the state is ROLLEDBACK, it means that we have already deleted the procedure from
1058      // procedure store, so do not need to log the release operation any more.
1059      store.update(this);
1060    }
1061    releaseLock(env);
1062  }
1063
1064  protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter)
1065    throws ProcedureSuspendedException {
1066    if (jitter) {
1067      // 10% possible jitter
1068      double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1);
1069      timeoutMillis += add;
1070    }
1071    setTimeout(timeoutMillis);
1072    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
1073    skipPersistence();
1074    throw new ProcedureSuspendedException();
1075  }
1076
1077  @Override
1078  public int compareTo(final Procedure<TEnvironment> other) {
1079    return Long.compare(getProcId(), other.getProcId());
1080  }
1081
1082  // ==========================================================================
1083  // misc utils
1084  // ==========================================================================
1085
1086  /**
1087   * Get an hashcode for the specified Procedure ID
1088   * @return the hashcode for the specified procId
1089   */
1090  public static long getProcIdHashCode(long procId) {
1091    long h = procId;
1092    h ^= h >> 16;
1093    h *= 0x85ebca6b;
1094    h ^= h >> 13;
1095    h *= 0xc2b2ae35;
1096    h ^= h >> 16;
1097    return h;
1098  }
1099
1100  /**
1101   * Helper to lookup the root Procedure ID given a specified procedure.
1102   */
1103  protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
1104    Procedure<T> proc) {
1105    while (proc.hasParent()) {
1106      proc = procedures.get(proc.getParentProcId());
1107      if (proc == null) {
1108        return null;
1109      }
1110    }
1111    return proc.getProcId();
1112  }
1113
1114  /**
1115   * @param a the first procedure to be compared.
1116   * @param b the second procedure to be compared.
1117   * @return true if the two procedures have the same parent
1118   */
1119  public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
1120    return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
1121  }
1122}