1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.errorhandling.ForeignException;
32 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
33 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
34 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
35 import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
36
37 import com.google.common.collect.Lists;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 @InterfaceAudience.Private
69 public class Procedure implements Callable<Void>, ForeignExceptionListener {
70 private static final Log LOG = LogFactory.getLog(Procedure.class);
71
72
73
74
75
76
77 final private String procName;
78
79 final private byte[] args;
80
81
82
83
84
85 final CountDownLatch acquiredBarrierLatch;
86
87 final CountDownLatch releasedBarrierLatch;
88
89 final CountDownLatch completedLatch;
90
91 private final ForeignExceptionDispatcher monitor;
92
93
94
95
96
97
98 protected final long wakeFrequency;
99 protected final TimeoutExceptionInjector timeoutInjector;
100
101
102
103
104
105
106 private Object joinBarrierLock = new Object();
107 private final List<String> acquiringMembers;
108 private final List<String> inBarrierMembers;
109 private final HashMap<String, byte[]> dataFromFinishedMembers;
110 private ProcedureCoordinator coord;
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
126 long timeout, String procName, byte[] args, List<String> expectedMembers) {
127 this.coord = coord;
128 this.acquiringMembers = new ArrayList<String>(expectedMembers);
129 this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size());
130 this.dataFromFinishedMembers = new HashMap<String, byte[]>();
131 this.procName = procName;
132 this.args = args;
133 this.monitor = monitor;
134 this.wakeFrequency = wakeFreq;
135
136 int count = expectedMembers.size();
137 this.acquiredBarrierLatch = new CountDownLatch(count);
138 this.releasedBarrierLatch = new CountDownLatch(count);
139 this.completedLatch = new CountDownLatch(1);
140 this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
141 }
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
159 String procName, byte[] args, List<String> expectedMembers) {
160 this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
161 expectedMembers);
162 }
163
164 public String getName() {
165 return procName;
166 }
167
168
169
170
171 public String getStatus() {
172 String waiting, done;
173 synchronized (joinBarrierLock) {
174 waiting = acquiringMembers.toString();
175 done = inBarrierMembers.toString();
176 }
177 return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
178 }
179
180
181
182
183
184 public ForeignExceptionDispatcher getErrorMonitor() {
185 return monitor;
186 }
187
188
189
190
191
192
193 @Override
194 @SuppressWarnings("finally")
195 final public Void call() {
196 LOG.info("Starting procedure '" + procName + "'");
197
198 timeoutInjector.start();
199
200
201 try {
202
203 monitor.rethrowException();
204 LOG.debug("Procedure '" + procName + "' starting 'acquire'");
205 sendGlobalBarrierStart();
206
207
208 LOG.debug("Waiting for all members to 'acquire'");
209 waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
210 monitor.rethrowException();
211
212 LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
213 sendGlobalBarrierReached();
214
215
216 LOG.debug("Waiting for all members to 'release'");
217 waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
218
219
220 monitor.rethrowException();
221 LOG.info("Procedure '" + procName + "' execution completed");
222 } catch (Exception e) {
223 if (e instanceof InterruptedException) {
224 Thread.currentThread().interrupt();
225 }
226 String msg = "Procedure '" + procName +"' execution failed!";
227 LOG.error(msg, e);
228 receive(new ForeignException(getName(), e));
229 } finally {
230 LOG.debug("Running finish phase.");
231 sendGlobalBarrierComplete();
232 completedLatch.countDown();
233
234
235 timeoutInjector.complete();
236 return null;
237 }
238 }
239
240
241
242
243
244
245 public void sendGlobalBarrierStart() throws ForeignException {
246
247 LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
248 try {
249
250
251 coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
252 } catch (IOException e) {
253 coord.rpcConnectionFailure("Can't reach controller.", e);
254 } catch (IllegalArgumentException e) {
255 throw new ForeignException(getName(), e);
256 }
257 }
258
259
260
261
262
263
264
265
266 public void sendGlobalBarrierReached() throws ForeignException {
267 try {
268
269 coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
270 } catch (IOException e) {
271 coord.rpcConnectionFailure("Can't reach controller.", e);
272 }
273 }
274
275
276
277
278
279
280 public void sendGlobalBarrierComplete() {
281 LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
282 try {
283 coord.getRpcs().resetMembers(this);
284 } catch (IOException e) {
285 coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
286 }
287 }
288
289
290
291
292
293
294
295
296
297 public void barrierAcquiredByMember(String member) {
298 LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName
299 + "' on coordinator");
300 if (this.acquiringMembers.contains(member)) {
301 synchronized (joinBarrierLock) {
302 if (this.acquiringMembers.remove(member)) {
303 this.inBarrierMembers.add(member);
304 acquiredBarrierLatch.countDown();
305 }
306 }
307 LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
308 } else {
309 LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
310 " Continuing on.");
311 }
312 }
313
314
315
316
317
318
319
320 public void barrierReleasedByMember(String member, byte[] dataFromMember) {
321 boolean removed = false;
322 synchronized (joinBarrierLock) {
323 removed = this.inBarrierMembers.remove(member);
324 if (removed) {
325 releasedBarrierLatch.countDown();
326 }
327 }
328 if (removed) {
329 LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
330 + "', counting down latch. Waiting for " + releasedBarrierLatch.getCount()
331 + " more");
332 } else {
333 LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
334 + "', but we weren't waiting on it to release!");
335 }
336 dataFromFinishedMembers.put(member, dataFromMember);
337 }
338
339
340
341
342
343
344
345
346 public void waitForCompleted() throws ForeignException, InterruptedException {
347 waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
348 }
349
350
351
352
353
354
355
356
357
358 public HashMap<String, byte[]> waitForCompletedWithRet() throws ForeignException, InterruptedException {
359 waitForCompleted();
360 return dataFromFinishedMembers;
361 }
362
363
364
365
366
367 public boolean isCompleted() throws ForeignException {
368
369 monitor.rethrowException();
370 return (completedLatch.getCount() == 0);
371 }
372
373
374
375
376 @Override
377 public void receive(ForeignException e) {
378 monitor.receive(e);
379 }
380
381
382
383
384
385
386
387
388
389
390
391
392 public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
393 long wakeFrequency, String latchDescription) throws ForeignException,
394 InterruptedException {
395 boolean released = false;
396 while (!released) {
397 if (monitor != null) {
398 monitor.rethrowException();
399 }
400
401
402
403 released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
404 }
405
406 if (monitor != null) {
407 monitor.rethrowException();
408 }
409 }
410 }