1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.Arrays;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.errorhandling.ForeignException;
29 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
30 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32 import org.apache.zookeeper.KeeperException;
33
34 import com.google.protobuf.InvalidProtocolBufferException;
35
36
37
38
39 @InterfaceAudience.Private
40 public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
41 public static final Log LOG = LogFactory.getLog(ZKProcedureCoordinatorRpcs.class);
42 private ZKProcedureUtil zkProc = null;
43 protected ProcedureCoordinator coordinator = null;
44
45 ZooKeeperWatcher watcher;
46 String procedureType;
47 String coordName;
48
49
50
51
52
53
54
55
56 public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
57 String procedureClass, String coordName) throws KeeperException {
58 this.watcher = watcher;
59 this.procedureType = procedureClass;
60 this.coordName = coordName;
61 }
62
63
64
65
66
67
68
69
70
71
72
73 @Override
74 final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
75 throws IOException, IllegalArgumentException {
76 String procName = proc.getName();
77
78 String abortNode = zkProc.getAbortZNode(procName);
79 try {
80
81 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
82 abort(abortNode);
83 }
84
85
86 } catch (KeeperException e) {
87 LOG.error("Failed to watch abort", e);
88 throw new IOException("Failed while watching abort node:" + abortNode, e);
89 }
90
91
92 String acquire = zkProc.getAcquiredBarrierNode(procName);
93 LOG.debug("Creating acquire znode:" + acquire);
94 try {
95
96 byte[] data = ProtobufUtil.prependPBMagic(info);
97 ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
98
99 for (String node : nodeNames) {
100 String znode = ZKUtil.joinZNode(acquire, node);
101 LOG.debug("Watching for acquire node:" + znode);
102 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
103 coordinator.memberAcquiredBarrier(procName, node);
104 }
105 }
106 } catch (KeeperException e) {
107 throw new IOException("Failed while creating acquire node:" + acquire, e);
108 }
109 }
110
111 @Override
112 public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
113 String procName = proc.getName();
114 String reachedNode = zkProc.getReachedBarrierNode(procName);
115 LOG.debug("Creating reached barrier zk node:" + reachedNode);
116 try {
117
118 ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
119
120 for (String node : nodeNames) {
121 String znode = ZKUtil.joinZNode(reachedNode, node);
122 if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
123 byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode);
124
125 if (dataFromMember != null && dataFromMember.length > 0) {
126 if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
127 throw new IOException(
128 "Failed to get data from finished node or data is illegally formatted: "
129 + znode);
130 } else {
131 dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
132 dataFromMember.length);
133 coordinator.memberFinishedBarrier(procName, node, dataFromMember);
134 }
135 } else {
136 coordinator.memberFinishedBarrier(procName, node, dataFromMember);
137 }
138 }
139 }
140 } catch (KeeperException e) {
141 throw new IOException("Failed while creating reached node:" + reachedNode, e);
142 } catch (InterruptedException e) {
143 throw new InterruptedIOException("Interrupted while creating reached node:" + reachedNode);
144 }
145 }
146
147
148
149
150
151 @Override
152 final public void resetMembers(Procedure proc) throws IOException {
153 String procName = proc.getName();
154 boolean stillGettingNotifications = false;
155 do {
156 try {
157 LOG.debug("Attempting to clean out zk node for op:" + procName);
158 zkProc.clearZNodes(procName);
159 stillGettingNotifications = false;
160 } catch (KeeperException.NotEmptyException e) {
161
162
163 stillGettingNotifications = true;
164 } catch (KeeperException e) {
165 throw new IOException("Failed to complete reset procedure " + procName, e);
166 }
167 } while (stillGettingNotifications);
168 }
169
170
171
172
173
174 final public boolean start(final ProcedureCoordinator coordinator) {
175 if (this.coordinator != null) {
176 throw new IllegalStateException(
177 "ZKProcedureCoordinator already started and already has listener installed");
178 }
179 this.coordinator = coordinator;
180
181 try {
182 this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
183 @Override
184 public void nodeCreated(String path) {
185 if (!isInProcedurePath(path)) return;
186 LOG.debug("Node created: " + path);
187 logZKTree(this.baseZNode);
188 if (isAcquiredPathNode(path)) {
189
190 coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
191 ZKUtil.getNodeName(path));
192 } else if (isReachedPathNode(path)) {
193
194
195
196 String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
197 String member = ZKUtil.getNodeName(path);
198
199 try {
200 byte[] dataFromMember = ZKUtil.getData(watcher, path);
201
202 if (dataFromMember != null && dataFromMember.length > 0) {
203 if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
204 ForeignException ee = new ForeignException(coordName,
205 "Failed to get data from finished node or data is illegally formatted:"
206 + path);
207 coordinator.abortProcedure(procName, ee);
208 } else {
209 dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
210 dataFromMember.length);
211 LOG.debug("Finished data from procedure '" + procName
212 + "' member '" + member + "': " + new String(dataFromMember));
213 coordinator.memberFinishedBarrier(procName, member, dataFromMember);
214 }
215 } else {
216 coordinator.memberFinishedBarrier(procName, member, dataFromMember);
217 }
218 } catch (KeeperException e) {
219 ForeignException ee = new ForeignException(coordName, e);
220 coordinator.abortProcedure(procName, ee);
221 } catch (InterruptedException e) {
222 ForeignException ee = new ForeignException(coordName, e);
223 coordinator.abortProcedure(procName, ee);
224 }
225 } else if (isAbortPathNode(path)) {
226 abort(path);
227 } else {
228 LOG.debug("Ignoring created notification for node:" + path);
229 }
230 }
231 };
232 zkProc.clearChildZNodes();
233 } catch (KeeperException e) {
234 LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
235 return false;
236 }
237
238 LOG.debug("Starting the controller for procedure member:" + coordName);
239 return true;
240 }
241
242
243
244
245
246
247
248 @Override
249 final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
250 String procName = proc.getName();
251 LOG.debug("Aborting procedure '" + procName + "' in zk");
252 String procAbortNode = zkProc.getAbortZNode(procName);
253 try {
254 LOG.debug("Creating abort znode:" + procAbortNode);
255 String source = (ee.getSource() == null) ? coordName : ee.getSource();
256 byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
257
258 ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
259 LOG.debug("Finished creating abort node:" + procAbortNode);
260 } catch (KeeperException e) {
261
262
263 zkProc.logZKTree(zkProc.baseZNode);
264 coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
265 + " to abort procedure '" + procName + "'", new IOException(e));
266 }
267 }
268
269
270
271
272
273 protected void abort(String abortNode) {
274 String procName = ZKUtil.getNodeName(abortNode);
275 ForeignException ee = null;
276 try {
277 byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
278 if (data == null || data.length == 0) {
279
280 return;
281 } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
282 LOG.warn("Got an error notification for op:" + abortNode
283 + " but we can't read the information. Killing the procedure.");
284
285 ee = new ForeignException(coordName, "Data in abort node is illegally formatted. ignoring content.");
286 } else {
287
288 data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
289 ee = ForeignException.deserialize(data);
290 }
291 } catch (InvalidProtocolBufferException e) {
292 LOG.warn("Got an error notification for op:" + abortNode
293 + " but we can't read the information. Killing the procedure.");
294
295 ee = new ForeignException(coordName, e);
296 } catch (KeeperException e) {
297 coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
298 + zkProc.getAbortZnode(), new IOException(e));
299 } catch (InterruptedException e) {
300 coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
301 + zkProc.getAbortZnode(), new IOException(e));
302 Thread.currentThread().interrupt();
303 }
304 coordinator.abortProcedure(procName, ee);
305 }
306
307 @Override
308 final public void close() throws IOException {
309 zkProc.close();
310 }
311
312
313
314
315 final ZKProcedureUtil getZkProcedureUtil() {
316 return zkProc;
317 }
318 }