001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.doAnswer; 024import static org.mockito.Mockito.doThrow; 025import static org.mockito.Mockito.spy; 026 027import java.io.IOException; 028import java.net.URLEncoder; 029import java.nio.charset.StandardCharsets; 030import java.util.Arrays; 031import java.util.Iterator; 032import java.util.List; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.atomic.AtomicBoolean; 035import org.apache.commons.io.FileUtils; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Abortable; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.Waiter; 047import org.apache.hadoop.hbase.ZooKeeperConnectionException; 048import org.apache.hadoop.hbase.master.HMaster; 049import org.apache.hadoop.hbase.replication.ReplicationException; 050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 052import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; 053import org.apache.hadoop.hbase.testclassification.MasterTests; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.MockServer; 058import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 059import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 060import org.apache.zookeeper.KeeperException; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.mockito.invocation.InvocationOnMock; 068import org.mockito.stubbing.Answer; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072@Category({ MasterTests.class, MediumTests.class }) 073public class TestLogsCleaner { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestLogsCleaner.class); 078 079 private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); 080 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 081 082 private static final Path OLD_WALS_DIR = 083 new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); 084 085 private static final Path OLD_PROCEDURE_WALS_DIR = new Path(OLD_WALS_DIR, "masterProcedureWALs"); 086 087 private static Configuration conf; 088 089 private static DirScanPool POOL; 090 091 @BeforeClass 092 public static void setUpBeforeClass() throws Exception { 093 TEST_UTIL.startMiniZKCluster(); 094 TEST_UTIL.startMiniDFSCluster(1); 095 POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration()); 096 } 097 098 @AfterClass 099 public static void tearDownAfterClass() throws Exception { 100 TEST_UTIL.shutdownMiniZKCluster(); 101 TEST_UTIL.shutdownMiniDFSCluster(); 102 POOL.shutdownNow(); 103 } 104 105 @Before 106 public void beforeTest() throws IOException { 107 conf = TEST_UTIL.getConfiguration(); 108 109 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 110 111 fs.delete(OLD_WALS_DIR, true); 112 113 // root directory 114 fs.mkdirs(OLD_WALS_DIR); 115 } 116 117 /** 118 * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same 119 * oldWALs directory. Created files: - 2 invalid files - 5 old Procedure WALs - 30 old WALs from 120 * which 3 are in replication - 5 recent Procedure WALs - 1 recent WAL - 1 very new WAL (timestamp 121 * in future) - masterProcedureWALs subdirectory Files which should stay: - 3 replication WALs - 2 122 * new WALs - 5 latest Procedure WALs - masterProcedureWALs subdirectory 123 */ 124 @Test 125 public void testLogCleaning() throws Exception { 126 // set TTLs 127 long ttlWAL = 2000; 128 long ttlProcedureWAL = 4000; 129 conf.setLong("hbase.master.logcleaner.ttl", ttlWAL); 130 conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL); 131 132 HMaster.decorateMasterConfiguration(conf); 133 Server server = new DummyServer(); 134 ReplicationQueueStorage queueStorage = 135 ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); 136 137 String fakeMachineName = 138 URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name()); 139 140 final FileSystem fs = FileSystem.get(conf); 141 fs.mkdirs(OLD_PROCEDURE_WALS_DIR); 142 143 final long now = EnvironmentEdgeManager.currentTime(); 144 145 // Case 1: 2 invalid files, which would be deleted directly 146 fs.createNewFile(new Path(OLD_WALS_DIR, "a")); 147 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a")); 148 149 // Case 2: 5 Procedure WALs that are old which would be deleted 150 for (int i = 1; i <= 5; i++) { 151 final Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 152 fs.createNewFile(fileName); 153 } 154 155 // Sleep for sometime to get old procedure WALs 156 Thread.sleep(ttlProcedureWAL - ttlWAL); 157 158 // Case 3: old WALs which would be deletable 159 for (int i = 1; i <= 30; i++) { 160 Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i)); 161 fs.createNewFile(fileName); 162 // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these 163 // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner 164 if (i % (30 / 3) == 0) { 165 queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName()); 166 LOG.info("Replication log file: " + fileName); 167 } 168 } 169 170 // Case 5: 5 Procedure WALs that are new, will stay 171 for (int i = 6; i <= 10; i++) { 172 Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 173 fs.createNewFile(fileName); 174 } 175 176 // Sleep for sometime to get newer modification time 177 Thread.sleep(ttlWAL); 178 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now)); 179 180 // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner, 181 // so we are not going down the chain 182 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL))); 183 184 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 185 LOG.info("File status: {}", Arrays.toString(status)); 186 187 // There should be 34 files and 1 masterProcedureWALs directory 188 assertEquals(35, fs.listStatus(OLD_WALS_DIR).length); 189 // 10 procedure WALs 190 assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 191 192 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null); 193 cleaner.chore(); 194 195 // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which 196 // are scheduled for replication and masterProcedureWALs directory 197 TEST_UTIL.waitFor(1000, 198 (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(OLD_WALS_DIR).length); 199 // In masterProcedureWALs we end up with 5 newer Procedure WALs 200 TEST_UTIL.waitFor(1000, 201 (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 202 203 if (LOG.isDebugEnabled()) { 204 FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR); 205 FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR); 206 LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs)); 207 LOG.debug("Kept log file for masterProcedureWALs: {}", Arrays.toString(statusProcedureWALs)); 208 } 209 } 210 211 @Test 212 public void testZooKeeperRecoveryDuringGetListOfReplicators() throws Exception { 213 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 214 215 List<FileStatus> dummyFiles = Arrays.asList( 216 new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log1")), 217 new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log2"))); 218 219 FaultyZooKeeperWatcher faultyZK = 220 new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); 221 final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false); 222 223 try { 224 faultyZK.init(false); 225 ReplicationQueueStorage queueStorage = 226 spy(ReplicationStorageFactory.getReplicationQueueStorage(faultyZK, conf)); 227 doAnswer(new Answer<Object>() { 228 @Override 229 public Object answer(InvocationOnMock invocation) throws Throwable { 230 try { 231 return invocation.callRealMethod(); 232 } catch (ReplicationException e) { 233 LOG.debug("Caught Exception", e); 234 getListOfReplicatorsFailed.set(true); 235 throw e; 236 } 237 } 238 }).when(queueStorage).getAllWALs(); 239 240 cleaner.setConf(conf, faultyZK, queueStorage); 241 // should keep all files due to a ConnectionLossException getting the queues znodes 242 cleaner.preClean(); 243 Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); 244 245 assertTrue(getListOfReplicatorsFailed.get()); 246 assertFalse(toDelete.iterator().hasNext()); 247 assertFalse(cleaner.isStopped()); 248 249 // zk recovery. 250 faultyZK.init(true); 251 cleaner.preClean(); 252 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 253 Iterator<FileStatus> iter = filesToDelete.iterator(); 254 assertTrue(iter.hasNext()); 255 assertEquals(new Path("log1"), iter.next().getPath()); 256 assertTrue(iter.hasNext()); 257 assertEquals(new Path("log2"), iter.next().getPath()); 258 assertFalse(iter.hasNext()); 259 260 } finally { 261 faultyZK.close(); 262 } 263 } 264 265 /** 266 * When zk is working both files should be returned 267 * @throws Exception from ZK watcher 268 */ 269 @Test 270 public void testZooKeeperNormal() throws Exception { 271 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 272 273 // Subtract 1000 from current time so modtime is for sure older 274 // than 'now'. 275 long modTime = EnvironmentEdgeManager.currentTime() - 1000; 276 List<FileStatus> dummyFiles = 277 Arrays.asList(new FileStatus(100, false, 3, 100, modTime, new Path("log1")), 278 new FileStatus(100, false, 3, 100, modTime, new Path("log2"))); 279 280 ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); 281 try { 282 cleaner.setConf(conf, zkw); 283 cleaner.preClean(); 284 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 285 Iterator<FileStatus> iter = filesToDelete.iterator(); 286 assertTrue(iter.hasNext()); 287 assertEquals(new Path("log1"), iter.next().getPath()); 288 assertTrue(iter.hasNext()); 289 assertEquals(new Path("log2"), iter.next().getPath()); 290 assertFalse(iter.hasNext()); 291 } finally { 292 zkw.close(); 293 } 294 } 295 296 @Test 297 public void testOnConfigurationChange() throws Exception { 298 // Prepare environments 299 Server server = new DummyServer(); 300 301 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 302 LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null); 303 int size = cleaner.getSizeOfCleaners(); 304 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 305 cleaner.getCleanerThreadTimeoutMsec()); 306 // Create dir and files for test 307 int numOfFiles = 10; 308 createFiles(fs, OLD_WALS_DIR, numOfFiles); 309 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 310 assertEquals(numOfFiles, status.length); 311 // Start cleaner chore 312 Thread thread = new Thread(() -> cleaner.chore()); 313 thread.setDaemon(true); 314 thread.start(); 315 // change size of cleaners dynamically 316 int sizeToChange = 4; 317 long threadTimeoutToChange = 30 * 1000L; 318 conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange); 319 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); 320 cleaner.onConfigurationChange(conf); 321 assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners()); 322 assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); 323 // Stop chore 324 thread.join(); 325 status = fs.listStatus(OLD_WALS_DIR); 326 assertEquals(0, status.length); 327 } 328 329 private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { 330 for (int i = 0; i < numOfFiles; i++) { 331 // size of each file is 1M, 2M, or 3M 332 int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4); 333 byte[] M = new byte[Math.toIntExact(FileUtils.ONE_MB * xMega)]; 334 Bytes.random(M); 335 try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { 336 fsdos.write(M); 337 } 338 } 339 } 340 341 static class DummyServer extends MockServer { 342 343 @Override 344 public Configuration getConfiguration() { 345 return TEST_UTIL.getConfiguration(); 346 } 347 348 @Override 349 public ZKWatcher getZooKeeper() { 350 try { 351 return new ZKWatcher(getConfiguration(), "dummy server", this); 352 } catch (IOException e) { 353 e.printStackTrace(); 354 } 355 return null; 356 } 357 } 358 359 static class FaultyZooKeeperWatcher extends ZKWatcher { 360 private RecoverableZooKeeper zk; 361 362 public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) 363 throws ZooKeeperConnectionException, IOException { 364 super(conf, identifier, abortable); 365 } 366 367 public void init(boolean autoRecovery) throws Exception { 368 this.zk = spy(super.getRecoverableZooKeeper()); 369 if (!autoRecovery) { 370 doThrow(new KeeperException.ConnectionLossException()).when(zk) 371 .getChildren("/hbase/replication/rs", null); 372 } 373 } 374 375 @Override 376 public RecoverableZooKeeper getRecoverableZooKeeper() { 377 return zk; 378 } 379 } 380}