1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.regex.Matcher;
30 import java.util.regex.Pattern;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Abortable;
37 import org.apache.hadoop.hbase.AuthUtil;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
41 import org.apache.hadoop.hbase.classification.InterfaceAudience;
42 import org.apache.hadoop.hbase.security.Superusers;
43 import org.apache.hadoop.security.UserGroupInformation;
44 import org.apache.zookeeper.KeeperException;
45 import org.apache.zookeeper.WatchedEvent;
46 import org.apache.zookeeper.Watcher;
47 import org.apache.zookeeper.ZooDefs;
48 import org.apache.zookeeper.ZooDefs.Ids;
49 import org.apache.zookeeper.ZooDefs.Perms;
50 import org.apache.zookeeper.data.ACL;
51 import org.apache.zookeeper.data.Id;
52 import org.apache.zookeeper.data.Stat;
53
54
55
56
57
58
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
67 private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
68
69
70
71 private String prefix;
72 private String identifier;
73
74
75 private String quorum;
76
77
78 private RecoverableZooKeeper recoverableZooKeeper;
79
80
81 protected Abortable abortable;
82
83 private boolean aborted = false;
84
85
86 private final List<ZooKeeperListener> listeners =
87 new CopyOnWriteArrayList<ZooKeeperListener>();
88
89
90
91 public CountDownLatch saslLatch = new CountDownLatch(1);
92
93
94
95
96 public String baseZNode;
97
98 private Map<Integer,String> metaReplicaZnodes = new HashMap<Integer, String>();
99
100 public String rsZNode;
101
102 public String drainingZNode;
103
104 private String masterAddressZNode;
105
106 public String backupMasterAddressesZNode;
107
108 public String clusterStateZNode;
109
110 public String assignmentZNode;
111
112 public String tableZNode;
113
114 public String clusterIdZNode;
115
116 public String splitLogZNode;
117
118 public String balancerZNode;
119
120 private String regionNormalizerZNode;
121
122 public String tableLockZNode;
123
124 public String recoveringRegionsZNode;
125
126 public static String namespaceZNode = "namespace";
127
128
129 public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
130 new ArrayList<ACL>() { {
131 add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
132 add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
133 }};
134
135 public final static String META_ZNODE_PREFIX = "meta-region-server";
136
137 private final Configuration conf;
138
139 private final Exception constructorCaller;
140
141
142 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
143
144
145
146
147
148
149
150
151 public ZooKeeperWatcher(Configuration conf, String identifier,
152 Abortable abortable) throws ZooKeeperConnectionException, IOException {
153 this(conf, identifier, abortable, false);
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167 public ZooKeeperWatcher(Configuration conf, String identifier,
168 Abortable abortable, boolean canCreateBaseZNode)
169 throws IOException, ZooKeeperConnectionException {
170 this.conf = conf;
171
172
173 try {
174 throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
175 } catch (Exception e) {
176 this.constructorCaller = e;
177 }
178 this.quorum = ZKConfig.getZKQuorumServersString(conf);
179 this.prefix = identifier;
180
181
182 this.identifier = identifier + "0x0";
183 this.abortable = abortable;
184 setNodeNames(conf);
185 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
186 if (canCreateBaseZNode) {
187 createBaseZNodes();
188 }
189 }
190
191 private void createBaseZNodes() throws ZooKeeperConnectionException {
192 try {
193
194 ZKUtil.createWithParents(this, baseZNode);
195 if (conf.getBoolean("hbase.assignment.usezk", true)) {
196 ZKUtil.createAndFailSilent(this, assignmentZNode);
197 }
198 ZKUtil.createAndFailSilent(this, rsZNode);
199 ZKUtil.createAndFailSilent(this, drainingZNode);
200 ZKUtil.createAndFailSilent(this, tableZNode);
201 ZKUtil.createAndFailSilent(this, splitLogZNode);
202 ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
203 ZKUtil.createAndFailSilent(this, tableLockZNode);
204 ZKUtil.createAndFailSilent(this, recoveringRegionsZNode);
205 } catch (KeeperException e) {
206 throw new ZooKeeperConnectionException(
207 prefix("Unexpected KeeperException creating base node"), e);
208 }
209 }
210
211
212
213 public boolean isClientReadable(String node) {
214
215
216
217 return
218 node.equals(baseZNode) ||
219 isAnyMetaReplicaZnode(node) ||
220 node.equals(getMasterAddressZNode()) ||
221 node.equals(clusterIdZNode)||
222 node.equals(rsZNode) ||
223
224 node.equals(tableZNode) ||
225 node.startsWith(tableZNode + "/");
226 }
227
228
229
230
231
232
233
234 public void checkAndSetZNodeAcls() {
235 if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
236 LOG.info("not a secure deployment, proceeding");
237 return;
238 }
239
240
241
242 try {
243 List<ACL> actualAcls = recoverableZooKeeper.getAcl(baseZNode, new Stat());
244
245 if (!isBaseZnodeAclSetup(actualAcls)) {
246 LOG.info("setting znode ACLs");
247 setZnodeAclsRecursive(baseZNode);
248 }
249 } catch(KeeperException.NoNodeException nne) {
250 return;
251 } catch(InterruptedException ie) {
252 interruptedExceptionNoThrow(ie, false);
253 } catch (IOException|KeeperException e) {
254 LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
255 }
256 }
257
258
259
260
261
262
263 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
264 List<String> children = recoverableZooKeeper.getChildren(znode, false);
265
266 for (String child : children) {
267 setZnodeAclsRecursive(ZKUtil.joinZNode(znode, child));
268 }
269 List<ACL> acls = ZKUtil.createACL(this, znode, true);
270 LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls);
271 recoverableZooKeeper.setAcl(znode, acls, -1);
272 }
273
274
275
276
277
278
279
280 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
281 if (LOG.isDebugEnabled()) {
282 LOG.debug("Checking znode ACLs");
283 }
284 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
285
286 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
287 return false;
288 }
289
290
291
292 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
293
294 if (acls.isEmpty()) {
295 if (LOG.isDebugEnabled()) {
296 LOG.debug("ACL is empty");
297 }
298 return false;
299 }
300
301 for (ACL acl : acls) {
302 int perms = acl.getPerms();
303 Id id = acl.getId();
304
305
306 if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
307 if (perms != Perms.READ) {
308 if (LOG.isDebugEnabled()) {
309 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
310 id, perms, Perms.READ));
311 }
312 return false;
313 }
314 } else if (superUsers != null && isSuperUserId(superUsers, id)) {
315 if (perms != Perms.ALL) {
316 if (LOG.isDebugEnabled()) {
317 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
318 id, perms, Perms.ALL));
319 }
320 return false;
321 }
322 } else if ("sasl".equals(id.getScheme())) {
323 String name = id.getId();
324
325 Matcher match = NAME_PATTERN.matcher(name);
326 if (match.matches()) {
327 name = match.group(1);
328 }
329 if (name.equals(hbaseUser)) {
330 if (perms != Perms.ALL) {
331 if (LOG.isDebugEnabled()) {
332 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
333 id, perms, Perms.ALL));
334 }
335 return false;
336 }
337 } else {
338 if (LOG.isDebugEnabled()) {
339 LOG.debug("Unexpected shortname in SASL ACL: " + id);
340 }
341 return false;
342 }
343 } else {
344 if (LOG.isDebugEnabled()) {
345 LOG.debug("unexpected ACL id '" + id + "'");
346 }
347 return false;
348 }
349 }
350 return true;
351 }
352
353
354
355
356 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
357 for (String user : superUsers) {
358 boolean hasAccess = false;
359
360 if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
361 for (ACL acl : acls) {
362 if (user.equals(acl.getId().getId())) {
363 if (acl.getPerms() == Perms.ALL) {
364 hasAccess = true;
365 } else {
366 if (LOG.isDebugEnabled()) {
367 LOG.debug(String.format(
368 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
369 acl.getId().getId(), acl.getPerms(), Perms.ALL));
370 }
371 }
372 break;
373 }
374 }
375 if (!hasAccess) {
376 return false;
377 }
378 }
379 }
380 return true;
381 }
382
383
384
385
386 public static boolean isSuperUserId(String[] superUsers, Id id) {
387 for (String user : superUsers) {
388
389 if (!user.startsWith(AuthUtil.GROUP_PREFIX) && new Id("sasl", user).equals(id)) {
390 return true;
391 }
392 }
393 return false;
394 }
395
396 @Override
397 public String toString() {
398 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode;
399 }
400
401
402
403
404
405
406
407 public String prefix(final String str) {
408 return this.toString() + " " + str;
409 }
410
411
412
413
414 private void setNodeNames(Configuration conf) {
415 baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
416 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
417 metaReplicaZnodes.put(0, ZKUtil.joinZNode(baseZNode,
418 conf.get("zookeeper.znode.metaserver", "meta-region-server")));
419 int numMetaReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
420 HConstants.DEFAULT_META_REPLICA_NUM);
421 for (int i = 1; i < numMetaReplicas; i++) {
422 String str = ZKUtil.joinZNode(baseZNode,
423 conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + i);
424 metaReplicaZnodes.put(i, str);
425 }
426 rsZNode = ZKUtil.joinZNode(baseZNode,
427 conf.get("zookeeper.znode.rs", "rs"));
428 drainingZNode = ZKUtil.joinZNode(baseZNode,
429 conf.get("zookeeper.znode.draining.rs", "draining"));
430 masterAddressZNode = ZKUtil.joinZNode(baseZNode,
431 conf.get("zookeeper.znode.master", "master"));
432 backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
433 conf.get("zookeeper.znode.backup.masters", "backup-masters"));
434 clusterStateZNode = ZKUtil.joinZNode(baseZNode,
435 conf.get("zookeeper.znode.state", "running"));
436 assignmentZNode = ZKUtil.joinZNode(baseZNode,
437 conf.get("zookeeper.znode.unassigned", "region-in-transition"));
438 tableZNode = ZKUtil.joinZNode(baseZNode,
439 conf.get("zookeeper.znode.tableEnableDisable", "table"));
440 clusterIdZNode = ZKUtil.joinZNode(baseZNode,
441 conf.get("zookeeper.znode.clusterId", "hbaseid"));
442 splitLogZNode = ZKUtil.joinZNode(baseZNode,
443 conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
444 balancerZNode = ZKUtil.joinZNode(baseZNode,
445 conf.get("zookeeper.znode.balancer", "balancer"));
446 regionNormalizerZNode = ZKUtil.joinZNode(baseZNode,
447 conf.get("zookeeper.znode.regionNormalizer", "normalizer"));
448 tableLockZNode = ZKUtil.joinZNode(baseZNode,
449 conf.get("zookeeper.znode.tableLock", "table-lock"));
450 recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
451 conf.get("zookeeper.znode.recovering.regions", "recovering-regions"));
452 namespaceZNode = ZKUtil.joinZNode(baseZNode,
453 conf.get("zookeeper.znode.namespace", "namespace"));
454 }
455
456
457
458
459
460
461 public boolean isAnyMetaReplicaZnode(String node) {
462 if (metaReplicaZnodes.values().contains(node)) {
463 return true;
464 }
465 return false;
466 }
467
468
469
470
471
472
473 public boolean isDefaultMetaReplicaZnode(String node) {
474 if (getZNodeForReplica(HRegionInfo.DEFAULT_REPLICA_ID).equals(node)) {
475 return true;
476 }
477 return false;
478 }
479
480
481
482
483
484
485 public List<String> getMetaReplicaNodes() throws KeeperException {
486 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, baseZNode);
487 List<String> metaReplicaNodes = new ArrayList<String>(2);
488 if (childrenOfBaseNode != null) {
489 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
490 for (String child : childrenOfBaseNode) {
491 if (child.startsWith(pattern)) metaReplicaNodes.add(child);
492 }
493 }
494 return metaReplicaNodes;
495 }
496
497
498
499
500
501
502 public String getZNodeForReplica(int replicaId) {
503 String str = metaReplicaZnodes.get(replicaId);
504
505
506
507 if (str == null) {
508 str = ZKUtil.joinZNode(baseZNode,
509 conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + replicaId);
510 }
511 return str;
512 }
513
514
515
516
517
518
519 public int getMetaReplicaIdFromZnode(String znode) {
520 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
521 if (znode.equals(pattern)) return HRegionInfo.DEFAULT_REPLICA_ID;
522
523 String nonDefaultPattern = pattern + "-";
524 return Integer.parseInt(znode.substring(nonDefaultPattern.length()));
525 }
526
527
528
529
530
531 public void registerListener(ZooKeeperListener listener) {
532 listeners.add(listener);
533 }
534
535
536
537
538
539
540 public void registerListenerFirst(ZooKeeperListener listener) {
541 listeners.add(0, listener);
542 }
543
544 public void unregisterListener(ZooKeeperListener listener) {
545 listeners.remove(listener);
546 }
547
548
549
550
551 public void unregisterAllListeners() {
552 listeners.clear();
553 }
554
555
556
557
558 public List<ZooKeeperListener> getListeners() {
559 return new ArrayList<ZooKeeperListener>(listeners);
560 }
561
562
563
564
565 public int getNumberOfListeners() {
566 return listeners.size();
567 }
568
569
570
571
572
573 public RecoverableZooKeeper getRecoverableZooKeeper() {
574 return recoverableZooKeeper;
575 }
576
577 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
578 recoverableZooKeeper.reconnectAfterExpiration();
579 }
580
581
582
583
584
585 public String getQuorum() {
586 return quorum;
587 }
588
589
590
591
592 public String getBaseZNode() {
593 return baseZNode;
594 }
595
596
597
598
599
600
601
602 @Override
603 public void process(WatchedEvent event) {
604 LOG.debug(prefix("Received ZooKeeper Event, " +
605 "type=" + event.getType() + ", " +
606 "state=" + event.getState() + ", " +
607 "path=" + event.getPath()));
608
609 switch(event.getType()) {
610
611
612 case None: {
613 connectionEvent(event);
614 break;
615 }
616
617
618
619 case NodeCreated: {
620 for(ZooKeeperListener listener : listeners) {
621 listener.nodeCreated(event.getPath());
622 }
623 break;
624 }
625
626 case NodeDeleted: {
627 for(ZooKeeperListener listener : listeners) {
628 listener.nodeDeleted(event.getPath());
629 }
630 break;
631 }
632
633 case NodeDataChanged: {
634 for(ZooKeeperListener listener : listeners) {
635 listener.nodeDataChanged(event.getPath());
636 }
637 break;
638 }
639
640 case NodeChildrenChanged: {
641 for(ZooKeeperListener listener : listeners) {
642 listener.nodeChildrenChanged(event.getPath());
643 }
644 break;
645 }
646 }
647 }
648
649
650
651
652
653
654
655
656
657
658
659
660
661 private void connectionEvent(WatchedEvent event) {
662 switch(event.getState()) {
663 case SyncConnected:
664
665
666 long finished = System.currentTimeMillis() +
667 this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
668 while (System.currentTimeMillis() < finished) {
669 try {
670 Thread.sleep(1);
671 } catch (InterruptedException e) {
672 LOG.warn("Interrupted while sleeping");
673 throw new RuntimeException("Interrupted while waiting for" +
674 " recoverableZooKeeper is set");
675 }
676 if (this.recoverableZooKeeper != null) break;
677 }
678
679 if (this.recoverableZooKeeper == null) {
680 LOG.error("ZK is null on connection event -- see stack trace " +
681 "for the stack trace when constructor was called on this zkw",
682 this.constructorCaller);
683 throw new NullPointerException("ZK is null");
684 }
685 this.identifier = this.prefix + "-0x" +
686 Long.toHexString(this.recoverableZooKeeper.getSessionId());
687
688 LOG.debug(this.identifier + " connected");
689 break;
690
691
692 case Disconnected:
693 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
694 break;
695
696 case Expired:
697 String msg = prefix(this.identifier + " received expired from " +
698 "ZooKeeper, aborting");
699
700
701 if (this.abortable != null) {
702 this.abortable.abort(msg, new KeeperException.SessionExpiredException());
703 }
704 break;
705
706 case ConnectedReadOnly:
707 case SaslAuthenticated:
708 case AuthFailed:
709 break;
710
711 default:
712 throw new IllegalStateException("Received event is not valid: " + event.getState());
713 }
714 }
715
716
717
718
719
720
721
722
723
724
725
726
727
728 public void sync(String path) throws KeeperException {
729 this.recoverableZooKeeper.sync(path, null, null);
730 }
731
732
733
734
735
736
737
738
739
740
741
742 public void keeperException(KeeperException ke)
743 throws KeeperException {
744 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
745 throw ke;
746 }
747
748
749
750
751
752
753 public void interruptedException(InterruptedException ie) throws KeeperException {
754 interruptedExceptionNoThrow(ie, true);
755
756 throw new KeeperException.SystemErrorException();
757 }
758
759
760
761
762
763
764 public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) {
765 LOG.debug(prefix("Received InterruptedException, will interrupt current thread"
766 + (throwLater ? " and rethrow a SystemErrorException" : "")),
767 ie);
768
769 Thread.currentThread().interrupt();
770 }
771
772
773
774
775
776 @Override
777 public void close() {
778 try {
779 if (recoverableZooKeeper != null) {
780 recoverableZooKeeper.close();
781 }
782 } catch (InterruptedException e) {
783 Thread.currentThread().interrupt();
784 }
785 }
786
787 public Configuration getConfiguration() {
788 return conf;
789 }
790
791 @Override
792 public void abort(String why, Throwable e) {
793 if (this.abortable != null) this.abortable.abort(why, e);
794 else this.aborted = true;
795 }
796
797 @Override
798 public boolean isAborted() {
799 return this.abortable == null? this.aborted: this.abortable.isAborted();
800 }
801
802
803
804
805 public String getMasterAddressZNode() {
806 return this.masterAddressZNode;
807 }
808
809
810
811
812 public String getRegionNormalizerZNode() {
813 return regionNormalizerZNode;
814 }
815 }