1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.Abortable;
25  import org.apache.zookeeper.KeeperException;
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  @InterfaceAudience.Private
37  public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
38    
39    protected static final Log LOG = LogFactory.getLog(ZooKeeperNodeTracker.class);
40    
41    protected final String node;
42  
43    
44    private byte [] data;
45  
46    
47    protected final Abortable abortable;
48  
49    private boolean stopped = false;
50  
51    
52  
53  
54  
55  
56  
57  
58  
59  
60    public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node,
61        Abortable abortable) {
62      super(watcher);
63      this.node = node;
64      this.abortable = abortable;
65      this.data = null;
66    }
67  
68    
69  
70  
71  
72  
73  
74    public synchronized void start() {
75      this.watcher.registerListener(this);
76      try {
77        if(ZKUtil.watchAndCheckExists(watcher, node)) {
78          byte [] data = ZKUtil.getDataAndWatch(watcher, node);
79          if(data != null) {
80            this.data = data;
81          } else {
82            
83            LOG.debug("Try starting again because there is no data from " + node);
84            start();
85          }
86        }
87      } catch (KeeperException e) {
88        abortable.abort("Unexpected exception during initialization, aborting", e);
89      }
90    }
91  
92    public synchronized void stop() {
93      this.stopped = true;
94      notifyAll();
95    }
96  
97    
98  
99  
100 
101 
102 
103   public synchronized byte [] blockUntilAvailable()
104   throws InterruptedException {
105     return blockUntilAvailable(0, false);
106   }
107 
108   
109 
110 
111 
112 
113 
114 
115 
116 
117   public synchronized byte [] blockUntilAvailable(long timeout, boolean refresh)
118   throws InterruptedException {
119     if (timeout < 0) throw new IllegalArgumentException();
120     boolean notimeout = timeout == 0;
121     long startTime = System.currentTimeMillis();
122     long remaining = timeout;
123     if (refresh) {
124       try {
125         
126         this.data = ZKUtil.getDataAndWatch(watcher, node);
127       } catch(KeeperException e) {
128         
129         
130         LOG.warn("Unexpected exception handling blockUntilAvailable", e);
131         abortable.abort("Unexpected exception handling blockUntilAvailable", e);
132       }
133     }
134     boolean nodeExistsChecked = (!refresh ||data!=null);
135     while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
136       if (!nodeExistsChecked) {
137         try {
138           nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1);
139         } catch (KeeperException e) {
140           LOG.warn(
141             "Got exception while trying to check existence in  ZooKeeper" +
142             " of the node: "+node+", retrying if timeout not reached",e );
143         }
144 
145         
146         if (nodeExistsChecked){
147           LOG.debug("Node " + node + " now exists, resetting a watcher");
148           try {
149             
150             this.data = ZKUtil.getDataAndWatch(watcher, node);
151           } catch (KeeperException e) {
152             LOG.warn("Unexpected exception handling blockUntilAvailable", e);
153             abortable.abort("Unexpected exception handling blockUntilAvailable", e);
154           }
155         }
156       }
157       
158       
159       wait(100);
160       remaining = timeout - (System.currentTimeMillis() - startTime);
161     }
162     return this.data;
163   }
164 
165   
166 
167 
168 
169 
170 
171 
172 
173 
174   public synchronized byte [] getData(boolean refresh) {
175     if (refresh) {
176       try {
177         this.data = ZKUtil.getDataAndWatch(watcher, node);
178       } catch(KeeperException e) {
179         abortable.abort("Unexpected exception handling getData", e);
180       }
181     }
182     return this.data;
183   }
184 
185   public String getNode() {
186     return this.node;
187   }
188 
189   @Override
190   public synchronized void nodeCreated(String path) {
191     if (!path.equals(node)) return;
192     try {
193       byte [] data = ZKUtil.getDataAndWatch(watcher, node);
194       if (data != null) {
195         this.data = data;
196         notifyAll();
197       } else {
198         nodeDeleted(path);
199       }
200     } catch(KeeperException e) {
201       abortable.abort("Unexpected exception handling nodeCreated event", e);
202     }
203   }
204 
205   @Override
206   public synchronized void nodeDeleted(String path) {
207     if(path.equals(node)) {
208       try {
209         if(ZKUtil.watchAndCheckExists(watcher, node)) {
210           nodeCreated(path);
211         } else {
212           this.data = null;
213         }
214       } catch(KeeperException e) {
215         abortable.abort("Unexpected exception handling nodeDeleted event", e);
216       }
217     }
218   }
219 
220   @Override
221   public synchronized void nodeDataChanged(String path) {
222     if(path.equals(node)) {
223       nodeCreated(path);
224     }
225   }
226   
227   
228 
229 
230 
231 
232 
233   public boolean checkIfBaseNodeAvailable() {
234     try {
235       if (ZKUtil.checkExists(watcher, watcher.baseZNode) == -1) {
236         return false;
237       }
238     } catch (KeeperException e) {
239       abortable
240           .abort(
241               "Exception while checking if basenode ("+watcher.baseZNode+
242                 ") exists in ZooKeeper.",
243               e);
244     }
245     return true;
246   }
247 
248   @Override
249   public String toString() {
250     return "ZooKeeperNodeTracker{" +
251         "node='" + node + ", stopped=" + stopped + '}';
252   }
253 }