001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.doThrow; 024import static org.mockito.Mockito.spy; 025 026import java.io.IOException; 027import java.net.URLEncoder; 028import java.nio.charset.StandardCharsets; 029import java.util.Arrays; 030import java.util.Iterator; 031import java.util.List; 032import java.util.concurrent.ThreadLocalRandom; 033import org.apache.commons.io.FileUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Abortable; 040import org.apache.hadoop.hbase.ChoreService; 041import org.apache.hadoop.hbase.CoordinatedStateManager; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.Waiter; 048import org.apache.hadoop.hbase.ZooKeeperConnectionException; 049import org.apache.hadoop.hbase.client.ClusterConnection; 050import org.apache.hadoop.hbase.client.Connection; 051import org.apache.hadoop.hbase.master.HMaster; 052import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 054import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; 055import org.apache.hadoop.hbase.testclassification.MasterTests; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 060import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 061import org.apache.zookeeper.KeeperException; 062import org.junit.AfterClass; 063import org.junit.Before; 064import org.junit.BeforeClass; 065import org.junit.ClassRule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071@Category({ MasterTests.class, MediumTests.class }) 072public class TestLogsCleaner { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestLogsCleaner.class); 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); 079 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 080 081 private static final Path OLD_WALS_DIR = 082 new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); 083 084 private static final Path OLD_PROCEDURE_WALS_DIR = new Path(OLD_WALS_DIR, "masterProcedureWALs"); 085 086 private static Configuration conf; 087 088 private static DirScanPool POOL; 089 090 @BeforeClass 091 public static void setUpBeforeClass() throws Exception { 092 TEST_UTIL.startMiniZKCluster(); 093 TEST_UTIL.startMiniDFSCluster(1); 094 POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration()); 095 } 096 097 @AfterClass 098 public static void tearDownAfterClass() throws Exception { 099 TEST_UTIL.shutdownMiniZKCluster(); 100 TEST_UTIL.shutdownMiniDFSCluster(); 101 POOL.shutdownNow(); 102 } 103 104 @Before 105 public void beforeTest() throws IOException { 106 conf = TEST_UTIL.getConfiguration(); 107 108 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 109 110 fs.delete(OLD_WALS_DIR, true); 111 112 // root directory 113 fs.mkdirs(OLD_WALS_DIR); 114 } 115 116 /** 117 * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same 118 * oldWALs directory. Created files: - 2 invalid files - 5 old Procedure WALs - 30 old WALs from 119 * which 3 are in replication - 5 recent Procedure WALs - 1 recent WAL - 1 very new WAL (timestamp 120 * in future) - masterProcedureWALs subdirectory Files which should stay: - 3 replication WALs - 2 121 * new WALs - 5 latest Procedure WALs - masterProcedureWALs subdirectory 122 */ 123 @Test 124 public void testLogCleaning() throws Exception { 125 // set TTLs 126 long ttlWAL = 2000; 127 long ttlProcedureWAL = 4000; 128 conf.setLong("hbase.master.logcleaner.ttl", ttlWAL); 129 conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL); 130 131 HMaster.decorateMasterConfiguration(conf); 132 Server server = new DummyServer(); 133 ReplicationQueueStorage queueStorage = 134 ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); 135 136 String fakeMachineName = 137 URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name()); 138 139 final FileSystem fs = FileSystem.get(conf); 140 fs.mkdirs(OLD_PROCEDURE_WALS_DIR); 141 142 final long now = EnvironmentEdgeManager.currentTime(); 143 144 // Case 1: 2 invalid files, which would be deleted directly 145 fs.createNewFile(new Path(OLD_WALS_DIR, "a")); 146 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a")); 147 148 // Case 2: 5 Procedure WALs that are old which would be deleted 149 for (int i = 1; i <= 5; i++) { 150 final Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 151 fs.createNewFile(fileName); 152 } 153 154 // Sleep for sometime to get old procedure WALs 155 Thread.sleep(ttlProcedureWAL - ttlWAL); 156 157 // Case 3: old WALs which would be deletable 158 for (int i = 1; i <= 30; i++) { 159 Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i)); 160 fs.createNewFile(fileName); 161 // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these 162 // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner 163 if (i % (30 / 3) == 0) { 164 queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName()); 165 LOG.info("Replication log file: " + fileName); 166 } 167 } 168 169 // Case 5: 5 Procedure WALs that are new, will stay 170 for (int i = 6; i <= 10; i++) { 171 Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 172 fs.createNewFile(fileName); 173 } 174 175 // Sleep for sometime to get newer modification time 176 Thread.sleep(ttlWAL); 177 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now)); 178 179 // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner, 180 // so we are not going down the chain 181 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL))); 182 183 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 184 LOG.info("File status: {}", Arrays.toString(status)); 185 186 // There should be 34 files and 1 masterProcedureWALs directory 187 assertEquals(35, fs.listStatus(OLD_WALS_DIR).length); 188 // 10 procedure WALs 189 assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 190 191 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null); 192 cleaner.chore(); 193 194 // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which 195 // are scheduled for replication and masterProcedureWALs directory 196 TEST_UTIL.waitFor(1000, 197 (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(OLD_WALS_DIR).length); 198 // In masterProcedureWALs we end up with 5 newer Procedure WALs 199 TEST_UTIL.waitFor(1000, 200 (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 201 202 if (LOG.isDebugEnabled()) { 203 FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR); 204 FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR); 205 LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs)); 206 LOG.debug("Kept log file for masterProcedureWALs: {}", Arrays.toString(statusProcedureWALs)); 207 } 208 } 209 210 /** 211 * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting. 212 */ 213 @Test 214 public void testZooKeeperAbort() throws Exception { 215 Configuration conf = TEST_UTIL.getConfiguration(); 216 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 217 218 List<FileStatus> dummyFiles = Arrays.asList( 219 new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log1")), 220 new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log2"))); 221 222 try (FaultyZooKeeperWatcher faultyZK = 223 new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null)) { 224 faultyZK.init(); 225 cleaner.setConf(conf, faultyZK); 226 cleaner.preClean(); 227 // should keep all files due to a ConnectionLossException getting the queues znodes 228 Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); 229 assertFalse(toDelete.iterator().hasNext()); 230 assertFalse(cleaner.isStopped()); 231 } 232 233 // when zk is working both files should be returned 234 cleaner = new ReplicationLogCleaner(); 235 try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null)) { 236 cleaner.setConf(conf, zkw); 237 cleaner.preClean(); 238 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 239 Iterator<FileStatus> iter = filesToDelete.iterator(); 240 assertTrue(iter.hasNext()); 241 assertEquals(new Path("log1"), iter.next().getPath()); 242 assertTrue(iter.hasNext()); 243 assertEquals(new Path("log2"), iter.next().getPath()); 244 assertFalse(iter.hasNext()); 245 } 246 } 247 248 /** 249 * When zk is working both files should be returned 250 * @throws Exception from ZK watcher 251 */ 252 @Test 253 public void testZooKeeperNormal() throws Exception { 254 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 255 256 // Subtract 1000 from current time so modtime is for sure older 257 // than 'now'. 258 long modTime = EnvironmentEdgeManager.currentTime() - 1000; 259 List<FileStatus> dummyFiles = 260 Arrays.asList(new FileStatus(100, false, 3, 100, modTime, new Path("log1")), 261 new FileStatus(100, false, 3, 100, modTime, new Path("log2"))); 262 263 ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); 264 try { 265 cleaner.setConf(conf, zkw); 266 cleaner.preClean(); 267 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 268 Iterator<FileStatus> iter = filesToDelete.iterator(); 269 assertTrue(iter.hasNext()); 270 assertEquals(new Path("log1"), iter.next().getPath()); 271 assertTrue(iter.hasNext()); 272 assertEquals(new Path("log2"), iter.next().getPath()); 273 assertFalse(iter.hasNext()); 274 } finally { 275 zkw.close(); 276 } 277 } 278 279 @Test 280 public void testOnConfigurationChange() throws Exception { 281 // Prepare environments 282 Server server = new DummyServer(); 283 284 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 285 LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null); 286 int size = cleaner.getSizeOfCleaners(); 287 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 288 cleaner.getCleanerThreadTimeoutMsec()); 289 // Create dir and files for test 290 int numOfFiles = 10; 291 createFiles(fs, OLD_WALS_DIR, numOfFiles); 292 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 293 assertEquals(numOfFiles, status.length); 294 // Start cleaner chore 295 Thread thread = new Thread(() -> cleaner.chore()); 296 thread.setDaemon(true); 297 thread.start(); 298 // change size of cleaners dynamically 299 int sizeToChange = 4; 300 long threadTimeoutToChange = 30 * 1000L; 301 conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange); 302 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); 303 cleaner.onConfigurationChange(conf); 304 assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners()); 305 assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); 306 // Stop chore 307 thread.join(); 308 status = fs.listStatus(OLD_WALS_DIR); 309 assertEquals(0, status.length); 310 } 311 312 private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { 313 for (int i = 0; i < numOfFiles; i++) { 314 // size of each file is 1M, 2M, or 3M 315 int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4); 316 byte[] M = new byte[Math.toIntExact(FileUtils.ONE_MB * xMega)]; 317 Bytes.random(M); 318 try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { 319 fsdos.write(M); 320 } 321 } 322 } 323 324 static class DummyServer implements Server { 325 326 @Override 327 public Configuration getConfiguration() { 328 return TEST_UTIL.getConfiguration(); 329 } 330 331 @Override 332 public ZKWatcher getZooKeeper() { 333 try { 334 return new ZKWatcher(getConfiguration(), "dummy server", this); 335 } catch (IOException e) { 336 e.printStackTrace(); 337 } 338 return null; 339 } 340 341 @Override 342 public CoordinatedStateManager getCoordinatedStateManager() { 343 return null; 344 } 345 346 @Override 347 public ClusterConnection getConnection() { 348 return null; 349 } 350 351 @Override 352 public ServerName getServerName() { 353 return ServerName.valueOf("regionserver,60020,000000"); 354 } 355 356 @Override 357 public void abort(String why, Throwable e) { 358 } 359 360 @Override 361 public boolean isAborted() { 362 return false; 363 } 364 365 @Override 366 public void stop(String why) { 367 } 368 369 @Override 370 public boolean isStopped() { 371 return false; 372 } 373 374 @Override 375 public ChoreService getChoreService() { 376 return null; 377 } 378 379 @Override 380 public ClusterConnection getClusterConnection() { 381 return null; 382 } 383 384 @Override 385 public FileSystem getFileSystem() { 386 return null; 387 } 388 389 @Override 390 public boolean isStopping() { 391 return false; 392 } 393 394 @Override 395 public Connection createConnection(Configuration conf) throws IOException { 396 return null; 397 } 398 } 399 400 static class FaultyZooKeeperWatcher extends ZKWatcher { 401 private RecoverableZooKeeper zk; 402 403 public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) 404 throws ZooKeeperConnectionException, IOException { 405 super(conf, identifier, abortable); 406 } 407 408 public void init() throws Exception { 409 this.zk = spy(super.getRecoverableZooKeeper()); 410 doThrow(new KeeperException.ConnectionLossException()).when(zk) 411 .getChildren("/hbase/replication/rs", null); 412 } 413 414 @Override 415 public RecoverableZooKeeper getRecoverableZooKeeper() { 416 return zk; 417 } 418 } 419}