1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.Arrays;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.StateMachineProcedureData;
29
30
31
32
33
34
35
36
37
38
39
40
41 @InterfaceAudience.Private
42 @InterfaceStability.Evolving
43 public abstract class StateMachineProcedure<TEnvironment, TState>
44 extends Procedure<TEnvironment> {
45 private int stateCount = 0;
46 private int[] states = null;
47
48 protected enum Flow {
49 HAS_MORE_STATE,
50 NO_MORE_STATE,
51 }
52
53
54
55
56
57
58
59 protected abstract Flow executeFromState(TEnvironment env, TState state)
60 throws ProcedureYieldException, InterruptedException;
61
62
63
64
65
66
67 protected abstract void rollbackState(TEnvironment env, TState state)
68 throws IOException, InterruptedException;
69
70
71
72
73
74
75 protected abstract TState getState(int stateId);
76
77
78
79
80
81
82 protected abstract int getStateId(TState state);
83
84
85
86
87
88 protected abstract TState getInitialState();
89
90
91
92
93
94 protected void setNextState(final TState state) {
95 setNextState(getStateId(state));
96 }
97
98
99
100
101
102
103
104
105
106 protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) {
107 return false;
108 }
109
110 @Override
111 protected Procedure[] execute(final TEnvironment env)
112 throws ProcedureYieldException, InterruptedException {
113 updateTimestamp();
114 try {
115 TState state = getCurrentState();
116 if (stateCount == 0) {
117 setNextState(getStateId(state));
118 }
119 if (executeFromState(env, state) == Flow.NO_MORE_STATE) {
120
121 return null;
122 }
123 return (isWaiting() || isFailed()) ? null : new Procedure[] {this};
124 } finally {
125 updateTimestamp();
126 }
127 }
128
129 @Override
130 protected void rollback(final TEnvironment env)
131 throws IOException, InterruptedException {
132 try {
133 updateTimestamp();
134 rollbackState(env, getCurrentState());
135 stateCount--;
136 } finally {
137 updateTimestamp();
138 }
139 }
140
141 @Override
142 protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
143 return isYieldBeforeExecuteFromState(env, getCurrentState());
144 }
145
146 private TState getCurrentState() {
147 return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
148 }
149
150
151
152
153
154 private void setNextState(final int stateId) {
155 if (states == null || states.length == stateCount) {
156 int newCapacity = stateCount + 8;
157 if (states != null) {
158 states = Arrays.copyOf(states, newCapacity);
159 } else {
160 states = new int[newCapacity];
161 }
162 }
163 states[stateCount++] = stateId;
164 }
165
166 @Override
167 protected void toStringState(StringBuilder builder) {
168 super.toStringState(builder);
169 if (!isFinished() && getCurrentState() != null) {
170 builder.append(":").append(getCurrentState());
171 }
172 }
173
174 @Override
175 protected void serializeStateData(final OutputStream stream) throws IOException {
176 StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
177 for (int i = 0; i < stateCount; ++i) {
178 data.addState(states[i]);
179 }
180 data.build().writeDelimitedTo(stream);
181 }
182
183 @Override
184 protected void deserializeStateData(final InputStream stream) throws IOException {
185 StateMachineProcedureData data = StateMachineProcedureData.parseDelimitedFrom(stream);
186 stateCount = data.getStateCount();
187 if (stateCount > 0) {
188 states = new int[stateCount];
189 for (int i = 0; i < stateCount; ++i) {
190 states[i] = data.getState(i);
191 }
192 } else {
193 states = null;
194 }
195 }
196 }