1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.zookeeper;
19
20 import java.io.IOException;
21 import java.util.List;
22 import java.util.NavigableSet;
23 import java.util.TreeSet;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.Abortable;
29 import org.apache.hadoop.hbase.ServerName;
30 import org.apache.hadoop.hbase.master.ServerManager;
31 import org.apache.zookeeper.KeeperException;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class DrainingServerTracker extends ZooKeeperListener {
49 private static final Log LOG = LogFactory.getLog(DrainingServerTracker.class);
50
51 private ServerManager serverManager;
52 private final NavigableSet<ServerName> drainingServers = new TreeSet<ServerName>();
53 private Abortable abortable;
54
55 public DrainingServerTracker(ZooKeeperWatcher watcher,
56 Abortable abortable, ServerManager serverManager) {
57 super(watcher);
58 this.abortable = abortable;
59 this.serverManager = serverManager;
60 }
61
62
63
64
65
66
67
68
69 public void start() throws KeeperException, IOException {
70 watcher.registerListener(this);
71 List<String> servers =
72 ZKUtil.listChildrenAndWatchThem(watcher, watcher.drainingZNode);
73 add(servers);
74 }
75
76 private void add(final List<String> servers) throws IOException {
77 synchronized(this.drainingServers) {
78 this.drainingServers.clear();
79 for (String n: servers) {
80 final ServerName sn = ServerName.valueOf(ZKUtil.getNodeName(n));
81 this.drainingServers.add(sn);
82 this.serverManager.addServerToDrainList(sn);
83 LOG.info("Draining RS node created, adding to list [" +
84 sn + "]");
85
86 }
87 }
88 }
89
90 private void remove(final ServerName sn) {
91 synchronized(this.drainingServers) {
92 this.drainingServers.remove(sn);
93 this.serverManager.removeServerFromDrainList(sn);
94 }
95 }
96
97 @Override
98 public void nodeDeleted(final String path) {
99 if(path.startsWith(watcher.drainingZNode)) {
100 final ServerName sn = ServerName.valueOf(ZKUtil.getNodeName(path));
101 LOG.info("Draining RS node deleted, removing from list [" +
102 sn + "]");
103 remove(sn);
104 }
105 }
106
107 @Override
108 public void nodeChildrenChanged(final String path) {
109 if(path.equals(watcher.drainingZNode)) {
110 try {
111 final List<String> newNodes =
112 ZKUtil.listChildrenAndWatchThem(watcher, watcher.drainingZNode);
113 add(newNodes);
114 } catch (KeeperException e) {
115 abortable.abort("Unexpected zk exception getting RS nodes", e);
116 } catch (IOException e) {
117 abortable.abort("Unexpected zk exception getting RS nodes", e);
118 }
119 }
120 }
121 }