1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.CountDownLatch;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.errorhandling.ForeignException;
27 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
28 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
29 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
30 import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
31 import org.apache.zookeeper.KeeperException;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 abstract public class Subprocedure implements Callable<Void> {
54 private static final Log LOG = LogFactory.getLog(Subprocedure.class);
55
56
57 final private String barrierName;
58
59
60
61
62
63
64 private final CountDownLatch inGlobalBarrier;
65
66 private final CountDownLatch releasedLocalBarrier;
67
68
69
70
71
72 protected final ForeignExceptionDispatcher monitor;
73
74 protected final long wakeFrequency;
75 protected final TimeoutExceptionInjector executionTimeoutTimer;
76 protected final ProcedureMemberRpcs rpcs;
77
78 private volatile boolean complete = false;
79
80
81
82
83
84
85
86
87
88 public Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor,
89 long wakeFrequency, long timeout) {
90
91 assert member != null : "procedure member should be non-null";
92 assert member.getRpcs() != null : "rpc handlers should be non-null";
93 assert procName != null : "procedure name should be non-null";
94 assert monitor != null : "monitor should be non-null";
95
96
97 this.rpcs = member.getRpcs();
98 this.barrierName = procName;
99 this.monitor = monitor;
100
101
102 this.monitor.addListener(new ForeignExceptionListener() {
103 @Override
104 public void receive(ForeignException ee) {
105
106 if (ee.isRemote()) {
107 LOG.debug("Was remote foreign exception, not redispatching error", ee);
108 return;
109 }
110
111 if (ee.getCause() instanceof KeeperException) {
112 LOG.debug("Was KeeperException, not redispatching error", ee);
113 return;
114 }
115
116 try {
117 rpcs.sendMemberAborted(Subprocedure.this, ee);
118 } catch (IOException e) {
119
120 LOG.error("Can't reach controller, not propagating error", e);
121 }
122 }
123 });
124
125 this.wakeFrequency = wakeFrequency;
126 this.inGlobalBarrier = new CountDownLatch(1);
127 this.releasedLocalBarrier = new CountDownLatch(1);
128
129
130 this.executionTimeoutTimer = new TimeoutExceptionInjector(monitor, timeout);
131 }
132
133 public String getName() {
134 return barrierName;
135 }
136
137 public String getMemberName() {
138 return rpcs.getMemberName();
139 }
140
141 private void rethrowException() throws ForeignException {
142 monitor.rethrowException();
143 }
144
145
146
147
148
149
150
151
152
153
154
155 @SuppressWarnings("finally")
156 final public Void call() {
157 LOG.debug("Starting subprocedure '" + barrierName + "' with timeout " +
158 executionTimeoutTimer.getMaxTime() + "ms");
159
160 executionTimeoutTimer.start();
161
162 try {
163
164 rethrowException();
165 LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage");
166 acquireBarrier();
167 LOG.debug("Subprocedure '" + barrierName + "' locally acquired");
168 rethrowException();
169
170
171 rpcs.sendMemberAcquired(this);
172 LOG.debug("Subprocedure '" + barrierName + "' coordinator notified of 'acquire', waiting on" +
173 " 'reached' or 'abort' from coordinator");
174
175
176 waitForReachedGlobalBarrier();
177 rethrowException();
178
179
180
181
182
183
184
185
186 LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator.");
187 byte[] dataToCoordinator = insideBarrier();
188 LOG.debug("Subprocedure '" + barrierName + "' locally completed");
189 rethrowException();
190
191
192 rpcs.sendMemberCompleted(this, dataToCoordinator);
193 LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion");
194
195
196 rethrowException();
197 } catch (Exception e) {
198 String msg = null;
199 if (e instanceof InterruptedException) {
200 msg = "Procedure '" + barrierName + "' aborting due to interrupt!" +
201 " Likely due to pool shutdown.";
202 Thread.currentThread().interrupt();
203 } else if (e instanceof ForeignException) {
204 msg = "Subprocedure '" + barrierName + "' aborting due to a ForeignException!";
205 } else {
206 msg = "Subprocedure '" + barrierName + "' failed!";
207 }
208 cancel(msg, e);
209
210 LOG.debug("Subprocedure '" + barrierName + "' running cleanup.");
211 cleanup(e);
212 } finally {
213 releasedLocalBarrier.countDown();
214
215
216 executionTimeoutTimer.complete();
217 complete = true;
218 LOG.debug("Subprocedure '" + barrierName + "' completed.");
219 return null;
220 }
221 }
222
223 boolean isComplete() {
224 return complete;
225 }
226
227
228
229
230 ForeignExceptionSnare getErrorCheckable() {
231 return this.monitor;
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245 abstract public void acquireBarrier() throws ForeignException;
246
247
248
249
250
251
252
253
254
255
256
257
258 abstract public byte[] insideBarrier() throws ForeignException;
259
260
261
262
263
264
265
266 abstract public void cleanup(Exception e);
267
268
269
270
271
272 public void cancel(String msg, Throwable cause) {
273 LOG.error(msg, cause);
274 complete = true;
275 if (cause instanceof ForeignException) {
276 monitor.receive((ForeignException) cause);
277 } else {
278 monitor.receive(new ForeignException(getMemberName(), cause));
279 }
280 }
281
282
283
284
285
286
287 public void receiveReachedGlobalBarrier() {
288 inGlobalBarrier.countDown();
289 }
290
291
292
293
294
295
296
297
298
299
300
301
302
303 void waitForReachedGlobalBarrier() throws ForeignException, InterruptedException {
304 Procedure.waitForLatch(inGlobalBarrier, monitor, wakeFrequency,
305 barrierName + ":remote acquired");
306 }
307
308
309
310
311
312
313 public void waitForLocallyCompleted() throws ForeignException, InterruptedException {
314 Procedure.waitForLatch(releasedLocalBarrier, monitor, wakeFrequency,
315 barrierName + ":completed");
316 }
317
318
319
320
321
322
323 public static class SubprocedureImpl extends Subprocedure {
324
325 public SubprocedureImpl(ProcedureMember member, String opName,
326 ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout) {
327 super(member, opName, monitor, wakeFrequency, timeout);
328 }
329
330 @Override
331 public void acquireBarrier() throws ForeignException {}
332
333 @Override
334 public byte[] insideBarrier() throws ForeignException {
335 return new byte[0];
336 }
337
338 @Override
339 public void cleanup(Exception e) {}
340 };
341 }