001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.doThrow; 024import static org.mockito.Mockito.spy; 025 026import java.io.IOException; 027import java.net.URLEncoder; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Random; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Abortable; 037import org.apache.hadoop.hbase.ChoreService; 038import org.apache.hadoop.hbase.CoordinatedStateManager; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.Server; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.Waiter; 045import org.apache.hadoop.hbase.ZooKeeperConnectionException; 046import org.apache.hadoop.hbase.client.ClusterConnection; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.master.HMaster; 049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 051import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; 052import org.apache.hadoop.hbase.testclassification.MasterTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 055import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 056import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 057import org.apache.zookeeper.KeeperException; 058import org.junit.AfterClass; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 067 068@Category({MasterTests.class, MediumTests.class}) 069public class TestLogsCleaner { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestLogsCleaner.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); 076 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 077 078 private static DirScanPool POOL; 079 080 @BeforeClass 081 public static void setUpBeforeClass() throws Exception { 082 TEST_UTIL.startMiniZKCluster(); 083 TEST_UTIL.startMiniDFSCluster(1); 084 POOL = new DirScanPool(TEST_UTIL.getConfiguration()); 085 } 086 087 @AfterClass 088 public static void tearDownAfterClass() throws Exception { 089 TEST_UTIL.shutdownMiniZKCluster(); 090 TEST_UTIL.shutdownMiniDFSCluster(); 091 POOL.shutdownNow(); 092 } 093 094 /** 095 * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located 096 * in the same oldWALs directory. 097 * Created files: 098 * - 2 invalid files 099 * - 5 old Procedure WALs 100 * - 30 old WALs from which 3 are in replication 101 * - 5 recent Procedure WALs 102 * - 1 recent WAL 103 * - 1 very new WAL (timestamp in future) 104 * - masterProcedureWALs subdirectory 105 * Files which should stay: 106 * - 3 replication WALs 107 * - 2 new WALs 108 * - 5 latest Procedure WALs 109 * - masterProcedureWALs subdirectory 110 */ 111 @Test 112 public void testLogCleaning() throws Exception { 113 Configuration conf = TEST_UTIL.getConfiguration(); 114 // set TTLs 115 long ttlWAL = 2000; 116 long ttlProcedureWAL = 4000; 117 conf.setLong("hbase.master.logcleaner.ttl", ttlWAL); 118 conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL); 119 120 HMaster.decorateMasterConfiguration(conf); 121 Server server = new DummyServer(); 122 ReplicationQueueStorage queueStorage = 123 ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); 124 final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); 125 final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs"); 126 String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8"); 127 128 final FileSystem fs = FileSystem.get(conf); 129 130 long now = System.currentTimeMillis(); 131 fs.delete(oldLogDir, true); 132 fs.mkdirs(oldLogDir); 133 134 // Case 1: 2 invalid files, which would be deleted directly 135 fs.createNewFile(new Path(oldLogDir, "a")); 136 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a")); 137 138 // Case 2: 5 Procedure WALs that are old which would be deleted 139 for (int i = 1; i < 6; i++) { 140 Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i)); 141 fs.createNewFile(fileName); 142 } 143 144 // Sleep for sometime to get old procedure WALs 145 Thread.sleep(ttlProcedureWAL - ttlWAL); 146 147 // Case 3: old WALs which would be deletable 148 for (int i = 1; i < 31; i++) { 149 Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i)); 150 fs.createNewFile(fileName); 151 // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these 152 // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner 153 if (i % (30 / 3) == 1) { 154 queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName()); 155 LOG.info("Replication log file: " + fileName); 156 } 157 } 158 159 // Case 5: 5 Procedure WALs that are new, will stay 160 for (int i = 6; i < 11; i++) { 161 Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i)); 162 fs.createNewFile(fileName); 163 } 164 165 // Sleep for sometime to get newer modification time 166 Thread.sleep(ttlWAL); 167 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now)); 168 169 // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner, 170 // so we are not going down the chain 171 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + ttlWAL))); 172 173 for (FileStatus stat : fs.listStatus(oldLogDir)) { 174 LOG.info(stat.getPath().toString()); 175 } 176 177 // There should be 34 files and masterProcedureWALs directory 178 assertEquals(35, fs.listStatus(oldLogDir).length); 179 // 10 procedure WALs 180 assertEquals(10, fs.listStatus(oldProcedureWALDir).length); 181 182 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir, POOL); 183 cleaner.chore(); 184 185 // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which 186 // are scheduled for replication and masterProcedureWALs directory 187 TEST_UTIL.waitFor(1000, 188 (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(oldLogDir).length); 189 // In masterProcedureWALs we end up with 5 newer Procedure WALs 190 TEST_UTIL.waitFor(1000, 191 (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(oldProcedureWALDir).length); 192 193 for (FileStatus file : fs.listStatus(oldLogDir)) { 194 LOG.debug("Kept log file in oldWALs: " + file.getPath().getName()); 195 } 196 for (FileStatus file : fs.listStatus(oldProcedureWALDir)) { 197 LOG.debug("Kept log file in masterProcedureWALs: " + file.getPath().getName()); 198 } 199 } 200 201 /** 202 * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting. 203 */ 204 @Test 205 public void testZooKeeperAbort() throws Exception { 206 Configuration conf = TEST_UTIL.getConfiguration(); 207 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 208 209 List<FileStatus> dummyFiles = Lists.newArrayList( 210 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), 211 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) 212 ); 213 214 try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf, 215 "testZooKeeperAbort-faulty", null)) { 216 faultyZK.init(); 217 cleaner.setConf(conf, faultyZK); 218 cleaner.preClean(); 219 // should keep all files due to a ConnectionLossException getting the queues znodes 220 Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); 221 assertFalse(toDelete.iterator().hasNext()); 222 assertFalse(cleaner.isStopped()); 223 } 224 225 // when zk is working both files should be returned 226 cleaner = new ReplicationLogCleaner(); 227 try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null)) { 228 cleaner.setConf(conf, zkw); 229 cleaner.preClean(); 230 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 231 Iterator<FileStatus> iter = filesToDelete.iterator(); 232 assertTrue(iter.hasNext()); 233 assertEquals(new Path("log1"), iter.next().getPath()); 234 assertTrue(iter.hasNext()); 235 assertEquals(new Path("log2"), iter.next().getPath()); 236 assertFalse(iter.hasNext()); 237 } 238 } 239 240 /** 241 * When zk is working both files should be returned 242 * @throws Exception 243 */ 244 @Test 245 public void testZooKeeperNormal() throws Exception { 246 Configuration conf = TEST_UTIL.getConfiguration(); 247 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 248 249 List<FileStatus> dummyFiles = Lists.newArrayList( 250 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), 251 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) 252 ); 253 254 ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); 255 try { 256 cleaner.setConf(conf, zkw); 257 cleaner.preClean(); 258 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 259 Iterator<FileStatus> iter = filesToDelete.iterator(); 260 assertTrue(iter.hasNext()); 261 assertEquals(new Path("log1"), iter.next().getPath()); 262 assertTrue(iter.hasNext()); 263 assertEquals(new Path("log2"), iter.next().getPath()); 264 assertFalse(iter.hasNext()); 265 } finally { 266 zkw.close(); 267 } 268 } 269 270 @Test 271 public void testOnConfigurationChange() throws Exception { 272 Configuration conf = TEST_UTIL.getConfiguration(); 273 conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, 274 LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 275 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 276 LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 277 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, 278 LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); 279 // Prepare environments 280 Server server = new DummyServer(); 281 Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), 282 HConstants.HREGION_OLDLOGDIR_NAME); 283 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 284 LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir, POOL); 285 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners()); 286 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 287 cleaner.getCleanerThreadTimeoutMsec()); 288 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, 289 cleaner.getCleanerThreadCheckIntervalMsec()); 290 // Create dir and files for test 291 fs.delete(oldWALsDir, true); 292 fs.mkdirs(oldWALsDir); 293 int numOfFiles = 10; 294 createFiles(fs, oldWALsDir, numOfFiles); 295 FileStatus[] status = fs.listStatus(oldWALsDir); 296 assertEquals(numOfFiles, status.length); 297 // Start cleaner chore 298 Thread thread = new Thread(() -> cleaner.chore()); 299 thread.setDaemon(true); 300 thread.start(); 301 // change size of cleaners dynamically 302 int sizeToChange = 4; 303 long threadTimeoutToChange = 30 * 1000L; 304 long threadCheckIntervalToChange = 250L; 305 conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange); 306 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); 307 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, 308 threadCheckIntervalToChange); 309 cleaner.onConfigurationChange(conf); 310 assertEquals(sizeToChange, cleaner.getSizeOfCleaners()); 311 assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); 312 assertEquals(threadCheckIntervalToChange, cleaner.getCleanerThreadCheckIntervalMsec()); 313 // Stop chore 314 thread.join(); 315 status = fs.listStatus(oldWALsDir); 316 assertEquals(0, status.length); 317 } 318 319 private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { 320 Random random = new Random(); 321 for (int i = 0; i < numOfFiles; i++) { 322 int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M 323 try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { 324 for (int m = 0; m < xMega; m++) { 325 byte[] M = new byte[1024 * 1024]; 326 random.nextBytes(M); 327 fsdos.write(M); 328 } 329 } 330 } 331 } 332 333 static class DummyServer implements Server { 334 335 @Override 336 public Configuration getConfiguration() { 337 return TEST_UTIL.getConfiguration(); 338 } 339 340 @Override 341 public ZKWatcher getZooKeeper() { 342 try { 343 return new ZKWatcher(getConfiguration(), "dummy server", this); 344 } catch (IOException e) { 345 e.printStackTrace(); 346 } 347 return null; 348 } 349 350 @Override 351 public CoordinatedStateManager getCoordinatedStateManager() { 352 return null; 353 } 354 355 @Override 356 public ClusterConnection getConnection() { 357 return null; 358 } 359 360 @Override 361 public MetaTableLocator getMetaTableLocator() { 362 return null; 363 } 364 365 @Override 366 public ServerName getServerName() { 367 return ServerName.valueOf("regionserver,60020,000000"); 368 } 369 370 @Override 371 public void abort(String why, Throwable e) {} 372 373 @Override 374 public boolean isAborted() { 375 return false; 376 } 377 378 @Override 379 public void stop(String why) {} 380 381 @Override 382 public boolean isStopped() { 383 return false; 384 } 385 386 @Override 387 public ChoreService getChoreService() { 388 return null; 389 } 390 391 @Override 392 public ClusterConnection getClusterConnection() { 393 return null; 394 } 395 396 @Override 397 public FileSystem getFileSystem() { 398 return null; 399 } 400 401 @Override 402 public boolean isStopping() { 403 return false; 404 } 405 406 @Override 407 public Connection createConnection(Configuration conf) throws IOException { 408 return null; 409 } 410 } 411 412 static class FaultyZooKeeperWatcher extends ZKWatcher { 413 private RecoverableZooKeeper zk; 414 415 public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) 416 throws ZooKeeperConnectionException, IOException { 417 super(conf, identifier, abortable); 418 } 419 420 public void init() throws Exception { 421 this.zk = spy(super.getRecoverableZooKeeper()); 422 doThrow(new KeeperException.ConnectionLossException()) 423 .when(zk).getChildren("/hbase/replication/rs", null); 424 } 425 426 @Override 427 public RecoverableZooKeeper getRecoverableZooKeeper() { 428 return zk; 429 } 430 } 431}