001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.nio.file.Files; 023import java.nio.file.Path; 024import java.nio.file.attribute.BasicFileAttributes; 025import java.nio.file.attribute.FileTime; 026import java.time.Duration; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * Instances of this class can be used to watch a file for changes. When a file's modification time 033 * changes, the callback provided by the user will be called from a background thread. Modification 034 * are detected by checking the file's attributes every polling interval. Some things to keep in 035 * mind: 036 * <ul> 037 * <li>The callback should be thread-safe.</li> 038 * <li>Changes that happen around the time the thread is started may be missed.</li> 039 * <li>There is a delay between a file changing and the callback firing.</li> 040 * </ul> 041 * <p/> 042 * This file was originally copied from the Apache ZooKeeper project, and then modified. 043 * @see <a href= 044 * "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java">Base 045 * revision</a> 046 */ 047@InterfaceAudience.Private 048public final class FileChangeWatcher { 049 050 public interface FileChangeWatcherCallback { 051 void callback(Path path); 052 } 053 054 private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class); 055 056 enum State { 057 NEW, // object created but start() not called yet 058 STARTING, // start() called but background thread has not entered main loop 059 RUNNING, // background thread is running 060 STOPPING, // stop() called but background thread has not exited main loop 061 STOPPED // stop() called and background thread has exited, or background thread crashed 062 } 063 064 private final WatcherThread watcherThread; 065 private State state; // protected by synchronized(this) 066 private FileTime lastModifiedTime; 067 private final Object lastModifiedTimeLock; 068 private final Path filePath; 069 private final Duration pollInterval; 070 071 /** 072 * Creates a watcher that watches <code>filePath</code> and invokes <code>callback</code> on 073 * changes. 074 * @param filePath the file to watch. 075 * @param callback the callback to invoke with events. <code>event.kind()</code> will return the 076 * type of event, and <code>event.context()</code> will return the filename 077 * relative to <code>dirPath</code>. 078 * @throws IOException if there is an error creating the WatchService. 079 */ 080 public FileChangeWatcher(Path filePath, String threadNameSuffix, Duration pollInterval, 081 FileChangeWatcherCallback callback) throws IOException { 082 this.filePath = filePath; 083 this.pollInterval = pollInterval; 084 085 state = State.NEW; 086 lastModifiedTimeLock = new Object(); 087 lastModifiedTime = Files.readAttributes(filePath, BasicFileAttributes.class).lastModifiedTime(); 088 this.watcherThread = new WatcherThread(threadNameSuffix, callback); 089 this.watcherThread.setDaemon(true); 090 } 091 092 /** 093 * Returns the current {@link FileChangeWatcher.State}. 094 * @return the current state. 095 */ 096 private synchronized State getState() { 097 return state; 098 } 099 100 /** 101 * Blocks until the current state becomes <code>desiredState</code>. Currently only used by tests, 102 * thus package-private. 103 * @param desiredState the desired state. 104 * @throws InterruptedException if the current thread gets interrupted. 105 */ 106 synchronized void waitForState(State desiredState) throws InterruptedException { 107 while (this.state != desiredState) { 108 this.wait(); 109 } 110 } 111 112 /** 113 * Sets the state to <code>newState</code>. 114 * @param newState the new state. 115 */ 116 private synchronized void setState(State newState) { 117 state = newState; 118 this.notifyAll(); 119 } 120 121 /** 122 * Atomically sets the state to <code>update</code> if and only if the state is currently 123 * <code>expected</code>. 124 * @param expected the expected state. 125 * @param update the new state. 126 * @return true if the update succeeds, or false if the current state does not equal 127 * <code>expected</code>. 128 */ 129 private synchronized boolean compareAndSetState(State expected, State update) { 130 if (state == expected) { 131 setState(update); 132 return true; 133 } else { 134 return false; 135 } 136 } 137 138 /** 139 * Atomically sets the state to <code>update</code> if and only if the state is currently one of 140 * <code>expectedStates</code>. 141 * @param expectedStates the expected states. 142 * @param update the new state. 143 * @return true if the update succeeds, or false if the current state does not equal any of the 144 * <code>expectedStates</code>. 145 */ 146 private synchronized boolean compareAndSetState(State[] expectedStates, State update) { 147 for (State expected : expectedStates) { 148 if (state == expected) { 149 setState(update); 150 return true; 151 } 152 } 153 return false; 154 } 155 156 /** 157 * Tells the background thread to start. Does not wait for it to be running. Calling this method 158 * more than once has no effect. 159 */ 160 public void start() { 161 if (!compareAndSetState(State.NEW, State.STARTING)) { 162 // If previous state was not NEW, start() has already been called. 163 return; 164 } 165 this.watcherThread.start(); 166 } 167 168 /** 169 * Tells the background thread to stop. Does not wait for it to exit. 170 */ 171 public void stop() { 172 if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) { 173 watcherThread.interrupt(); 174 } 175 } 176 177 @RestrictedApi(explanation = "Should only be called in tests", link = "", 178 allowedOnPath = ".*/src/test/.*") 179 Thread getWatcherThread() { 180 return watcherThread; 181 } 182 183 private static void handleException(Thread thread, Throwable e) { 184 LOG.warn("Exception occurred from thread {}", thread.getName(), e); 185 } 186 187 /** 188 * Inner class that implements the watcher thread logic. 189 */ 190 private class WatcherThread extends Thread { 191 192 private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-"; 193 194 final FileChangeWatcherCallback callback; 195 196 WatcherThread(String threadNameSuffix, FileChangeWatcherCallback callback) { 197 super(THREAD_NAME_PREFIX + threadNameSuffix); 198 this.callback = callback; 199 setUncaughtExceptionHandler(FileChangeWatcher::handleException); 200 } 201 202 @Override 203 public void run() { 204 try { 205 LOG.debug("{} thread started", getName()); 206 if ( 207 !compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING) 208 ) { 209 // stop() called shortly after start(), before 210 // this thread started running. 211 FileChangeWatcher.State state = FileChangeWatcher.this.getState(); 212 if (state != FileChangeWatcher.State.STOPPING) { 213 throw new IllegalStateException("Unexpected state: " + state); 214 } 215 return; 216 } 217 runLoop(); 218 } catch (Exception e) { 219 LOG.warn("Error in runLoop()", e); 220 throw new RuntimeException(e); 221 } finally { 222 LOG.debug("{} thread finished", getName()); 223 FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED); 224 } 225 } 226 227 private void runLoop() throws IOException { 228 while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) { 229 BasicFileAttributes attributes = Files.readAttributes(filePath, BasicFileAttributes.class); 230 boolean modified = false; 231 synchronized (lastModifiedTimeLock) { 232 FileTime maybeNewLastModifiedTime = attributes.lastModifiedTime(); 233 if (!lastModifiedTime.equals(maybeNewLastModifiedTime)) { 234 modified = true; 235 lastModifiedTime = maybeNewLastModifiedTime; 236 } 237 } 238 239 // avoid calling callback while holding lock 240 if (modified) { 241 try { 242 callback.callback(filePath); 243 } catch (Throwable e) { 244 LOG.error("Error from callback", e); 245 } 246 } 247 248 try { 249 Thread.sleep(pollInterval.toMillis()); 250 } catch (InterruptedException e) { 251 LOG.debug("Interrupted", e); 252 Thread.currentThread().interrupt(); 253 return; 254 } 255 } 256 } 257 } 258}