001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.yetus.audience.InterfaceStability;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMachineProcedureData;
032
033/**
034 * Procedure described by a series of steps.
035 *
036 * <p>The procedure implementor must have an enum of 'states', describing
037 * the various step of the procedure.
038 * Once the procedure is running, the procedure-framework will call executeFromState()
039 * using the 'state' provided by the user. The first call to executeFromState()
040 * will be performed with 'state = null'. The implementor can jump between
041 * states using setNextState(MyStateEnum.ordinal()).
042 * The rollback will call rollbackState() for each state that was executed, in reverse order.
043 */
044@InterfaceAudience.Private
045@InterfaceStability.Evolving
046public abstract class StateMachineProcedure<TEnvironment, TState>
047    extends Procedure<TEnvironment> {
048  private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcedure.class);
049
050  private static final int EOF_STATE = Integer.MIN_VALUE;
051
052  private final AtomicBoolean aborted = new AtomicBoolean(false);
053
054  private Flow stateFlow = Flow.HAS_MORE_STATE;
055  protected int stateCount = 0;
056  private int[] states = null;
057
058  private List<Procedure<TEnvironment>> subProcList = null;
059
060  protected final int getCycles() {
061    return cycles;
062  }
063
064  /**
065   * Cycles on same state. Good for figuring if we are stuck.
066   */
067  private int cycles = 0;
068
069  /**
070   * Ordinal of the previous state. So we can tell if we are progressing or not.
071   */
072  private int previousState;
073
074  public enum Flow {
075    HAS_MORE_STATE,
076    NO_MORE_STATE,
077  }
078
079  /**
080   * called to perform a single step of the specified 'state' of the procedure
081   * @param state state to execute
082   * @return Flow.NO_MORE_STATE if the procedure is completed,
083   *         Flow.HAS_MORE_STATE if there is another step.
084   */
085  protected abstract Flow executeFromState(TEnvironment env, TState state)
086          throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
087
088  /**
089   * called to perform the rollback of the specified state
090   * @param state state to rollback
091   * @throws IOException temporary failure, the rollback will retry later
092   */
093  protected abstract void rollbackState(TEnvironment env, TState state)
094    throws IOException, InterruptedException;
095
096  /**
097   * Convert an ordinal (or state id) to an Enum (or more descriptive) state object.
098   * @param stateId the ordinal() of the state enum (or state id)
099   * @return the state enum object
100   */
101  protected abstract TState getState(int stateId);
102
103  /**
104   * Convert the Enum (or more descriptive) state object to an ordinal (or state id).
105   * @param state the state enum object
106   * @return stateId the ordinal() of the state enum (or state id)
107   */
108  protected abstract int getStateId(TState state);
109
110  /**
111   * Return the initial state object that will be used for the first call to executeFromState().
112   * @return the initial state enum object
113   */
114  protected abstract TState getInitialState();
115
116  /**
117   * Set the next state for the procedure.
118   * @param state the state enum object
119   */
120  protected void setNextState(final TState state) {
121    setNextState(getStateId(state));
122    failIfAborted();
123  }
124
125  /**
126   * By default, the executor will try ro run all the steps of the procedure start to finish.
127   * Return true to make the executor yield between execution steps to
128   * give other procedures time to run their steps.
129   * @param state the state we are going to execute next.
130   * @return Return true if the executor should yield before the execution of the specified step.
131   *         Defaults to return false.
132   */
133  protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) {
134    return false;
135  }
136
137  /**
138   * Add a child procedure to execute
139   * @param subProcedure the child procedure
140   */
141  protected <T extends Procedure<TEnvironment>> void addChildProcedure(
142      @SuppressWarnings("unchecked") T... subProcedure) {
143    if (subProcedure == null) {
144      return;
145    }
146    final int len = subProcedure.length;
147    if (len == 0) {
148      return;
149    }
150    if (subProcList == null) {
151      subProcList = new ArrayList<>(len);
152    }
153    for (int i = 0; i < len; ++i) {
154      Procedure<TEnvironment> proc = subProcedure[i];
155      if (!proc.hasOwner()) {
156        proc.setOwner(getOwner());
157      }
158
159      subProcList.add(proc);
160    }
161  }
162
163  @Override
164  protected Procedure[] execute(final TEnvironment env)
165          throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
166    updateTimestamp();
167    try {
168      failIfAborted();
169
170      if (!hasMoreState() || isFailed()) {
171        return null;
172      }
173
174      TState state = getCurrentState();
175      if (stateCount == 0) {
176        setNextState(getStateId(state));
177      }
178
179      if (LOG.isTraceEnabled()) {
180        LOG.trace(state  + " " + this + "; cycles=" + this.cycles);
181      }
182      // Keep running count of cycles
183      if (getStateId(state) != this.previousState) {
184        this.previousState = getStateId(state);
185        this.cycles = 0;
186      } else {
187        this.cycles++;
188      }
189
190      LOG.trace("{}", this);
191      stateFlow = executeFromState(env, state);
192      if (!hasMoreState()) {
193        setNextState(EOF_STATE);
194      }
195
196      if (subProcList != null && !subProcList.isEmpty()) {
197        Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
198        subProcList = null;
199        return subProcedures;
200      }
201      return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
202    } finally {
203      updateTimestamp();
204    }
205  }
206
207  @Override
208  protected void rollback(final TEnvironment env)
209      throws IOException, InterruptedException {
210    if (isEofState()) {
211      stateCount--;
212    }
213
214    try {
215      updateTimestamp();
216      rollbackState(env, getCurrentState());
217    } finally {
218      stateCount--;
219      updateTimestamp();
220    }
221  }
222
223  protected boolean isEofState() {
224    return stateCount > 0 && states[stateCount-1] == EOF_STATE;
225  }
226
227  @Override
228  protected boolean abort(final TEnvironment env) {
229    LOG.debug("Abort requested for {}", this);
230    if (!hasMoreState()) {
231      LOG.warn("Ignore abort request on {} because it has already been finished", this);
232      return false;
233    }
234    if (!isRollbackSupported(getCurrentState())) {
235      LOG.warn("Ignore abort request on {} because it does not support rollback", this);
236      return false;
237    }
238    aborted.set(true);
239    return true;
240  }
241
242  /**
243   * If procedure has more states then abort it otherwise procedure is finished and abort can be
244   * ignored.
245   */
246  protected final void failIfAborted() {
247    if (aborted.get()) {
248      if (hasMoreState()) {
249        setAbortFailure(getClass().getSimpleName(), "abort requested");
250      } else {
251        LOG.warn("Ignoring abort request on state='" + getCurrentState() + "' for " + this);
252      }
253    }
254  }
255
256  /**
257   * Used by the default implementation of abort() to know if the current state can be aborted
258   * and rollback can be triggered.
259   */
260  protected boolean isRollbackSupported(final TState state) {
261    return false;
262  }
263
264  @Override
265  protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
266    return isYieldBeforeExecuteFromState(env, getCurrentState());
267  }
268
269  private boolean hasMoreState() {
270    return stateFlow != Flow.NO_MORE_STATE;
271  }
272
273  protected TState getCurrentState() {
274    return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
275  }
276
277  /**
278   * This method is used from test code as it cannot be assumed that state transition will happen
279   * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in
280   * future.
281   */
282  public int getCurrentStateId() {
283    return getStateId(getCurrentState());
284  }
285
286  /**
287   * Set the next state for the procedure.
288   * @param stateId the ordinal() of the state enum (or state id)
289   */
290  private void setNextState(final int stateId) {
291    if (states == null || states.length == stateCount) {
292      int newCapacity = stateCount + 8;
293      if (states != null) {
294        states = Arrays.copyOf(states, newCapacity);
295      } else {
296        states = new int[newCapacity];
297      }
298    }
299    states[stateCount++] = stateId;
300  }
301
302  @Override
303  protected void toStringState(StringBuilder builder) {
304    super.toStringState(builder);
305    if (!isFinished() && !isEofState() && getCurrentState() != null) {
306      builder.append(":").append(getCurrentState());
307    }
308  }
309
310  @Override
311  protected void serializeStateData(ProcedureStateSerializer serializer)
312      throws IOException {
313    StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
314    for (int i = 0; i < stateCount; ++i) {
315      data.addState(states[i]);
316    }
317    serializer.serialize(data.build());
318  }
319
320  @Override
321  protected void deserializeStateData(ProcedureStateSerializer serializer)
322      throws IOException {
323    StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class);
324    stateCount = data.getStateCount();
325    if (stateCount > 0) {
326      states = new int[stateCount];
327      for (int i = 0; i < stateCount; ++i) {
328        states[i] = data.getState(i);
329      }
330      if (isEofState()) {
331        stateFlow = Flow.NO_MORE_STATE;
332      }
333    } else {
334      states = null;
335    }
336  }
337}