001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.File;
028import java.io.IOException;
029import java.nio.charset.StandardCharsets;
030import java.nio.file.StandardWatchEventKinds;
031import java.nio.file.WatchEvent;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicReference;
036import org.apache.commons.io.FileUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
040import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
041import org.apache.hadoop.hbase.testclassification.IOTests;
042import org.apache.hadoop.hbase.testclassification.SmallTests;
043import org.hamcrest.Matchers;
044import org.junit.AfterClass;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * This file has been copied from the Apache ZooKeeper project.
054 * @see <a href=
055 *      "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base
056 *      revision</a>
057 */
058@Category({ IOTests.class, SmallTests.class })
059public class TestFileChangeWatcher {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestFileChangeWatcher.class);
064
065  private static File tempDir;
066  private static File tempFile;
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class);
069  private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
070
071  private static final long FS_TIMEOUT = 30000L;
072
073  @BeforeClass
074  public static void createTempFile() throws IOException {
075    tempDir = new File(UTIL.getDataTestDir(TestFileChangeWatcher.class.getSimpleName()).toString())
076      .getCanonicalFile();
077    FileUtils.forceMkdir(tempDir);
078    tempFile = File.createTempFile("zk_test_", "", tempDir);
079  }
080
081  @AfterClass
082  public static void cleanupTempDir() {
083    UTIL.cleanupTestDir();
084  }
085
086  @Test
087  public void testEnableCertFileReloading() throws IOException {
088    Configuration myConf = new Configuration();
089    String sharedPath = "/tmp/foo.jks";
090    myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath);
091    myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath);
092    AtomicReference<FileChangeWatcher> keystoreWatcher = new AtomicReference<>();
093    AtomicReference<FileChangeWatcher> truststoreWatcher = new AtomicReference<>();
094    X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
095    });
096    assertNotNull(keystoreWatcher.get());
097    assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-foo.jks"));
098    assertNull(truststoreWatcher.get());
099
100    keystoreWatcher.getAndSet(null).stop();
101    truststoreWatcher.set(null);
102
103    String truststorePath = "/tmp/bar.jks";
104    myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath);
105    X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
106    });
107
108    assertNotNull(keystoreWatcher.get());
109    assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-foo.jks"));
110    assertNotNull(truststoreWatcher.get());
111    assertThat(truststoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("-bar.jks"));
112
113    keystoreWatcher.getAndSet(null).stop();
114    truststoreWatcher.getAndSet(null).stop();
115  }
116
117  @Test
118  public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException {
119    FileChangeWatcher watcher = null;
120    try {
121      final List<WatchEvent<?>> events = new ArrayList<>();
122      watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
123        LOG.info("Got an update: {} {}", event.kind(), event.context());
124        // Filter out the extra ENTRY_CREATE events that are
125        // sometimes seen at the start. Even though we create the watcher
126        // after the file exists, sometimes we still get a create event.
127        if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
128          return;
129        }
130        synchronized (events) {
131          events.add(event);
132          events.notifyAll();
133        }
134      });
135      watcher.start();
136      watcher.waitForState(FileChangeWatcher.State.RUNNING);
137      Thread.sleep(1000L); // TODO hack
138      for (int i = 0; i < 3; i++) {
139        LOG.info("Modifying file, attempt {}", (i + 1));
140        FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8,
141          true);
142        synchronized (events) {
143          if (events.size() < i + 1) {
144            events.wait(FS_TIMEOUT);
145          }
146          assertEquals("Wrong number of events", i + 1, events.size());
147          WatchEvent<?> event = events.get(i);
148          assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind());
149          assertEquals(tempFile.getName(), event.context().toString());
150        }
151      }
152    } finally {
153      if (watcher != null) {
154        watcher.stop();
155        watcher.waitForState(FileChangeWatcher.State.STOPPED);
156      }
157    }
158  }
159
160  @Test
161  public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException {
162    FileChangeWatcher watcher = null;
163    try {
164      final List<WatchEvent<?>> events = new ArrayList<>();
165      watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
166        LOG.info("Got an update: {} {}", event.kind(), event.context());
167        // Filter out the extra ENTRY_CREATE events that are
168        // sometimes seen at the start. Even though we create the watcher
169        // after the file exists, sometimes we still get a create event.
170        if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
171          return;
172        }
173        synchronized (events) {
174          events.add(event);
175          events.notifyAll();
176        }
177      });
178      watcher.start();
179      watcher.waitForState(FileChangeWatcher.State.RUNNING);
180      Thread.sleep(1000L); // TODO hack
181      LOG.info("Touching file");
182      FileUtils.touch(tempFile);
183      synchronized (events) {
184        if (events.isEmpty()) {
185          events.wait(FS_TIMEOUT);
186        }
187        assertFalse(events.isEmpty());
188        WatchEvent<?> event = events.get(0);
189        assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind());
190        assertEquals(tempFile.getName(), event.context().toString());
191      }
192    } finally {
193      if (watcher != null) {
194        watcher.stop();
195        watcher.waitForState(FileChangeWatcher.State.STOPPED);
196      }
197    }
198  }
199
200  @Test
201  public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException {
202    FileChangeWatcher watcher = null;
203    try {
204      final List<WatchEvent<?>> events = new ArrayList<>();
205      watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
206        LOG.info("Got an update: {} {}", event.kind(), event.context());
207        synchronized (events) {
208          events.add(event);
209          events.notifyAll();
210        }
211      });
212      watcher.start();
213      watcher.waitForState(FileChangeWatcher.State.RUNNING);
214      Thread.sleep(1000L); // TODO hack
215      File tempFile2 = File.createTempFile("zk_test_", "", tempDir);
216      tempFile2.deleteOnExit();
217      synchronized (events) {
218        if (events.isEmpty()) {
219          events.wait(FS_TIMEOUT);
220        }
221        assertFalse(events.isEmpty());
222        WatchEvent<?> event = events.get(0);
223        assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind());
224        assertEquals(tempFile2.getName(), event.context().toString());
225      }
226    } finally {
227      if (watcher != null) {
228        watcher.stop();
229        watcher.waitForState(FileChangeWatcher.State.STOPPED);
230      }
231    }
232  }
233
234  @Test
235  public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException {
236    FileChangeWatcher watcher = null;
237    try {
238      final List<WatchEvent<?>> events = new ArrayList<>();
239      watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
240        LOG.info("Got an update: {} {}", event.kind(), event.context());
241        // Filter out the extra ENTRY_CREATE events that are
242        // sometimes seen at the start. Even though we create the watcher
243        // after the file exists, sometimes we still get a create event.
244        if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
245          return;
246        }
247        synchronized (events) {
248          events.add(event);
249          events.notifyAll();
250        }
251      });
252      watcher.start();
253      watcher.waitForState(FileChangeWatcher.State.RUNNING);
254      Thread.sleep(1000L); // TODO hack
255      tempFile.delete();
256      synchronized (events) {
257        if (events.isEmpty()) {
258          events.wait(FS_TIMEOUT);
259        }
260        assertFalse(events.isEmpty());
261        WatchEvent<?> event = events.get(0);
262        assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind());
263        assertEquals(tempFile.getName(), event.context().toString());
264      }
265    } finally {
266      if (watcher != null) {
267        watcher.stop();
268        watcher.waitForState(FileChangeWatcher.State.STOPPED);
269      }
270    }
271  }
272
273  @Test
274  public void testCallbackErrorDoesNotCrashWatcherThread()
275    throws IOException, InterruptedException {
276    FileChangeWatcher watcher = null;
277    try {
278      final AtomicInteger callCount = new AtomicInteger(0);
279      watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
280        LOG.info("Got an update: {} {}", event.kind(), event.context());
281        int oldValue;
282        synchronized (callCount) {
283          oldValue = callCount.getAndIncrement();
284          callCount.notifyAll();
285        }
286        if (oldValue == 0) {
287          throw new RuntimeException("This error should not crash the watcher thread");
288        }
289      });
290      watcher.start();
291      watcher.waitForState(FileChangeWatcher.State.RUNNING);
292      Thread.sleep(1000L); // TODO hack
293      LOG.info("Modifying file");
294      FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true);
295      synchronized (callCount) {
296        while (callCount.get() == 0) {
297          callCount.wait(FS_TIMEOUT);
298        }
299      }
300      LOG.info("Modifying file again");
301      FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true);
302      synchronized (callCount) {
303        if (callCount.get() == 1) {
304          callCount.wait(FS_TIMEOUT);
305        }
306      }
307      // The value of callCount can exceed 1 only if the callback thread
308      // survives the exception thrown by the first callback.
309      assertTrue(callCount.get() > 1);
310    } finally {
311      if (watcher != null) {
312        watcher.stop();
313        watcher.waitForState(FileChangeWatcher.State.STOPPED);
314      }
315    }
316  }
317}