1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.zookeeper;
19  
20  import java.io.IOException;
21  import java.util.List;
22  import java.util.NavigableSet;
23  import java.util.TreeSet;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.Abortable;
29  import org.apache.hadoop.hbase.ServerName;
30  import org.apache.hadoop.hbase.master.ServerManager;
31  import org.apache.zookeeper.KeeperException;
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  @InterfaceAudience.Private
48  public class DrainingServerTracker extends ZooKeeperListener {
49    private static final Log LOG = LogFactory.getLog(DrainingServerTracker.class);
50  
51    private ServerManager serverManager;
52    private final NavigableSet<ServerName> drainingServers = new TreeSet<ServerName>();
53    private Abortable abortable;
54  
55    public DrainingServerTracker(ZooKeeperWatcher watcher,
56        Abortable abortable, ServerManager serverManager) {
57      super(watcher);
58      this.abortable = abortable;
59      this.serverManager = serverManager;
60    }
61  
62    
63  
64  
65  
66  
67  
68  
69    public void start() throws KeeperException, IOException {
70      watcher.registerListener(this);
71      List<String> servers =
72        ZKUtil.listChildrenAndWatchThem(watcher, watcher.drainingZNode);
73      add(servers);
74    }
75  
76    private void add(final List<String> servers) throws IOException {
77      synchronized(this.drainingServers) {
78        this.drainingServers.clear();
79        for (String n: servers) {
80          final ServerName sn = ServerName.valueOf(ZKUtil.getNodeName(n));
81          this.drainingServers.add(sn);
82          this.serverManager.addServerToDrainList(sn);
83          LOG.info("Draining RS node created, adding to list [" +
84              sn + "]");
85  
86        }
87      }
88    }
89  
90    private void remove(final ServerName sn) {
91      synchronized(this.drainingServers) {
92        this.drainingServers.remove(sn);
93        this.serverManager.removeServerFromDrainList(sn);
94      }
95    }
96  
97    @Override
98    public void nodeDeleted(final String path) {
99      if(path.startsWith(watcher.drainingZNode)) {
100       final ServerName sn = ServerName.valueOf(ZKUtil.getNodeName(path));
101       LOG.info("Draining RS node deleted, removing from list [" +
102           sn + "]");
103       remove(sn);
104     }
105   }
106 
107   @Override
108   public void nodeChildrenChanged(final String path) {
109     if(path.equals(watcher.drainingZNode)) {
110       try {
111         final List<String> newNodes =
112           ZKUtil.listChildrenAndWatchThem(watcher, watcher.drainingZNode);
113         add(newNodes);
114       } catch (KeeperException e) {
115         abortable.abort("Unexpected zk exception getting RS nodes", e);
116       } catch (IOException e) {
117         abortable.abort("Unexpected zk exception getting RS nodes", e);
118       }
119     }
120   }
121 }