001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.containsString; 022import static org.hamcrest.Matchers.instanceOf; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertFalse; 025import static org.junit.jupiter.api.Assertions.assertThrows; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027 028import java.io.IOException; 029import java.util.Map; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.atomic.AtomicInteger; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.FilterFileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.ChoreService; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.Stoppable; 044import org.apache.hadoop.hbase.testclassification.MasterTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.util.StoppableImplementation; 049import org.apache.hadoop.hbase.util.Threads; 050import org.junit.jupiter.api.AfterAll; 051import org.junit.jupiter.api.BeforeAll; 052import org.junit.jupiter.api.Tag; 053import org.junit.jupiter.api.Test; 054import org.mockito.Mockito; 055import org.mockito.invocation.InvocationOnMock; 056import org.mockito.stubbing.Answer; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 061 062@Tag(MasterTests.TAG) 063@Tag(SmallTests.TAG) 064public class TestCleanerChore { 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class); 067 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 068 private static DirScanPool POOL; 069 private static ChoreService SERVICE; 070 071 @BeforeAll 072 public static void setup() { 073 POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); 074 SERVICE = new ChoreService("cleaner", 2, true); 075 } 076 077 @AfterAll 078 public static void cleanup() throws Exception { 079 SERVICE.shutdown(); 080 UTIL.cleanupTestDir(); 081 POOL.shutdownNow(); 082 } 083 084 @Test 085 public void testSavesFilesOnRequest() throws Exception { 086 Stoppable stop = new StoppableImplementation(); 087 Configuration conf = UTIL.getConfiguration(); 088 Path testDir = UTIL.getDataTestDir(); 089 FileSystem fs = UTIL.getTestFileSystem(); 090 String confKey = "hbase.test.cleaner.delegates"; 091 conf.set(confKey, NeverDelete.class.getName()); 092 093 AllValidPaths chore = 094 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 095 096 // create the directory layout in the directory to clean 097 Path parent = new Path(testDir, "parent"); 098 Path file = new Path(parent, "someFile"); 099 fs.mkdirs(parent); 100 // touch a new file 101 fs.create(file).close(); 102 assertTrue(fs.exists(file), "Test file didn't get created."); 103 104 // run the chore 105 chore.chore(); 106 107 // verify all the files were preserved 108 assertTrue(fs.exists(file), "File shouldn't have been deleted"); 109 assertTrue(fs.exists(parent), "directory shouldn't have been deleted"); 110 } 111 112 @Test 113 public void retriesIOExceptionInStatus() throws Exception { 114 Stoppable stop = new StoppableImplementation(); 115 Configuration conf = UTIL.getConfiguration(); 116 Path testDir = UTIL.getDataTestDir(); 117 FileSystem fs = UTIL.getTestFileSystem(); 118 String confKey = "hbase.test.cleaner.delegates"; 119 120 Path child = new Path(testDir, "child"); 121 Path file = new Path(child, "file"); 122 fs.mkdirs(child); 123 fs.create(file).close(); 124 assertTrue(fs.exists(file), "test file didn't get created."); 125 final AtomicBoolean fails = new AtomicBoolean(true); 126 FilterFileSystem filtered = new FilterFileSystem(fs) { 127 public FileStatus[] listStatus(Path f) throws IOException { 128 if (fails.get()) { 129 throw new IOException("whomp whomp."); 130 } 131 return fs.listStatus(f); 132 } 133 }; 134 135 AllValidPaths chore = 136 new AllValidPaths("test-retry-ioe", stop, conf, filtered, testDir, confKey, POOL); 137 SERVICE.scheduleChore(chore); 138 try { 139 // trouble talking to the filesystem 140 // and verify that it accurately reported the failure. 141 CompletableFuture<Boolean> errorFuture = chore.triggerCleanerNow(); 142 ExecutionException e = assertThrows(ExecutionException.class, () -> errorFuture.get()); 143 assertThat(e.getCause(), instanceOf(IOException.class)); 144 assertThat(e.getCause().getMessage(), containsString("whomp")); 145 146 // verify that it couldn't clean the files. 147 assertTrue(fs.exists(file), "test rig failed to inject failure."); 148 assertTrue(fs.exists(child), "test rig failed to inject failure."); 149 150 // filesystem is back 151 fails.set(false); 152 for (;;) { 153 CompletableFuture<Boolean> succFuture = chore.triggerCleanerNow(); 154 // the reset of the future is async, so it is possible that we get the previous future 155 // again. 156 if (succFuture != errorFuture) { 157 // verify that it accurately reported success. 158 assertTrue(succFuture.get(), "chore should claim it succeeded."); 159 break; 160 } 161 } 162 // verify everything is gone. 163 assertFalse(fs.exists(file), "file should have been destroyed."); 164 assertFalse(fs.exists(child), "directory should have been destroyed."); 165 166 } finally { 167 chore.cancel(); 168 } 169 } 170 171 @Test 172 public void testDeletesEmptyDirectories() throws Exception { 173 Stoppable stop = new StoppableImplementation(); 174 Configuration conf = UTIL.getConfiguration(); 175 Path testDir = UTIL.getDataTestDir(); 176 FileSystem fs = UTIL.getTestFileSystem(); 177 String confKey = "hbase.test.cleaner.delegates"; 178 conf.set(confKey, AlwaysDelete.class.getName()); 179 180 AllValidPaths chore = 181 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 182 183 // create the directory layout in the directory to clean 184 Path parent = new Path(testDir, "parent"); 185 Path child = new Path(parent, "child"); 186 Path emptyChild = new Path(parent, "emptyChild"); 187 Path file = new Path(child, "someFile"); 188 fs.mkdirs(child); 189 fs.mkdirs(emptyChild); 190 // touch a new file 191 fs.create(file).close(); 192 // also create a file in the top level directory 193 Path topFile = new Path(testDir, "topFile"); 194 fs.create(topFile).close(); 195 assertTrue(fs.exists(file), "Test file didn't get created."); 196 assertTrue(fs.exists(topFile), "Test file didn't get created."); 197 198 // run the chore 199 chore.chore(); 200 201 // verify all the files got deleted 202 assertFalse(fs.exists(topFile), "File didn't get deleted"); 203 assertFalse(fs.exists(file), "File didn't get deleted"); 204 assertFalse(fs.exists(child), "Empty directory didn't get deleted"); 205 assertFalse(fs.exists(parent), "Empty directory didn't get deleted"); 206 } 207 208 /** 209 * Test to make sure that we don't attempt to ask the delegate whether or not we should preserve a 210 * directory. 211 * @throws Exception on failure 212 */ 213 @Test 214 public void testDoesNotCheckDirectories() throws Exception { 215 Stoppable stop = new StoppableImplementation(); 216 Configuration conf = UTIL.getConfiguration(); 217 Path testDir = UTIL.getDataTestDir(); 218 FileSystem fs = UTIL.getTestFileSystem(); 219 String confKey = "hbase.test.cleaner.delegates"; 220 conf.set(confKey, AlwaysDelete.class.getName()); 221 222 AllValidPaths chore = 223 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 224 // spy on the delegate to ensure that we don't check for directories 225 AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); 226 AlwaysDelete spy = Mockito.spy(delegate); 227 chore.cleanersChain.set(0, spy); 228 229 // create the directory layout in the directory to clean 230 Path parent = new Path(testDir, "parent"); 231 Path file = new Path(parent, "someFile"); 232 fs.mkdirs(parent); 233 assertTrue(fs.exists(parent), "Test parent didn't get created."); 234 // touch a new file 235 fs.create(file).close(); 236 assertTrue(fs.exists(file), "Test file didn't get created."); 237 238 FileStatus fStat = fs.getFileStatus(parent); 239 chore.chore(); 240 // make sure we never checked the directory 241 Mockito.verify(spy, Mockito.never()).isFileDeletable(fStat); 242 Mockito.reset(spy); 243 } 244 245 @Test 246 public void testStoppedCleanerDoesNotDeleteFiles() throws Exception { 247 Stoppable stop = new StoppableImplementation(); 248 Configuration conf = UTIL.getConfiguration(); 249 Path testDir = UTIL.getDataTestDir(); 250 FileSystem fs = UTIL.getTestFileSystem(); 251 String confKey = "hbase.test.cleaner.delegates"; 252 conf.set(confKey, AlwaysDelete.class.getName()); 253 254 AllValidPaths chore = 255 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 256 257 // also create a file in the top level directory 258 Path topFile = new Path(testDir, "topFile"); 259 fs.create(topFile).close(); 260 assertTrue(fs.exists(topFile), "Test file didn't get created."); 261 262 // stop the chore 263 stop.stop("testing stop"); 264 265 // run the chore 266 chore.chore(); 267 268 // test that the file still exists 269 assertTrue(fs.exists(topFile), "File got deleted while chore was stopped"); 270 } 271 272 /** 273 * While cleaning a directory, all the files in the directory may be deleted, but there may be 274 * another file added, in which case the directory shouldn't be deleted. 275 * @throws IOException on failure 276 */ 277 @Test 278 public void testCleanerDoesNotDeleteDirectoryWithLateAddedFiles() throws IOException { 279 Stoppable stop = new StoppableImplementation(); 280 Configuration conf = UTIL.getConfiguration(); 281 final Path testDir = UTIL.getDataTestDir(); 282 final FileSystem fs = UTIL.getTestFileSystem(); 283 String confKey = "hbase.test.cleaner.delegates"; 284 conf.set(confKey, AlwaysDelete.class.getName()); 285 286 AllValidPaths chore = 287 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 288 // spy on the delegate to ensure that we don't check for directories 289 AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); 290 AlwaysDelete spy = Mockito.spy(delegate); 291 chore.cleanersChain.set(0, spy); 292 293 // create the directory layout in the directory to clean 294 final Path parent = new Path(testDir, "parent"); 295 Path file = new Path(parent, "someFile"); 296 fs.mkdirs(parent); 297 // touch a new file 298 fs.create(file).close(); 299 assertTrue(fs.exists(file), "Test file didn't get created."); 300 final Path addedFile = new Path(parent, "addedFile"); 301 302 // when we attempt to delete the original file, add another file in the same directory 303 Mockito.doAnswer(new Answer<Boolean>() { 304 @Override 305 public Boolean answer(InvocationOnMock invocation) throws Throwable { 306 fs.create(addedFile).close(); 307 CommonFSUtils.logFileSystemState(fs, testDir, LOG); 308 return (Boolean) invocation.callRealMethod(); 309 } 310 }).when(spy).isFileDeletable(Mockito.any()); 311 312 // run the chore 313 chore.chore(); 314 315 // make sure all the directories + added file exist, but the original file is deleted 316 assertTrue(fs.exists(addedFile), "Added file unexpectedly deleted"); 317 assertTrue(fs.exists(parent), "Parent directory deleted unexpectedly"); 318 assertFalse(fs.exists(file), "Original file unexpectedly retained"); 319 Mockito.verify(spy, Mockito.times(1)).isFileDeletable(Mockito.any()); 320 Mockito.reset(spy); 321 } 322 323 /** 324 * The cleaner runs in a loop, where it first checks to see all the files under a directory can be 325 * deleted. If they all can, then we try to delete the directory. However, a file may be added 326 * that directory to after the original check. This ensures that we don't accidentally delete that 327 * directory on and don't get spurious IOExceptions. 328 * <p> 329 * This was from HBASE-7465. 330 * @throws Exception on failure 331 */ 332 @Test 333 public void testNoExceptionFromDirectoryWithRacyChildren() throws Exception { 334 UTIL.cleanupTestDir(); 335 Stoppable stop = new StoppableImplementation(); 336 // need to use a localutil to not break the rest of the test that runs on the local FS, which 337 // gets hosed when we start to use a minicluster. 338 HBaseTestingUtil localUtil = new HBaseTestingUtil(); 339 Configuration conf = localUtil.getConfiguration(); 340 final Path testDir = UTIL.getDataTestDir(); 341 final FileSystem fs = UTIL.getTestFileSystem(); 342 LOG.debug("Writing test data to: " + testDir); 343 String confKey = "hbase.test.cleaner.delegates"; 344 conf.set(confKey, AlwaysDelete.class.getName()); 345 346 AllValidPaths chore = 347 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 348 // spy on the delegate to ensure that we don't check for directories 349 AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); 350 AlwaysDelete spy = Mockito.spy(delegate); 351 chore.cleanersChain.set(0, spy); 352 353 // create the directory layout in the directory to clean 354 final Path parent = new Path(testDir, "parent"); 355 Path file = new Path(parent, "someFile"); 356 fs.mkdirs(parent); 357 // touch a new file 358 fs.create(file).close(); 359 assertTrue(fs.exists(file), "Test file didn't get created."); 360 final Path racyFile = new Path(parent, "addedFile"); 361 362 // when we attempt to delete the original file, add another file in the same directory 363 Mockito.doAnswer(new Answer<Boolean>() { 364 @Override 365 public Boolean answer(InvocationOnMock invocation) throws Throwable { 366 fs.create(racyFile).close(); 367 CommonFSUtils.logFileSystemState(fs, testDir, LOG); 368 return (Boolean) invocation.callRealMethod(); 369 } 370 }).when(spy).isFileDeletable(Mockito.any()); 371 372 // run the chore 373 chore.chore(); 374 375 // make sure all the directories + added file exist, but the original file is deleted 376 assertTrue(fs.exists(racyFile), "Added file unexpectedly deleted"); 377 assertTrue(fs.exists(parent), "Parent directory deleted unexpectedly"); 378 assertFalse(fs.exists(file), "Original file unexpectedly retained"); 379 Mockito.verify(spy, Mockito.times(1)).isFileDeletable(Mockito.any()); 380 } 381 382 @Test 383 public void testDeleteFileWithCleanerEnabled() throws Exception { 384 Stoppable stop = new StoppableImplementation(); 385 Configuration conf = UTIL.getConfiguration(); 386 Path testDir = UTIL.getDataTestDir(); 387 FileSystem fs = UTIL.getTestFileSystem(); 388 String confKey = "hbase.test.cleaner.delegates"; 389 conf.set(confKey, AlwaysDelete.class.getName()); 390 391 AllValidPaths chore = 392 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 393 394 // Enable cleaner 395 chore.setEnabled(true); 396 397 // create the directory layout in the directory to clean 398 Path parent = new Path(testDir, "parent"); 399 Path child = new Path(parent, "child"); 400 Path file = new Path(child, "someFile"); 401 fs.mkdirs(child); 402 403 // touch a new file 404 fs.create(file).close(); 405 assertTrue(fs.exists(file), "Test file didn't get created."); 406 407 // run the chore 408 chore.chore(); 409 410 // verify all the files got deleted 411 assertFalse(fs.exists(file), "File didn't get deleted"); 412 assertFalse(fs.exists(child), "Empty directory didn't get deleted"); 413 assertFalse(fs.exists(parent), "Empty directory didn't get deleted"); 414 } 415 416 @Test 417 public void testDeleteFileWithCleanerDisabled() throws Exception { 418 Stoppable stop = new StoppableImplementation(); 419 Configuration conf = UTIL.getConfiguration(); 420 Path testDir = UTIL.getDataTestDir(); 421 FileSystem fs = UTIL.getTestFileSystem(); 422 String confKey = "hbase.test.cleaner.delegates"; 423 conf.set(confKey, AlwaysDelete.class.getName()); 424 425 AllValidPaths chore = 426 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 427 428 // Disable cleaner 429 chore.setEnabled(false); 430 431 // create the directory layout in the directory to clean 432 Path parent = new Path(testDir, "parent"); 433 Path child = new Path(parent, "child"); 434 Path file = new Path(child, "someFile"); 435 fs.mkdirs(child); 436 437 // touch a new file 438 fs.create(file).close(); 439 assertTrue(fs.exists(file), "Test file didn't get created."); 440 441 // run the chore 442 chore.chore(); 443 444 // verify all the files exist 445 assertTrue(fs.exists(file), "File got deleted with cleaner disabled"); 446 assertTrue(fs.exists(child), "Directory got deleted"); 447 assertTrue(fs.exists(parent), "Directory got deleted"); 448 } 449 450 @Test 451 public void testOnConfigurationChange() throws Exception { 452 int availableProcessorNum = Runtime.getRuntime().availableProcessors(); 453 if (availableProcessorNum == 1) { // no need to run this test 454 return; 455 } 456 457 // have at least 2 available processors/cores 458 int initPoolSize = availableProcessorNum / 2; 459 int changedPoolSize = availableProcessorNum; 460 461 Stoppable stop = new StoppableImplementation(); 462 Configuration conf = UTIL.getConfiguration(); 463 Path testDir = UTIL.getDataTestDir(); 464 FileSystem fs = UTIL.getTestFileSystem(); 465 String confKey = "hbase.test.cleaner.delegates"; 466 conf.set(confKey, AlwaysDelete.class.getName()); 467 conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(initPoolSize)); 468 AllValidPaths chore = 469 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 470 chore.setEnabled(true); 471 // Create subdirs under testDir 472 int dirNums = 6; 473 Path[] subdirs = new Path[dirNums]; 474 for (int i = 0; i < dirNums; i++) { 475 subdirs[i] = new Path(testDir, "subdir-" + i); 476 fs.mkdirs(subdirs[i]); 477 } 478 // Under each subdirs create 6 files 479 for (Path subdir : subdirs) { 480 createFiles(fs, subdir, 6); 481 } 482 // Start chore 483 Thread t = new Thread(() -> chore.chore()); 484 t.setDaemon(true); 485 t.start(); 486 // Change size of chore's pool 487 conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(changedPoolSize)); 488 POOL.onConfigurationChange(conf); 489 assertEquals(changedPoolSize, chore.getChorePoolSize()); 490 // Stop chore 491 t.join(); 492 } 493 494 @Test 495 public void testOnConfigurationChangeLogCleaner() throws Exception { 496 int availableProcessorNum = Runtime.getRuntime().availableProcessors(); 497 if (availableProcessorNum == 1) { // no need to run this test 498 return; 499 } 500 501 DirScanPool pool = DirScanPool.getLogCleanerScanPool(UTIL.getConfiguration()); 502 503 // have at least 2 available processors/cores 504 int initPoolSize = availableProcessorNum / 2; 505 int changedPoolSize = availableProcessorNum; 506 507 Stoppable stop = new StoppableImplementation(); 508 Configuration conf = UTIL.getConfiguration(); 509 Path testDir = UTIL.getDataTestDir(); 510 FileSystem fs = UTIL.getTestFileSystem(); 511 String confKey = "hbase.test.cleaner.delegates"; 512 conf.set(confKey, AlwaysDelete.class.getName()); 513 conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(initPoolSize)); 514 final AllValidPaths chore = 515 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, pool); 516 chore.setEnabled(true); 517 // Create subdirs under testDir 518 int dirNums = 6; 519 Path[] subdirs = new Path[dirNums]; 520 for (int i = 0; i < dirNums; i++) { 521 subdirs[i] = new Path(testDir, "subdir-" + i); 522 fs.mkdirs(subdirs[i]); 523 } 524 // Under each subdirs create 6 files 525 for (Path subdir : subdirs) { 526 createFiles(fs, subdir, 6); 527 } 528 // Start chore 529 Thread t = new Thread(new Runnable() { 530 @Override 531 public void run() { 532 chore.chore(); 533 } 534 }); 535 t.setDaemon(true); 536 t.start(); 537 // Change size of chore's pool 538 conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(changedPoolSize)); 539 pool.onConfigurationChange(conf); 540 assertEquals(changedPoolSize, chore.getChorePoolSize()); 541 // Stop chore 542 t.join(); 543 } 544 545 @Test 546 public void testMinimumNumberOfThreads() throws Exception { 547 Configuration conf = UTIL.getConfiguration(); 548 String confKey = "hbase.test.cleaner.delegates"; 549 conf.set(confKey, AlwaysDelete.class.getName()); 550 conf.set(CleanerChore.CHORE_POOL_SIZE, "2"); 551 int numProcs = Runtime.getRuntime().availableProcessors(); 552 // Sanity 553 assertEquals(numProcs, CleanerChore.calculatePoolSize(Integer.toString(numProcs))); 554 // The implementation does not allow us to set more threads than we have processors 555 assertEquals(numProcs, CleanerChore.calculatePoolSize(Integer.toString(numProcs + 2))); 556 // Force us into the branch that is multiplying 0.0 against the number of processors 557 assertEquals(1, CleanerChore.calculatePoolSize("0.0")); 558 } 559 560 @Test 561 public void testTriggerCleaner() throws Exception { 562 Stoppable stop = new StoppableImplementation(); 563 Configuration conf = UTIL.getConfiguration(); 564 Path testDir = UTIL.getDataTestDir(); 565 FileSystem fs = UTIL.getTestFileSystem(); 566 fs.mkdirs(testDir); 567 String confKey = "hbase.test.cleaner.delegates"; 568 conf.set(confKey, AlwaysDelete.class.getName()); 569 final AllValidPaths chore = 570 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 571 try { 572 SERVICE.scheduleChore(chore); 573 assertTrue(chore.triggerCleanerNow().get()); 574 chore.setEnabled(false); 575 // should still runnable 576 assertTrue(chore.triggerCleanerNow().get()); 577 } finally { 578 chore.cancel(); 579 } 580 } 581 582 @Test 583 public void testRescheduleNoConcurrencyRun() throws Exception { 584 Stoppable stop = new StoppableImplementation(); 585 Configuration conf = UTIL.getConfiguration(); 586 Path testDir = UTIL.getDataTestDir(); 587 FileSystem fs = UTIL.getTestFileSystem(); 588 fs.mkdirs(testDir); 589 fs.createNewFile(new Path(testDir, "test")); 590 String confKey = "hbase.test.cleaner.delegates"; 591 conf.set(confKey, GetConcurrency.class.getName()); 592 AtomicInteger maxConcurrency = new AtomicInteger(); 593 final AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, 594 confKey, POOL, ImmutableMap.of("maxConcurrency", maxConcurrency)); 595 try { 596 SERVICE.scheduleChore(chore); 597 for (int i = 0; i < 100; i++) { 598 chore.triggerNow(); 599 Thread.sleep(5 + ThreadLocalRandom.current().nextInt(5)); 600 } 601 Thread.sleep(1000); 602 // set a barrier here to make sure that the previous runs are also finished 603 assertFalse(chore.triggerCleanerNow().get()); 604 // make sure we do not have multiple cleaner runs at the same time 605 assertEquals(1, maxConcurrency.get()); 606 } finally { 607 chore.cancel(); 608 } 609 } 610 611 private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { 612 for (int i = 0; i < numOfFiles; i++) { 613 int xMega = 1 + ThreadLocalRandom.current().nextInt(3); // size of each file is between 1~3M 614 try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { 615 for (int m = 0; m < xMega; m++) { 616 byte[] M = new byte[1024 * 1024]; 617 Bytes.random(M); 618 fsdos.write(M); 619 } 620 } 621 } 622 } 623 624 private static class AllValidPaths extends CleanerChore<BaseHFileCleanerDelegate> { 625 626 public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs, 627 Path oldFileDir, String confkey, DirScanPool pool) { 628 super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey, pool); 629 } 630 631 public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs, 632 Path oldFileDir, String confkey, DirScanPool pool, Map<String, Object> params) { 633 super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey, pool, params, null); 634 } 635 636 // all paths are valid 637 @Override 638 protected boolean validate(Path file) { 639 return true; 640 } 641 } 642 643 public static class AlwaysDelete extends BaseHFileCleanerDelegate { 644 @Override 645 public boolean isFileDeletable(FileStatus fStat) { 646 return true; 647 } 648 } 649 650 public static class NeverDelete extends BaseHFileCleanerDelegate { 651 @Override 652 public boolean isFileDeletable(FileStatus fStat) { 653 return false; 654 } 655 } 656 657 public static class GetConcurrency extends BaseHFileCleanerDelegate { 658 659 private final AtomicInteger concurrency = new AtomicInteger(); 660 661 private AtomicInteger maxConcurrency; 662 663 @Override 664 public void init(Map<String, Object> params) { 665 maxConcurrency = (AtomicInteger) params.get("maxConcurrency"); 666 } 667 668 @Override 669 public void preClean() { 670 int c = concurrency.incrementAndGet(); 671 while (true) { 672 int cur = maxConcurrency.get(); 673 if (c <= cur) { 674 break; 675 } 676 677 if (maxConcurrency.compareAndSet(cur, c)) { 678 break; 679 } 680 } 681 } 682 683 @Override 684 public void postClean() { 685 concurrency.decrementAndGet(); 686 } 687 688 @Override 689 protected boolean isFileDeletable(FileStatus fStat) { 690 // sleep a while to slow down the process 691 Threads.sleepWithoutInterrupt(10 + ThreadLocalRandom.current().nextInt(10)); 692 return false; 693 } 694 } 695}