View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.Abortable;
25  import org.apache.zookeeper.KeeperException;
26  
27  /**
28   * Tracks the availability and value of a single ZooKeeper node.
29   *
30   * <p>Utilizes the {@link ZooKeeperListener} interface to get the necessary
31   * ZooKeeper events related to the node.
32   *
33   * <p>This is the base class used by trackers in both the Master and
34   * RegionServers.
35   */
36  @InterfaceAudience.Private
37  public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
38    static final Log LOG = LogFactory.getLog(ZooKeeperNodeTracker.class);
39    /** Path of node being tracked */
40    protected final String node;
41  
42    /** Data of the node being tracked */
43    private byte [] data;
44  
45    /** Used to abort if a fatal error occurs */
46    protected final Abortable abortable;
47  
48    private boolean stopped = false;
49  
50    /**
51     * Constructs a new ZK node tracker.
52     *
53     * <p>After construction, use {@link #start} to kick off tracking.
54     *
55     * @param watcher
56     * @param node
57     * @param abortable
58     */
59    public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node,
60        Abortable abortable) {
61      super(watcher);
62      this.node = node;
63      this.abortable = abortable;
64      this.data = null;
65    }
66  
67    /**
68     * Starts the tracking of the node in ZooKeeper.
69     *
70     * <p>Use {@link #blockUntilAvailable()} to block until the node is available
71     * or {@link #getData(boolean)} to get the data of the node if it is available.
72     */
73    public synchronized void start() {
74      this.watcher.registerListener(this);
75      try {
76        if(ZKUtil.watchAndCheckExists(watcher, node)) {
77          byte [] data = ZKUtil.getDataAndWatch(watcher, node);
78          if(data != null) {
79            this.data = data;
80          } else {
81            // It existed but now does not, try again to ensure a watch is set
82            LOG.debug("Try starting again because there is no data from " + node);
83            start();
84          }
85        }
86      } catch (KeeperException e) {
87        abortable.abort("Unexpected exception during initialization, aborting", e);
88      }
89    }
90  
91    public synchronized void stop() {
92      this.stopped = true;
93      notifyAll();
94    }
95  
96    /**
97     * Gets the data of the node, blocking until the node is available.
98     *
99     * @return data of the node
100    * @throws InterruptedException if the waiting thread is interrupted
101    */
102   public synchronized byte [] blockUntilAvailable()
103   throws InterruptedException {
104     return blockUntilAvailable(0, false);
105   }
106 
107   /**
108    * Gets the data of the node, blocking until the node is available or the
109    * specified timeout has elapsed.
110    *
111    * @param timeout maximum time to wait for the node data to be available,
112    * n milliseconds.  Pass 0 for no timeout.
113    * @return data of the node
114    * @throws InterruptedException if the waiting thread is interrupted
115    */
116   public synchronized byte [] blockUntilAvailable(long timeout, boolean refresh)
117   throws InterruptedException {
118     if (timeout < 0) throw new IllegalArgumentException();
119     boolean notimeout = timeout == 0;
120     long startTime = System.currentTimeMillis();
121     long remaining = timeout;
122     if (refresh) {
123       try {
124         // This does not create a watch if the node does not exists
125         this.data = ZKUtil.getDataAndWatch(watcher, node);
126       } catch(KeeperException e) {
127         // We use to abort here, but in some cases the abort is ignored (
128         //  (empty Abortable), so it's better to log...
129         LOG.warn("Unexpected exception handling blockUntilAvailable", e);
130         abortable.abort("Unexpected exception handling blockUntilAvailable", e);
131       }
132     }
133     boolean nodeExistsChecked = (!refresh ||data!=null);
134     while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
135       if (!nodeExistsChecked) {
136         try {
137           nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1);
138         } catch (KeeperException e) {
139           LOG.warn(
140             "Got exception while trying to check existence in  ZooKeeper" +
141             " of the node: "+node+", retrying if timeout not reached",e );
142         }
143 
144         // It did not exists, and now it does.
145         if (nodeExistsChecked){
146           LOG.debug("Node " + node + " now exists, resetting a watcher");
147           try {
148             // This does not create a watch if the node does not exists
149             this.data = ZKUtil.getDataAndWatch(watcher, node);
150           } catch (KeeperException e) {
151             LOG.warn("Unexpected exception handling blockUntilAvailable", e);
152             abortable.abort("Unexpected exception handling blockUntilAvailable", e);
153           }
154         }
155       }
156       // We expect a notification; but we wait with a
157       //  a timeout to lower the impact of a race condition if any
158       wait(100);
159       remaining = timeout - (System.currentTimeMillis() - startTime);
160     }
161     return this.data;
162   }
163 
164   /**
165    * Gets the data of the node.
166    *
167    * <p>If the node is currently available, the most up-to-date known version of
168    * the data is returned.  If the node is not currently available, null is
169    * returned.
170    * @param refresh whether to refresh the data by calling ZK directly.
171    * @return data of the node, null if unavailable
172    */
173   public synchronized byte [] getData(boolean refresh) {
174     if (refresh) {
175       try {
176         this.data = ZKUtil.getDataAndWatch(watcher, node);
177       } catch(KeeperException e) {
178         abortable.abort("Unexpected exception handling getData", e);
179       }
180     }
181     return this.data;
182   }
183 
184   public String getNode() {
185     return this.node;
186   }
187 
188   @Override
189   public synchronized void nodeCreated(String path) {
190     if (!path.equals(node)) return;
191     try {
192       byte [] data = ZKUtil.getDataAndWatch(watcher, node);
193       if (data != null) {
194         this.data = data;
195         notifyAll();
196       } else {
197         nodeDeleted(path);
198       }
199     } catch(KeeperException e) {
200       abortable.abort("Unexpected exception handling nodeCreated event", e);
201     }
202   }
203 
204   @Override
205   public synchronized void nodeDeleted(String path) {
206     if(path.equals(node)) {
207       try {
208         if(ZKUtil.watchAndCheckExists(watcher, node)) {
209           nodeCreated(path);
210         } else {
211           this.data = null;
212         }
213       } catch(KeeperException e) {
214         abortable.abort("Unexpected exception handling nodeDeleted event", e);
215       }
216     }
217   }
218 
219   @Override
220   public synchronized void nodeDataChanged(String path) {
221     if(path.equals(node)) {
222       nodeCreated(path);
223     }
224   }
225   
226   /**
227    * Checks if the baseznode set as per the property 'zookeeper.znode.parent'
228    * exists.
229    * @return true if baseznode exists.
230    *         false if doesnot exists.
231    */
232   public boolean checkIfBaseNodeAvailable() {
233     try {
234       if (ZKUtil.checkExists(watcher, watcher.baseZNode) == -1) {
235         return false;
236       }
237     } catch (KeeperException e) {
238       abortable
239           .abort(
240               "Exception while checking if basenode ("+watcher.baseZNode+
241                 ") exists in ZooKeeper.",
242               e);
243     }
244     return true;
245   }
246 
247   @Override
248   public String toString() {
249     return "ZooKeeperNodeTracker{" +
250         "node='" + node + ", stopped=" + stopped + '}';
251   }
252 }