001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.File; 028import java.io.IOException; 029import java.nio.charset.StandardCharsets; 030import java.nio.file.Path; 031import java.time.Duration; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.concurrent.atomic.AtomicReference; 036import org.apache.commons.io.FileUtils; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 040import org.apache.hadoop.hbase.io.crypto.tls.X509Util; 041import org.apache.hadoop.hbase.testclassification.IOTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.hamcrest.Matchers; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * This file was originally copied from the Apache ZooKeeper project, but has been modified 054 * @see <a href= 055 * "https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base 056 * revision</a> 057 */ 058@Category({ IOTests.class, SmallTests.class }) 059public class TestFileChangeWatcher { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestFileChangeWatcher.class); 064 065 private static File tempFile; 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class); 068 private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); 069 070 private static final long FS_TIMEOUT = 30000L; 071 private static final Duration POLL_INTERVAL = Duration.ofMillis(100); 072 073 @BeforeClass 074 public static void createTempFile() throws IOException { 075 tempFile = File.createTempFile("zk_test_", ""); 076 } 077 078 @AfterClass 079 public static void cleanupTempDir() { 080 UTIL.cleanupTestDir(); 081 } 082 083 @Test 084 public void testEnableCertFileReloading() throws IOException { 085 Configuration myConf = new Configuration(); 086 String sharedPath = File.createTempFile("foo", "foo.jks").getAbsolutePath(); 087 myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath); 088 myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath); 089 AtomicReference<FileChangeWatcher> keystoreWatcher = new AtomicReference<>(); 090 AtomicReference<FileChangeWatcher> truststoreWatcher = new AtomicReference<>(); 091 X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> { 092 }); 093 assertNotNull(keystoreWatcher.get()); 094 assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("foo.jks")); 095 assertNull(truststoreWatcher.get()); 096 097 keystoreWatcher.getAndSet(null).stop(); 098 truststoreWatcher.set(null); 099 100 String truststorePath = File.createTempFile("bar", "bar.jks").getAbsolutePath(); 101 myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath); 102 X509Util.enableCertFileReloading(myConf, keystoreWatcher, truststoreWatcher, () -> { 103 }); 104 105 assertNotNull(keystoreWatcher.get()); 106 assertThat(keystoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("foo.jks")); 107 assertNotNull(truststoreWatcher.get()); 108 assertThat(truststoreWatcher.get().getWatcherThreadName(), Matchers.endsWith("bar.jks")); 109 110 keystoreWatcher.getAndSet(null).stop(); 111 truststoreWatcher.getAndSet(null).stop(); 112 } 113 114 @Test 115 public void testNoFalseNotifications() throws IOException, InterruptedException { 116 FileChangeWatcher watcher = null; 117 try { 118 final List<Path> notifiedPaths = new ArrayList<>(); 119 watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { 120 LOG.info("Got an update on path {}", path); 121 synchronized (notifiedPaths) { 122 notifiedPaths.add(path); 123 notifiedPaths.notifyAll(); 124 } 125 }); 126 watcher.start(); 127 watcher.waitForState(FileChangeWatcher.State.RUNNING); 128 Thread.sleep(1000L); // TODO hack 129 assertEquals("Should not have been notified", 0, notifiedPaths.size()); 130 } finally { 131 if (watcher != null) { 132 watcher.stop(); 133 watcher.waitForState(FileChangeWatcher.State.STOPPED); 134 } 135 } 136 } 137 138 @Test 139 public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { 140 FileChangeWatcher watcher = null; 141 try { 142 final List<Path> notifiedPaths = new ArrayList<>(); 143 watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { 144 LOG.info("Got an update on path {}", path); 145 synchronized (notifiedPaths) { 146 notifiedPaths.add(path); 147 notifiedPaths.notifyAll(); 148 } 149 }); 150 watcher.start(); 151 watcher.waitForState(FileChangeWatcher.State.RUNNING); 152 Thread.sleep(1000L); // TODO hack 153 for (int i = 0; i < 3; i++) { 154 LOG.info("Modifying file, attempt {}", (i + 1)); 155 FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8, 156 true); 157 synchronized (notifiedPaths) { 158 if (notifiedPaths.size() < i + 1) { 159 notifiedPaths.wait(FS_TIMEOUT); 160 } 161 assertEquals("Wrong number of notifications", i + 1, notifiedPaths.size()); 162 Path path = notifiedPaths.get(i); 163 assertEquals(tempFile.getPath(), path.toString()); 164 } 165 } 166 } finally { 167 if (watcher != null) { 168 watcher.stop(); 169 watcher.waitForState(FileChangeWatcher.State.STOPPED); 170 } 171 } 172 } 173 174 @Test 175 public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException { 176 FileChangeWatcher watcher = null; 177 try { 178 final List<Path> notifiedPaths = new ArrayList<>(); 179 watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { 180 LOG.info("Got an update on path {}", path); 181 synchronized (notifiedPaths) { 182 notifiedPaths.add(path); 183 notifiedPaths.notifyAll(); 184 } 185 }); 186 watcher.start(); 187 watcher.waitForState(FileChangeWatcher.State.RUNNING); 188 Thread.sleep(1000L); // TODO hack 189 LOG.info("Touching file"); 190 FileUtils.touch(tempFile); 191 synchronized (notifiedPaths) { 192 if (notifiedPaths.isEmpty()) { 193 notifiedPaths.wait(FS_TIMEOUT); 194 } 195 assertFalse(notifiedPaths.isEmpty()); 196 Path path = notifiedPaths.get(0); 197 assertEquals(tempFile.getPath(), path.toString()); 198 } 199 } finally { 200 if (watcher != null) { 201 watcher.stop(); 202 watcher.waitForState(FileChangeWatcher.State.STOPPED); 203 } 204 } 205 } 206 207 @Test 208 public void testCallbackErrorDoesNotCrashWatcherThread() 209 throws IOException, InterruptedException { 210 FileChangeWatcher watcher = null; 211 try { 212 final AtomicInteger callCount = new AtomicInteger(0); 213 watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, path -> { 214 LOG.info("Got an update for path {}", path); 215 int oldValue; 216 synchronized (callCount) { 217 oldValue = callCount.getAndIncrement(); 218 callCount.notifyAll(); 219 } 220 if (oldValue == 0) { 221 throw new RuntimeException("This error should not crash the watcher thread"); 222 } 223 }); 224 watcher.start(); 225 watcher.waitForState(FileChangeWatcher.State.RUNNING); 226 Thread.sleep(1000L); // TODO hack 227 LOG.info("Modifying file"); 228 FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); 229 synchronized (callCount) { 230 while (callCount.get() == 0) { 231 callCount.wait(FS_TIMEOUT); 232 } 233 } 234 LOG.info("Modifying file again"); 235 FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); 236 synchronized (callCount) { 237 if (callCount.get() == 1) { 238 callCount.wait(FS_TIMEOUT); 239 } 240 } 241 // The value of callCount can exceed 1 only if the callback thread 242 // survives the exception thrown by the first callback. 243 assertTrue(callCount.get() > 1); 244 } finally { 245 if (watcher != null) { 246 watcher.stop(); 247 watcher.waitForState(FileChangeWatcher.State.STOPPED); 248 } 249 } 250 } 251}