1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.classification.InterfaceAudience;
24 import org.apache.hadoop.classification.InterfaceStability;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Abortable;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
29 import org.apache.hadoop.hbase.util.Threads;
30 import org.apache.zookeeper.KeeperException;
31 import org.apache.zookeeper.WatchedEvent;
32 import org.apache.zookeeper.Watcher;
33 import org.apache.zookeeper.ZooDefs;
34 import org.apache.zookeeper.data.ACL;
35
36 import java.io.Closeable;
37 import java.io.IOException;
38 import java.util.ArrayList;
39 import java.util.List;
40 import java.util.concurrent.CopyOnWriteArrayList;
41 import java.util.concurrent.CountDownLatch;
42
43
44
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Public
55 @InterfaceStability.Evolving
56 public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
57 private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
58
59
60
61 private String identifier;
62
63
64 private String quorum;
65
66
67 private RecoverableZooKeeper recoverableZooKeeper;
68
69
70 protected Abortable abortable;
71
72 private boolean aborted = false;
73
74
75 private final List<ZooKeeperListener> listeners =
76 new CopyOnWriteArrayList<ZooKeeperListener>();
77
78
79
80 public CountDownLatch saslLatch = new CountDownLatch(1);
81
82
83
84
85 public String baseZNode;
86
87 public String metaServerZNode;
88
89 public String rsZNode;
90
91 public String drainingZNode;
92
93 private String masterAddressZNode;
94
95 public String backupMasterAddressesZNode;
96
97 public String clusterStateZNode;
98
99 public String assignmentZNode;
100
101 public String tableZNode;
102
103 public String clusterIdZNode;
104
105 public String splitLogZNode;
106
107 public String balancerZNode;
108
109 public String tableLockZNode;
110
111 public String recoveringRegionsZNode;
112
113
114 public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
115 new ArrayList<ACL>() { {
116 add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
117 add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
118 }};
119
120 private final Configuration conf;
121
122 private final Exception constructorCaller;
123
124
125
126
127
128
129
130
131 public ZooKeeperWatcher(Configuration conf, String identifier,
132 Abortable abortable) throws ZooKeeperConnectionException, IOException {
133 this(conf, identifier, abortable, false);
134 }
135
136
137
138
139
140
141
142
143
144
145
146
147 public ZooKeeperWatcher(Configuration conf, String identifier,
148 Abortable abortable, boolean canCreateBaseZNode)
149 throws IOException, ZooKeeperConnectionException {
150 this.conf = conf;
151
152
153 try {
154 throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
155 } catch (Exception e) {
156 this.constructorCaller = e;
157 }
158 this.quorum = ZKConfig.getZKQuorumServersString(conf);
159
160
161 this.identifier = identifier;
162 this.abortable = abortable;
163 setNodeNames(conf);
164 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
165 if (canCreateBaseZNode) {
166 createBaseZNodes();
167 }
168 }
169
170 private void createBaseZNodes() throws ZooKeeperConnectionException {
171 try {
172
173 ZKUtil.createWithParents(this, baseZNode);
174 ZKUtil.createAndFailSilent(this, assignmentZNode);
175 ZKUtil.createAndFailSilent(this, rsZNode);
176 ZKUtil.createAndFailSilent(this, drainingZNode);
177 ZKUtil.createAndFailSilent(this, tableZNode);
178 ZKUtil.createAndFailSilent(this, splitLogZNode);
179 ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
180 ZKUtil.createAndFailSilent(this, tableLockZNode);
181 ZKUtil.createAndFailSilent(this, recoveringRegionsZNode);
182 } catch (KeeperException e) {
183 throw new ZooKeeperConnectionException(
184 prefix("Unexpected KeeperException creating base node"), e);
185 }
186 }
187
188 @Override
189 public String toString() {
190 return this.identifier;
191 }
192
193
194
195
196
197
198
199 public String prefix(final String str) {
200 return this.toString() + " " + str;
201 }
202
203
204
205
206 private void setNodeNames(Configuration conf) {
207 baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
208 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
209 metaServerZNode = ZKUtil.joinZNode(baseZNode,
210 conf.get("zookeeper.znode.metaserver", "meta-region-server"));
211 rsZNode = ZKUtil.joinZNode(baseZNode,
212 conf.get("zookeeper.znode.rs", "rs"));
213 drainingZNode = ZKUtil.joinZNode(baseZNode,
214 conf.get("zookeeper.znode.draining.rs", "draining"));
215 masterAddressZNode = ZKUtil.joinZNode(baseZNode,
216 conf.get("zookeeper.znode.master", "master"));
217 backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
218 conf.get("zookeeper.znode.backup.masters", "backup-masters"));
219 clusterStateZNode = ZKUtil.joinZNode(baseZNode,
220 conf.get("zookeeper.znode.state", "running"));
221 assignmentZNode = ZKUtil.joinZNode(baseZNode,
222 conf.get("zookeeper.znode.unassigned", "region-in-transition"));
223 tableZNode = ZKUtil.joinZNode(baseZNode,
224 conf.get("zookeeper.znode.tableEnableDisable", "table"));
225 clusterIdZNode = ZKUtil.joinZNode(baseZNode,
226 conf.get("zookeeper.znode.clusterId", "hbaseid"));
227 splitLogZNode = ZKUtil.joinZNode(baseZNode,
228 conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
229 balancerZNode = ZKUtil.joinZNode(baseZNode,
230 conf.get("zookeeper.znode.balancer", "balancer"));
231 tableLockZNode = ZKUtil.joinZNode(baseZNode,
232 conf.get("zookeeper.znode.tableLock", "table-lock"));
233 recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
234 conf.get("zookeeper.znode.recovering.regions", "recovering-regions"));
235 }
236
237
238
239
240
241 public void registerListener(ZooKeeperListener listener) {
242 listeners.add(listener);
243 }
244
245
246
247
248
249
250 public void registerListenerFirst(ZooKeeperListener listener) {
251 listeners.add(0, listener);
252 }
253
254 public void unregisterListener(ZooKeeperListener listener) {
255 listeners.remove(listener);
256 }
257
258
259
260
261 public void unregisterAllListeners() {
262 listeners.clear();
263 }
264
265
266
267
268 public int getNumberOfListeners() {
269 return listeners.size();
270 }
271
272
273
274
275
276 public RecoverableZooKeeper getRecoverableZooKeeper() {
277 return recoverableZooKeeper;
278 }
279
280 public void reconnectAfterExpiration() throws IOException, InterruptedException {
281 recoverableZooKeeper.reconnectAfterExpiration();
282 }
283
284
285
286
287
288 public String getQuorum() {
289 return quorum;
290 }
291
292
293
294
295
296
297
298 @Override
299 public void process(WatchedEvent event) {
300 LOG.debug(prefix("Received ZooKeeper Event, " +
301 "type=" + event.getType() + ", " +
302 "state=" + event.getState() + ", " +
303 "path=" + event.getPath()));
304
305 switch(event.getType()) {
306
307
308 case None: {
309 connectionEvent(event);
310 break;
311 }
312
313
314
315 case NodeCreated: {
316 for(ZooKeeperListener listener : listeners) {
317 listener.nodeCreated(event.getPath());
318 }
319 break;
320 }
321
322 case NodeDeleted: {
323 for(ZooKeeperListener listener : listeners) {
324 listener.nodeDeleted(event.getPath());
325 }
326 break;
327 }
328
329 case NodeDataChanged: {
330 for(ZooKeeperListener listener : listeners) {
331 listener.nodeDataChanged(event.getPath());
332 }
333 break;
334 }
335
336 case NodeChildrenChanged: {
337 for(ZooKeeperListener listener : listeners) {
338 listener.nodeChildrenChanged(event.getPath());
339 }
340 break;
341 }
342 }
343 }
344
345
346
347
348
349
350
351
352
353
354
355
356
357 private void connectionEvent(WatchedEvent event) {
358 switch(event.getState()) {
359 case SyncConnected:
360
361
362 long finished = System.currentTimeMillis() +
363 this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
364 while (System.currentTimeMillis() < finished) {
365 Threads.sleep(1);
366 if (this.recoverableZooKeeper != null) break;
367 }
368 if (this.recoverableZooKeeper == null) {
369 LOG.error("ZK is null on connection event -- see stack trace " +
370 "for the stack trace when constructor was called on this zkw",
371 this.constructorCaller);
372 throw new NullPointerException("ZK is null");
373 }
374 this.identifier = this.identifier + "-0x" +
375 Long.toHexString(this.recoverableZooKeeper.getSessionId());
376
377 LOG.debug(this.identifier + " connected");
378 break;
379
380
381 case Disconnected:
382 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
383 break;
384
385 case Expired:
386 String msg = prefix(this.identifier + " received expired from " +
387 "ZooKeeper, aborting");
388
389
390 if (this.abortable != null) {
391 this.abortable.abort(msg, new KeeperException.SessionExpiredException());
392 }
393 break;
394
395 case ConnectedReadOnly:
396 break;
397
398 default:
399 throw new IllegalStateException("Received event is not valid.");
400 }
401 }
402
403
404
405
406
407
408
409
410
411
412
413
414
415 public void sync(String path) {
416 this.recoverableZooKeeper.sync(path, null, null);
417 }
418
419
420
421
422
423
424
425
426
427
428
429 public void keeperException(KeeperException ke)
430 throws KeeperException {
431 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
432 throw ke;
433 }
434
435
436
437
438
439
440
441
442
443
444
445
446 public void interruptedException(InterruptedException ie) {
447 LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
448
449 Thread.currentThread().interrupt();
450
451 }
452
453
454
455
456
457
458 public void close() {
459 try {
460 if (recoverableZooKeeper != null) {
461 recoverableZooKeeper.close();
462 }
463 } catch (InterruptedException e) {
464 Thread.currentThread().interrupt();
465 }
466 }
467
468 public Configuration getConfiguration() {
469 return conf;
470 }
471
472 @Override
473 public void abort(String why, Throwable e) {
474 if (this.abortable != null) this.abortable.abort(why, e);
475 else this.aborted = true;
476 }
477
478 @Override
479 public boolean isAborted() {
480 return this.abortable == null? this.aborted: this.abortable.isAborted();
481 }
482
483
484
485
486 public String getMasterAddressZNode() {
487 return this.masterAddressZNode;
488 }
489
490 }