001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import org.apache.hadoop.hbase.Abortable;
022import org.apache.yetus.audience.InterfaceAudience;
023import org.apache.zookeeper.KeeperException;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * Tracks the availability and value of a single ZooKeeper node.
029 *
030 * <p>Utilizes the {@link ZKListener} interface to get the necessary
031 * ZooKeeper events related to the node.
032 *
033 * <p>This is the base class used by trackers in both the Master and
034 * RegionServers.
035 */
036@InterfaceAudience.Private
037public abstract class ZKNodeTracker extends ZKListener {
038  // LOG is being used in subclasses, hence keeping it protected
039  protected static final Logger LOG = LoggerFactory.getLogger(ZKNodeTracker.class);
040  /** Path of node being tracked */
041  protected final String node;
042
043  /** Data of the node being tracked */
044  private byte [] data;
045
046  /** Used to abort if a fatal error occurs */
047  protected final Abortable abortable;
048
049  private boolean stopped = false;
050
051  /**
052   * Constructs a new ZK node tracker.
053   *
054   * <p>After construction, use {@link #start} to kick off tracking.
055   *
056   * @param watcher reference to the {@link ZKWatcher} which also contains configuration and
057   *                constants
058   * @param node path of the node being tracked
059   * @param abortable used to abort if a fatal error occurs
060   */
061  public ZKNodeTracker(ZKWatcher watcher, String node,
062                       Abortable abortable) {
063    super(watcher);
064    this.node = node;
065    this.abortable = abortable;
066    this.data = null;
067  }
068
069  /**
070   * Starts the tracking of the node in ZooKeeper.
071   *
072   * <p>Use {@link #blockUntilAvailable()} to block until the node is available
073   * or {@link #getData(boolean)} to get the data of the node if it is available.
074   */
075  public synchronized void start() {
076    this.watcher.registerListener(this);
077    try {
078      if(ZKUtil.watchAndCheckExists(watcher, node)) {
079        byte [] data = ZKUtil.getDataAndWatch(watcher, node);
080        if(data != null) {
081          this.data = data;
082        } else {
083          // It existed but now does not, try again to ensure a watch is set
084          LOG.debug("Try starting again because there is no data from {}", node);
085          start();
086        }
087      }
088    } catch (KeeperException e) {
089      abortable.abort("Unexpected exception during initialization, aborting", e);
090    }
091  }
092
093  public synchronized void stop() {
094    this.stopped = true;
095    notifyAll();
096  }
097
098  /**
099   * Gets the data of the node, blocking until the node is available.
100   *
101   * @return data of the node
102   * @throws InterruptedException if the waiting thread is interrupted
103   */
104  public synchronized byte [] blockUntilAvailable()
105    throws InterruptedException {
106    return blockUntilAvailable(0, false);
107  }
108
109  /**
110   * Gets the data of the node, blocking until the node is available or the
111   * specified timeout has elapsed.
112   *
113   * @param timeout maximum time to wait for the node data to be available, n milliseconds. Pass 0
114   *                for no timeout.
115   * @return data of the node
116   * @throws InterruptedException if the waiting thread is interrupted
117   */
118  public synchronized byte [] blockUntilAvailable(long timeout, boolean refresh)
119          throws InterruptedException {
120    if (timeout < 0) {
121      throw new IllegalArgumentException();
122    }
123
124    boolean notimeout = timeout == 0;
125    long startTime = System.currentTimeMillis();
126    long remaining = timeout;
127    if (refresh) {
128      try {
129        // This does not create a watch if the node does not exists
130        this.data = ZKUtil.getDataAndWatch(watcher, node);
131      } catch(KeeperException e) {
132        // We use to abort here, but in some cases the abort is ignored (
133        //  (empty Abortable), so it's better to log...
134        LOG.warn("Unexpected exception handling blockUntilAvailable", e);
135        abortable.abort("Unexpected exception handling blockUntilAvailable", e);
136      }
137    }
138    boolean nodeExistsChecked = (!refresh ||data!=null);
139    while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
140      if (!nodeExistsChecked) {
141        try {
142          nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1);
143        } catch (KeeperException e) {
144          LOG.warn("Got exception while trying to check existence in  ZooKeeper" +
145            " of the node: " + node + ", retrying if timeout not reached", e);
146        }
147
148        // It did not exists, and now it does.
149        if (nodeExistsChecked){
150          LOG.debug("Node {} now exists, resetting a watcher", node);
151          try {
152            // This does not create a watch if the node does not exists
153            this.data = ZKUtil.getDataAndWatch(watcher, node);
154          } catch (KeeperException e) {
155            LOG.warn("Unexpected exception handling blockUntilAvailable", e);
156            abortable.abort("Unexpected exception handling blockUntilAvailable", e);
157          }
158        }
159      }
160      // We expect a notification; but we wait with a
161      //  a timeout to lower the impact of a race condition if any
162      wait(100);
163      remaining = timeout - (System.currentTimeMillis() - startTime);
164    }
165    return this.data;
166  }
167
168  /**
169   * Gets the data of the node.
170   *
171   * <p>If the node is currently available, the most up-to-date known version of
172   * the data is returned.  If the node is not currently available, null is
173   * returned.
174   * @param refresh whether to refresh the data by calling ZK directly.
175   * @return data of the node, null if unavailable
176   */
177  public synchronized byte [] getData(boolean refresh) {
178    if (refresh) {
179      try {
180        this.data = ZKUtil.getDataAndWatch(watcher, node);
181      } catch(KeeperException e) {
182        abortable.abort("Unexpected exception handling getData", e);
183      }
184    }
185    return this.data;
186  }
187
188  public String getNode() {
189    return this.node;
190  }
191
192  @Override
193  public synchronized void nodeCreated(String path) {
194    if (!path.equals(node)) {
195      return;
196    }
197
198    try {
199      byte [] data = ZKUtil.getDataAndWatch(watcher, node);
200      if (data != null) {
201        this.data = data;
202        notifyAll();
203      } else {
204        nodeDeleted(path);
205      }
206    } catch(KeeperException e) {
207      abortable.abort("Unexpected exception handling nodeCreated event", e);
208    }
209  }
210
211  @Override
212  public synchronized void nodeDeleted(String path) {
213    if(path.equals(node)) {
214      try {
215        if(ZKUtil.watchAndCheckExists(watcher, node)) {
216          nodeCreated(path);
217        } else {
218          this.data = null;
219        }
220      } catch(KeeperException e) {
221        abortable.abort("Unexpected exception handling nodeDeleted event", e);
222      }
223    }
224  }
225
226  @Override
227  public synchronized void nodeDataChanged(String path) {
228    if(path.equals(node)) {
229      nodeCreated(path);
230    }
231  }
232
233  /**
234   * Checks if the baseznode set as per the property 'zookeeper.znode.parent'
235   * exists.
236   * @return true if baseznode exists.
237   *         false if doesnot exists.
238   */
239  public boolean checkIfBaseNodeAvailable() {
240    try {
241      if (ZKUtil.checkExists(watcher, watcher.getZNodePaths().baseZNode) == -1) {
242        return false;
243      }
244    } catch (KeeperException e) {
245      abortable.abort("Exception while checking if basenode (" + watcher.getZNodePaths().baseZNode
246          + ") exists in ZooKeeper.",
247        e);
248    }
249    return true;
250  }
251
252  @Override
253  public String toString() {
254    return "ZKNodeTracker{" +
255        "node='" + node + ", stopped=" + stopped + '}';
256  }
257}