1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.Arrays;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.errorhandling.ForeignException;
29 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
30 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32 import org.apache.zookeeper.KeeperException;
33
34 import com.google.protobuf.InvalidProtocolBufferException;
35
36
37
38
39 @InterfaceAudience.Private
40 public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
41 private static final Log LOG = LogFactory.getLog(ZKProcedureCoordinatorRpcs.class);
42 private ZKProcedureUtil zkProc = null;
43 protected ProcedureCoordinator coordinator = null;
44
45 ZooKeeperWatcher watcher;
46 String procedureType;
47 String coordName;
48
49
50
51
52
53
54
55
56 public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
57 String procedureClass, String coordName) throws KeeperException {
58 this.watcher = watcher;
59 this.procedureType = procedureClass;
60 this.coordName = coordName;
61 }
62
63
64
65
66
67
68
69
70
71
72
73 @Override
74 final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
75 throws IOException, IllegalArgumentException {
76 String procName = proc.getName();
77
78 String abortNode = zkProc.getAbortZNode(procName);
79 try {
80
81 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
82 abort(abortNode);
83 }
84
85
86 } catch (KeeperException e) {
87 String msg = "Failed while watching abort node:" + abortNode;
88 LOG.error(msg, e);
89 throw new IOException(msg, e);
90 }
91
92
93 String acquire = zkProc.getAcquiredBarrierNode(procName);
94 LOG.debug("Creating acquire znode:" + acquire);
95 try {
96
97 byte[] data = ProtobufUtil.prependPBMagic(info);
98 ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
99
100 for (String node : nodeNames) {
101 String znode = ZKUtil.joinZNode(acquire, node);
102 LOG.debug("Watching for acquire node:" + znode);
103 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
104 coordinator.memberAcquiredBarrier(procName, node);
105 }
106 }
107 } catch (KeeperException e) {
108 String msg = "Failed while creating acquire node:" + acquire;
109 LOG.error(msg, e);
110 throw new IOException(msg, e);
111 }
112 }
113
114 @Override
115 public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
116 String procName = proc.getName();
117 String reachedNode = zkProc.getReachedBarrierNode(procName);
118 LOG.debug("Creating reached barrier zk node:" + reachedNode);
119 try {
120
121 ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
122
123 for (String node : nodeNames) {
124 String znode = ZKUtil.joinZNode(reachedNode, node);
125 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
126 byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode);
127
128 if (dataFromMember != null && dataFromMember.length > 0) {
129 if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
130 String msg =
131 "Failed to get data from finished node or data is illegally formatted: " + znode;
132 LOG.error(msg);
133 throw new IOException(msg);
134 } else {
135 dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
136 dataFromMember.length);
137 coordinator.memberFinishedBarrier(procName, node, dataFromMember);
138 }
139 } else {
140 coordinator.memberFinishedBarrier(procName, node, dataFromMember);
141 }
142 }
143 }
144 } catch (KeeperException e) {
145 String msg = "Failed while creating reached node:" + reachedNode;
146 LOG.error(msg, e);
147 throw new IOException(msg, e);
148 } catch (InterruptedException e) {
149 String msg = "Interrupted while creating reached node:" + reachedNode;
150 LOG.error(msg, e);
151 throw new InterruptedIOException(msg);
152 }
153 }
154
155
156
157
158
159 @Override
160 final public void resetMembers(Procedure proc) throws IOException {
161 String procName = proc.getName();
162 boolean stillGettingNotifications = false;
163 do {
164 try {
165 LOG.debug("Attempting to clean out zk node for op:" + procName);
166 zkProc.clearZNodes(procName);
167 stillGettingNotifications = false;
168 } catch (KeeperException.NotEmptyException e) {
169
170
171 stillGettingNotifications = true;
172 } catch (KeeperException e) {
173 String msg = "Failed to complete reset procedure " + procName;
174 LOG.error(msg, e);
175 throw new IOException(msg, e);
176 }
177 } while (stillGettingNotifications);
178 }
179
180
181
182
183
184 final public boolean start(final ProcedureCoordinator coordinator) {
185 if (this.coordinator != null) {
186 throw new IllegalStateException(
187 "ZKProcedureCoordinator already started and already has listener installed");
188 }
189 this.coordinator = coordinator;
190
191 try {
192 this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
193 @Override
194 public void nodeCreated(String path) {
195 if (!isInProcedurePath(path)) return;
196 LOG.debug("Node created: " + path);
197 logZKTree(this.baseZNode);
198 if (isAcquiredPathNode(path)) {
199
200 coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
201 ZKUtil.getNodeName(path));
202 } else if (isReachedPathNode(path)) {
203
204
205
206 String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
207 String member = ZKUtil.getNodeName(path);
208
209 try {
210 byte[] dataFromMember = ZKUtil.getData(watcher, path);
211
212 if (dataFromMember != null && dataFromMember.length > 0) {
213 if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
214 ForeignException ee = new ForeignException(coordName,
215 "Failed to get data from finished node or data is illegally formatted:"
216 + path);
217 coordinator.abortProcedure(procName, ee);
218 } else {
219 dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
220 dataFromMember.length);
221 LOG.debug("Finished data from procedure '" + procName
222 + "' member '" + member + "': " + new String(dataFromMember));
223 coordinator.memberFinishedBarrier(procName, member, dataFromMember);
224 }
225 } else {
226 coordinator.memberFinishedBarrier(procName, member, dataFromMember);
227 }
228 } catch (KeeperException e) {
229 ForeignException ee = new ForeignException(coordName, e);
230 coordinator.abortProcedure(procName, ee);
231 } catch (InterruptedException e) {
232 ForeignException ee = new ForeignException(coordName, e);
233 coordinator.abortProcedure(procName, ee);
234 }
235 } else if (isAbortPathNode(path)) {
236 abort(path);
237 } else {
238 LOG.debug("Ignoring created notification for node:" + path);
239 }
240 }
241 };
242 zkProc.clearChildZNodes();
243 } catch (KeeperException e) {
244 LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
245 return false;
246 }
247
248 LOG.debug("Starting the controller for procedure member:" + coordName);
249 return true;
250 }
251
252
253
254
255
256
257
258 @Override
259 final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
260 String procName = proc.getName();
261 LOG.debug("Aborting procedure '" + procName + "' in zk");
262 String procAbortNode = zkProc.getAbortZNode(procName);
263 try {
264 LOG.debug("Creating abort znode:" + procAbortNode);
265 String source = (ee.getSource() == null) ? coordName : ee.getSource();
266 byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
267
268 ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
269 LOG.debug("Finished creating abort node:" + procAbortNode);
270 } catch (KeeperException e) {
271
272
273 zkProc.logZKTree(zkProc.baseZNode);
274 coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
275 + " to abort procedure '" + procName + "'", new IOException(e));
276 }
277 }
278
279
280
281
282
283 protected void abort(String abortNode) {
284 String procName = ZKUtil.getNodeName(abortNode);
285 ForeignException ee = null;
286 try {
287 byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
288 if (data == null || data.length == 0) {
289
290 return;
291 } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
292 LOG.warn("Got an error notification for op:" + abortNode
293 + " but we can't read the information. Killing the procedure.");
294
295 ee = new ForeignException(coordName,
296 "Data in abort node is illegally formatted. ignoring content.");
297 } else {
298
299 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
300 ee = ForeignException.deserialize(data);
301 }
302 } catch (InvalidProtocolBufferException e) {
303 LOG.warn("Got an error notification for op:" + abortNode
304 + " but we can't read the information. Killing the procedure.");
305
306 ee = new ForeignException(coordName, e);
307 } catch (KeeperException e) {
308 coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
309 + zkProc.getAbortZnode(), new IOException(e));
310 } catch (InterruptedException e) {
311 coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
312 + zkProc.getAbortZnode(), new IOException(e));
313 Thread.currentThread().interrupt();
314 }
315 coordinator.abortProcedure(procName, ee);
316 }
317
318 @Override
319 final public void close() throws IOException {
320 zkProc.close();
321 }
322
323
324
325
326 final ZKProcedureUtil getZkProcedureUtil() {
327 return zkProc;
328 }
329 }