001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.File; 025import java.io.IOException; 026import java.nio.charset.StandardCharsets; 027import java.nio.file.StandardWatchEventKinds; 028import java.nio.file.WatchEvent; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.concurrent.atomic.AtomicInteger; 032import org.apache.commons.io.FileUtils; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 035import org.apache.hadoop.hbase.testclassification.IOTests; 036import org.apache.hadoop.hbase.testclassification.SmallTests; 037import org.junit.AfterClass; 038import org.junit.BeforeClass; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * This file has been copied from the Apache ZooKeeper project. 047 * @see <a href= 048 * "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base 049 * revision</a> 050 */ 051@Category({ IOTests.class, SmallTests.class }) 052public class TestFileChangeWatcher { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestFileChangeWatcher.class); 057 058 private static File tempDir; 059 private static File tempFile; 060 061 private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class); 062 private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); 063 064 private static final long FS_TIMEOUT = 30000L; 065 066 @BeforeClass 067 public static void createTempFile() throws IOException { 068 tempDir = new File(UTIL.getDataTestDir(TestFileChangeWatcher.class.getSimpleName()).toString()) 069 .getCanonicalFile(); 070 FileUtils.forceMkdir(tempDir); 071 tempFile = File.createTempFile("zk_test_", "", tempDir); 072 } 073 074 @AfterClass 075 public static void cleanupTempDir() { 076 UTIL.cleanupTestDir(); 077 } 078 079 @Test 080 public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { 081 FileChangeWatcher watcher = null; 082 try { 083 final List<WatchEvent<?>> events = new ArrayList<>(); 084 watcher = new FileChangeWatcher(tempDir.toPath(), event -> { 085 LOG.info("Got an update: {} {}", event.kind(), event.context()); 086 // Filter out the extra ENTRY_CREATE events that are 087 // sometimes seen at the start. Even though we create the watcher 088 // after the file exists, sometimes we still get a create event. 089 if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { 090 return; 091 } 092 synchronized (events) { 093 events.add(event); 094 events.notifyAll(); 095 } 096 }); 097 watcher.start(); 098 watcher.waitForState(FileChangeWatcher.State.RUNNING); 099 Thread.sleep(1000L); // TODO hack 100 for (int i = 0; i < 3; i++) { 101 LOG.info("Modifying file, attempt {}", (i + 1)); 102 FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8, 103 true); 104 synchronized (events) { 105 if (events.size() < i + 1) { 106 events.wait(FS_TIMEOUT); 107 } 108 assertEquals("Wrong number of events", i + 1, events.size()); 109 WatchEvent<?> event = events.get(i); 110 assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); 111 assertEquals(tempFile.getName(), event.context().toString()); 112 } 113 } 114 } finally { 115 if (watcher != null) { 116 watcher.stop(); 117 watcher.waitForState(FileChangeWatcher.State.STOPPED); 118 } 119 } 120 } 121 122 @Test 123 public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException { 124 FileChangeWatcher watcher = null; 125 try { 126 final List<WatchEvent<?>> events = new ArrayList<>(); 127 watcher = new FileChangeWatcher(tempDir.toPath(), event -> { 128 LOG.info("Got an update: {} {}", event.kind(), event.context()); 129 // Filter out the extra ENTRY_CREATE events that are 130 // sometimes seen at the start. Even though we create the watcher 131 // after the file exists, sometimes we still get a create event. 132 if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { 133 return; 134 } 135 synchronized (events) { 136 events.add(event); 137 events.notifyAll(); 138 } 139 }); 140 watcher.start(); 141 watcher.waitForState(FileChangeWatcher.State.RUNNING); 142 Thread.sleep(1000L); // TODO hack 143 LOG.info("Touching file"); 144 FileUtils.touch(tempFile); 145 synchronized (events) { 146 if (events.isEmpty()) { 147 events.wait(FS_TIMEOUT); 148 } 149 assertFalse(events.isEmpty()); 150 WatchEvent<?> event = events.get(0); 151 assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); 152 assertEquals(tempFile.getName(), event.context().toString()); 153 } 154 } finally { 155 if (watcher != null) { 156 watcher.stop(); 157 watcher.waitForState(FileChangeWatcher.State.STOPPED); 158 } 159 } 160 } 161 162 @Test 163 public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException { 164 FileChangeWatcher watcher = null; 165 try { 166 final List<WatchEvent<?>> events = new ArrayList<>(); 167 watcher = new FileChangeWatcher(tempDir.toPath(), event -> { 168 LOG.info("Got an update: {} {}", event.kind(), event.context()); 169 synchronized (events) { 170 events.add(event); 171 events.notifyAll(); 172 } 173 }); 174 watcher.start(); 175 watcher.waitForState(FileChangeWatcher.State.RUNNING); 176 Thread.sleep(1000L); // TODO hack 177 File tempFile2 = File.createTempFile("zk_test_", "", tempDir); 178 tempFile2.deleteOnExit(); 179 synchronized (events) { 180 if (events.isEmpty()) { 181 events.wait(FS_TIMEOUT); 182 } 183 assertFalse(events.isEmpty()); 184 WatchEvent<?> event = events.get(0); 185 assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind()); 186 assertEquals(tempFile2.getName(), event.context().toString()); 187 } 188 } finally { 189 if (watcher != null) { 190 watcher.stop(); 191 watcher.waitForState(FileChangeWatcher.State.STOPPED); 192 } 193 } 194 } 195 196 @Test 197 public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException { 198 FileChangeWatcher watcher = null; 199 try { 200 final List<WatchEvent<?>> events = new ArrayList<>(); 201 watcher = new FileChangeWatcher(tempDir.toPath(), event -> { 202 LOG.info("Got an update: {} {}", event.kind(), event.context()); 203 // Filter out the extra ENTRY_CREATE events that are 204 // sometimes seen at the start. Even though we create the watcher 205 // after the file exists, sometimes we still get a create event. 206 if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { 207 return; 208 } 209 synchronized (events) { 210 events.add(event); 211 events.notifyAll(); 212 } 213 }); 214 watcher.start(); 215 watcher.waitForState(FileChangeWatcher.State.RUNNING); 216 Thread.sleep(1000L); // TODO hack 217 tempFile.delete(); 218 synchronized (events) { 219 if (events.isEmpty()) { 220 events.wait(FS_TIMEOUT); 221 } 222 assertFalse(events.isEmpty()); 223 WatchEvent<?> event = events.get(0); 224 assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind()); 225 assertEquals(tempFile.getName(), event.context().toString()); 226 } 227 } finally { 228 if (watcher != null) { 229 watcher.stop(); 230 watcher.waitForState(FileChangeWatcher.State.STOPPED); 231 } 232 } 233 } 234 235 @Test 236 public void testCallbackErrorDoesNotCrashWatcherThread() 237 throws IOException, InterruptedException { 238 FileChangeWatcher watcher = null; 239 try { 240 final AtomicInteger callCount = new AtomicInteger(0); 241 watcher = new FileChangeWatcher(tempDir.toPath(), event -> { 242 LOG.info("Got an update: {} {}", event.kind(), event.context()); 243 int oldValue; 244 synchronized (callCount) { 245 oldValue = callCount.getAndIncrement(); 246 callCount.notifyAll(); 247 } 248 if (oldValue == 0) { 249 throw new RuntimeException("This error should not crash the watcher thread"); 250 } 251 }); 252 watcher.start(); 253 watcher.waitForState(FileChangeWatcher.State.RUNNING); 254 Thread.sleep(1000L); // TODO hack 255 LOG.info("Modifying file"); 256 FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); 257 synchronized (callCount) { 258 while (callCount.get() == 0) { 259 callCount.wait(FS_TIMEOUT); 260 } 261 } 262 LOG.info("Modifying file again"); 263 FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); 264 synchronized (callCount) { 265 if (callCount.get() == 1) { 266 callCount.wait(FS_TIMEOUT); 267 } 268 } 269 // The value of callCount can exceed 1 only if the callback thread 270 // survives the exception thrown by the first callback. 271 assertTrue(callCount.get() > 1); 272 } finally { 273 if (watcher != null) { 274 watcher.stop(); 275 watcher.waitForState(FileChangeWatcher.State.STOPPED); 276 } 277 } 278 } 279}