001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.IOException; 021import java.nio.file.ClosedWatchServiceException; 022import java.nio.file.FileSystem; 023import java.nio.file.Path; 024import java.nio.file.StandardWatchEventKinds; 025import java.nio.file.WatchEvent; 026import java.nio.file.WatchKey; 027import java.nio.file.WatchService; 028import java.util.function.Consumer; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Instances of this class can be used to watch a directory for file changes. When a file is added 035 * to, deleted from, or is modified in the given directory, the callback provided by the user will 036 * be called from a background thread. Some things to keep in mind: 037 * <ul> 038 * <li>The callback should be thread-safe.</li> 039 * <li>Changes that happen around the time the thread is started may be missed.</li> 040 * <li>There is a delay between a file changing and the callback firing.</li> 041 * <li>The watch is not recursive - changes to subdirectories will not trigger a callback.</li> 042 * </ul> 043 * <p/> 044 * This file has been copied from the Apache ZooKeeper project. 045 * @see <a href= 046 * "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java">Base 047 * revision</a> 048 */ 049@InterfaceAudience.Private 050public final class FileChangeWatcher { 051 052 private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class); 053 054 public enum State { 055 NEW, // object created but start() not called yet 056 STARTING, // start() called but background thread has not entered main loop 057 RUNNING, // background thread is running 058 STOPPING, // stop() called but background thread has not exited main loop 059 STOPPED // stop() called and background thread has exited, or background thread crashed 060 } 061 062 private final WatcherThread watcherThread; 063 private State state; // protected by synchronized(this) 064 065 /** 066 * Creates a watcher that watches <code>dirPath</code> and invokes <code>callback</code> on 067 * changes. 068 * @param dirPath the directory to watch. 069 * @param callback the callback to invoke with events. <code>event.kind()</code> will return the 070 * type of event, and <code>event.context()</code> will return the filename 071 * relative to <code>dirPath</code>. 072 * @throws IOException if there is an error creating the WatchService. 073 */ 074 public FileChangeWatcher(Path dirPath, String threadNameSuffix, Consumer<WatchEvent<?>> callback) 075 throws IOException { 076 FileSystem fs = dirPath.getFileSystem(); 077 WatchService watchService = fs.newWatchService(); 078 079 LOG.debug("Registering with watch service: {}", dirPath); 080 081 dirPath.register(watchService, 082 new WatchEvent.Kind<?>[] { StandardWatchEventKinds.ENTRY_CREATE, 083 StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY, 084 StandardWatchEventKinds.OVERFLOW }); 085 state = State.NEW; 086 this.watcherThread = new WatcherThread(threadNameSuffix, watchService, callback); 087 this.watcherThread.setDaemon(true); 088 } 089 090 /** 091 * Returns the current {@link FileChangeWatcher.State}. 092 * @return the current state. 093 */ 094 public synchronized State getState() { 095 return state; 096 } 097 098 /** 099 * Blocks until the current state becomes <code>desiredState</code>. Currently only used by tests, 100 * thus package-private. 101 * @param desiredState the desired state. 102 * @throws InterruptedException if the current thread gets interrupted. 103 */ 104 synchronized void waitForState(State desiredState) throws InterruptedException { 105 while (this.state != desiredState) { 106 this.wait(); 107 } 108 } 109 110 /** 111 * Sets the state to <code>newState</code>. 112 * @param newState the new state. 113 */ 114 private synchronized void setState(State newState) { 115 state = newState; 116 this.notifyAll(); 117 } 118 119 /** 120 * Atomically sets the state to <code>update</code> if and only if the state is currently 121 * <code>expected</code>. 122 * @param expected the expected state. 123 * @param update the new state. 124 * @return true if the update succeeds, or false if the current state does not equal 125 * <code>expected</code>. 126 */ 127 private synchronized boolean compareAndSetState(State expected, State update) { 128 if (state == expected) { 129 setState(update); 130 return true; 131 } else { 132 return false; 133 } 134 } 135 136 /** 137 * Atomically sets the state to <code>update</code> if and only if the state is currently one of 138 * <code>expectedStates</code>. 139 * @param expectedStates the expected states. 140 * @param update the new state. 141 * @return true if the update succeeds, or false if the current state does not equal any of the 142 * <code>expectedStates</code>. 143 */ 144 private synchronized boolean compareAndSetState(State[] expectedStates, State update) { 145 for (State expected : expectedStates) { 146 if (state == expected) { 147 setState(update); 148 return true; 149 } 150 } 151 return false; 152 } 153 154 /** 155 * Tells the background thread to start. Does not wait for it to be running. Calling this method 156 * more than once has no effect. 157 */ 158 public void start() { 159 if (!compareAndSetState(State.NEW, State.STARTING)) { 160 // If previous state was not NEW, start() has already been called. 161 return; 162 } 163 this.watcherThread.start(); 164 } 165 166 /** 167 * Tells the background thread to stop. Does not wait for it to exit. 168 */ 169 public void stop() { 170 if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) { 171 watcherThread.interrupt(); 172 } 173 } 174 175 String getWatcherThreadName() { 176 return watcherThread.getName(); 177 } 178 179 private static void handleException(Thread thread, Throwable e) { 180 LOG.warn("Exception occurred from thread {}", thread.getName(), e); 181 } 182 183 /** 184 * Inner class that implements the watcher thread logic. 185 */ 186 private class WatcherThread extends Thread { 187 188 private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-"; 189 190 final WatchService watchService; 191 final Consumer<WatchEvent<?>> callback; 192 193 WatcherThread(String threadNameSuffix, WatchService watchService, 194 Consumer<WatchEvent<?>> callback) { 195 super(THREAD_NAME_PREFIX + threadNameSuffix); 196 this.watchService = watchService; 197 this.callback = callback; 198 setUncaughtExceptionHandler(FileChangeWatcher::handleException); 199 } 200 201 @Override 202 public void run() { 203 try { 204 LOG.info("{} thread started", getName()); 205 if ( 206 !compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING) 207 ) { 208 // stop() called shortly after start(), before 209 // this thread started running. 210 FileChangeWatcher.State state = FileChangeWatcher.this.getState(); 211 if (state != FileChangeWatcher.State.STOPPING) { 212 throw new IllegalStateException("Unexpected state: " + state); 213 } 214 return; 215 } 216 runLoop(); 217 } catch (Exception e) { 218 LOG.warn("Error in runLoop()", e); 219 throw e; 220 } finally { 221 try { 222 watchService.close(); 223 } catch (IOException e) { 224 LOG.warn("Error closing watch service", e); 225 } 226 LOG.info("{} thread finished", getName()); 227 FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED); 228 } 229 } 230 231 private void runLoop() { 232 while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) { 233 WatchKey key; 234 try { 235 key = watchService.take(); 236 } catch (InterruptedException | ClosedWatchServiceException e) { 237 LOG.debug("{} was interrupted and is shutting down...", getName()); 238 break; 239 } 240 for (WatchEvent<?> event : key.pollEvents()) { 241 LOG.debug("Got file changed event: {} with context: {}", event.kind(), event.context()); 242 try { 243 callback.accept(event); 244 } catch (Throwable e) { 245 LOG.error("Error from callback", e); 246 } 247 } 248 boolean isKeyValid = key.reset(); 249 if (!isKeyValid) { 250 // This is likely a problem, it means that file reloading is broken, probably because the 251 // directory we are watching was deleted or otherwise became inaccessible (unmounted, 252 // permissions 253 // changed, ???). 254 // For now, we log an error and exit the watcher thread. 255 LOG.error("Watch key no longer valid, maybe the directory is inaccessible?"); 256 break; 257 } 258 } 259 } 260 } 261}