001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.File;
025import java.io.IOException;
026import java.nio.charset.StandardCharsets;
027import java.nio.file.StandardWatchEventKinds;
028import java.nio.file.WatchEvent;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.concurrent.atomic.AtomicInteger;
032import org.apache.commons.io.FileUtils;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
035import org.apache.hadoop.hbase.testclassification.IOTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.junit.AfterClass;
038import org.junit.BeforeClass;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * This file has been copied from the Apache ZooKeeper project.
047 * @see <a href=
048 *      "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base
049 *      revision</a>
050 */
051@Category({ IOTests.class, SmallTests.class })
052public class TestFileChangeWatcher {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestFileChangeWatcher.class);
057
058  private static File tempDir;
059  private static File tempFile;
060
061  private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class);
062  private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
063
064  private static final long FS_TIMEOUT = 30000L;
065
066  @BeforeClass
067  public static void createTempFile() throws IOException {
068    tempDir = new File(UTIL.getDataTestDir(TestFileChangeWatcher.class.getSimpleName()).toString())
069      .getCanonicalFile();
070    FileUtils.forceMkdir(tempDir);
071    tempFile = File.createTempFile("zk_test_", "", tempDir);
072  }
073
074  @AfterClass
075  public static void cleanupTempDir() {
076    UTIL.cleanupTestDir();
077  }
078
079  @Test
080  public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException {
081    FileChangeWatcher watcher = null;
082    try {
083      final List<WatchEvent<?>> events = new ArrayList<>();
084      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
085        LOG.info("Got an update: {} {}", event.kind(), event.context());
086        // Filter out the extra ENTRY_CREATE events that are
087        // sometimes seen at the start. Even though we create the watcher
088        // after the file exists, sometimes we still get a create event.
089        if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
090          return;
091        }
092        synchronized (events) {
093          events.add(event);
094          events.notifyAll();
095        }
096      });
097      watcher.start();
098      watcher.waitForState(FileChangeWatcher.State.RUNNING);
099      Thread.sleep(1000L); // TODO hack
100      for (int i = 0; i < 3; i++) {
101        LOG.info("Modifying file, attempt {}", (i + 1));
102        FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8,
103          true);
104        synchronized (events) {
105          if (events.size() < i + 1) {
106            events.wait(FS_TIMEOUT);
107          }
108          assertEquals("Wrong number of events", i + 1, events.size());
109          WatchEvent<?> event = events.get(i);
110          assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind());
111          assertEquals(tempFile.getName(), event.context().toString());
112        }
113      }
114    } finally {
115      if (watcher != null) {
116        watcher.stop();
117        watcher.waitForState(FileChangeWatcher.State.STOPPED);
118      }
119    }
120  }
121
122  @Test
123  public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException {
124    FileChangeWatcher watcher = null;
125    try {
126      final List<WatchEvent<?>> events = new ArrayList<>();
127      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
128        LOG.info("Got an update: {} {}", event.kind(), event.context());
129        // Filter out the extra ENTRY_CREATE events that are
130        // sometimes seen at the start. Even though we create the watcher
131        // after the file exists, sometimes we still get a create event.
132        if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
133          return;
134        }
135        synchronized (events) {
136          events.add(event);
137          events.notifyAll();
138        }
139      });
140      watcher.start();
141      watcher.waitForState(FileChangeWatcher.State.RUNNING);
142      Thread.sleep(1000L); // TODO hack
143      LOG.info("Touching file");
144      FileUtils.touch(tempFile);
145      synchronized (events) {
146        if (events.isEmpty()) {
147          events.wait(FS_TIMEOUT);
148        }
149        assertFalse(events.isEmpty());
150        WatchEvent<?> event = events.get(0);
151        assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind());
152        assertEquals(tempFile.getName(), event.context().toString());
153      }
154    } finally {
155      if (watcher != null) {
156        watcher.stop();
157        watcher.waitForState(FileChangeWatcher.State.STOPPED);
158      }
159    }
160  }
161
162  @Test
163  public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException {
164    FileChangeWatcher watcher = null;
165    try {
166      final List<WatchEvent<?>> events = new ArrayList<>();
167      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
168        LOG.info("Got an update: {} {}", event.kind(), event.context());
169        synchronized (events) {
170          events.add(event);
171          events.notifyAll();
172        }
173      });
174      watcher.start();
175      watcher.waitForState(FileChangeWatcher.State.RUNNING);
176      Thread.sleep(1000L); // TODO hack
177      File tempFile2 = File.createTempFile("zk_test_", "", tempDir);
178      tempFile2.deleteOnExit();
179      synchronized (events) {
180        if (events.isEmpty()) {
181          events.wait(FS_TIMEOUT);
182        }
183        assertFalse(events.isEmpty());
184        WatchEvent<?> event = events.get(0);
185        assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind());
186        assertEquals(tempFile2.getName(), event.context().toString());
187      }
188    } finally {
189      if (watcher != null) {
190        watcher.stop();
191        watcher.waitForState(FileChangeWatcher.State.STOPPED);
192      }
193    }
194  }
195
196  @Test
197  public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException {
198    FileChangeWatcher watcher = null;
199    try {
200      final List<WatchEvent<?>> events = new ArrayList<>();
201      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
202        LOG.info("Got an update: {} {}", event.kind(), event.context());
203        // Filter out the extra ENTRY_CREATE events that are
204        // sometimes seen at the start. Even though we create the watcher
205        // after the file exists, sometimes we still get a create event.
206        if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
207          return;
208        }
209        synchronized (events) {
210          events.add(event);
211          events.notifyAll();
212        }
213      });
214      watcher.start();
215      watcher.waitForState(FileChangeWatcher.State.RUNNING);
216      Thread.sleep(1000L); // TODO hack
217      tempFile.delete();
218      synchronized (events) {
219        if (events.isEmpty()) {
220          events.wait(FS_TIMEOUT);
221        }
222        assertFalse(events.isEmpty());
223        WatchEvent<?> event = events.get(0);
224        assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind());
225        assertEquals(tempFile.getName(), event.context().toString());
226      }
227    } finally {
228      if (watcher != null) {
229        watcher.stop();
230        watcher.waitForState(FileChangeWatcher.State.STOPPED);
231      }
232    }
233  }
234
235  @Test
236  public void testCallbackErrorDoesNotCrashWatcherThread()
237    throws IOException, InterruptedException {
238    FileChangeWatcher watcher = null;
239    try {
240      final AtomicInteger callCount = new AtomicInteger(0);
241      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
242        LOG.info("Got an update: {} {}", event.kind(), event.context());
243        int oldValue;
244        synchronized (callCount) {
245          oldValue = callCount.getAndIncrement();
246          callCount.notifyAll();
247        }
248        if (oldValue == 0) {
249          throw new RuntimeException("This error should not crash the watcher thread");
250        }
251      });
252      watcher.start();
253      watcher.waitForState(FileChangeWatcher.State.RUNNING);
254      Thread.sleep(1000L); // TODO hack
255      LOG.info("Modifying file");
256      FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true);
257      synchronized (callCount) {
258        while (callCount.get() == 0) {
259          callCount.wait(FS_TIMEOUT);
260        }
261      }
262      LOG.info("Modifying file again");
263      FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true);
264      synchronized (callCount) {
265        if (callCount.get() == 1) {
266          callCount.wait(FS_TIMEOUT);
267        }
268      }
269      // The value of callCount can exceed 1 only if the callback thread
270      // survives the exception thrown by the first callback.
271      assertTrue(callCount.get() > 1);
272    } finally {
273      if (watcher != null) {
274        watcher.stop();
275        watcher.waitForState(FileChangeWatcher.State.STOPPED);
276      }
277    }
278  }
279}