001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.nio.file.Files;
023import java.nio.file.Path;
024import java.nio.file.attribute.BasicFileAttributes;
025import java.nio.file.attribute.FileTime;
026import java.time.Duration;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * Instances of this class can be used to watch a file for changes. When a file's modification time
033 * changes, the callback provided by the user will be called from a background thread. Modification
034 * are detected by checking the file's attributes every polling interval. Some things to keep in
035 * mind:
036 * <ul>
037 * <li>The callback should be thread-safe.</li>
038 * <li>Changes that happen around the time the thread is started may be missed.</li>
039 * <li>There is a delay between a file changing and the callback firing.</li>
040 * </ul>
041 * <p/>
042 * This file was originally copied from the Apache ZooKeeper project, and then modified.
043 * @see <a href=
044 *      "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java">Base
045 *      revision</a>
046 */
047@InterfaceAudience.Private
048public final class FileChangeWatcher {
049
050  public interface FileChangeWatcherCallback {
051    void callback(Path path);
052  }
053
054  private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class);
055
056  enum State {
057    NEW, // object created but start() not called yet
058    STARTING, // start() called but background thread has not entered main loop
059    RUNNING, // background thread is running
060    STOPPING, // stop() called but background thread has not exited main loop
061    STOPPED // stop() called and background thread has exited, or background thread crashed
062  }
063
064  private final WatcherThread watcherThread;
065  private State state; // protected by synchronized(this)
066  private FileTime lastModifiedTime;
067  private final Object lastModifiedTimeLock;
068  private final Path filePath;
069  private final Duration pollInterval;
070
071  /**
072   * Creates a watcher that watches <code>filePath</code> and invokes <code>callback</code> on
073   * changes.
074   * @param filePath the file to watch.
075   * @param callback the callback to invoke with events. <code>event.kind()</code> will return the
076   *                 type of event, and <code>event.context()</code> will return the filename
077   *                 relative to <code>dirPath</code>.
078   * @throws IOException if there is an error creating the WatchService.
079   */
080  public FileChangeWatcher(Path filePath, String threadNameSuffix, Duration pollInterval,
081    FileChangeWatcherCallback callback) throws IOException {
082    this.filePath = filePath;
083    this.pollInterval = pollInterval;
084
085    state = State.NEW;
086    lastModifiedTimeLock = new Object();
087    lastModifiedTime = Files.readAttributes(filePath, BasicFileAttributes.class).lastModifiedTime();
088    this.watcherThread = new WatcherThread(threadNameSuffix, callback);
089    this.watcherThread.setDaemon(true);
090  }
091
092  /**
093   * Returns the current {@link FileChangeWatcher.State}.
094   * @return the current state.
095   */
096  private synchronized State getState() {
097    return state;
098  }
099
100  /**
101   * Blocks until the current state becomes <code>desiredState</code>. Currently only used by tests,
102   * thus package-private.
103   * @param desiredState the desired state.
104   * @throws InterruptedException if the current thread gets interrupted.
105   */
106  synchronized void waitForState(State desiredState) throws InterruptedException {
107    while (this.state != desiredState) {
108      this.wait();
109    }
110  }
111
112  /**
113   * Sets the state to <code>newState</code>.
114   * @param newState the new state.
115   */
116  private synchronized void setState(State newState) {
117    state = newState;
118    this.notifyAll();
119  }
120
121  /**
122   * Atomically sets the state to <code>update</code> if and only if the state is currently
123   * <code>expected</code>.
124   * @param expected the expected state.
125   * @param update   the new state.
126   * @return true if the update succeeds, or false if the current state does not equal
127   *         <code>expected</code>.
128   */
129  private synchronized boolean compareAndSetState(State expected, State update) {
130    if (state == expected) {
131      setState(update);
132      return true;
133    } else {
134      return false;
135    }
136  }
137
138  /**
139   * Atomically sets the state to <code>update</code> if and only if the state is currently one of
140   * <code>expectedStates</code>.
141   * @param expectedStates the expected states.
142   * @param update         the new state.
143   * @return true if the update succeeds, or false if the current state does not equal any of the
144   *         <code>expectedStates</code>.
145   */
146  private synchronized boolean compareAndSetState(State[] expectedStates, State update) {
147    for (State expected : expectedStates) {
148      if (state == expected) {
149        setState(update);
150        return true;
151      }
152    }
153    return false;
154  }
155
156  /**
157   * Tells the background thread to start. Does not wait for it to be running. Calling this method
158   * more than once has no effect.
159   */
160  public void start() {
161    if (!compareAndSetState(State.NEW, State.STARTING)) {
162      // If previous state was not NEW, start() has already been called.
163      return;
164    }
165    this.watcherThread.start();
166  }
167
168  /**
169   * Tells the background thread to stop. Does not wait for it to exit.
170   */
171  public void stop() {
172    if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) {
173      watcherThread.interrupt();
174    }
175  }
176
177  @RestrictedApi(explanation = "Should only be called in tests", link = "",
178      allowedOnPath = ".*/src/test/.*")
179  Thread getWatcherThread() {
180    return watcherThread;
181  }
182
183  private static void handleException(Thread thread, Throwable e) {
184    LOG.warn("Exception occurred from thread {}", thread.getName(), e);
185  }
186
187  /**
188   * Inner class that implements the watcher thread logic.
189   */
190  private class WatcherThread extends Thread {
191
192    private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-";
193
194    final FileChangeWatcherCallback callback;
195
196    WatcherThread(String threadNameSuffix, FileChangeWatcherCallback callback) {
197      super(THREAD_NAME_PREFIX + threadNameSuffix);
198      this.callback = callback;
199      setUncaughtExceptionHandler(FileChangeWatcher::handleException);
200    }
201
202    @Override
203    public void run() {
204      try {
205        LOG.debug("{} thread started", getName());
206        if (
207          !compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING)
208        ) {
209          // stop() called shortly after start(), before
210          // this thread started running.
211          FileChangeWatcher.State state = FileChangeWatcher.this.getState();
212          if (state != FileChangeWatcher.State.STOPPING) {
213            throw new IllegalStateException("Unexpected state: " + state);
214          }
215          return;
216        }
217        runLoop();
218      } catch (Exception e) {
219        LOG.warn("Error in runLoop()", e);
220        throw new RuntimeException(e);
221      } finally {
222        LOG.debug("{} thread finished", getName());
223        FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED);
224      }
225    }
226
227    private void runLoop() throws IOException {
228      while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) {
229        BasicFileAttributes attributes = Files.readAttributes(filePath, BasicFileAttributes.class);
230        boolean modified = false;
231        synchronized (lastModifiedTimeLock) {
232          FileTime maybeNewLastModifiedTime = attributes.lastModifiedTime();
233          if (!lastModifiedTime.equals(maybeNewLastModifiedTime)) {
234            modified = true;
235            lastModifiedTime = maybeNewLastModifiedTime;
236          }
237        }
238
239        // avoid calling callback while holding lock
240        if (modified) {
241          try {
242            callback.callback(filePath);
243          } catch (Throwable e) {
244            LOG.error("Error from callback", e);
245          }
246        }
247
248        try {
249          Thread.sleep(pollInterval.toMillis());
250        } catch (InterruptedException e) {
251          LOG.debug("Interrupted", e);
252          Thread.currentThread().interrupt();
253          return;
254        }
255      }
256    }
257  }
258}