001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import org.apache.hadoop.hbase.Abortable;
021import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
022import org.apache.yetus.audience.InterfaceAudience;
023import org.apache.zookeeper.KeeperException;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * Tracks the availability and value of a single ZooKeeper node.
029 * <p>
030 * Utilizes the {@link ZKListener} interface to get the necessary ZooKeeper events related to the
031 * node.
032 * <p>
033 * This is the base class used by trackers in both the Master and RegionServers.
034 */
035@InterfaceAudience.Private
036public abstract class ZKNodeTracker extends ZKListener {
037  // LOG is being used in subclasses, hence keeping it protected
038  protected static final Logger LOG = LoggerFactory.getLogger(ZKNodeTracker.class);
039  /** Path of node being tracked */
040  protected final String node;
041
042  /** Data of the node being tracked */
043  private byte[] data;
044
045  /** Used to abort if a fatal error occurs */
046  protected final Abortable abortable;
047
048  private boolean stopped = false;
049
050  /**
051   * Constructs a new ZK node tracker.
052   * <p/>
053   * After construction, use {@link #start} to kick off tracking.
054   * @param watcher   reference to the {@link ZKWatcher} which also contains configuration and
055   *                  constants
056   * @param node      path of the node being tracked
057   * @param abortable used to abort if a fatal error occurs
058   */
059  public ZKNodeTracker(ZKWatcher watcher, String node, Abortable abortable) {
060    super(watcher);
061    this.node = node;
062    this.abortable = abortable;
063    this.data = null;
064  }
065
066  /**
067   * Starts the tracking of the node in ZooKeeper.
068   * <p/>
069   * Use {@link #blockUntilAvailable()} to block until the node is available or
070   * {@link #getData(boolean)} to get the data of the node if it is available.
071   */
072  public synchronized void start() {
073    this.watcher.registerListener(this);
074    try {
075      if (ZKUtil.watchAndCheckExists(watcher, node)) {
076        byte[] data = ZKUtil.getDataAndWatch(watcher, node);
077        if (data != null) {
078          this.data = data;
079        } else {
080          // It existed but now does not, try again to ensure a watch is set
081          LOG.debug("Try starting again because there is no data from {}", node);
082          start();
083        }
084      }
085    } catch (KeeperException e) {
086      abortable.abort("Unexpected exception during initialization, aborting", e);
087    }
088    postStart();
089  }
090
091  /**
092   * Called after start is called. Sub classes could implement this method to load more data on zk.
093   */
094  protected void postStart() {
095  }
096
097  public synchronized void stop() {
098    this.stopped = true;
099    notifyAll();
100  }
101
102  /**
103   * Gets the data of the node, blocking until the node is available.
104   * @return data of the node
105   * @throws InterruptedException if the waiting thread is interrupted
106   */
107  public synchronized byte[] blockUntilAvailable() throws InterruptedException {
108    return blockUntilAvailable(0, false);
109  }
110
111  /**
112   * Gets the data of the node, blocking until the node is available or the specified timeout has
113   * elapsed.
114   * @param timeout maximum time to wait for the node data to be available, n milliseconds. Pass 0
115   *                for no timeout.
116   * @return data of the node
117   * @throws InterruptedException if the waiting thread is interrupted
118   */
119  public synchronized byte[] blockUntilAvailable(long timeout, boolean refresh)
120    throws InterruptedException {
121    if (timeout < 0) {
122      throw new IllegalArgumentException();
123    }
124
125    boolean notimeout = timeout == 0;
126    long startTime = EnvironmentEdgeManager.currentTime();
127    long remaining = timeout;
128    if (refresh) {
129      try {
130        // This does not create a watch if the node does not exists
131        this.data = ZKUtil.getDataAndWatch(watcher, node);
132      } catch (KeeperException e) {
133        // We use to abort here, but in some cases the abort is ignored (
134        // (empty Abortable), so it's better to log...
135        LOG.warn("Unexpected exception handling blockUntilAvailable", e);
136        abortable.abort("Unexpected exception handling blockUntilAvailable", e);
137      }
138    }
139    boolean nodeExistsChecked = (!refresh || data != null);
140    while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
141      if (!nodeExistsChecked) {
142        try {
143          nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1);
144        } catch (KeeperException e) {
145          LOG.warn("Got exception while trying to check existence in  ZooKeeper" + " of the node: "
146            + node + ", retrying if timeout not reached", e);
147        }
148
149        // It did not exists, and now it does.
150        if (nodeExistsChecked) {
151          LOG.debug("Node {} now exists, resetting a watcher", node);
152          try {
153            // This does not create a watch if the node does not exists
154            this.data = ZKUtil.getDataAndWatch(watcher, node);
155          } catch (KeeperException e) {
156            LOG.warn("Unexpected exception handling blockUntilAvailable", e);
157            abortable.abort("Unexpected exception handling blockUntilAvailable", e);
158          }
159        }
160      }
161      // We expect a notification; but we wait with a
162      // a timeout to lower the impact of a race condition if any
163      wait(100);
164      remaining = timeout - (EnvironmentEdgeManager.currentTime() - startTime);
165    }
166    return this.data;
167  }
168
169  /**
170   * Gets the data of the node.
171   * <p>
172   * If the node is currently available, the most up-to-date known version of the data is returned.
173   * If the node is not currently available, null is returned.
174   * @param refresh whether to refresh the data by calling ZK directly.
175   * @return data of the node, null if unavailable
176   */
177  public synchronized byte[] getData(boolean refresh) {
178    if (refresh) {
179      try {
180        this.data = ZKUtil.getDataAndWatch(watcher, node);
181      } catch (KeeperException e) {
182        abortable.abort("Unexpected exception handling getData", e);
183      }
184    }
185    return this.data;
186  }
187
188  public String getNode() {
189    return this.node;
190  }
191
192  @Override
193  public synchronized void nodeCreated(String path) {
194    if (!path.equals(node)) {
195      return;
196    }
197
198    try {
199      byte[] data = ZKUtil.getDataAndWatch(watcher, node);
200      if (data != null) {
201        this.data = data;
202        notifyAll();
203      } else {
204        nodeDeleted(path);
205      }
206    } catch (KeeperException e) {
207      abortable.abort("Unexpected exception handling nodeCreated event", e);
208    }
209  }
210
211  @Override
212  public synchronized void nodeDeleted(String path) {
213    if (path.equals(node)) {
214      try {
215        if (ZKUtil.watchAndCheckExists(watcher, node)) {
216          nodeCreated(path);
217        } else {
218          this.data = null;
219        }
220      } catch (KeeperException e) {
221        abortable.abort("Unexpected exception handling nodeDeleted event", e);
222      }
223    }
224  }
225
226  @Override
227  public synchronized void nodeDataChanged(String path) {
228    if (path.equals(node)) {
229      nodeCreated(path);
230    }
231  }
232
233  /**
234   * Checks if the baseznode set as per the property 'zookeeper.znode.parent' exists.
235   * @return true if baseznode exists. false if doesnot exists.
236   */
237  public boolean checkIfBaseNodeAvailable() {
238    try {
239      if (ZKUtil.checkExists(watcher, watcher.getZNodePaths().baseZNode) == -1) {
240        return false;
241      }
242    } catch (KeeperException e) {
243      abortable.abort("Exception while checking if basenode (" + watcher.getZNodePaths().baseZNode
244        + ") exists in ZooKeeper.", e);
245    }
246    return true;
247  }
248
249  @Override
250  public String toString() {
251    return "ZKNodeTracker{" + "node='" + node + ", stopped=" + stopped + '}';
252  }
253}