001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.doThrow; 024import static org.mockito.Mockito.spy; 025import static org.mockito.Mockito.doAnswer; 026 027import java.io.IOException; 028import java.lang.reflect.Field; 029import java.net.URLEncoder; 030import java.util.Iterator; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Random; 034import java.util.concurrent.atomic.AtomicBoolean; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Abortable; 042import org.apache.hadoop.hbase.ChoreService; 043import org.apache.hadoop.hbase.CoordinatedStateManager; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseTestingUtility; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.Server; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.Waiter; 050import org.apache.hadoop.hbase.ZooKeeperConnectionException; 051import org.apache.hadoop.hbase.client.ClusterConnection; 052import org.apache.hadoop.hbase.client.Connection; 053import org.apache.hadoop.hbase.master.HMaster; 054import org.apache.hadoop.hbase.replication.ReplicationFactory; 055import org.apache.hadoop.hbase.replication.ReplicationQueues; 056import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; 057import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; 058import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; 059import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; 060import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; 061import org.apache.hadoop.hbase.testclassification.MasterTests; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 064import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 065import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 066import org.apache.zookeeper.KeeperException; 067import org.junit.AfterClass; 068import org.junit.BeforeClass; 069import org.junit.ClassRule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.mockito.Mockito; 073import org.mockito.invocation.InvocationOnMock; 074import org.mockito.stubbing.Answer; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 079 080@Category({MasterTests.class, MediumTests.class}) 081public class TestLogsCleaner { 082 083 @ClassRule 084 public static final HBaseClassTestRule CLASS_RULE = 085 HBaseClassTestRule.forClass(TestLogsCleaner.class); 086 087 private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); 088 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 089 090 @BeforeClass 091 public static void setUpBeforeClass() throws Exception { 092 TEST_UTIL.startMiniZKCluster(); 093 TEST_UTIL.startMiniDFSCluster(1); 094 } 095 096 @AfterClass 097 public static void tearDownAfterClass() throws Exception { 098 TEST_UTIL.shutdownMiniZKCluster(); 099 TEST_UTIL.shutdownMiniDFSCluster(); 100 } 101 102 /** 103 * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located 104 * in the same oldWALs directory. 105 * Created files: 106 * - 2 invalid files 107 * - 5 old Procedure WALs 108 * - 30 old WALs from which 3 are in replication 109 * - 5 recent Procedure WALs 110 * - 1 recent WAL 111 * - 1 very new WAL (timestamp in future) 112 * - masterProcedureWALs subdirectory 113 * Files which should stay: 114 * - 3 replication WALs 115 * - 2 new WALs 116 * - 5 latest Procedure WALs 117 * - masterProcedureWALs subdirectory 118 */ 119 @Test 120 public void testLogCleaning() throws Exception { 121 Configuration conf = TEST_UTIL.getConfiguration(); 122 // set TTLs 123 long ttlWAL = 2000; 124 long ttlProcedureWAL = 4000; 125 conf.setLong("hbase.master.logcleaner.ttl", ttlWAL); 126 conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL); 127 128 HMaster.decorateMasterConfiguration(conf); 129 Server server = new DummyServer(); 130 ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues( 131 new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); 132 repQueues.init(server.getServerName().toString()); 133 final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); 134 final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs"); 135 String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8"); 136 137 final FileSystem fs = FileSystem.get(conf); 138 139 long now = System.currentTimeMillis(); 140 fs.delete(oldLogDir, true); 141 fs.mkdirs(oldLogDir); 142 143 // Case 1: 2 invalid files, which would be deleted directly 144 fs.createNewFile(new Path(oldLogDir, "a")); 145 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a")); 146 147 // Case 2: 5 Procedure WALs that are old which would be deleted 148 for (int i = 1; i < 6; i++) { 149 Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i)); 150 fs.createNewFile(fileName); 151 } 152 153 // Sleep for sometime to get old procedure WALs 154 Thread.sleep(ttlProcedureWAL - ttlWAL); 155 156 // Case 3: old WALs which would be deletable 157 for (int i = 1; i < 31; i++) { 158 Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i)); 159 fs.createNewFile(fileName); 160 // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these 161 // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner 162 if (i % (30 / 3) == 1) { 163 repQueues.addLog(fakeMachineName, fileName.getName()); 164 LOG.info("Replication log file: " + fileName); 165 } 166 } 167 168 // Case 5: 5 Procedure WALs that are new, will stay 169 for (int i = 6; i < 11; i++) { 170 Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i)); 171 fs.createNewFile(fileName); 172 } 173 174 // Sleep for sometime to get newer modification time 175 Thread.sleep(ttlWAL); 176 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now)); 177 178 // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner, 179 // so we are not going down the chain 180 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + ttlWAL))); 181 182 for (FileStatus stat : fs.listStatus(oldLogDir)) { 183 LOG.info(stat.getPath().toString()); 184 } 185 186 // There should be 34 files and masterProcedureWALs directory 187 assertEquals(35, fs.listStatus(oldLogDir).length); 188 // 10 procedure WALs 189 assertEquals(10, fs.listStatus(oldProcedureWALDir).length); 190 191 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); 192 cleaner.chore(); 193 194 // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which 195 // are scheduled for replication and masterProcedureWALs directory 196 TEST_UTIL.waitFor(1000, 197 (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(oldLogDir).length); 198 // In masterProcedureWALs we end up with 5 newer Procedure WALs 199 TEST_UTIL.waitFor(1000, 200 (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(oldProcedureWALDir).length); 201 202 for (FileStatus file : fs.listStatus(oldLogDir)) { 203 LOG.debug("Kept log file in oldWALs: " + file.getPath().getName()); 204 } 205 for (FileStatus file : fs.listStatus(oldProcedureWALDir)) { 206 LOG.debug("Kept log file in masterProcedureWALs: " + file.getPath().getName()); 207 } 208 } 209 210 @Test 211 public void testZnodeCversionChange() throws Exception { 212 Configuration conf = TEST_UTIL.getConfiguration(); 213 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 214 cleaner.setConf(conf); 215 216 ReplicationQueuesClientZKImpl rqcMock = Mockito.mock(ReplicationQueuesClientZKImpl.class); 217 Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4); 218 219 Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues"); 220 rqc.setAccessible(true); 221 222 rqc.set(cleaner, rqcMock); 223 224 // This should return eventually when cversion stabilizes 225 cleaner.getDeletableFiles(new LinkedList<>()); 226 } 227 228 @Test(timeout=10000) 229 public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception { 230 Configuration conf = TEST_UTIL.getConfiguration(); 231 232 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 233 234 List<FileStatus> dummyFiles = Lists.newArrayList( 235 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), 236 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) 237 ); 238 239 FaultyZooKeeperWatcher faultyZK = 240 new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); 241 final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false); 242 243 try { 244 faultyZK.init(); 245 ReplicationQueuesClient replicationQueuesClient = spy(ReplicationFactory.getReplicationQueuesClient( 246 new ReplicationQueuesClientArguments(conf, new ReplicationLogCleaner.WarnOnlyAbortable(), faultyZK))); 247 doAnswer(new Answer<Object>() { 248 @Override 249 public Object answer(InvocationOnMock invocation) throws Throwable { 250 try { 251 return invocation.callRealMethod(); 252 } catch (KeeperException.ConnectionLossException e) { 253 getListOfReplicatorsFailed.set(true); 254 throw e; 255 } 256 } 257 }).when(replicationQueuesClient).getListOfReplicators(); 258 replicationQueuesClient.init(); 259 260 cleaner.setConf(conf, faultyZK, replicationQueuesClient); 261 // should keep all files due to a ConnectionLossException getting the queues znodes 262 cleaner.preClean(); 263 Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); 264 265 assertTrue(getListOfReplicatorsFailed.get()); 266 assertFalse(toDelete.iterator().hasNext()); 267 assertFalse(cleaner.isStopped()); 268 } finally { 269 faultyZK.close(); 270 } 271 } 272 273 /** 274 * When zk is working both files should be returned 275 * @throws Exception 276 */ 277 @Test(timeout=10000) 278 public void testZooKeeperNormal() throws Exception { 279 Configuration conf = TEST_UTIL.getConfiguration(); 280 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 281 282 List<FileStatus> dummyFiles = Lists.newArrayList( 283 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), 284 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) 285 ); 286 287 ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); 288 try { 289 cleaner.setConf(conf, zkw); 290 cleaner.preClean(); 291 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 292 Iterator<FileStatus> iter = filesToDelete.iterator(); 293 assertTrue(iter.hasNext()); 294 assertEquals(new Path("log1"), iter.next().getPath()); 295 assertTrue(iter.hasNext()); 296 assertEquals(new Path("log2"), iter.next().getPath()); 297 assertFalse(iter.hasNext()); 298 } finally { 299 zkw.close(); 300 } 301 } 302 303 @Test 304 public void testOnConfigurationChange() throws Exception { 305 Configuration conf = TEST_UTIL.getConfiguration(); 306 conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, 307 LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 308 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 309 LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 310 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, 311 LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); 312 // Prepare environments 313 Server server = new DummyServer(); 314 Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), 315 HConstants.HREGION_OLDLOGDIR_NAME); 316 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 317 LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); 318 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners()); 319 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 320 cleaner.getCleanerThreadTimeoutMsec()); 321 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, 322 cleaner.getCleanerThreadCheckIntervalMsec()); 323 // Create dir and files for test 324 fs.delete(oldWALsDir, true); 325 fs.mkdirs(oldWALsDir); 326 int numOfFiles = 10; 327 createFiles(fs, oldWALsDir, numOfFiles); 328 FileStatus[] status = fs.listStatus(oldWALsDir); 329 assertEquals(numOfFiles, status.length); 330 // Start cleaner chore 331 Thread thread = new Thread(() -> cleaner.chore()); 332 thread.setDaemon(true); 333 thread.start(); 334 // change size of cleaners dynamically 335 int sizeToChange = 4; 336 long threadTimeoutToChange = 30 * 1000L; 337 long threadCheckIntervalToChange = 250L; 338 conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange); 339 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); 340 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, 341 threadCheckIntervalToChange); 342 cleaner.onConfigurationChange(conf); 343 assertEquals(sizeToChange, cleaner.getSizeOfCleaners()); 344 assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); 345 assertEquals(threadCheckIntervalToChange, cleaner.getCleanerThreadCheckIntervalMsec()); 346 // Stop chore 347 thread.join(); 348 status = fs.listStatus(oldWALsDir); 349 assertEquals(0, status.length); 350 } 351 352 private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { 353 Random random = new Random(); 354 for (int i = 0; i < numOfFiles; i++) { 355 int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M 356 try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { 357 for (int m = 0; m < xMega; m++) { 358 byte[] M = new byte[1024 * 1024]; 359 random.nextBytes(M); 360 fsdos.write(M); 361 } 362 } 363 } 364 } 365 366 static class DummyServer implements Server { 367 368 @Override 369 public Configuration getConfiguration() { 370 return TEST_UTIL.getConfiguration(); 371 } 372 373 @Override 374 public ZKWatcher getZooKeeper() { 375 try { 376 return new ZKWatcher(getConfiguration(), "dummy server", this); 377 } catch (IOException e) { 378 e.printStackTrace(); 379 } 380 return null; 381 } 382 383 @Override 384 public CoordinatedStateManager getCoordinatedStateManager() { 385 return null; 386 } 387 388 @Override 389 public ClusterConnection getConnection() { 390 return null; 391 } 392 393 @Override 394 public MetaTableLocator getMetaTableLocator() { 395 return null; 396 } 397 398 @Override 399 public ServerName getServerName() { 400 return ServerName.valueOf("regionserver,60020,000000"); 401 } 402 403 @Override 404 public void abort(String why, Throwable e) {} 405 406 @Override 407 public boolean isAborted() { 408 return false; 409 } 410 411 @Override 412 public void stop(String why) {} 413 414 @Override 415 public boolean isStopped() { 416 return false; 417 } 418 419 @Override 420 public ChoreService getChoreService() { 421 return null; 422 } 423 424 @Override 425 public ClusterConnection getClusterConnection() { 426 return null; 427 } 428 429 @Override 430 public FileSystem getFileSystem() { 431 return null; 432 } 433 434 @Override 435 public boolean isStopping() { 436 return false; 437 } 438 439 @Override 440 public Connection createConnection(Configuration conf) throws IOException { 441 return null; 442 } 443 } 444 445 static class FaultyZooKeeperWatcher extends ZKWatcher { 446 private RecoverableZooKeeper zk; 447 448 public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) 449 throws ZooKeeperConnectionException, IOException { 450 super(conf, identifier, abortable); 451 } 452 453 public void init() throws Exception { 454 this.zk = spy(super.getRecoverableZooKeeper()); 455 doThrow(new KeeperException.ConnectionLossException()) 456 .when(zk).getChildren("/hbase/replication/rs", null); 457 } 458 459 @Override 460 public RecoverableZooKeeper getRecoverableZooKeeper() { 461 return zk; 462 } 463 } 464}