001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.yetus.audience.InterfaceStability;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
032
033import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMachineProcedureData;
034
035/**
036 * Procedure described by a series of steps.
037 *
038 * <p>The procedure implementor must have an enum of 'states', describing
039 * the various step of the procedure.
040 * Once the procedure is running, the procedure-framework will call executeFromState()
041 * using the 'state' provided by the user. The first call to executeFromState()
042 * will be performed with 'state = null'. The implementor can jump between
043 * states using setNextState(MyStateEnum.ordinal()).
044 * The rollback will call rollbackState() for each state that was executed, in reverse order.
045 */
046@InterfaceAudience.Private
047@InterfaceStability.Evolving
048public abstract class StateMachineProcedure<TEnvironment, TState>
049    extends Procedure<TEnvironment> {
050  private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcedure.class);
051
052  private static final int EOF_STATE = Integer.MIN_VALUE;
053
054  private final AtomicBoolean aborted = new AtomicBoolean(false);
055
056  private Flow stateFlow = Flow.HAS_MORE_STATE;
057  private int stateCount = 0;
058  private int[] states = null;
059
060  private List<Procedure<TEnvironment>> subProcList = null;
061
062  protected final int getCycles() {
063    return cycles;
064  }
065
066  /**
067   * Cycles on same state. Good for figuring if we are stuck.
068   */
069  private int cycles = 0;
070
071  /**
072   * Ordinal of the previous state. So we can tell if we are progressing or not.
073   */
074  private int previousState;
075
076  @VisibleForTesting
077  public enum Flow {
078    HAS_MORE_STATE,
079    NO_MORE_STATE,
080  }
081
082  /**
083   * called to perform a single step of the specified 'state' of the procedure
084   * @param state state to execute
085   * @return Flow.NO_MORE_STATE if the procedure is completed,
086   *         Flow.HAS_MORE_STATE if there is another step.
087   */
088  protected abstract Flow executeFromState(TEnvironment env, TState state)
089          throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
090
091  /**
092   * called to perform the rollback of the specified state
093   * @param state state to rollback
094   * @throws IOException temporary failure, the rollback will retry later
095   */
096  protected abstract void rollbackState(TEnvironment env, TState state)
097    throws IOException, InterruptedException;
098
099  /**
100   * Convert an ordinal (or state id) to an Enum (or more descriptive) state object.
101   * @param stateId the ordinal() of the state enum (or state id)
102   * @return the state enum object
103   */
104  protected abstract TState getState(int stateId);
105
106  /**
107   * Convert the Enum (or more descriptive) state object to an ordinal (or state id).
108   * @param state the state enum object
109   * @return stateId the ordinal() of the state enum (or state id)
110   */
111  protected abstract int getStateId(TState state);
112
113  /**
114   * Return the initial state object that will be used for the first call to executeFromState().
115   * @return the initial state enum object
116   */
117  protected abstract TState getInitialState();
118
119  /**
120   * Set the next state for the procedure.
121   * @param state the state enum object
122   */
123  protected void setNextState(final TState state) {
124    setNextState(getStateId(state));
125    failIfAborted();
126  }
127
128  /**
129   * By default, the executor will try ro run all the steps of the procedure start to finish.
130   * Return true to make the executor yield between execution steps to
131   * give other procedures time to run their steps.
132   * @param state the state we are going to execute next.
133   * @return Return true if the executor should yield before the execution of the specified step.
134   *         Defaults to return false.
135   */
136  protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) {
137    return false;
138  }
139
140  /**
141   * Add a child procedure to execute
142   * @param subProcedure the child procedure
143   */
144  protected <T extends Procedure<TEnvironment>> void addChildProcedure(
145      @SuppressWarnings("unchecked") T... subProcedure) {
146    if (subProcedure == null) {
147      return;
148    }
149    final int len = subProcedure.length;
150    if (len == 0) {
151      return;
152    }
153    if (subProcList == null) {
154      subProcList = new ArrayList<>(len);
155    }
156    for (int i = 0; i < len; ++i) {
157      Procedure<TEnvironment> proc = subProcedure[i];
158      if (!proc.hasOwner()) {
159        proc.setOwner(getOwner());
160      }
161
162      subProcList.add(proc);
163    }
164  }
165
166  @Override
167  protected Procedure[] execute(final TEnvironment env)
168          throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
169    updateTimestamp();
170    try {
171      failIfAborted();
172
173      if (!hasMoreState() || isFailed()) {
174        return null;
175      }
176
177      TState state = getCurrentState();
178      if (stateCount == 0) {
179        setNextState(getStateId(state));
180      }
181
182      if (LOG.isTraceEnabled()) {
183        LOG.trace(state  + " " + this + "; cycles=" + this.cycles);
184      }
185      // Keep running count of cycles
186      if (getStateId(state) != this.previousState) {
187        this.previousState = getStateId(state);
188        this.cycles = 0;
189      } else {
190        this.cycles++;
191      }
192
193      LOG.trace("{}", this);
194      stateFlow = executeFromState(env, state);
195      if (!hasMoreState()) {
196        setNextState(EOF_STATE);
197      }
198
199      if (subProcList != null && !subProcList.isEmpty()) {
200        Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
201        subProcList = null;
202        return subProcedures;
203      }
204      return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
205    } finally {
206      updateTimestamp();
207    }
208  }
209
210  @Override
211  protected void rollback(final TEnvironment env)
212      throws IOException, InterruptedException {
213    if (isEofState()) {
214      stateCount--;
215    }
216
217    try {
218      updateTimestamp();
219      rollbackState(env, getCurrentState());
220      stateCount--;
221    } finally {
222      updateTimestamp();
223    }
224  }
225
226  private boolean isEofState() {
227    return stateCount > 0 && states[stateCount-1] == EOF_STATE;
228  }
229
230  @Override
231  protected boolean abort(final TEnvironment env) {
232    LOG.debug("Abort requested for {}", this);
233    if (!hasMoreState()) {
234      LOG.warn("Ignore abort request on {} because it has already been finished", this);
235      return false;
236    }
237    if (!isRollbackSupported(getCurrentState())) {
238      LOG.warn("Ignore abort request on {} because it does not support rollback", this);
239      return false;
240    }
241    aborted.set(true);
242    return true;
243  }
244
245  /**
246   * If procedure has more states then abort it otherwise procedure is finished and abort can be
247   * ignored.
248   */
249  protected final void failIfAborted() {
250    if (aborted.get()) {
251      if (hasMoreState()) {
252        setAbortFailure(getClass().getSimpleName(), "abort requested");
253      } else {
254        LOG.warn("Ignoring abort request on state='" + getCurrentState() + "' for " + this);
255      }
256    }
257  }
258
259  /**
260   * Used by the default implementation of abort() to know if the current state can be aborted
261   * and rollback can be triggered.
262   */
263  protected boolean isRollbackSupported(final TState state) {
264    return false;
265  }
266
267  @Override
268  protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
269    return isYieldBeforeExecuteFromState(env, getCurrentState());
270  }
271
272  private boolean hasMoreState() {
273    return stateFlow != Flow.NO_MORE_STATE;
274  }
275
276  protected TState getCurrentState() {
277    return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
278  }
279
280  /**
281   * This method is used from test code as it cannot be assumed that state transition will happen
282   * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in
283   * future.
284   */
285  @VisibleForTesting
286  public int getCurrentStateId() {
287    return getStateId(getCurrentState());
288  }
289
290  /**
291   * Set the next state for the procedure.
292   * @param stateId the ordinal() of the state enum (or state id)
293   */
294  private void setNextState(final int stateId) {
295    if (states == null || states.length == stateCount) {
296      int newCapacity = stateCount + 8;
297      if (states != null) {
298        states = Arrays.copyOf(states, newCapacity);
299      } else {
300        states = new int[newCapacity];
301      }
302    }
303    states[stateCount++] = stateId;
304  }
305
306  @Override
307  protected void toStringState(StringBuilder builder) {
308    super.toStringState(builder);
309    if (!isFinished() && !isEofState() && getCurrentState() != null) {
310      builder.append(":").append(getCurrentState());
311    }
312  }
313
314  @Override
315  protected void serializeStateData(ProcedureStateSerializer serializer)
316      throws IOException {
317    StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
318    for (int i = 0; i < stateCount; ++i) {
319      data.addState(states[i]);
320    }
321    serializer.serialize(data.build());
322  }
323
324  @Override
325  protected void deserializeStateData(ProcedureStateSerializer serializer)
326      throws IOException {
327    StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class);
328    stateCount = data.getStateCount();
329    if (stateCount > 0) {
330      states = new int[stateCount];
331      for (int i = 0; i < stateCount; ++i) {
332        states[i] = data.getState(i);
333      }
334      if (isEofState()) {
335        stateFlow = Flow.NO_MORE_STATE;
336      }
337    } else {
338      states = null;
339    }
340  }
341}