001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicBoolean;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.yetus.audience.InterfaceStability;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMachineProcedureData;
031
032/**
033 * Procedure described by a series of steps.
034 * <p>
035 * The procedure implementor must have an enum of 'states', describing the various step of the
036 * procedure. Once the procedure is running, the procedure-framework will call executeFromState()
037 * using the 'state' provided by the user. The first call to executeFromState() will be performed
038 * with 'state = null'. The implementor can jump between states using
039 * setNextState(MyStateEnum.ordinal()). The rollback will call rollbackState() for each state that
040 * was executed, in reverse order.
041 */
042@InterfaceAudience.Private
043@InterfaceStability.Evolving
044public abstract class StateMachineProcedure<TEnvironment, TState> extends Procedure<TEnvironment> {
045  private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcedure.class);
046
047  private static final int EOF_STATE = Integer.MIN_VALUE;
048
049  private final AtomicBoolean aborted = new AtomicBoolean(false);
050
051  private Flow stateFlow = Flow.HAS_MORE_STATE;
052  protected int stateCount = 0;
053  private int[] states = null;
054
055  private List<Procedure<TEnvironment>> subProcList = null;
056
057  protected final int getCycles() {
058    return cycles;
059  }
060
061  /**
062   * Cycles on same state. Good for figuring if we are stuck.
063   */
064  private int cycles = 0;
065
066  /**
067   * Ordinal of the previous state. So we can tell if we are progressing or not.
068   */
069  private int previousState;
070
071  public enum Flow {
072    HAS_MORE_STATE,
073    NO_MORE_STATE,
074  }
075
076  /**
077   * called to perform a single step of the specified 'state' of the procedure
078   * @param state state to execute
079   * @return Flow.NO_MORE_STATE if the procedure is completed, Flow.HAS_MORE_STATE if there is
080   *         another step.
081   */
082  protected abstract Flow executeFromState(TEnvironment env, TState state)
083    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
084
085  /**
086   * called to perform the rollback of the specified state
087   * @param state state to rollback
088   * @throws IOException temporary failure, the rollback will retry later
089   */
090  protected abstract void rollbackState(TEnvironment env, TState state)
091    throws IOException, InterruptedException;
092
093  /**
094   * Convert an ordinal (or state id) to an Enum (or more descriptive) state object.
095   * @param stateId the ordinal() of the state enum (or state id)
096   * @return the state enum object
097   */
098  protected abstract TState getState(int stateId);
099
100  /**
101   * Convert the Enum (or more descriptive) state object to an ordinal (or state id).
102   * @param state the state enum object
103   * @return stateId the ordinal() of the state enum (or state id)
104   */
105  protected abstract int getStateId(TState state);
106
107  /**
108   * Return the initial state object that will be used for the first call to executeFromState().
109   * @return the initial state enum object
110   */
111  protected abstract TState getInitialState();
112
113  /**
114   * Set the next state for the procedure.
115   * @param state the state enum object
116   */
117  protected void setNextState(final TState state) {
118    setNextState(getStateId(state));
119    failIfAborted();
120  }
121
122  /**
123   * By default, the executor will try ro run all the steps of the procedure start to finish. Return
124   * true to make the executor yield between execution steps to give other procedures time to run
125   * their steps.
126   * @param state the state we are going to execute next.
127   * @return Return true if the executor should yield before the execution of the specified step.
128   *         Defaults to return false.
129   */
130  protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) {
131    return false;
132  }
133
134  /**
135   * Add a child procedure to execute
136   * @param subProcedure the child procedure
137   */
138  protected <T extends Procedure<TEnvironment>> void
139    addChildProcedure(@SuppressWarnings("unchecked") T... subProcedure) {
140    if (subProcedure == null) {
141      return;
142    }
143    final int len = subProcedure.length;
144    if (len == 0) {
145      return;
146    }
147    if (subProcList == null) {
148      subProcList = new ArrayList<>(len);
149    }
150    for (int i = 0; i < len; ++i) {
151      Procedure<TEnvironment> proc = subProcedure[i];
152      if (!proc.hasOwner()) {
153        proc.setOwner(getOwner());
154      }
155
156      subProcList.add(proc);
157    }
158  }
159
160  @Override
161  protected Procedure[] execute(final TEnvironment env)
162    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
163    updateTimestamp();
164    try {
165      failIfAborted();
166
167      if (!hasMoreState() || isFailed()) {
168        return null;
169      }
170
171      TState state = getCurrentState();
172      if (stateCount == 0) {
173        setNextState(getStateId(state));
174      }
175
176      if (LOG.isTraceEnabled()) {
177        LOG.trace(state + " " + this + "; cycles=" + this.cycles);
178      }
179      // Keep running count of cycles
180      if (getStateId(state) != this.previousState) {
181        this.previousState = getStateId(state);
182        this.cycles = 0;
183      } else {
184        this.cycles++;
185      }
186
187      LOG.trace("{}", this);
188      stateFlow = executeFromState(env, state);
189      if (!hasMoreState()) {
190        setNextState(EOF_STATE);
191      }
192
193      if (subProcList != null && !subProcList.isEmpty()) {
194        Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
195        subProcList = null;
196        return subProcedures;
197      }
198      return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] { this };
199    } finally {
200      updateTimestamp();
201    }
202  }
203
204  @Override
205  protected void rollback(final TEnvironment env) throws IOException, InterruptedException {
206    if (isEofState()) {
207      stateCount--;
208    }
209
210    try {
211      updateTimestamp();
212      rollbackState(env, getCurrentState());
213    } finally {
214      stateCount--;
215      updateTimestamp();
216    }
217  }
218
219  protected boolean isEofState() {
220    return stateCount > 0 && states[stateCount - 1] == EOF_STATE;
221  }
222
223  @Override
224  protected boolean abort(final TEnvironment env) {
225    LOG.debug("Abort requested for {}", this);
226    if (!hasMoreState()) {
227      LOG.warn("Ignore abort request on {} because it has already been finished", this);
228      return false;
229    }
230    if (!isRollbackSupported(getCurrentState())) {
231      LOG.warn("Ignore abort request on {} because it does not support rollback", this);
232      return false;
233    }
234    aborted.set(true);
235    return true;
236  }
237
238  /**
239   * If procedure has more states then abort it otherwise procedure is finished and abort can be
240   * ignored.
241   */
242  protected final void failIfAborted() {
243    if (aborted.get()) {
244      if (hasMoreState()) {
245        setAbortFailure(getClass().getSimpleName(), "abort requested");
246      } else {
247        LOG.warn("Ignoring abort request on state='" + getCurrentState() + "' for " + this);
248      }
249    }
250  }
251
252  /**
253   * Used by the default implementation of abort() to know if the current state can be aborted and
254   * rollback can be triggered.
255   */
256  protected boolean isRollbackSupported(final TState state) {
257    return false;
258  }
259
260  @Override
261  protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
262    return isYieldBeforeExecuteFromState(env, getCurrentState());
263  }
264
265  private boolean hasMoreState() {
266    return stateFlow != Flow.NO_MORE_STATE;
267  }
268
269  protected TState getCurrentState() {
270    return stateCount > 0 ? getState(states[stateCount - 1]) : getInitialState();
271  }
272
273  /**
274   * This method is used from test code as it cannot be assumed that state transition will happen
275   * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in
276   * future.
277   */
278  public int getCurrentStateId() {
279    return getStateId(getCurrentState());
280  }
281
282  /**
283   * Set the next state for the procedure.
284   * @param stateId the ordinal() of the state enum (or state id)
285   */
286  private void setNextState(final int stateId) {
287    if (states == null || states.length == stateCount) {
288      int newCapacity = stateCount + 8;
289      if (states != null) {
290        states = Arrays.copyOf(states, newCapacity);
291      } else {
292        states = new int[newCapacity];
293      }
294    }
295    states[stateCount++] = stateId;
296  }
297
298  @Override
299  protected void toStringState(StringBuilder builder) {
300    super.toStringState(builder);
301    if (!isFinished() && !isEofState() && getCurrentState() != null) {
302      builder.append(":").append(getCurrentState());
303    }
304  }
305
306  @Override
307  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
308    StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
309    for (int i = 0; i < stateCount; ++i) {
310      data.addState(states[i]);
311    }
312    serializer.serialize(data.build());
313  }
314
315  @Override
316  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
317    StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class);
318    stateCount = data.getStateCount();
319    if (stateCount > 0) {
320      states = new int[stateCount];
321      for (int i = 0; i < stateCount; ++i) {
322        states[i] = data.getState(i);
323      }
324      if (isEofState()) {
325        stateFlow = Flow.NO_MORE_STATE;
326      }
327    } else {
328      states = null;
329    }
330  }
331}