1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.Arrays;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.errorhandling.ForeignException;
28 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
29 import org.apache.hadoop.hbase.util.Bytes;
30 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32 import org.apache.zookeeper.KeeperException;
33
34 import com.google.protobuf.InvalidProtocolBufferException;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Private
55 public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
56 private static final Log LOG = LogFactory.getLog(ZKProcedureMemberRpcs.class);
57
58 private final ZKProcedureUtil zkController;
59
60 protected ProcedureMember member;
61 private String memberName;
62
63
64
65
66
67
68
69
70 public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
71 throws KeeperException {
72 this.zkController = new ZKProcedureUtil(watcher, procType) {
73 @Override
74 public void nodeCreated(String path) {
75 if (!isInProcedurePath(path)) {
76 return;
77 }
78
79 LOG.info("Received created event:" + path);
80
81 if (isAcquiredNode(path)) {
82 waitForNewProcedures();
83 return;
84 } else if (isAbortNode(path)) {
85 watchForAbortedProcedures();
86 return;
87 }
88 String parent = ZKUtil.getParent(path);
89
90 if (isReachedNode(parent)) {
91 receivedReachedGlobalBarrier(path);
92 return;
93 } else if (isAbortNode(parent)) {
94 abort(path);
95 return;
96 } else if (isAcquiredNode(parent)) {
97 startNewSubprocedure(path);
98 } else {
99 LOG.debug("Ignoring created notification for node:" + path);
100 }
101 }
102
103 @Override
104 public void nodeChildrenChanged(String path) {
105 if (path.equals(this.acquiredZnode)) {
106 LOG.info("Received procedure start children changed event: " + path);
107 waitForNewProcedures();
108 } else if (path.equals(this.abortZnode)) {
109 LOG.info("Received procedure abort children changed event: " + path);
110 watchForAbortedProcedures();
111 }
112 }
113 };
114 }
115
116 public ZKProcedureUtil getZkController() {
117 return zkController;
118 }
119
120 @Override
121 public String getMemberName() {
122 return memberName;
123 }
124
125
126
127
128
129 private void receivedReachedGlobalBarrier(String path) {
130 LOG.debug("Recieved reached global barrier:" + path);
131 String procName = ZKUtil.getNodeName(path);
132 this.member.receivedReachedGlobalBarrier(procName);
133 }
134
135 private void watchForAbortedProcedures() {
136 LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
137 try {
138
139 for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
140 zkController.getAbortZnode())) {
141 String abortNode = ZKUtil.joinZNode(zkController.getAbortZnode(), node);
142 abort(abortNode);
143 }
144 } catch (KeeperException e) {
145 member.controllerConnectionFailure("Failed to list children for abort node:"
146 + zkController.getAbortZnode(), e, null);
147 }
148 }
149
150 private void waitForNewProcedures() {
151
152 LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'");
153 List<String> runningProcedures = null;
154 try {
155 runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
156 zkController.getAcquiredBarrier());
157 if (runningProcedures == null) {
158 LOG.debug("No running procedures.");
159 return;
160 }
161 } catch (KeeperException e) {
162 member.controllerConnectionFailure("General failure when watching for new procedures",
163 e, null);
164 }
165 if (runningProcedures == null) {
166 LOG.debug("No running procedures.");
167 return;
168 }
169 for (String procName : runningProcedures) {
170
171 String path = ZKUtil.joinZNode(zkController.getAcquiredBarrier(), procName);
172 startNewSubprocedure(path);
173 }
174 }
175
176
177
178
179
180
181
182
183 private synchronized void startNewSubprocedure(String path) {
184 LOG.debug("Found procedure znode: " + path);
185 String opName = ZKUtil.getNodeName(path);
186
187 String abortZNode = zkController.getAbortZNode(opName);
188 try {
189 if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) {
190 LOG.debug("Not starting:" + opName + " because we already have an abort notification.");
191 return;
192 }
193 } catch (KeeperException e) {
194 member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
195 + ") for procedure :" + opName, e, opName);
196 return;
197 }
198
199
200 Subprocedure subproc = null;
201 try {
202 byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
203 if (!ProtobufUtil.isPBMagicPrefix(data)) {
204 String msg = "Data in for starting procuedure " + opName +
205 " is illegally formatted (no pb magic). " +
206 "Killing the procedure: " + Bytes.toString(data);
207 LOG.error(msg);
208 throw new IllegalArgumentException(msg);
209 }
210 LOG.debug("start proc data length is " + data.length);
211 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
212 LOG.debug("Found data for znode:" + path);
213 subproc = member.createSubprocedure(opName, data);
214 member.submitSubprocedure(subproc);
215 } catch (IllegalArgumentException iae ) {
216 LOG.error("Illegal argument exception", iae);
217 sendMemberAborted(subproc, new ForeignException(getMemberName(), iae));
218 } catch (IllegalStateException ise) {
219 LOG.error("Illegal state exception ", ise);
220 sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
221 } catch (KeeperException e) {
222 member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
223 e, opName);
224 } catch (InterruptedException e) {
225 member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
226 e, opName);
227 Thread.currentThread().interrupt();
228 }
229 }
230
231
232
233
234
235
236
237 @Override
238 public void sendMemberAcquired(Subprocedure sub) throws IOException {
239 String procName = sub.getName();
240 try {
241 LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
242 + ") in zk");
243 String acquiredZNode = ZKUtil.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(
244 zkController, procName), memberName);
245 ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
246
247
248 String reachedBarrier = zkController.getReachedBarrierNode(procName);
249 LOG.debug("Watch for global barrier reached:" + reachedBarrier);
250 if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
251 receivedReachedGlobalBarrier(reachedBarrier);
252 }
253 } catch (KeeperException e) {
254 member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
255 + procName + " and member: " + memberName, e, procName);
256 }
257 }
258
259
260
261
262 @Override
263 public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException {
264 String procName = sub.getName();
265 LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName
266 + "' in zk");
267 String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
268
269 if (data == null) {
270 data = new byte[0];
271 }
272 try {
273 ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath,
274 ProtobufUtil.prependPBMagic(data));
275 } catch (KeeperException e) {
276 member.controllerConnectionFailure("Failed to post zk node:" + joinPath
277 + " to join procedure barrier.", e, procName);
278 }
279 }
280
281
282
283
284
285 @Override
286 public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
287 if (sub == null) {
288 LOG.error("Failed due to null subprocedure", ee);
289 return;
290 }
291 String procName = sub.getName();
292 LOG.debug("Aborting procedure (" + procName + ") in zk");
293 String procAbortZNode = zkController.getAbortZNode(procName);
294 try {
295 String source = (ee.getSource() == null) ? memberName: ee.getSource();
296 byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
297 ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
298 LOG.debug("Finished creating abort znode:" + procAbortZNode);
299 } catch (KeeperException e) {
300
301
302 zkController.logZKTree(zkController.getBaseZnode());
303 member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
304 + " to abort procedure", e, procName);
305 }
306 }
307
308
309
310
311
312 protected void abort(String abortZNode) {
313 LOG.debug("Aborting procedure member for znode " + abortZNode);
314 String opName = ZKUtil.getNodeName(abortZNode);
315 try {
316 byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode);
317
318
319 ForeignException ee;
320 try {
321 if (data == null || data.length == 0) {
322
323 return;
324 } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
325 String msg = "Illegally formatted data in abort node for proc " + opName
326 + ". Killing the procedure.";
327 LOG.error(msg);
328
329 ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
330 } else {
331 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
332 ee = ForeignException.deserialize(data);
333 }
334 } catch (InvalidProtocolBufferException e) {
335 LOG.warn("Got an error notification for op:" + opName
336 + " but we can't read the information. Killing the procedure.");
337
338 ee = new ForeignException(getMemberName(), e);
339 }
340
341 this.member.receiveAbortProcedure(opName, ee);
342 } catch (KeeperException e) {
343 member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
344 + zkController.getAbortZnode(), e, opName);
345 } catch (InterruptedException e) {
346 LOG.warn("abort already in progress", e);
347 Thread.currentThread().interrupt();
348 }
349 }
350
351 public void start(final String memberName, final ProcedureMember listener) {
352 LOG.debug("Starting procedure member '" + memberName + "'");
353 this.member = listener;
354 this.memberName = memberName;
355 watchForAbortedProcedures();
356 waitForNewProcedures();
357 }
358
359 @Override
360 public void close() throws IOException {
361 zkController.close();
362 }
363
364 }