001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.List;
025import java.util.concurrent.atomic.AtomicBoolean;
026
027import org.apache.yetus.audience.InterfaceAudience;
028import org.apache.yetus.audience.InterfaceStability;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
033
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMachineProcedureData;
035
036/**
037 * Procedure described by a series of steps.
038 *
039 * <p>The procedure implementor must have an enum of 'states', describing
040 * the various step of the procedure.
041 * Once the procedure is running, the procedure-framework will call executeFromState()
042 * using the 'state' provided by the user. The first call to executeFromState()
043 * will be performed with 'state = null'. The implementor can jump between
044 * states using setNextState(MyStateEnum.ordinal()).
045 * The rollback will call rollbackState() for each state that was executed, in reverse order.
046 */
047@InterfaceAudience.Private
048@InterfaceStability.Evolving
049public abstract class StateMachineProcedure<TEnvironment, TState>
050    extends Procedure<TEnvironment> {
051  private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcedure.class);
052
053  private static final int EOF_STATE = Integer.MIN_VALUE;
054
055  private final AtomicBoolean aborted = new AtomicBoolean(false);
056
057  private Flow stateFlow = Flow.HAS_MORE_STATE;
058  protected int stateCount = 0;
059  private int[] states = null;
060
061  private List<Procedure<TEnvironment>> subProcList = null;
062
063  protected final int getCycles() {
064    return cycles;
065  }
066
067  /**
068   * Cycles on same state. Good for figuring if we are stuck.
069   */
070  private int cycles = 0;
071
072  /**
073   * Ordinal of the previous state. So we can tell if we are progressing or not.
074   */
075  private int previousState;
076
077  @VisibleForTesting
078  public enum Flow {
079    HAS_MORE_STATE,
080    NO_MORE_STATE,
081  }
082
083  /**
084   * called to perform a single step of the specified 'state' of the procedure
085   * @param state state to execute
086   * @return Flow.NO_MORE_STATE if the procedure is completed,
087   *         Flow.HAS_MORE_STATE if there is another step.
088   */
089  protected abstract Flow executeFromState(TEnvironment env, TState state)
090  throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
091
092  /**
093   * called to perform the rollback of the specified state
094   * @param state state to rollback
095   * @throws IOException temporary failure, the rollback will retry later
096   */
097  protected abstract void rollbackState(TEnvironment env, TState state)
098    throws IOException, InterruptedException;
099
100  /**
101   * Convert an ordinal (or state id) to an Enum (or more descriptive) state object.
102   * @param stateId the ordinal() of the state enum (or state id)
103   * @return the state enum object
104   */
105  protected abstract TState getState(int stateId);
106
107  /**
108   * Convert the Enum (or more descriptive) state object to an ordinal (or state id).
109   * @param state the state enum object
110   * @return stateId the ordinal() of the state enum (or state id)
111   */
112  protected abstract int getStateId(TState state);
113
114  /**
115   * Return the initial state object that will be used for the first call to executeFromState().
116   * @return the initial state enum object
117   */
118  protected abstract TState getInitialState();
119
120  /**
121   * Set the next state for the procedure.
122   * @param state the state enum object
123   */
124  protected void setNextState(final TState state) {
125    setNextState(getStateId(state));
126    failIfAborted();
127  }
128
129  /**
130   * By default, the executor will try ro run all the steps of the procedure start to finish.
131   * Return true to make the executor yield between execution steps to
132   * give other procedures time to run their steps.
133   * @param state the state we are going to execute next.
134   * @return Return true if the executor should yield before the execution of the specified step.
135   *         Defaults to return false.
136   */
137  protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) {
138    return false;
139  }
140
141  /**
142   * Add a child procedure to execute
143   * @param subProcedure the child procedure
144   */
145  protected <T extends Procedure<TEnvironment>> void addChildProcedure(
146      @SuppressWarnings("unchecked") T... subProcedure) {
147    if (subProcedure == null) {
148      return;
149    }
150    final int len = subProcedure.length;
151    if (len == 0) {
152      return;
153    }
154    if (subProcList == null) {
155      subProcList = new ArrayList<>(len);
156    }
157    for (int i = 0; i < len; ++i) {
158      Procedure<TEnvironment> proc = subProcedure[i];
159      if (!proc.hasOwner()) proc.setOwner(getOwner());
160      subProcList.add(proc);
161    }
162  }
163
164  @Override
165  protected Procedure[] execute(final TEnvironment env)
166  throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
167    updateTimestamp();
168    try {
169      failIfAborted();
170
171      if (!hasMoreState() || isFailed()) return null;
172      TState state = getCurrentState();
173      if (stateCount == 0) {
174        setNextState(getStateId(state));
175      }
176
177      if (LOG.isTraceEnabled()) {
178        LOG.trace(state  + " " + this + "; cycles=" + this.cycles);
179      }
180      // Keep running count of cycles
181      if (getStateId(state) != this.previousState) {
182        this.previousState = getStateId(state);
183        this.cycles = 0;
184      } else {
185        this.cycles++;
186      }
187
188      LOG.trace("{}", toString());
189      stateFlow = executeFromState(env, state);
190      if (!hasMoreState()) setNextState(EOF_STATE);
191      if (subProcList != null && !subProcList.isEmpty()) {
192        Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
193        subProcList = null;
194        return subProcedures;
195      }
196      return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
197    } finally {
198      updateTimestamp();
199    }
200  }
201
202  @Override
203  protected void rollback(final TEnvironment env)
204      throws IOException, InterruptedException {
205    if (isEofState()) stateCount--;
206    try {
207      updateTimestamp();
208      rollbackState(env, getCurrentState());
209    } finally {
210      stateCount--;
211      updateTimestamp();
212    }
213  }
214
215  protected boolean isEofState() {
216    return stateCount > 0 && states[stateCount-1] == EOF_STATE;
217  }
218
219  @Override
220  protected boolean abort(final TEnvironment env) {
221    LOG.debug("Abort requested for {}", this);
222    if (!hasMoreState()) {
223      LOG.warn("Ignore abort request on {} because it has already been finished", this);
224      return false;
225    }
226    if (!isRollbackSupported(getCurrentState())) {
227      LOG.warn("Ignore abort request on {} because it does not support rollback", this);
228      return false;
229    }
230    aborted.set(true);
231    return true;
232  }
233
234  /**
235   * If procedure has more states then abort it otherwise procedure is finished and abort can be
236   * ignored.
237   */
238  protected final void failIfAborted() {
239    if (aborted.get()) {
240      if (hasMoreState()) {
241        setAbortFailure(getClass().getSimpleName(), "abort requested");
242      } else {
243        LOG.warn("Ignoring abort request on state='" + getCurrentState() + "' for " + this);
244      }
245    }
246  }
247
248  /**
249   * Used by the default implementation of abort() to know if the current state can be aborted
250   * and rollback can be triggered.
251   */
252  protected boolean isRollbackSupported(final TState state) {
253    return false;
254  }
255
256  @Override
257  protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
258    return isYieldBeforeExecuteFromState(env, getCurrentState());
259  }
260
261  private boolean hasMoreState() {
262    return stateFlow != Flow.NO_MORE_STATE;
263  }
264
265  protected TState getCurrentState() {
266    return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
267  }
268
269  /**
270   * This method is used from test code as it cannot be assumed that state transition will happen
271   * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in
272   * future.
273   */
274  @VisibleForTesting
275  public int getCurrentStateId() {
276    return getStateId(getCurrentState());
277  }
278
279  /**
280   * Set the next state for the procedure.
281   * @param stateId the ordinal() of the state enum (or state id)
282   */
283  private void setNextState(final int stateId) {
284    if (states == null || states.length == stateCount) {
285      int newCapacity = stateCount + 8;
286      if (states != null) {
287        states = Arrays.copyOf(states, newCapacity);
288      } else {
289        states = new int[newCapacity];
290      }
291    }
292    states[stateCount++] = stateId;
293  }
294
295  @Override
296  protected void toStringState(StringBuilder builder) {
297    super.toStringState(builder);
298    if (!isFinished() && !isEofState() && getCurrentState() != null) {
299      builder.append(":").append(getCurrentState());
300    }
301  }
302
303  @Override
304  protected void serializeStateData(ProcedureStateSerializer serializer)
305      throws IOException {
306    StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
307    for (int i = 0; i < stateCount; ++i) {
308      data.addState(states[i]);
309    }
310    serializer.serialize(data.build());
311  }
312
313  @Override
314  protected void deserializeStateData(ProcedureStateSerializer serializer)
315      throws IOException {
316    StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class);
317    stateCount = data.getStateCount();
318    if (stateCount > 0) {
319      states = new int[stateCount];
320      for (int i = 0; i < stateCount; ++i) {
321        states[i] = data.getState(i);
322      }
323      if (isEofState()) {
324        stateFlow = Flow.NO_MORE_STATE;
325      }
326    } else {
327      states = null;
328    }
329  }
330}