1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.regex.Matcher;
30 import java.util.regex.Pattern;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.security.Superusers;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.Abortable;
38 import org.apache.hadoop.hbase.AuthUtil;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
42 import org.apache.hadoop.security.UserGroupInformation;
43 import org.apache.zookeeper.KeeperException;
44 import org.apache.zookeeper.WatchedEvent;
45 import org.apache.zookeeper.Watcher;
46 import org.apache.zookeeper.ZooDefs;
47 import org.apache.zookeeper.ZooDefs.Ids;
48 import org.apache.zookeeper.ZooDefs.Perms;
49 import org.apache.zookeeper.data.ACL;
50 import org.apache.zookeeper.data.Id;
51 import org.apache.zookeeper.data.Stat;
52
53
54
55
56
57
58
59
60
61
62
63
64 @InterfaceAudience.Private
65 public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
66 private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
67
68
69
70 private String prefix;
71 private String identifier;
72
73
74 private String quorum;
75
76
77 private RecoverableZooKeeper recoverableZooKeeper;
78
79
80 protected Abortable abortable;
81
82 private boolean aborted = false;
83
84
85 private final List<ZooKeeperListener> listeners =
86 new CopyOnWriteArrayList<ZooKeeperListener>();
87
88
89
90 public CountDownLatch saslLatch = new CountDownLatch(1);
91
92
93
94
95 public String baseZNode;
96
97 private Map<Integer,String> metaReplicaZnodes = new HashMap<Integer, String>();
98
99 public String rsZNode;
100
101 public String drainingZNode;
102
103 private String masterAddressZNode;
104
105 public String backupMasterAddressesZNode;
106
107 public String clusterStateZNode;
108
109 public String assignmentZNode;
110
111 public String tableZNode;
112
113 public String clusterIdZNode;
114
115 public String splitLogZNode;
116
117 public String balancerZNode;
118
119 public String tableLockZNode;
120
121 public String recoveringRegionsZNode;
122
123 public static String namespaceZNode = "namespace";
124
125
126 public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
127 new ArrayList<ACL>() { {
128 add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
129 add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
130 }};
131
132 public final static String META_ZNODE_PREFIX = "meta-region-server";
133
134 private final Configuration conf;
135
136 private final Exception constructorCaller;
137
138
139 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
140
141
142
143
144
145
146
147
148 public ZooKeeperWatcher(Configuration conf, String identifier,
149 Abortable abortable) throws ZooKeeperConnectionException, IOException {
150 this(conf, identifier, abortable, false);
151 }
152
153
154
155
156
157
158
159
160
161
162
163
164 public ZooKeeperWatcher(Configuration conf, String identifier,
165 Abortable abortable, boolean canCreateBaseZNode)
166 throws IOException, ZooKeeperConnectionException {
167 this.conf = conf;
168
169
170 try {
171 throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
172 } catch (Exception e) {
173 this.constructorCaller = e;
174 }
175 this.quorum = ZKConfig.getZKQuorumServersString(conf);
176 this.prefix = identifier;
177
178
179 this.identifier = identifier + "0x0";
180 this.abortable = abortable;
181 setNodeNames(conf);
182 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
183 if (canCreateBaseZNode) {
184 createBaseZNodes();
185 }
186 }
187
188 private void createBaseZNodes() throws ZooKeeperConnectionException {
189 try {
190
191 ZKUtil.createWithParents(this, baseZNode);
192 if (conf.getBoolean("hbase.assignment.usezk", true)) {
193 ZKUtil.createAndFailSilent(this, assignmentZNode);
194 }
195 ZKUtil.createAndFailSilent(this, rsZNode);
196 ZKUtil.createAndFailSilent(this, drainingZNode);
197 ZKUtil.createAndFailSilent(this, tableZNode);
198 ZKUtil.createAndFailSilent(this, splitLogZNode);
199 ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
200 ZKUtil.createAndFailSilent(this, tableLockZNode);
201 ZKUtil.createAndFailSilent(this, recoveringRegionsZNode);
202 } catch (KeeperException e) {
203 throw new ZooKeeperConnectionException(
204 prefix("Unexpected KeeperException creating base node"), e);
205 }
206 }
207
208
209
210 public boolean isClientReadable(String node) {
211
212
213
214 return
215 node.equals(baseZNode) ||
216 isAnyMetaReplicaZnode(node) ||
217 node.equals(getMasterAddressZNode()) ||
218 node.equals(clusterIdZNode)||
219 node.equals(rsZNode) ||
220
221 node.equals(tableZNode) ||
222 node.startsWith(tableZNode + "/");
223 }
224
225
226
227
228
229
230
231 public void checkAndSetZNodeAcls() {
232 if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
233 LOG.info("not a secure deployment, proceeding");
234 return;
235 }
236
237
238
239 try {
240 List<ACL> actualAcls = recoverableZooKeeper.getAcl(baseZNode, new Stat());
241
242 if (!isBaseZnodeAclSetup(actualAcls)) {
243 LOG.info("setting znode ACLs");
244 setZnodeAclsRecursive(baseZNode);
245 }
246 } catch(KeeperException.NoNodeException nne) {
247 return;
248 } catch(InterruptedException ie) {
249 interruptedException(ie);
250 } catch (IOException|KeeperException e) {
251 LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
252 }
253 }
254
255
256
257
258
259
260 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
261 List<String> children = recoverableZooKeeper.getChildren(znode, false);
262
263 for (String child : children) {
264 setZnodeAclsRecursive(ZKUtil.joinZNode(znode, child));
265 }
266 List<ACL> acls = ZKUtil.createACL(this, znode, true);
267 LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls);
268 recoverableZooKeeper.setAcl(znode, acls, -1);
269 }
270
271
272
273
274
275
276
277 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
278 if (LOG.isDebugEnabled()) {
279 LOG.debug("Checking znode ACLs");
280 }
281 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
282
283 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
284 return false;
285 }
286
287
288
289 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
290
291 if (acls.isEmpty()) {
292 if (LOG.isDebugEnabled()) {
293 LOG.debug("ACL is empty");
294 }
295 return false;
296 }
297
298 for (ACL acl : acls) {
299 int perms = acl.getPerms();
300 Id id = acl.getId();
301
302
303 if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
304 if (perms != Perms.READ) {
305 if (LOG.isDebugEnabled()) {
306 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
307 id, perms, Perms.READ));
308 }
309 return false;
310 }
311 } else if ("sasl".equals(id.getScheme())) {
312 String name = id.getId();
313
314 Matcher match = NAME_PATTERN.matcher(name);
315 if (match.matches()) {
316 name = match.group(1);
317 }
318 if (name.equals(hbaseUser)) {
319 if (perms != Perms.ALL) {
320 if (LOG.isDebugEnabled()) {
321 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
322 id, perms, Perms.ALL));
323 }
324 return false;
325 }
326 } else {
327 if (LOG.isDebugEnabled()) {
328 LOG.debug("Unexpected shortname in SASL ACL: " + id);
329 }
330 return false;
331 }
332 } else {
333 if (LOG.isDebugEnabled()) {
334 LOG.debug("unexpected ACL id '" + id + "'");
335 }
336 return false;
337 }
338 }
339 return true;
340 }
341
342
343
344
345 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
346 for (String user : superUsers) {
347 boolean hasAccess = false;
348
349 if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
350 for (ACL acl : acls) {
351 if (user.equals(acl.getId().getId())) {
352 if (acl.getPerms() == Perms.ALL) {
353 hasAccess = true;
354 } else {
355 if (LOG.isDebugEnabled()) {
356 LOG.debug(String.format(
357 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
358 acl.getId().getId(), acl.getPerms(), Perms.ALL));
359 }
360 }
361 break;
362 }
363 }
364 if (!hasAccess) {
365 return false;
366 }
367 }
368 }
369 return true;
370 }
371
372 @Override
373 public String toString() {
374 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode;
375 }
376
377
378
379
380
381
382
383 public String prefix(final String str) {
384 return this.toString() + " " + str;
385 }
386
387
388
389
390 private void setNodeNames(Configuration conf) {
391 baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
392 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
393 metaReplicaZnodes.put(0, ZKUtil.joinZNode(baseZNode,
394 conf.get("zookeeper.znode.metaserver", "meta-region-server")));
395 int numMetaReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
396 HConstants.DEFAULT_META_REPLICA_NUM);
397 for (int i = 1; i < numMetaReplicas; i++) {
398 String str = ZKUtil.joinZNode(baseZNode,
399 conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + i);
400 metaReplicaZnodes.put(i, str);
401 }
402 rsZNode = ZKUtil.joinZNode(baseZNode,
403 conf.get("zookeeper.znode.rs", "rs"));
404 drainingZNode = ZKUtil.joinZNode(baseZNode,
405 conf.get("zookeeper.znode.draining.rs", "draining"));
406 masterAddressZNode = ZKUtil.joinZNode(baseZNode,
407 conf.get("zookeeper.znode.master", "master"));
408 backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
409 conf.get("zookeeper.znode.backup.masters", "backup-masters"));
410 clusterStateZNode = ZKUtil.joinZNode(baseZNode,
411 conf.get("zookeeper.znode.state", "running"));
412 assignmentZNode = ZKUtil.joinZNode(baseZNode,
413 conf.get("zookeeper.znode.unassigned", "region-in-transition"));
414 tableZNode = ZKUtil.joinZNode(baseZNode,
415 conf.get("zookeeper.znode.tableEnableDisable", "table"));
416 clusterIdZNode = ZKUtil.joinZNode(baseZNode,
417 conf.get("zookeeper.znode.clusterId", "hbaseid"));
418 splitLogZNode = ZKUtil.joinZNode(baseZNode,
419 conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
420 balancerZNode = ZKUtil.joinZNode(baseZNode,
421 conf.get("zookeeper.znode.balancer", "balancer"));
422 tableLockZNode = ZKUtil.joinZNode(baseZNode,
423 conf.get("zookeeper.znode.tableLock", "table-lock"));
424 recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
425 conf.get("zookeeper.znode.recovering.regions", "recovering-regions"));
426 namespaceZNode = ZKUtil.joinZNode(baseZNode,
427 conf.get("zookeeper.znode.namespace", "namespace"));
428 }
429
430
431
432
433
434
435 public boolean isAnyMetaReplicaZnode(String node) {
436 if (metaReplicaZnodes.values().contains(node)) {
437 return true;
438 }
439 return false;
440 }
441
442
443
444
445
446
447 public boolean isDefaultMetaReplicaZnode(String node) {
448 if (getZNodeForReplica(HRegionInfo.DEFAULT_REPLICA_ID).equals(node)) {
449 return true;
450 }
451 return false;
452 }
453
454
455
456
457
458
459 public List<String> getMetaReplicaNodes() throws KeeperException {
460 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, baseZNode);
461 List<String> metaReplicaNodes = new ArrayList<String>(2);
462 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
463 for (String child : childrenOfBaseNode) {
464 if (child.startsWith(pattern)) metaReplicaNodes.add(child);
465 }
466 return metaReplicaNodes;
467 }
468
469
470
471
472
473
474 public String getZNodeForReplica(int replicaId) {
475 String str = metaReplicaZnodes.get(replicaId);
476
477
478
479 if (str == null) {
480 str = ZKUtil.joinZNode(baseZNode,
481 conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + replicaId);
482 }
483 return str;
484 }
485
486
487
488
489
490
491 public int getMetaReplicaIdFromZnode(String znode) {
492 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
493 if (znode.equals(pattern)) return HRegionInfo.DEFAULT_REPLICA_ID;
494
495 String nonDefaultPattern = pattern + "-";
496 return Integer.parseInt(znode.substring(nonDefaultPattern.length()));
497 }
498
499
500
501
502
503 public void registerListener(ZooKeeperListener listener) {
504 listeners.add(listener);
505 }
506
507
508
509
510
511
512 public void registerListenerFirst(ZooKeeperListener listener) {
513 listeners.add(0, listener);
514 }
515
516 public void unregisterListener(ZooKeeperListener listener) {
517 listeners.remove(listener);
518 }
519
520
521
522
523 public void unregisterAllListeners() {
524 listeners.clear();
525 }
526
527
528
529
530 public List<ZooKeeperListener> getListeners() {
531 return new ArrayList<ZooKeeperListener>(listeners);
532 }
533
534
535
536
537 public int getNumberOfListeners() {
538 return listeners.size();
539 }
540
541
542
543
544
545 public RecoverableZooKeeper getRecoverableZooKeeper() {
546 return recoverableZooKeeper;
547 }
548
549 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
550 recoverableZooKeeper.reconnectAfterExpiration();
551 }
552
553
554
555
556
557 public String getQuorum() {
558 return quorum;
559 }
560
561
562
563
564 public String getBaseZNode() {
565 return baseZNode;
566 }
567
568
569
570
571
572
573
574 @Override
575 public void process(WatchedEvent event) {
576 LOG.debug(prefix("Received ZooKeeper Event, " +
577 "type=" + event.getType() + ", " +
578 "state=" + event.getState() + ", " +
579 "path=" + event.getPath()));
580
581 switch(event.getType()) {
582
583
584 case None: {
585 connectionEvent(event);
586 break;
587 }
588
589
590
591 case NodeCreated: {
592 for(ZooKeeperListener listener : listeners) {
593 listener.nodeCreated(event.getPath());
594 }
595 break;
596 }
597
598 case NodeDeleted: {
599 for(ZooKeeperListener listener : listeners) {
600 listener.nodeDeleted(event.getPath());
601 }
602 break;
603 }
604
605 case NodeDataChanged: {
606 for(ZooKeeperListener listener : listeners) {
607 listener.nodeDataChanged(event.getPath());
608 }
609 break;
610 }
611
612 case NodeChildrenChanged: {
613 for(ZooKeeperListener listener : listeners) {
614 listener.nodeChildrenChanged(event.getPath());
615 }
616 break;
617 }
618 }
619 }
620
621
622
623
624
625
626
627
628
629
630
631
632
633 private void connectionEvent(WatchedEvent event) {
634 switch(event.getState()) {
635 case SyncConnected:
636
637
638 long finished = System.currentTimeMillis() +
639 this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
640 while (System.currentTimeMillis() < finished) {
641 try {
642 Thread.sleep(1);
643 } catch (InterruptedException e) {
644 LOG.warn("Interrupted while sleeping");
645 throw new RuntimeException("Interrupted while waiting for" +
646 " recoverableZooKeeper is set");
647 }
648 if (this.recoverableZooKeeper != null) break;
649 }
650
651 if (this.recoverableZooKeeper == null) {
652 LOG.error("ZK is null on connection event -- see stack trace " +
653 "for the stack trace when constructor was called on this zkw",
654 this.constructorCaller);
655 throw new NullPointerException("ZK is null");
656 }
657 this.identifier = this.prefix + "-0x" +
658 Long.toHexString(this.recoverableZooKeeper.getSessionId());
659
660 LOG.debug(this.identifier + " connected");
661 break;
662
663
664 case Disconnected:
665 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
666 break;
667
668 case Expired:
669 String msg = prefix(this.identifier + " received expired from " +
670 "ZooKeeper, aborting");
671
672
673 if (this.abortable != null) {
674 this.abortable.abort(msg, new KeeperException.SessionExpiredException());
675 }
676 break;
677
678 case ConnectedReadOnly:
679 case SaslAuthenticated:
680 case AuthFailed:
681 break;
682
683 default:
684 throw new IllegalStateException("Received event is not valid: " + event.getState());
685 }
686 }
687
688
689
690
691
692
693
694
695
696
697
698
699
700 public void sync(String path) throws KeeperException {
701 this.recoverableZooKeeper.sync(path, null, null);
702 }
703
704
705
706
707
708
709
710
711
712
713
714 public void keeperException(KeeperException ke)
715 throws KeeperException {
716 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
717 throw ke;
718 }
719
720
721
722
723
724
725
726
727
728
729
730
731 public void interruptedException(InterruptedException ie) {
732 LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
733
734 Thread.currentThread().interrupt();
735
736 }
737
738
739
740
741
742
743 @Override
744 public void close() {
745 try {
746 if (recoverableZooKeeper != null) {
747 recoverableZooKeeper.close();
748 }
749 } catch (InterruptedException e) {
750 Thread.currentThread().interrupt();
751 }
752 }
753
754 public Configuration getConfiguration() {
755 return conf;
756 }
757
758 @Override
759 public void abort(String why, Throwable e) {
760 if (this.abortable != null) this.abortable.abort(why, e);
761 else this.aborted = true;
762 }
763
764 @Override
765 public boolean isAborted() {
766 return this.abortable == null? this.aborted: this.abortable.isAborted();
767 }
768
769
770
771
772 public String getMasterAddressZNode() {
773 return this.masterAddressZNode;
774 }
775
776 }