001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.awaitility.Awaitility.await;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.endsWith;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.jupiter.api.Assertions.assertNotEquals;
028
029import java.io.File;
030import java.io.IOException;
031import java.nio.charset.StandardCharsets;
032import java.nio.file.Path;
033import java.time.Duration;
034import java.util.ArrayList;
035import java.util.Collections;
036import java.util.List;
037import java.util.concurrent.atomic.AtomicInteger;
038import java.util.concurrent.atomic.AtomicReference;
039import org.apache.commons.io.FileUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
042import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
043import org.apache.hadoop.hbase.testclassification.IOTests;
044import org.apache.hadoop.hbase.testclassification.SmallTests;
045import org.junit.jupiter.api.AfterAll;
046import org.junit.jupiter.api.AfterEach;
047import org.junit.jupiter.api.BeforeAll;
048import org.junit.jupiter.api.BeforeEach;
049import org.junit.jupiter.api.Tag;
050import org.junit.jupiter.api.Test;
051import org.junit.jupiter.api.TestInfo;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * This file was originally copied from the Apache ZooKeeper project, but has been modified
057 * @see <a href=
058 *      "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base
059 *      revision</a>
060 */
061@Tag(IOTests.TAG)
062@Tag(SmallTests.TAG)
063public class TestFileChangeWatcher {
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class);
066  private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
067
068  private static File dir;
069
070  private static final Duration POLL_INTERVAL = Duration.ofMillis(100);
071
072  private File tempFile;
073
074  private FileChangeWatcher watcher;
075
076  @BeforeAll
077  public static void setUpBeforeAll() throws IOException {
078    dir = new File(UTIL.getDataTestDir().toString()).getAbsoluteFile();
079    if (!dir.mkdirs()) {
080      throw new IOException("can not mkdir " + dir);
081    }
082  }
083
084  @AfterAll
085  public static void cleanupTempDir() {
086    UTIL.cleanupTestDir();
087  }
088
089  @BeforeEach
090  public void setUp(TestInfo testInfo) throws IOException {
091    tempFile = new File(dir, "file_change_test_" + testInfo.getDisplayName());
092    if (!tempFile.createNewFile()) {
093      throw new IOException("failed to create new empty file " + tempFile);
094    }
095  }
096
097  @AfterEach
098  public void tearDown() throws InterruptedException {
099    if (watcher != null) {
100      watcher.stop();
101      watcher.waitForState(FileChangeWatcher.State.STOPPED);
102      watcher = null;
103    }
104  }
105
106  @Test
107  public void testEnableCertFileReloading() throws IOException {
108    Configuration myConf = new Configuration();
109    String sharedPath = File.createTempFile("foo", "foo.jks").getAbsolutePath();
110    myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath);
111    myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath);
112    AtomicReference<FileChangeWatcher> keystoreWatcher = new AtomicReference<>();
113    AtomicReference<FileChangeWatcher> truststoreWatcher = new AtomicReference<>();
114    X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
115    });
116    assertNotNull(keystoreWatcher.get());
117    assertThat(keystoreWatcher.get().getWatcherThread().getName(), endsWith("foo.jks"));
118    assertNull(truststoreWatcher.get());
119
120    keystoreWatcher.getAndSet(null).stop();
121    truststoreWatcher.set(null);
122
123    String truststorePath = File.createTempFile("bar", "bar.jks").getAbsolutePath();
124    myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath);
125    X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> {
126    });
127
128    assertNotNull(keystoreWatcher.get());
129    assertThat(keystoreWatcher.get().getWatcherThread().getName(), endsWith("foo.jks"));
130    assertNotNull(truststoreWatcher.get());
131    assertThat(truststoreWatcher.get().getWatcherThread().getName(), endsWith("bar.jks"));
132
133    keystoreWatcher.getAndSet(null).stop();
134    truststoreWatcher.getAndSet(null).stop();
135  }
136
137  // wait until watcher thread finish loading the last modified time, we check this by checking
138  // whether the watcher thread has been in TIMED_WAITING state, i.e, waiting for the next runLoop
139  private void awaitWatcherThreadInitialized() throws InterruptedException {
140    watcher.waitForState(FileChangeWatcher.State.RUNNING);
141    await().atMost(Duration.ofSeconds(2)).pollInSameThread().pollInterval(Duration.ofMillis(10))
142      .until(() -> watcher.getWatcherThread().getState() == Thread.State.TIMED_WAITING);
143  }
144
145  @Test
146  public void testNoFalseNotifications() throws Exception {
147    final List<Path> notifiedPaths = new ArrayList<>();
148    watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
149      LOG.info("Got an update on path {}", path);
150      synchronized (notifiedPaths) {
151        notifiedPaths.add(path);
152        notifiedPaths.notifyAll();
153      }
154    });
155    watcher.start();
156    awaitWatcherThreadInitialized();
157    await().during(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(3))
158      .untilAsserted(() -> assertEquals("Should not have been notified", 0, notifiedPaths.size()));
159  }
160
161  @Test
162  public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException {
163    final List<Path> notifiedPaths = Collections.synchronizedList(new ArrayList<>());
164    watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
165      LOG.info("Got an update on path {}", path);
166      notifiedPaths.add(path);
167    });
168    watcher.start();
169    awaitWatcherThreadInitialized();
170    for (int i = 0; i < 3; i++) {
171      final int index = i;
172      LOG.info("Modifying file, attempt {}", (index + 1));
173      FileUtils.writeStringToFile(tempFile, "Hello world " + index + "\n", StandardCharsets.UTF_8,
174        true);
175      await().atMost(Duration.ofSeconds(2)).untilAsserted(
176        () -> assertEquals("Wrong number of notifications", index + 1, notifiedPaths.size()));
177      Path path = notifiedPaths.get(index);
178      assertEquals(tempFile.getPath(), path.toString());
179    }
180  }
181
182  @Test
183  public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException {
184    final List<Path> notifiedPaths = Collections.synchronizedList(new ArrayList<>());
185    watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
186      LOG.info("Got an update on path {}", path);
187      notifiedPaths.add(path);
188    });
189    watcher.start();
190    awaitWatcherThreadInitialized();
191    LOG.info("Touching file");
192    FileUtils.touch(tempFile);
193    await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertFalse(notifiedPaths.isEmpty()));
194    Path path = notifiedPaths.get(0);
195    assertEquals(tempFile.getPath(), path.toString());
196  }
197
198  @Test
199  public void testCallbackErrorDoesNotCrashWatcherThread() throws Exception {
200    final AtomicInteger callCount = new AtomicInteger(0);
201    watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> {
202      LOG.info("Got an update for path {}", path);
203      callCount.incrementAndGet();
204      throw new RuntimeException("This error should not crash the watcher thread");
205    });
206    watcher.start();
207    awaitWatcherThreadInitialized();
208
209    LOG.info("Modifying file");
210    FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true);
211    await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(1, callCount.get()));
212
213    // make sure we can still receive the update event, which means the watcher thread is still
214    // alive
215    LOG.info("Modifying file again");
216    FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true);
217    await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(2, callCount.get()));
218
219    // also make sure that the thread is not terminated
220    assertNotEquals(Thread.State.TERMINATED, watcher.getWatcherThread());
221  }
222}