View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.Arrays;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.StateMachineProcedureData;
29  
30  /**
31   * Procedure described by a series of steps.
32   *
33   * The procedure implementor must have an enum of 'states', describing
34   * the various step of the procedure.
35   * Once the procedure is running, the procedure-framework will call executeFromState()
36   * using the 'state' provided by the user. The first call to executeFromState()
37   * will be performed with 'state = null'. The implementor can jump between
38   * states using setNextState(MyStateEnum.ordinal()).
39   * The rollback will call rollbackState() for each state that was executed, in reverse order.
40   */
41  @InterfaceAudience.Private
42  @InterfaceStability.Evolving
43  public abstract class StateMachineProcedure<TEnvironment, TState>
44      extends Procedure<TEnvironment> {
45    private int stateCount = 0;
46    private int[] states = null;
47  
48    protected enum Flow {
49      HAS_MORE_STATE,
50      NO_MORE_STATE,
51    }
52  
53    /**
54     * called to perform a single step of the specified 'state' of the procedure
55     * @param state state to execute
56     * @return Flow.NO_MORE_STATE if the procedure is completed,
57     *         Flow.HAS_MORE_STATE if there is another step.
58     */
59    protected abstract Flow executeFromState(TEnvironment env, TState state)
60      throws ProcedureYieldException, InterruptedException;
61  
62    /**
63     * called to perform the rollback of the specified state
64     * @param state state to rollback
65     * @throws IOException temporary failure, the rollback will retry later
66     */
67    protected abstract void rollbackState(TEnvironment env, TState state)
68      throws IOException, InterruptedException;
69  
70    /**
71     * Convert an ordinal (or state id) to an Enum (or more descriptive) state object.
72     * @param stateId the ordinal() of the state enum (or state id)
73     * @return the state enum object
74     */
75    protected abstract TState getState(int stateId);
76  
77    /**
78     * Convert the Enum (or more descriptive) state object to an ordinal (or state id).
79     * @param state the state enum object
80     * @return stateId the ordinal() of the state enum (or state id)
81     */
82    protected abstract int getStateId(TState state);
83  
84    /**
85     * Return the initial state object that will be used for the first call to executeFromState().
86     * @return the initial state enum object
87     */
88    protected abstract TState getInitialState();
89  
90    /**
91     * Set the next state for the procedure.
92     * @param state the state enum object
93     */
94    protected void setNextState(final TState state) {
95      setNextState(getStateId(state));
96    }
97  
98    /**
99     * By default, the executor will try ro run all the steps of the procedure start to finish.
100    * Return true to make the executor yield between execution steps to
101    * give other procedures time to run their steps.
102    * @param state the state we are going to execute next.
103    * @return Return true if the executor should yield before the execution of the specified step.
104    *         Defaults to return false.
105    */
106   protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) {
107     return false;
108   }
109 
110   @Override
111   protected Procedure[] execute(final TEnvironment env)
112       throws ProcedureYieldException, InterruptedException {
113     updateTimestamp();
114     try {
115       TState state = getCurrentState();
116       if (stateCount == 0) {
117         setNextState(getStateId(state));
118       }
119       if (executeFromState(env, state) == Flow.NO_MORE_STATE) {
120         // completed
121         return null;
122       }
123       return (isWaiting() || isFailed()) ? null : new Procedure[] {this};
124     } finally {
125       updateTimestamp();
126     }
127   }
128 
129   @Override
130   protected void rollback(final TEnvironment env)
131       throws IOException, InterruptedException {
132     try {
133       updateTimestamp();
134       rollbackState(env, getCurrentState());
135       stateCount--;
136     } finally {
137       updateTimestamp();
138     }
139   }
140 
141   @Override
142   protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
143     return isYieldBeforeExecuteFromState(env, getCurrentState());
144   }
145 
146   private TState getCurrentState() {
147     return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
148   }
149 
150   /**
151    * Set the next state for the procedure.
152    * @param stateId the ordinal() of the state enum (or state id)
153    */
154   private void setNextState(final int stateId) {
155     if (states == null || states.length == stateCount) {
156       int newCapacity = stateCount + 8;
157       if (states != null) {
158         states = Arrays.copyOf(states, newCapacity);
159       } else {
160         states = new int[newCapacity];
161       }
162     }
163     states[stateCount++] = stateId;
164   }
165 
166   @Override
167   protected void toStringState(StringBuilder builder) {
168     super.toStringState(builder);
169     if (!isFinished() && getCurrentState() != null) {
170       builder.append(":").append(getCurrentState());
171     }
172   }
173 
174   @Override
175   protected void serializeStateData(final OutputStream stream) throws IOException {
176     StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
177     for (int i = 0; i < stateCount; ++i) {
178       data.addState(states[i]);
179     }
180     data.build().writeDelimitedTo(stream);
181   }
182 
183   @Override
184   protected void deserializeStateData(final InputStream stream) throws IOException {
185     StateMachineProcedureData data = StateMachineProcedureData.parseDelimitedFrom(stream);
186     stateCount = data.getStateCount();
187     if (stateCount > 0) {
188       states = new int[stateCount];
189       for (int i = 0; i < stateCount; ++i) {
190         states[i] = data.getState(i);
191       }
192     } else {
193       states = null;
194     }
195   }
196 }