001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.IOException;
021import java.nio.file.ClosedWatchServiceException;
022import java.nio.file.FileSystem;
023import java.nio.file.Path;
024import java.nio.file.StandardWatchEventKinds;
025import java.nio.file.WatchEvent;
026import java.nio.file.WatchKey;
027import java.nio.file.WatchService;
028import java.util.function.Consumer;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Instances of this class can be used to watch a directory for file changes. When a file is added
035 * to, deleted from, or is modified in the given directory, the callback provided by the user will
036 * be called from a background thread. Some things to keep in mind:
037 * <ul>
038 * <li>The callback should be thread-safe.</li>
039 * <li>Changes that happen around the time the thread is started may be missed.</li>
040 * <li>There is a delay between a file changing and the callback firing.</li>
041 * <li>The watch is not recursive - changes to subdirectories will not trigger a callback.</li>
042 * </ul>
043 * <p/>
044 * This file has been copied from the Apache ZooKeeper project.
045 * @see <a href=
046 *      "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java">Base
047 *      revision</a>
048 */
049@InterfaceAudience.Private
050public final class FileChangeWatcher {
051
052  private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class);
053
054  public enum State {
055    NEW, // object created but start() not called yet
056    STARTING, // start() called but background thread has not entered main loop
057    RUNNING, // background thread is running
058    STOPPING, // stop() called but background thread has not exited main loop
059    STOPPED // stop() called and background thread has exited, or background thread crashed
060  }
061
062  private final WatcherThread watcherThread;
063  private State state; // protected by synchronized(this)
064
065  /**
066   * Creates a watcher that watches <code>dirPath</code> and invokes <code>callback</code> on
067   * changes.
068   * @param dirPath  the directory to watch.
069   * @param callback the callback to invoke with events. <code>event.kind()</code> will return the
070   *                 type of event, and <code>event.context()</code> will return the filename
071   *                 relative to <code>dirPath</code>.
072   * @throws IOException if there is an error creating the WatchService.
073   */
074  public FileChangeWatcher(Path dirPath, String threadNameSuffix, Consumer<WatchEvent<?>> callback)
075    throws IOException {
076    FileSystem fs = dirPath.getFileSystem();
077    WatchService watchService = fs.newWatchService();
078
079    LOG.debug("Registering with watch service: {}", dirPath);
080
081    dirPath.register(watchService,
082      new WatchEvent.Kind<?>[] { StandardWatchEventKinds.ENTRY_CREATE,
083        StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY,
084        StandardWatchEventKinds.OVERFLOW });
085    state = State.NEW;
086    this.watcherThread = new WatcherThread(threadNameSuffix, watchService, callback);
087    this.watcherThread.setDaemon(true);
088  }
089
090  /**
091   * Returns the current {@link FileChangeWatcher.State}.
092   * @return the current state.
093   */
094  public synchronized State getState() {
095    return state;
096  }
097
098  /**
099   * Blocks until the current state becomes <code>desiredState</code>. Currently only used by tests,
100   * thus package-private.
101   * @param desiredState the desired state.
102   * @throws InterruptedException if the current thread gets interrupted.
103   */
104  synchronized void waitForState(State desiredState) throws InterruptedException {
105    while (this.state != desiredState) {
106      this.wait();
107    }
108  }
109
110  /**
111   * Sets the state to <code>newState</code>.
112   * @param newState the new state.
113   */
114  private synchronized void setState(State newState) {
115    state = newState;
116    this.notifyAll();
117  }
118
119  /**
120   * Atomically sets the state to <code>update</code> if and only if the state is currently
121   * <code>expected</code>.
122   * @param expected the expected state.
123   * @param update   the new state.
124   * @return true if the update succeeds, or false if the current state does not equal
125   *         <code>expected</code>.
126   */
127  private synchronized boolean compareAndSetState(State expected, State update) {
128    if (state == expected) {
129      setState(update);
130      return true;
131    } else {
132      return false;
133    }
134  }
135
136  /**
137   * Atomically sets the state to <code>update</code> if and only if the state is currently one of
138   * <code>expectedStates</code>.
139   * @param expectedStates the expected states.
140   * @param update         the new state.
141   * @return true if the update succeeds, or false if the current state does not equal any of the
142   *         <code>expectedStates</code>.
143   */
144  private synchronized boolean compareAndSetState(State[] expectedStates, State update) {
145    for (State expected : expectedStates) {
146      if (state == expected) {
147        setState(update);
148        return true;
149      }
150    }
151    return false;
152  }
153
154  /**
155   * Tells the background thread to start. Does not wait for it to be running. Calling this method
156   * more than once has no effect.
157   */
158  public void start() {
159    if (!compareAndSetState(State.NEW, State.STARTING)) {
160      // If previous state was not NEW, start() has already been called.
161      return;
162    }
163    this.watcherThread.start();
164  }
165
166  /**
167   * Tells the background thread to stop. Does not wait for it to exit.
168   */
169  public void stop() {
170    if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) {
171      watcherThread.interrupt();
172    }
173  }
174
175  String getWatcherThreadName() {
176    return watcherThread.getName();
177  }
178
179  private static void handleException(Thread thread, Throwable e) {
180    LOG.warn("Exception occurred from thread {}", thread.getName(), e);
181  }
182
183  /**
184   * Inner class that implements the watcher thread logic.
185   */
186  private class WatcherThread extends Thread {
187
188    private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-";
189
190    final WatchService watchService;
191    final Consumer<WatchEvent<?>> callback;
192
193    WatcherThread(String threadNameSuffix, WatchService watchService,
194      Consumer<WatchEvent<?>> callback) {
195      super(THREAD_NAME_PREFIX + threadNameSuffix);
196      this.watchService = watchService;
197      this.callback = callback;
198      setUncaughtExceptionHandler(FileChangeWatcher::handleException);
199    }
200
201    @Override
202    public void run() {
203      try {
204        LOG.info("{} thread started", getName());
205        if (
206          !compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING)
207        ) {
208          // stop() called shortly after start(), before
209          // this thread started running.
210          FileChangeWatcher.State state = FileChangeWatcher.this.getState();
211          if (state != FileChangeWatcher.State.STOPPING) {
212            throw new IllegalStateException("Unexpected state: " + state);
213          }
214          return;
215        }
216        runLoop();
217      } catch (Exception e) {
218        LOG.warn("Error in runLoop()", e);
219        throw e;
220      } finally {
221        try {
222          watchService.close();
223        } catch (IOException e) {
224          LOG.warn("Error closing watch service", e);
225        }
226        LOG.info("{} thread finished", getName());
227        FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED);
228      }
229    }
230
231    private void runLoop() {
232      while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) {
233        WatchKey key;
234        try {
235          key = watchService.take();
236        } catch (InterruptedException | ClosedWatchServiceException e) {
237          LOG.debug("{} was interrupted and is shutting down...", getName());
238          break;
239        }
240        for (WatchEvent<?> event : key.pollEvents()) {
241          LOG.debug("Got file changed event: {} with context: {}", event.kind(), event.context());
242          try {
243            callback.accept(event);
244          } catch (Throwable e) {
245            LOG.error("Error from callback", e);
246          }
247        }
248        boolean isKeyValid = key.reset();
249        if (!isKeyValid) {
250          // This is likely a problem, it means that file reloading is broken, probably because the
251          // directory we are watching was deleted or otherwise became inaccessible (unmounted,
252          // permissions
253          // changed, ???).
254          // For now, we log an error and exit the watcher thread.
255          LOG.error("Watch key no longer valid, maybe the directory is inaccessible?");
256          break;
257        }
258      }
259    }
260  }
261}