View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.Abortable;
25  import org.apache.zookeeper.KeeperException;
26  
27  /**
28   * Tracks the availability and value of a single ZooKeeper node.
29   *
30   * <p>Utilizes the {@link ZooKeeperListener} interface to get the necessary
31   * ZooKeeper events related to the node.
32   *
33   * <p>This is the base class used by trackers in both the Master and
34   * RegionServers.
35   */
36  @InterfaceAudience.Private
37  public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
38    // LOG is being used in subclasses, hence keeping it protected
39    protected static final Log LOG = LogFactory.getLog(ZooKeeperNodeTracker.class);
40    /** Path of node being tracked */
41    protected final String node;
42  
43    /** Data of the node being tracked */
44    private byte [] data;
45  
46    /** Used to abort if a fatal error occurs */
47    protected final Abortable abortable;
48  
49    private boolean stopped = false;
50  
51    /**
52     * Constructs a new ZK node tracker.
53     *
54     * <p>After construction, use {@link #start} to kick off tracking.
55     *
56     * @param watcher
57     * @param node
58     * @param abortable
59     */
60    public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node,
61        Abortable abortable) {
62      super(watcher);
63      this.node = node;
64      this.abortable = abortable;
65      this.data = null;
66    }
67  
68    /**
69     * Starts the tracking of the node in ZooKeeper.
70     *
71     * <p>Use {@link #blockUntilAvailable()} to block until the node is available
72     * or {@link #getData(boolean)} to get the data of the node if it is available.
73     */
74    public synchronized void start() {
75      this.watcher.registerListener(this);
76      try {
77        if(ZKUtil.watchAndCheckExists(watcher, node)) {
78          byte [] data = ZKUtil.getDataAndWatch(watcher, node);
79          if(data != null) {
80            this.data = data;
81          } else {
82            // It existed but now does not, try again to ensure a watch is set
83            LOG.debug("Try starting again because there is no data from " + node);
84            start();
85          }
86        }
87      } catch (KeeperException e) {
88        abortable.abort("Unexpected exception during initialization, aborting", e);
89      }
90    }
91  
92    public synchronized void stop() {
93      this.stopped = true;
94      notifyAll();
95    }
96  
97    /**
98     * Gets the data of the node, blocking until the node is available.
99     *
100    * @return data of the node
101    * @throws InterruptedException if the waiting thread is interrupted
102    */
103   public synchronized byte [] blockUntilAvailable()
104   throws InterruptedException {
105     return blockUntilAvailable(0, false);
106   }
107 
108   /**
109    * Gets the data of the node, blocking until the node is available or the
110    * specified timeout has elapsed.
111    *
112    * @param timeout maximum time to wait for the node data to be available,
113    * n milliseconds.  Pass 0 for no timeout.
114    * @return data of the node
115    * @throws InterruptedException if the waiting thread is interrupted
116    */
117   public synchronized byte [] blockUntilAvailable(long timeout, boolean refresh)
118   throws InterruptedException {
119     if (timeout < 0) throw new IllegalArgumentException();
120     boolean notimeout = timeout == 0;
121     long startTime = System.currentTimeMillis();
122     long remaining = timeout;
123     if (refresh) {
124       try {
125         // This does not create a watch if the node does not exists
126         this.data = ZKUtil.getDataAndWatch(watcher, node);
127       } catch(KeeperException e) {
128         // We use to abort here, but in some cases the abort is ignored (
129         //  (empty Abortable), so it's better to log...
130         LOG.warn("Unexpected exception handling blockUntilAvailable", e);
131         abortable.abort("Unexpected exception handling blockUntilAvailable", e);
132       }
133     }
134     boolean nodeExistsChecked = (!refresh ||data!=null);
135     while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
136       if (!nodeExistsChecked) {
137         try {
138           nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1);
139         } catch (KeeperException e) {
140           LOG.warn(
141             "Got exception while trying to check existence in  ZooKeeper" +
142             " of the node: "+node+", retrying if timeout not reached",e );
143         }
144 
145         // It did not exists, and now it does.
146         if (nodeExistsChecked){
147           LOG.debug("Node " + node + " now exists, resetting a watcher");
148           try {
149             // This does not create a watch if the node does not exists
150             this.data = ZKUtil.getDataAndWatch(watcher, node);
151           } catch (KeeperException e) {
152             LOG.warn("Unexpected exception handling blockUntilAvailable", e);
153             abortable.abort("Unexpected exception handling blockUntilAvailable", e);
154           }
155         }
156       }
157       // We expect a notification; but we wait with a
158       //  a timeout to lower the impact of a race condition if any
159       wait(100);
160       remaining = timeout - (System.currentTimeMillis() - startTime);
161     }
162     return this.data;
163   }
164 
165   /**
166    * Gets the data of the node.
167    *
168    * <p>If the node is currently available, the most up-to-date known version of
169    * the data is returned.  If the node is not currently available, null is
170    * returned.
171    * @param refresh whether to refresh the data by calling ZK directly.
172    * @return data of the node, null if unavailable
173    */
174   public synchronized byte [] getData(boolean refresh) {
175     if (refresh) {
176       try {
177         this.data = ZKUtil.getDataAndWatch(watcher, node);
178       } catch(KeeperException e) {
179         abortable.abort("Unexpected exception handling getData", e);
180       }
181     }
182     return this.data;
183   }
184 
185   public String getNode() {
186     return this.node;
187   }
188 
189   @Override
190   public synchronized void nodeCreated(String path) {
191     if (!path.equals(node)) return;
192     try {
193       byte [] data = ZKUtil.getDataAndWatch(watcher, node);
194       if (data != null) {
195         this.data = data;
196         notifyAll();
197       } else {
198         nodeDeleted(path);
199       }
200     } catch(KeeperException e) {
201       abortable.abort("Unexpected exception handling nodeCreated event", e);
202     }
203   }
204 
205   @Override
206   public synchronized void nodeDeleted(String path) {
207     if(path.equals(node)) {
208       try {
209         if(ZKUtil.watchAndCheckExists(watcher, node)) {
210           nodeCreated(path);
211         } else {
212           this.data = null;
213         }
214       } catch(KeeperException e) {
215         abortable.abort("Unexpected exception handling nodeDeleted event", e);
216       }
217     }
218   }
219 
220   @Override
221   public synchronized void nodeDataChanged(String path) {
222     if(path.equals(node)) {
223       nodeCreated(path);
224     }
225   }
226   
227   /**
228    * Checks if the baseznode set as per the property 'zookeeper.znode.parent'
229    * exists.
230    * @return true if baseznode exists.
231    *         false if doesnot exists.
232    */
233   public boolean checkIfBaseNodeAvailable() {
234     try {
235       if (ZKUtil.checkExists(watcher, watcher.baseZNode) == -1) {
236         return false;
237       }
238     } catch (KeeperException e) {
239       abortable
240           .abort(
241               "Exception while checking if basenode ("+watcher.baseZNode+
242                 ") exists in ZooKeeper.",
243               e);
244     }
245     return true;
246   }
247 
248   @Override
249   public String toString() {
250     return "ZooKeeperNodeTracker{" +
251         "node='" + node + ", stopped=" + stopped + '}';
252   }
253 }