001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.File;
028import java.io.IOException;
029import java.nio.charset.StandardCharsets;
030import java.nio.file.Path;
031import java.time.Duration;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicReference;
036import org.apache.commons.io.FileUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
040import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
041import org.apache.hadoop.hbase.testclassification.IOTests;
042import org.apache.hadoop.hbase.testclassification.SmallTests;
043import org.hamcrest.Matchers;
044import org.junit.AfterClass;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * This file was originally copied from the Apache ZooKeeper project, but has been modified
054 * @see <a href=
055 *      "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base
056 *      revision</a>
057 */
058@Category({ IOTests.class, SmallTests.class })
059public class TestFileChangeWatcher {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestFileChangeWatcher.class);
064
065  private static File tempFile;
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class);
068  private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
069
070  private static final long FS_TIMEOUT = 30000L;
071  private static final Duration POLL_INTERVAL = Duration.ofMillis(100);
072
073  @BeforeClass
074  public static void createTempFile() throws IOException {
075    tempFile = File.createTempFile("zk_test_", "");
076  }
077
078  @AfterClass
079  public static void cleanupTempDir() {
080    UTIL.cleanupTestDir();
081  }
082
083  @Test
084  public void testEnableCertFileReloading() throws IOException {
085    Configuration myConf = new Configuration();
086    String sharedPath = File.createTempFile("foo", "foo.jks").getAbsolutePath();
087    myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath);
088    myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath);
089    AtomicReference<FileChangeWatcher> keystoreWatcher = new AtomicReference<>();
090    AtomicReference<FileChangeWatcher> truststoreWatcher = new AtomicReference<>();
091    X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
092    });
093    assertNotNull(keystoreWatcher.get());
094    assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("foo.jks"));
095    assertNull(truststoreWatcher.get());
096
097    keystoreWatcher.getAndSet(null).stop();
098    truststoreWatcher.set(null);
099
100    String truststorePath = File.createTempFile("bar", "bar.jks").getAbsolutePath();
101    myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath);
102    X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
103    });
104
105    assertNotNull(keystoreWatcher.get());
106    assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("foo.jks"));
107    assertNotNull(truststoreWatcher.get());
108    assertThat(truststoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("bar.jks"));
109
110    keystoreWatcher.getAndSet(null).stop();
111    truststoreWatcher.getAndSet(null).stop();
112  }
113
114  @Test
115  public void testNoFalseNotifications() throws IOException, InterruptedException {
116    FileChangeWatcher watcher = null;
117    try {
118      final List<Path> notifiedPaths = new ArrayList<>();
119      watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
120        LOG.info("Got an update on path {}", path);
121        synchronized (notifiedPaths) {
122          notifiedPaths.add(path);
123          notifiedPaths.notifyAll();
124        }
125      });
126      watcher.start();
127      watcher.waitForState(FileChangeWatcher.State.RUNNING);
128      Thread.sleep(1000L); // TODO hack
129      assertEquals("Should not have been notified", 0, notifiedPaths.size());
130    } finally {
131      if (watcher != null) {
132        watcher.stop();
133        watcher.waitForState(FileChangeWatcher.State.STOPPED);
134      }
135    }
136  }
137
138  @Test
139  public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException {
140    FileChangeWatcher watcher = null;
141    try {
142      final List<Path> notifiedPaths = new ArrayList<>();
143      watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
144        LOG.info("Got an update on path {}", path);
145        synchronized (notifiedPaths) {
146          notifiedPaths.add(path);
147          notifiedPaths.notifyAll();
148        }
149      });
150      watcher.start();
151      watcher.waitForState(FileChangeWatcher.State.RUNNING);
152      Thread.sleep(1000L); // TODO hack
153      for (int i = 0; i < 3; i++) {
154        LOG.info("Modifying file, attempt {}", (i + 1));
155        FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8,
156          true);
157        synchronized (notifiedPaths) {
158          if (notifiedPaths.size() < i + 1) {
159            notifiedPaths.wait(FS_TIMEOUT);
160          }
161          assertEquals("Wrong number of notifications", i + 1, notifiedPaths.size());
162          Path path = notifiedPaths.get(i);
163          assertEquals(tempFile.getPath(), path.toString());
164        }
165      }
166    } finally {
167      if (watcher != null) {
168        watcher.stop();
169        watcher.waitForState(FileChangeWatcher.State.STOPPED);
170      }
171    }
172  }
173
174  @Test
175  public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException {
176    FileChangeWatcher watcher = null;
177    try {
178      final List<Path> notifiedPaths = new ArrayList<>();
179      watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
180        LOG.info("Got an update on path {}", path);
181        synchronized (notifiedPaths) {
182          notifiedPaths.add(path);
183          notifiedPaths.notifyAll();
184        }
185      });
186      watcher.start();
187      watcher.waitForState(FileChangeWatcher.State.RUNNING);
188      Thread.sleep(1000L); // TODO hack
189      LOG.info("Touching file");
190      FileUtils.touch(tempFile);
191      synchronized (notifiedPaths) {
192        if (notifiedPaths.isEmpty()) {
193          notifiedPaths.wait(FS_TIMEOUT);
194        }
195        assertFalse(notifiedPaths.isEmpty());
196        Path path = notifiedPaths.get(0);
197        assertEquals(tempFile.getPath(), path.toString());
198      }
199    } finally {
200      if (watcher != null) {
201        watcher.stop();
202        watcher.waitForState(FileChangeWatcher.State.STOPPED);
203      }
204    }
205  }
206
207  @Test
208  public void testCallbackErrorDoesNotCrashWatcherThread()
209    throws IOException, InterruptedException {
210    FileChangeWatcher watcher = null;
211    try {
212      final AtomicInteger callCount = new AtomicInteger(0);
213      watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
214        LOG.info("Got an update for path {}", path);
215        int oldValue;
216        synchronized (callCount) {
217          oldValue = callCount.getAndIncrement();
218          callCount.notifyAll();
219        }
220        if (oldValue == 0) {
221          throw new RuntimeException("This error should not crash the watcher thread");
222        }
223      });
224      watcher.start();
225      watcher.waitForState(FileChangeWatcher.State.RUNNING);
226      Thread.sleep(1000L); // TODO hack
227      LOG.info("Modifying file");
228      FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true);
229      synchronized (callCount) {
230        while (callCount.get() == 0) {
231          callCount.wait(FS_TIMEOUT);
232        }
233      }
234      LOG.info("Modifying file again");
235      FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true);
236      synchronized (callCount) {
237        if (callCount.get() == 1) {
238          callCount.wait(FS_TIMEOUT);
239        }
240      }
241      // The value of callCount can exceed 1 only if the callback thread
242      // survives the exception thrown by the first callback.
243      assertTrue(callCount.get() > 1);
244    } finally {
245      if (watcher != null) {
246        watcher.stop();
247        watcher.waitForState(FileChangeWatcher.State.STOPPED);
248      }
249    }
250  }
251}