1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.Abortable;
25 import org.apache.zookeeper.KeeperException;
26
27
28
29
30
31
32
33
34
35
36 @InterfaceAudience.Private
37 public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
38 static final Log LOG = LogFactory.getLog(ZooKeeperNodeTracker.class);
39
40 protected final String node;
41
42
43 private byte [] data;
44
45
46 protected final Abortable abortable;
47
48 private boolean stopped = false;
49
50
51
52
53
54
55
56
57
58
59 public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node,
60 Abortable abortable) {
61 super(watcher);
62 this.node = node;
63 this.abortable = abortable;
64 this.data = null;
65 }
66
67
68
69
70
71
72
73 public synchronized void start() {
74 this.watcher.registerListener(this);
75 try {
76 if(ZKUtil.watchAndCheckExists(watcher, node)) {
77 byte [] data = ZKUtil.getDataAndWatch(watcher, node);
78 if(data != null) {
79 this.data = data;
80 } else {
81
82 LOG.debug("Try starting again because there is no data from " + node);
83 start();
84 }
85 }
86 } catch (KeeperException e) {
87 abortable.abort("Unexpected exception during initialization, aborting", e);
88 }
89 }
90
91 public synchronized void stop() {
92 this.stopped = true;
93 notifyAll();
94 }
95
96
97
98
99
100
101
102 public synchronized byte [] blockUntilAvailable()
103 throws InterruptedException {
104 return blockUntilAvailable(0, false);
105 }
106
107
108
109
110
111
112
113
114
115
116 public synchronized byte [] blockUntilAvailable(long timeout, boolean refresh)
117 throws InterruptedException {
118 if (timeout < 0) throw new IllegalArgumentException();
119 boolean notimeout = timeout == 0;
120 long startTime = System.currentTimeMillis();
121 long remaining = timeout;
122 if (refresh) {
123 try {
124
125 this.data = ZKUtil.getDataAndWatch(watcher, node);
126 } catch(KeeperException e) {
127
128
129 LOG.warn("Unexpected exception handling blockUntilAvailable", e);
130 abortable.abort("Unexpected exception handling blockUntilAvailable", e);
131 }
132 }
133 boolean nodeExistsChecked = (!refresh ||data!=null);
134 while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
135 if (!nodeExistsChecked) {
136 try {
137 nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1);
138 } catch (KeeperException e) {
139 LOG.warn(
140 "Got exception while trying to check existence in ZooKeeper" +
141 " of the node: "+node+", retrying if timeout not reached",e );
142 }
143
144
145 if (nodeExistsChecked){
146 LOG.debug("Node " + node + " now exists, resetting a watcher");
147 try {
148
149 this.data = ZKUtil.getDataAndWatch(watcher, node);
150 } catch (KeeperException e) {
151 LOG.warn("Unexpected exception handling blockUntilAvailable", e);
152 abortable.abort("Unexpected exception handling blockUntilAvailable", e);
153 }
154 }
155 }
156
157
158 wait(100);
159 remaining = timeout - (System.currentTimeMillis() - startTime);
160 }
161 return this.data;
162 }
163
164
165
166
167
168
169
170
171
172
173 public synchronized byte [] getData(boolean refresh) {
174 if (refresh) {
175 try {
176 this.data = ZKUtil.getDataAndWatch(watcher, node);
177 } catch(KeeperException e) {
178 abortable.abort("Unexpected exception handling getData", e);
179 }
180 }
181 return this.data;
182 }
183
184 public String getNode() {
185 return this.node;
186 }
187
188 @Override
189 public synchronized void nodeCreated(String path) {
190 if (!path.equals(node)) return;
191 try {
192 byte [] data = ZKUtil.getDataAndWatch(watcher, node);
193 if (data != null) {
194 this.data = data;
195 notifyAll();
196 } else {
197 nodeDeleted(path);
198 }
199 } catch(KeeperException e) {
200 abortable.abort("Unexpected exception handling nodeCreated event", e);
201 }
202 }
203
204 @Override
205 public synchronized void nodeDeleted(String path) {
206 if(path.equals(node)) {
207 try {
208 if(ZKUtil.watchAndCheckExists(watcher, node)) {
209 nodeCreated(path);
210 } else {
211 this.data = null;
212 }
213 } catch(KeeperException e) {
214 abortable.abort("Unexpected exception handling nodeDeleted event", e);
215 }
216 }
217 }
218
219 @Override
220 public synchronized void nodeDataChanged(String path) {
221 if(path.equals(node)) {
222 nodeCreated(path);
223 }
224 }
225
226
227
228
229
230
231
232 public boolean checkIfBaseNodeAvailable() {
233 try {
234 if (ZKUtil.checkExists(watcher, watcher.baseZNode) == -1) {
235 return false;
236 }
237 } catch (KeeperException e) {
238 abortable
239 .abort(
240 "Exception while checking if basenode ("+watcher.baseZNode+
241 ") exists in ZooKeeper.",
242 e);
243 }
244 return true;
245 }
246
247 @Override
248 public String toString() {
249 return "ZooKeeperNodeTracker{" +
250 "node='" + node + ", stopped=" + stopped + '}';
251 }
252 }