001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.doThrow; 024import static org.mockito.Mockito.spy; 025 026import java.io.IOException; 027import java.net.URLEncoder; 028import java.nio.charset.StandardCharsets; 029import java.util.Arrays; 030import java.util.Iterator; 031import java.util.List; 032import java.util.concurrent.ThreadLocalRandom; 033import org.apache.commons.io.FileUtils; 034import org.apache.commons.lang3.RandomUtils; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Abortable; 041import org.apache.hadoop.hbase.ChoreService; 042import org.apache.hadoop.hbase.CoordinatedStateManager; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.Server; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.Waiter; 049import org.apache.hadoop.hbase.ZooKeeperConnectionException; 050import org.apache.hadoop.hbase.client.ClusterConnection; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.master.HMaster; 053import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 054import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 055import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; 056import org.apache.hadoop.hbase.testclassification.MasterTests; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 059import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 060import org.apache.zookeeper.KeeperException; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070@Category({MasterTests.class, MediumTests.class}) 071public class TestLogsCleaner { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestLogsCleaner.class); 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); 078 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 079 080 private static final Path OLD_WALS_DIR = 081 new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); 082 083 private static final Path OLD_PROCEDURE_WALS_DIR = 084 new Path(OLD_WALS_DIR, "masterProcedureWALs"); 085 086 private static Configuration conf; 087 088 private static DirScanPool POOL; 089 090 @BeforeClass 091 public static void setUpBeforeClass() throws Exception { 092 TEST_UTIL.startMiniZKCluster(); 093 TEST_UTIL.startMiniDFSCluster(1); 094 POOL = new DirScanPool(TEST_UTIL.getConfiguration()); 095 } 096 097 @AfterClass 098 public static void tearDownAfterClass() throws Exception { 099 TEST_UTIL.shutdownMiniZKCluster(); 100 TEST_UTIL.shutdownMiniDFSCluster(); 101 POOL.shutdownNow(); 102 } 103 104 @Before 105 public void beforeTest() throws IOException { 106 conf = TEST_UTIL.getConfiguration(); 107 108 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 109 110 fs.delete(OLD_WALS_DIR, true); 111 112 // root directory 113 fs.mkdirs(OLD_WALS_DIR); 114 } 115 116 /** 117 * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located 118 * in the same oldWALs directory. 119 * Created files: 120 * - 2 invalid files 121 * - 5 old Procedure WALs 122 * - 30 old WALs from which 3 are in replication 123 * - 5 recent Procedure WALs 124 * - 1 recent WAL 125 * - 1 very new WAL (timestamp in future) 126 * - masterProcedureWALs subdirectory 127 * Files which should stay: 128 * - 3 replication WALs 129 * - 2 new WALs 130 * - 5 latest Procedure WALs 131 * - masterProcedureWALs subdirectory 132 */ 133 @Test 134 public void testLogCleaning() throws Exception { 135 // set TTLs 136 long ttlWAL = 2000; 137 long ttlProcedureWAL = 4000; 138 conf.setLong("hbase.master.logcleaner.ttl", ttlWAL); 139 conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL); 140 141 HMaster.decorateMasterConfiguration(conf); 142 Server server = new DummyServer(); 143 ReplicationQueueStorage queueStorage = 144 ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); 145 146 String fakeMachineName = URLEncoder.encode( 147 server.getServerName().toString(), StandardCharsets.UTF_8.name()); 148 149 final FileSystem fs = FileSystem.get(conf); 150 fs.mkdirs(OLD_PROCEDURE_WALS_DIR); 151 152 final long now = System.currentTimeMillis(); 153 154 // Case 1: 2 invalid files, which would be deleted directly 155 fs.createNewFile(new Path(OLD_WALS_DIR, "a")); 156 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a")); 157 158 // Case 2: 5 Procedure WALs that are old which would be deleted 159 for (int i = 1; i <= 5; i++) { 160 final Path fileName = 161 new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 162 fs.createNewFile(fileName); 163 } 164 165 // Sleep for sometime to get old procedure WALs 166 Thread.sleep(ttlProcedureWAL - ttlWAL); 167 168 // Case 3: old WALs which would be deletable 169 for (int i = 1; i <= 30; i++) { 170 Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i)); 171 fs.createNewFile(fileName); 172 // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these 173 // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner 174 if (i % (30 / 3) == 0) { 175 queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName()); 176 LOG.info("Replication log file: " + fileName); 177 } 178 } 179 180 // Case 5: 5 Procedure WALs that are new, will stay 181 for (int i = 6; i <= 10; i++) { 182 Path fileName = 183 new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 184 fs.createNewFile(fileName); 185 } 186 187 // Sleep for sometime to get newer modification time 188 Thread.sleep(ttlWAL); 189 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now)); 190 191 // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner, 192 // so we are not going down the chain 193 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL))); 194 195 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 196 LOG.info("File status: {}", Arrays.toString(status)); 197 198 // There should be 34 files and 1 masterProcedureWALs directory 199 assertEquals(35, fs.listStatus(OLD_WALS_DIR).length); 200 // 10 procedure WALs 201 assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 202 203 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL); 204 cleaner.chore(); 205 206 // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which 207 // are scheduled for replication and masterProcedureWALs directory 208 TEST_UTIL.waitFor(1000, (Waiter.Predicate<Exception>) () -> 6 == fs 209 .listStatus(OLD_WALS_DIR).length); 210 // In masterProcedureWALs we end up with 5 newer Procedure WALs 211 TEST_UTIL.waitFor(1000, (Waiter.Predicate<Exception>) () -> 5 == fs 212 .listStatus(OLD_PROCEDURE_WALS_DIR).length); 213 214 if (LOG.isDebugEnabled()) { 215 FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR); 216 FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR); 217 LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs)); 218 LOG.debug("Kept log file for masterProcedureWALs: {}", 219 Arrays.toString(statusProcedureWALs)); 220 } 221 } 222 223 /** 224 * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting. 225 */ 226 @Test 227 public void testZooKeeperAbort() throws Exception { 228 Configuration conf = TEST_UTIL.getConfiguration(); 229 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 230 231 List<FileStatus> dummyFiles = Arrays.asList( 232 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), 233 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) 234 ); 235 236 try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf, 237 "testZooKeeperAbort-faulty", null)) { 238 faultyZK.init(); 239 cleaner.setConf(conf, faultyZK); 240 cleaner.preClean(); 241 // should keep all files due to a ConnectionLossException getting the queues znodes 242 Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); 243 assertFalse(toDelete.iterator().hasNext()); 244 assertFalse(cleaner.isStopped()); 245 } 246 247 // when zk is working both files should be returned 248 cleaner = new ReplicationLogCleaner(); 249 try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null)) { 250 cleaner.setConf(conf, zkw); 251 cleaner.preClean(); 252 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 253 Iterator<FileStatus> iter = filesToDelete.iterator(); 254 assertTrue(iter.hasNext()); 255 assertEquals(new Path("log1"), iter.next().getPath()); 256 assertTrue(iter.hasNext()); 257 assertEquals(new Path("log2"), iter.next().getPath()); 258 assertFalse(iter.hasNext()); 259 } 260 } 261 262 /** 263 * When zk is working both files should be returned 264 * @throws Exception from ZK watcher 265 */ 266 @Test 267 public void testZooKeeperNormal() throws Exception { 268 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 269 270 // Subtract 1000 from current time so modtime is for sure older 271 // than 'now'. 272 long modTime = System.currentTimeMillis() - 1000; 273 List<FileStatus> dummyFiles = Arrays.asList( 274 new FileStatus(100, false, 3, 100, modTime, new Path("log1")), 275 new FileStatus(100, false, 3, 100, modTime, new Path("log2")) 276 ); 277 278 ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); 279 try { 280 cleaner.setConf(conf, zkw); 281 cleaner.preClean(); 282 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 283 Iterator<FileStatus> iter = filesToDelete.iterator(); 284 assertTrue(iter.hasNext()); 285 assertEquals(new Path("log1"), iter.next().getPath()); 286 assertTrue(iter.hasNext()); 287 assertEquals(new Path("log2"), iter.next().getPath()); 288 assertFalse(iter.hasNext()); 289 } finally { 290 zkw.close(); 291 } 292 } 293 294 @Test 295 public void testOnConfigurationChange() throws Exception { 296 // Prepare environments 297 Server server = new DummyServer(); 298 299 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 300 LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL); 301 int size = cleaner.getSizeOfCleaners(); 302 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 303 cleaner.getCleanerThreadTimeoutMsec()); 304 // Create dir and files for test 305 int numOfFiles = 10; 306 createFiles(fs, OLD_WALS_DIR, numOfFiles); 307 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 308 assertEquals(numOfFiles, status.length); 309 // Start cleaner chore 310 Thread thread = new Thread(() -> cleaner.chore()); 311 thread.setDaemon(true); 312 thread.start(); 313 // change size of cleaners dynamically 314 int sizeToChange = 4; 315 long threadTimeoutToChange = 30 * 1000L; 316 conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange); 317 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); 318 cleaner.onConfigurationChange(conf); 319 assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners()); 320 assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); 321 // Stop chore 322 thread.join(); 323 status = fs.listStatus(OLD_WALS_DIR); 324 assertEquals(0, status.length); 325 } 326 327 private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { 328 for (int i = 0; i < numOfFiles; i++) { 329 // size of each file is 1M, 2M, or 3M 330 int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4); 331 try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { 332 byte[] M = RandomUtils.nextBytes(Math.toIntExact(FileUtils.ONE_MB * xMega)); 333 fsdos.write(M); 334 } 335 } 336 } 337 338 static class DummyServer implements Server { 339 340 @Override 341 public Configuration getConfiguration() { 342 return TEST_UTIL.getConfiguration(); 343 } 344 345 @Override 346 public ZKWatcher getZooKeeper() { 347 try { 348 return new ZKWatcher(getConfiguration(), "dummy server", this); 349 } catch (IOException e) { 350 e.printStackTrace(); 351 } 352 return null; 353 } 354 355 @Override 356 public CoordinatedStateManager getCoordinatedStateManager() { 357 return null; 358 } 359 360 @Override 361 public ClusterConnection getConnection() { 362 return null; 363 } 364 365 @Override 366 public ServerName getServerName() { 367 return ServerName.valueOf("regionserver,60020,000000"); 368 } 369 370 @Override 371 public void abort(String why, Throwable e) {} 372 373 @Override 374 public boolean isAborted() { 375 return false; 376 } 377 378 @Override 379 public void stop(String why) {} 380 381 @Override 382 public boolean isStopped() { 383 return false; 384 } 385 386 @Override 387 public ChoreService getChoreService() { 388 return null; 389 } 390 391 @Override 392 public ClusterConnection getClusterConnection() { 393 return null; 394 } 395 396 @Override 397 public FileSystem getFileSystem() { 398 return null; 399 } 400 401 @Override 402 public boolean isStopping() { 403 return false; 404 } 405 406 @Override 407 public Connection createConnection(Configuration conf) throws IOException { 408 return null; 409 } 410 } 411 412 static class FaultyZooKeeperWatcher extends ZKWatcher { 413 private RecoverableZooKeeper zk; 414 415 public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) 416 throws ZooKeeperConnectionException, IOException { 417 super(conf, identifier, abortable); 418 } 419 420 public void init() throws Exception { 421 this.zk = spy(super.getRecoverableZooKeeper()); 422 doThrow(new KeeperException.ConnectionLossException()) 423 .when(zk).getChildren("/hbase/replication/rs", null); 424 } 425 426 @Override 427 public RecoverableZooKeeper getRecoverableZooKeeper() { 428 return zk; 429 } 430 } 431}