1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.Arrays;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.StateMachineProcedureData;
29
30
31
32
33
34
35
36
37
38
39
40
41 @InterfaceAudience.Private
42 @InterfaceStability.Evolving
43 public abstract class StateMachineProcedure<TEnvironment, TState>
44 extends Procedure<TEnvironment> {
45 private int stateCount = 0;
46 private int[] states = null;
47
48 protected enum Flow {
49 HAS_MORE_STATE,
50 NO_MORE_STATE,
51 }
52
53
54
55
56
57
58
59 protected abstract Flow executeFromState(TEnvironment env, TState state)
60 throws ProcedureYieldException;
61
62
63
64
65
66
67 protected abstract void rollbackState(TEnvironment env, TState state)
68 throws IOException;
69
70
71
72
73
74
75 protected abstract TState getState(int stateId);
76
77
78
79
80
81
82 protected abstract int getStateId(TState state);
83
84
85
86
87
88 protected abstract TState getInitialState();
89
90
91
92
93
94 protected void setNextState(final TState state) {
95 setNextState(getStateId(state));
96 }
97
98 @Override
99 protected Procedure[] execute(final TEnvironment env)
100 throws ProcedureYieldException {
101 updateTimestamp();
102 try {
103 TState state = stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
104 if (stateCount == 0) {
105 setNextState(getStateId(state));
106 }
107 if (executeFromState(env, state) == Flow.NO_MORE_STATE) {
108
109 return null;
110 }
111 return (isWaiting() || isFailed()) ? null : new Procedure[] {this};
112 } finally {
113 updateTimestamp();
114 }
115 }
116
117 @Override
118 protected void rollback(final TEnvironment env) throws IOException {
119 try {
120 updateTimestamp();
121 rollbackState(env, stateCount > 0 ? getState(states[stateCount-1]) : getInitialState());
122 stateCount--;
123 } finally {
124 updateTimestamp();
125 }
126 }
127
128
129
130
131
132 private void setNextState(final int stateId) {
133 if (states == null || states.length == stateCount) {
134 int newCapacity = stateCount + 8;
135 if (states != null) {
136 states = Arrays.copyOf(states, newCapacity);
137 } else {
138 states = new int[newCapacity];
139 }
140 }
141 states[stateCount++] = stateId;
142 }
143
144 @Override
145 protected void serializeStateData(final OutputStream stream) throws IOException {
146 StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
147 for (int i = 0; i < stateCount; ++i) {
148 data.addState(states[i]);
149 }
150 data.build().writeDelimitedTo(stream);
151 }
152
153 @Override
154 protected void deserializeStateData(final InputStream stream) throws IOException {
155 StateMachineProcedureData data = StateMachineProcedureData.parseDelimitedFrom(stream);
156 stateCount = data.getStateCount();
157 if (stateCount > 0) {
158 states = new int[stateCount];
159 for (int i = 0; i < stateCount; ++i) {
160 states[i] = data.getState(i);
161 }
162 } else {
163 states = null;
164 }
165 }
166 }