1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
28 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
29 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
30 import org.apache.zookeeper.KeeperException;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 @InterfaceAudience.Private
51 public abstract class ZKProcedureUtil
52 extends ZooKeeperListener implements Closeable {
53
54 private static final Log LOG = LogFactory.getLog(ZKProcedureUtil.class);
55
56 public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired";
57 public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached";
58 public static final String ABORT_ZNODE_DEFAULT = "abort";
59
60 public final String baseZNode;
61 protected final String acquiredZnode;
62 protected final String reachedZnode;
63 protected final String abortZnode;
64
65
66
67
68
69
70
71
72
73
74
75 public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription)
76 throws KeeperException {
77 super(watcher);
78
79 watcher.registerListener(this);
80
81 this.baseZNode = ZKUtil.joinZNode(watcher.baseZNode, procDescription);
82 acquiredZnode = ZKUtil.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
83 reachedZnode = ZKUtil.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
84 abortZnode = ZKUtil.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
85
86
87
88 ZKUtil.createWithParents(watcher, acquiredZnode);
89
90 ZKUtil.createAndFailSilent(watcher, reachedZnode);
91 ZKUtil.createAndFailSilent(watcher, abortZnode);
92 }
93
94 @Override
95 public void close() throws IOException {
96
97
98 }
99
100 public String getAcquiredBarrierNode(String opInstanceName) {
101 return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName);
102 }
103
104 public String getReachedBarrierNode(String opInstanceName) {
105 return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName);
106 }
107
108 public String getAbortZNode(String opInstanceName) {
109 return ZKProcedureUtil.getAbortNode(this, opInstanceName);
110 }
111
112 public String getAbortZnode() {
113 return abortZnode;
114 }
115
116 public String getBaseZnode() {
117 return baseZNode;
118 }
119
120 public String getAcquiredBarrier() {
121 return acquiredZnode;
122 }
123
124
125
126
127
128
129
130
131 public static String getAcquireBarrierNode(ZKProcedureUtil controller,
132 String opInstanceName) {
133 return ZKUtil.joinZNode(controller.acquiredZnode, opInstanceName);
134 }
135
136
137
138
139
140
141
142
143 public static String getReachedBarrierNode(ZKProcedureUtil controller,
144 String opInstanceName) {
145 return ZKUtil.joinZNode(controller.reachedZnode, opInstanceName);
146 }
147
148
149
150
151
152
153
154
155 public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) {
156 return ZKUtil.joinZNode(controller.abortZnode, opInstanceName);
157 }
158
159 public ZooKeeperWatcher getWatcher() {
160 return watcher;
161 }
162
163
164
165
166
167
168
169
170
171 boolean isInProcedurePath(String path) {
172 return path.startsWith(baseZNode);
173 }
174
175
176
177
178 boolean isAcquiredNode(String path) {
179 return path.equals(acquiredZnode);
180 }
181
182
183
184
185
186 boolean isAcquiredPathNode(String path) {
187 return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode) &&
188 isMemberNode(path, acquiredZnode);
189 }
190
191
192
193
194 boolean isReachedNode(String path) {
195 return path.equals(reachedZnode);
196 }
197
198
199
200
201 boolean isReachedPathNode(String path) {
202 return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode) &&
203 isMemberNode(path, reachedZnode);
204 }
205
206
207
208
209
210
211
212 private boolean isMemberNode(final String path, final String statePath) {
213 int count = 0;
214 for (int i = statePath.length(); i < path.length(); ++i) {
215 count += (path.charAt(i) == ZKUtil.ZNODE_PATH_SEPARATOR) ? 1 : 0;
216 }
217 return count == 2;
218 }
219
220
221
222
223 boolean isAbortNode(String path) {
224 return path.equals(abortZnode);
225 }
226
227
228
229
230 public boolean isAbortPathNode(String path) {
231 return path.startsWith(this.abortZnode) && !path.equals(abortZnode);
232 }
233
234
235
236
237
238
239
240
241
242 void logZKTree(String root) {
243 if (!LOG.isDebugEnabled()) return;
244 LOG.debug("Current zk system:");
245 String prefix = "|-";
246 LOG.debug(prefix + root);
247 try {
248 logZKTree(root, prefix);
249 } catch (KeeperException e) {
250 throw new RuntimeException(e);
251 }
252 }
253
254
255
256
257
258
259 protected void logZKTree(String root, String prefix) throws KeeperException {
260 List<String> children = ZKUtil.listChildrenNoWatch(watcher, root);
261 if (children == null) return;
262 for (String child : children) {
263 LOG.debug(prefix + child);
264 String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
265 logZKTree(node, prefix + "---");
266 }
267 }
268
269 public void clearChildZNodes() throws KeeperException {
270 LOG.info("Clearing all procedure znodes: " + acquiredZnode + " " + reachedZnode + " "
271 + abortZnode);
272
273
274
275
276 ZKUtil.deleteChildrenRecursivelyMultiOrSequential(watcher, true, acquiredZnode, reachedZnode,
277 abortZnode);
278
279 if (LOG.isTraceEnabled()) {
280 logZKTree(this.baseZNode);
281 }
282 }
283
284 public void clearZNodes(String procedureName) throws KeeperException {
285 LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes "
286 + acquiredZnode + " " + reachedZnode + " " + abortZnode);
287
288
289 String acquiredBarrierNode = getAcquiredBarrierNode(procedureName);
290 String reachedBarrierNode = getReachedBarrierNode(procedureName);
291 String abortZNode = getAbortZNode(procedureName);
292
293 ZKUtil.createAndFailSilent(watcher, acquiredBarrierNode);
294 ZKUtil.createAndFailSilent(watcher, abortZNode);
295
296 ZKUtil.deleteNodeRecursivelyMultiOrSequential(watcher, true, acquiredBarrierNode,
297 reachedBarrierNode, abortZNode);
298
299 if (LOG.isTraceEnabled()) {
300 logZKTree(this.baseZNode);
301 }
302 }
303 }