001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import static org.awaitility.Awaitility.await; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.endsWith; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.jupiter.api.Assertions.assertNotEquals; 028 029import java.io.File; 030import java.io.IOException; 031import java.nio.charset.StandardCharsets; 032import java.nio.file.Path; 033import java.time.Duration; 034import java.util.ArrayList; 035import java.util.Collections; 036import java.util.List; 037import java.util.concurrent.atomic.AtomicInteger; 038import java.util.concurrent.atomic.AtomicReference; 039import org.apache.commons.io.FileUtils; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 042import org.apache.hadoop.hbase.io.crypto.tls.X509Util; 043import org.apache.hadoop.hbase.testclassification.IOTests; 044import org.apache.hadoop.hbase.testclassification.SmallTests; 045import org.junit.jupiter.api.AfterAll; 046import org.junit.jupiter.api.AfterEach; 047import org.junit.jupiter.api.BeforeAll; 048import org.junit.jupiter.api.BeforeEach; 049import org.junit.jupiter.api.Tag; 050import org.junit.jupiter.api.Test; 051import org.junit.jupiter.api.TestInfo; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * This file was originally copied from the Apache ZooKeeper project, but has been modified 057 * @see <a href= 058 * "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base 059 * revision</a> 060 */ 061@Tag(IOTests.TAG) 062@Tag(SmallTests.TAG) 063public class TestFileChangeWatcher { 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class); 066 private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); 067 068 private static File dir; 069 070 private static final Duration POLL_INTERVAL = Duration.ofMillis(100); 071 072 private File tempFile; 073 074 private FileChangeWatcher watcher; 075 076 @BeforeAll 077 public static void setUpBeforeAll() throws IOException { 078 dir = new File(UTIL.getDataTestDir().toString()).getAbsoluteFile(); 079 if (!dir.mkdirs()) { 080 throw new IOException("can not mkdir " + dir); 081 } 082 } 083 084 @AfterAll 085 public static void cleanupTempDir() { 086 UTIL.cleanupTestDir(); 087 } 088 089 @BeforeEach 090 public void setUp(TestInfo testInfo) throws IOException { 091 tempFile = new File(dir, "file_change_test_" + testInfo.getDisplayName()); 092 if (!tempFile.createNewFile()) { 093 throw new IOException("failed to create new empty file " + tempFile); 094 } 095 } 096 097 @AfterEach 098 public void tearDown() throws InterruptedException { 099 if (watcher != null) { 100 watcher.stop(); 101 watcher.waitForState(FileChangeWatcher.State.STOPPED); 102 watcher = null; 103 } 104 } 105 106 @Test 107 public void testEnableCertFileReloading() throws IOException { 108 Configuration myConf = new Configuration(); 109 String sharedPath = File.createTempFile("foo", "foo.jks").getAbsolutePath(); 110 myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath); 111 myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath); 112 AtomicReference<FileChangeWatcher> keystoreWatcher = new AtomicReference<>(); 113 AtomicReference<FileChangeWatcher> truststoreWatcher = new AtomicReference<>(); 114 X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> { 115 }); 116 assertNotNull(keystoreWatcher.get()); 117 assertThat(keystoreWatcher.get().getWatcherThread().getName(), endsWith("foo.jks")); 118 assertNull(truststoreWatcher.get()); 119 120 keystoreWatcher.getAndSet(null).stop(); 121 truststoreWatcher.set(null); 122 123 String truststorePath = File.createTempFile("bar", "bar.jks").getAbsolutePath(); 124 myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath); 125 X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> { 126 }); 127 128 assertNotNull(keystoreWatcher.get()); 129 assertThat(keystoreWatcher.get().getWatcherThread().getName(), endsWith("foo.jks")); 130 assertNotNull(truststoreWatcher.get()); 131 assertThat(truststoreWatcher.get().getWatcherThread().getName(), endsWith("bar.jks")); 132 133 keystoreWatcher.getAndSet(null).stop(); 134 truststoreWatcher.getAndSet(null).stop(); 135 } 136 137 // wait until watcher thread finish loading the last modified time, we check this by checking 138 // whether the watcher thread has been in TIMED_WAITING state, i.e, waiting for the next runLoop 139 private void awaitWatcherThreadInitialized() throws InterruptedException { 140 watcher.waitForState(FileChangeWatcher.State.RUNNING); 141 await().atMost(Duration.ofSeconds(2)).pollInSameThread().pollInterval(Duration.ofMillis(10)) 142 .until(() -> watcher.getWatcherThread().getState() == Thread.State.TIMED_WAITING); 143 } 144 145 @Test 146 public void testNoFalseNotifications() throws Exception { 147 final List<Path> notifiedPaths = new ArrayList<>(); 148 watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { 149 LOG.info("Got an update on path {}", path); 150 synchronized (notifiedPaths) { 151 notifiedPaths.add(path); 152 notifiedPaths.notifyAll(); 153 } 154 }); 155 watcher.start(); 156 awaitWatcherThreadInitialized(); 157 await().during(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(3)) 158 .untilAsserted(() -> assertEquals("Should not have been notified", 0, notifiedPaths.size())); 159 } 160 161 @Test 162 public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { 163 final List<Path> notifiedPaths = Collections.synchronizedList(new ArrayList<>()); 164 watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { 165 LOG.info("Got an update on path {}", path); 166 notifiedPaths.add(path); 167 }); 168 watcher.start(); 169 awaitWatcherThreadInitialized(); 170 for (int i = 0; i < 3; i++) { 171 final int index = i; 172 LOG.info("Modifying file, attempt {}", (index + 1)); 173 FileUtils.writeStringToFile(tempFile, "Hello world " + index + "\n", StandardCharsets.UTF_8, 174 true); 175 await().atMost(Duration.ofSeconds(2)).untilAsserted( 176 () -> assertEquals("Wrong number of notifications", index + 1, notifiedPaths.size())); 177 Path path = notifiedPaths.get(index); 178 assertEquals(tempFile.getPath(), path.toString()); 179 } 180 } 181 182 @Test 183 public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException { 184 final List<Path> notifiedPaths = Collections.synchronizedList(new ArrayList<>()); 185 watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { 186 LOG.info("Got an update on path {}", path); 187 notifiedPaths.add(path); 188 }); 189 watcher.start(); 190 awaitWatcherThreadInitialized(); 191 LOG.info("Touching file"); 192 FileUtils.touch(tempFile); 193 await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertFalse(notifiedPaths.isEmpty())); 194 Path path = notifiedPaths.get(0); 195 assertEquals(tempFile.getPath(), path.toString()); 196 } 197 198 @Test 199 public void testCallbackErrorDoesNotCrashWatcherThread() throws Exception { 200 final AtomicInteger callCount = new AtomicInteger(0); 201 watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { 202 LOG.info("Got an update for path {}", path); 203 callCount.incrementAndGet(); 204 throw new RuntimeException("This error should not crash the watcher thread"); 205 }); 206 watcher.start(); 207 awaitWatcherThreadInitialized(); 208 209 LOG.info("Modifying file"); 210 FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); 211 await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(1, callCount.get())); 212 213 // make sure we can still receive the update event, which means the watcher thread is still 214 // alive 215 LOG.info("Modifying file again"); 216 FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); 217 await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(2, callCount.get())); 218 219 // also make sure that the thread is not terminated 220 assertNotEquals(Thread.State.TERMINATED, watcher.getWatcherThread()); 221 } 222}