1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.CopyOnWriteArrayList;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.Abortable;
30 import org.apache.hadoop.hbase.Stoppable;
31 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
32 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
33 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
34 import org.apache.zookeeper.KeeperException;
35
36
37
38
39
40
41 @InterfaceAudience.Private
42 public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker {
43
44 private static final Log LOG = LogFactory.getLog(ReplicationTrackerZKImpl.class);
45
46 private final Stoppable stopper;
47
48 private final List<ReplicationListener> listeners =
49 new CopyOnWriteArrayList<ReplicationListener>();
50
51 private final ArrayList<String> otherRegionServers = new ArrayList<String>();
52 private final ReplicationPeers replicationPeers;
53
54 public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper,
55 final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
56 Stoppable stopper) {
57 super(zookeeper, conf, abortable);
58 this.replicationPeers = replicationPeers;
59 this.stopper = stopper;
60 this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
61 this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
62 }
63
64 @Override
65 public void registerListener(ReplicationListener listener) {
66 listeners.add(listener);
67 }
68
69 @Override
70 public void removeListener(ReplicationListener listener) {
71 listeners.remove(listener);
72 }
73
74
75
76
77 @Override
78 public List<String> getListOfRegionServers() {
79 refreshOtherRegionServersList();
80
81 List<String> list = null;
82 synchronized (otherRegionServers) {
83 list = new ArrayList<String>(otherRegionServers);
84 }
85 return list;
86 }
87
88
89
90
91
92 public class OtherRegionServerWatcher extends ZooKeeperListener {
93
94
95
96
97 public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
98 super(watcher);
99 }
100
101
102
103
104
105 public void nodeCreated(String path) {
106 refreshListIfRightPath(path);
107 }
108
109
110
111
112
113 public void nodeDeleted(String path) {
114 if (stopper.isStopped()) {
115 return;
116 }
117 boolean cont = refreshListIfRightPath(path);
118 if (!cont) {
119 return;
120 }
121 LOG.info(path + " znode expired, triggering replicatorRemoved event");
122 for (ReplicationListener rl : listeners) {
123 rl.regionServerRemoved(getZNodeName(path));
124 }
125 }
126
127
128
129
130
131 public void nodeChildrenChanged(String path) {
132 if (stopper.isStopped()) {
133 return;
134 }
135 refreshListIfRightPath(path);
136 }
137
138 private boolean refreshListIfRightPath(String path) {
139 if (!path.startsWith(this.watcher.rsZNode)) {
140 return false;
141 }
142 return refreshOtherRegionServersList();
143 }
144 }
145
146
147
148
149 public class PeersWatcher extends ZooKeeperListener {
150
151
152
153
154 public PeersWatcher(ZooKeeperWatcher watcher) {
155 super(watcher);
156 }
157
158
159
160
161
162 public void nodeDeleted(String path) {
163 List<String> peers = refreshPeersList(path);
164 if (peers == null) {
165 return;
166 }
167 if (isPeerPath(path)) {
168 String id = getZNodeName(path);
169 LOG.info(path + " znode expired, triggering peerRemoved event");
170 for (ReplicationListener rl : listeners) {
171 rl.peerRemoved(id);
172 }
173 }
174 }
175
176
177
178
179
180 public void nodeChildrenChanged(String path) {
181 List<String> peers = refreshPeersList(path);
182 if (peers == null) {
183 return;
184 }
185 LOG.info(path + " znode expired, triggering peerListChanged event");
186 for (ReplicationListener rl : listeners) {
187 rl.peerListChanged(peers);
188 }
189 }
190 }
191
192
193
194
195
196
197
198 private List<String> refreshPeersList(String path) {
199 if (!path.startsWith(getPeersZNode())) {
200 return null;
201 }
202 return this.replicationPeers.getAllPeerIds();
203 }
204
205 private String getPeersZNode() {
206 return this.peersZNode;
207 }
208
209
210
211
212
213
214 private String getZNodeName(String fullPath) {
215 String[] parts = fullPath.split("/");
216 return parts.length > 0 ? parts[parts.length - 1] : "";
217 }
218
219
220
221
222
223
224
225 private boolean refreshOtherRegionServersList() {
226 List<String> newRsList = getRegisteredRegionServers();
227 if (newRsList == null) {
228 return false;
229 } else {
230 synchronized (otherRegionServers) {
231 otherRegionServers.clear();
232 otherRegionServers.addAll(newRsList);
233 }
234 }
235 return true;
236 }
237
238
239
240
241
242 private List<String> getRegisteredRegionServers() {
243 List<String> result = null;
244 try {
245 result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.rsZNode);
246 } catch (KeeperException e) {
247 this.abortable.abort("Get list of registered region servers", e);
248 }
249 return result;
250 }
251 }