1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.Abortable;
25 import org.apache.zookeeper.KeeperException;
26
27
28
29
30
31
32
33
34
35
36 @InterfaceAudience.Private
37 public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
38
39 protected static final Log LOG = LogFactory.getLog(ZooKeeperNodeTracker.class);
40
41 protected final String node;
42
43
44 private byte [] data;
45
46
47 protected final Abortable abortable;
48
49 private boolean stopped = false;
50
51
52
53
54
55
56
57
58
59
60 public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node,
61 Abortable abortable) {
62 super(watcher);
63 this.node = node;
64 this.abortable = abortable;
65 this.data = null;
66 }
67
68
69
70
71
72
73
74 public synchronized void start() {
75 this.watcher.registerListener(this);
76 try {
77 if(ZKUtil.watchAndCheckExists(watcher, node)) {
78 byte [] data = ZKUtil.getDataAndWatch(watcher, node);
79 if(data != null) {
80 this.data = data;
81 } else {
82
83 LOG.debug("Try starting again because there is no data from " + node);
84 start();
85 }
86 }
87 } catch (KeeperException e) {
88 abortable.abort("Unexpected exception during initialization, aborting", e);
89 }
90 }
91
92 public synchronized void stop() {
93 this.stopped = true;
94 notifyAll();
95 }
96
97
98
99
100
101
102
103 public synchronized byte [] blockUntilAvailable()
104 throws InterruptedException {
105 return blockUntilAvailable(0, false);
106 }
107
108
109
110
111
112
113
114
115
116
117 public synchronized byte [] blockUntilAvailable(long timeout, boolean refresh)
118 throws InterruptedException {
119 if (timeout < 0) throw new IllegalArgumentException();
120 boolean notimeout = timeout == 0;
121 long startTime = System.currentTimeMillis();
122 long remaining = timeout;
123 if (refresh) {
124 try {
125
126 this.data = ZKUtil.getDataAndWatch(watcher, node);
127 } catch(KeeperException e) {
128
129
130 LOG.warn("Unexpected exception handling blockUntilAvailable", e);
131 abortable.abort("Unexpected exception handling blockUntilAvailable", e);
132 }
133 }
134 boolean nodeExistsChecked = (!refresh ||data!=null);
135 while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
136 if (!nodeExistsChecked) {
137 try {
138 nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1);
139 } catch (KeeperException e) {
140 LOG.warn(
141 "Got exception while trying to check existence in ZooKeeper" +
142 " of the node: "+node+", retrying if timeout not reached",e );
143 }
144
145
146 if (nodeExistsChecked){
147 LOG.debug("Node " + node + " now exists, resetting a watcher");
148 try {
149
150 this.data = ZKUtil.getDataAndWatch(watcher, node);
151 } catch (KeeperException e) {
152 LOG.warn("Unexpected exception handling blockUntilAvailable", e);
153 abortable.abort("Unexpected exception handling blockUntilAvailable", e);
154 }
155 }
156 }
157
158
159 wait(100);
160 remaining = timeout - (System.currentTimeMillis() - startTime);
161 }
162 return this.data;
163 }
164
165
166
167
168
169
170
171
172
173
174 public synchronized byte [] getData(boolean refresh) {
175 if (refresh) {
176 try {
177 this.data = ZKUtil.getDataAndWatch(watcher, node);
178 } catch(KeeperException e) {
179 abortable.abort("Unexpected exception handling getData", e);
180 }
181 }
182 return this.data;
183 }
184
185 public String getNode() {
186 return this.node;
187 }
188
189 @Override
190 public synchronized void nodeCreated(String path) {
191 if (!path.equals(node)) return;
192 try {
193 byte [] data = ZKUtil.getDataAndWatch(watcher, node);
194 if (data != null) {
195 this.data = data;
196 notifyAll();
197 } else {
198 nodeDeleted(path);
199 }
200 } catch(KeeperException e) {
201 abortable.abort("Unexpected exception handling nodeCreated event", e);
202 }
203 }
204
205 @Override
206 public synchronized void nodeDeleted(String path) {
207 if(path.equals(node)) {
208 try {
209 if(ZKUtil.watchAndCheckExists(watcher, node)) {
210 nodeCreated(path);
211 } else {
212 this.data = null;
213 }
214 } catch(KeeperException e) {
215 abortable.abort("Unexpected exception handling nodeDeleted event", e);
216 }
217 }
218 }
219
220 @Override
221 public synchronized void nodeDataChanged(String path) {
222 if(path.equals(node)) {
223 nodeCreated(path);
224 }
225 }
226
227
228
229
230
231
232
233 public boolean checkIfBaseNodeAvailable() {
234 try {
235 if (ZKUtil.checkExists(watcher, watcher.baseZNode) == -1) {
236 return false;
237 }
238 } catch (KeeperException e) {
239 abortable
240 .abort(
241 "Exception while checking if basenode ("+watcher.baseZNode+
242 ") exists in ZooKeeper.",
243 e);
244 }
245 return true;
246 }
247
248 @Override
249 public String toString() {
250 return "ZooKeeperNodeTracker{" +
251 "node='" + node + ", stopped=" + stopped + '}';
252 }
253 }