001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.List;
025import java.util.concurrent.atomic.AtomicBoolean;
026
027import org.apache.yetus.audience.InterfaceAudience;
028import org.apache.yetus.audience.InterfaceStability;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
033
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMachineProcedureData;
035
036/**
037 * Procedure described by a series of steps.
038 *
039 * <p>The procedure implementor must have an enum of 'states', describing
040 * the various step of the procedure.
041 * Once the procedure is running, the procedure-framework will call executeFromState()
042 * using the 'state' provided by the user. The first call to executeFromState()
043 * will be performed with 'state = null'. The implementor can jump between
044 * states using setNextState(MyStateEnum.ordinal()).
045 * The rollback will call rollbackState() for each state that was executed, in reverse order.
046 */
047@InterfaceAudience.Private
048@InterfaceStability.Evolving
049public abstract class StateMachineProcedure<TEnvironment, TState>
050    extends Procedure<TEnvironment> {
051  private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcedure.class);
052
053  private static final int EOF_STATE = Integer.MIN_VALUE;
054
055  private final AtomicBoolean aborted = new AtomicBoolean(false);
056
057  private Flow stateFlow = Flow.HAS_MORE_STATE;
058  protected int stateCount = 0;
059  private int[] states = null;
060
061  private List<Procedure<TEnvironment>> subProcList = null;
062
063  protected final int getCycles() {
064    return cycles;
065  }
066
067  /**
068   * Cycles on same state. Good for figuring if we are stuck.
069   */
070  private int cycles = 0;
071
072  /**
073   * Ordinal of the previous state. So we can tell if we are progressing or not.
074   */
075  private int previousState;
076
077  @VisibleForTesting
078  public enum Flow {
079    HAS_MORE_STATE,
080    NO_MORE_STATE,
081  }
082
083  /**
084   * called to perform a single step of the specified 'state' of the procedure
085   * @param state state to execute
086   * @return Flow.NO_MORE_STATE if the procedure is completed,
087   *         Flow.HAS_MORE_STATE if there is another step.
088   */
089  protected abstract Flow executeFromState(TEnvironment env, TState state)
090  throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
091
092  /**
093   * called to perform the rollback of the specified state
094   * @param state state to rollback
095   * @throws IOException temporary failure, the rollback will retry later
096   */
097  protected abstract void rollbackState(TEnvironment env, TState state)
098    throws IOException, InterruptedException;
099
100  /**
101   * Convert an ordinal (or state id) to an Enum (or more descriptive) state object.
102   * @param stateId the ordinal() of the state enum (or state id)
103   * @return the state enum object
104   */
105  protected abstract TState getState(int stateId);
106
107  /**
108   * Convert the Enum (or more descriptive) state object to an ordinal (or state id).
109   * @param state the state enum object
110   * @return stateId the ordinal() of the state enum (or state id)
111   */
112  protected abstract int getStateId(TState state);
113
114  /**
115   * Return the initial state object that will be used for the first call to executeFromState().
116   * @return the initial state enum object
117   */
118  protected abstract TState getInitialState();
119
120  /**
121   * Set the next state for the procedure.
122   * @param state the state enum object
123   */
124  protected void setNextState(final TState state) {
125    setNextState(getStateId(state));
126    failIfAborted();
127  }
128
129  /**
130   * By default, the executor will try ro run all the steps of the procedure start to finish.
131   * Return true to make the executor yield between execution steps to
132   * give other procedures time to run their steps.
133   * @param state the state we are going to execute next.
134   * @return Return true if the executor should yield before the execution of the specified step.
135   *         Defaults to return false.
136   */
137  protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) {
138    return false;
139  }
140
141  /**
142   * Add a child procedure to execute
143   * @param subProcedure the child procedure
144   */
145  protected void addChildProcedure(Procedure<TEnvironment>... subProcedure) {
146    if (subProcedure == null) return;
147    final int len = subProcedure.length;
148    if (len == 0) return;
149    if (subProcList == null) {
150      subProcList = new ArrayList<>(len);
151    }
152    for (int i = 0; i < len; ++i) {
153      Procedure<TEnvironment> proc = subProcedure[i];
154      if (!proc.hasOwner()) proc.setOwner(getOwner());
155      subProcList.add(proc);
156    }
157  }
158
159  @Override
160  protected Procedure[] execute(final TEnvironment env)
161  throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
162    updateTimestamp();
163    try {
164      failIfAborted();
165
166      if (!hasMoreState() || isFailed()) return null;
167      TState state = getCurrentState();
168      if (stateCount == 0) {
169        setNextState(getStateId(state));
170      }
171
172      if (LOG.isTraceEnabled()) {
173        LOG.trace(state  + " " + this + "; cycles=" + this.cycles);
174      }
175      // Keep running count of cycles
176      if (getStateId(state) != this.previousState) {
177        this.previousState = getStateId(state);
178        this.cycles = 0;
179      } else {
180        this.cycles++;
181      }
182
183      LOG.trace("{}", toString());
184      stateFlow = executeFromState(env, state);
185      if (!hasMoreState()) setNextState(EOF_STATE);
186      if (subProcList != null && !subProcList.isEmpty()) {
187        Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
188        subProcList = null;
189        return subProcedures;
190      }
191      return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
192    } finally {
193      updateTimestamp();
194    }
195  }
196
197  @Override
198  protected void rollback(final TEnvironment env)
199      throws IOException, InterruptedException {
200    if (isEofState()) stateCount--;
201    try {
202      updateTimestamp();
203      rollbackState(env, getCurrentState());
204    } finally {
205      stateCount--;
206      updateTimestamp();
207    }
208  }
209
210  protected boolean isEofState() {
211    return stateCount > 0 && states[stateCount-1] == EOF_STATE;
212  }
213
214  @Override
215  protected boolean abort(final TEnvironment env) {
216    LOG.debug("Abort requested for {}", this);
217    if (!hasMoreState()) {
218      LOG.warn("Ignore abort request on {} because it has already been finished", this);
219      return false;
220    }
221    if (!isRollbackSupported(getCurrentState())) {
222      LOG.warn("Ignore abort request on {} because it does not support rollback", this);
223      return false;
224    }
225    aborted.set(true);
226    return true;
227  }
228
229  /**
230   * If procedure has more states then abort it otherwise procedure is finished and abort can be
231   * ignored.
232   */
233  protected final void failIfAborted() {
234    if (aborted.get()) {
235      if (hasMoreState()) {
236        setAbortFailure(getClass().getSimpleName(), "abort requested");
237      } else {
238        LOG.warn("Ignoring abort request on state='" + getCurrentState() + "' for " + this);
239      }
240    }
241  }
242
243  /**
244   * Used by the default implementation of abort() to know if the current state can be aborted
245   * and rollback can be triggered.
246   */
247  protected boolean isRollbackSupported(final TState state) {
248    return false;
249  }
250
251  @Override
252  protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
253    return isYieldBeforeExecuteFromState(env, getCurrentState());
254  }
255
256  private boolean hasMoreState() {
257    return stateFlow != Flow.NO_MORE_STATE;
258  }
259
260  protected TState getCurrentState() {
261    return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
262  }
263
264  /**
265   * This method is used from test code as it cannot be assumed that state transition will happen
266   * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in
267   * future.
268   */
269  @VisibleForTesting
270  public int getCurrentStateId() {
271    return getStateId(getCurrentState());
272  }
273
274  /**
275   * Set the next state for the procedure.
276   * @param stateId the ordinal() of the state enum (or state id)
277   */
278  private void setNextState(final int stateId) {
279    if (states == null || states.length == stateCount) {
280      int newCapacity = stateCount + 8;
281      if (states != null) {
282        states = Arrays.copyOf(states, newCapacity);
283      } else {
284        states = new int[newCapacity];
285      }
286    }
287    states[stateCount++] = stateId;
288  }
289
290  @Override
291  protected void toStringState(StringBuilder builder) {
292    super.toStringState(builder);
293    if (!isFinished() && !isEofState() && getCurrentState() != null) {
294      builder.append(":").append(getCurrentState());
295    }
296  }
297
298  @Override
299  protected void serializeStateData(ProcedureStateSerializer serializer)
300      throws IOException {
301    StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
302    for (int i = 0; i < stateCount; ++i) {
303      data.addState(states[i]);
304    }
305    serializer.serialize(data.build());
306  }
307
308  @Override
309  protected void deserializeStateData(ProcedureStateSerializer serializer)
310      throws IOException {
311    StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class);
312    stateCount = data.getStateCount();
313    if (stateCount > 0) {
314      states = new int[stateCount];
315      for (int i = 0; i < stateCount; ++i) {
316        states[i] = data.getState(i);
317      }
318      if (isEofState()) {
319        stateFlow = Flow.NO_MORE_STATE;
320      }
321    } else {
322      states = null;
323    }
324  }
325}