001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.mockito.Mockito.atLeastOnce; 024import static org.mockito.Mockito.never; 025import static org.mockito.Mockito.spy; 026import static org.mockito.Mockito.verify; 027 028import java.util.concurrent.TimeUnit; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.testclassification.MediumTests; 031import org.apache.hadoop.hbase.testclassification.MiscTests; 032import org.apache.hadoop.hbase.util.Threads; 033import org.junit.jupiter.api.AfterEach; 034import org.junit.jupiter.api.BeforeEach; 035import org.junit.jupiter.api.Tag; 036import org.junit.jupiter.api.Test; 037import org.junit.jupiter.api.TestInfo; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041@Tag(MiscTests.TAG) 042@Tag(MediumTests.TAG) 043public class TestChoreService { 044 045 private static final Logger LOG = LoggerFactory.getLogger(TestChoreService.class); 046 047 private static final Configuration CONF = HBaseConfiguration.create(); 048 049 private int initialCorePoolSize = 3; 050 051 private ChoreService service; 052 053 private String testName; 054 055 @BeforeEach 056 public void setUp(TestInfo testInfo) { 057 testName = testInfo.getTestMethod().get().getName(); 058 service = new ChoreService(testName, initialCorePoolSize, false); 059 } 060 061 @AfterEach 062 public void tearDown() { 063 shutdownService(service); 064 } 065 066 /** 067 * Straight forward stopper implementation that is used by default when one is not provided 068 */ 069 private static class SampleStopper implements Stoppable { 070 private boolean stopped = false; 071 072 @Override 073 public void stop(String why) { 074 stopped = true; 075 } 076 077 @Override 078 public boolean isStopped() { 079 return stopped; 080 } 081 } 082 083 /** 084 * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic 085 * executions 086 */ 087 private static class SlowChore extends ScheduledChore { 088 public SlowChore(String name, int period) { 089 this(name, new SampleStopper(), period); 090 } 091 092 public SlowChore(String name, Stoppable stopper, int period) { 093 super(name, stopper, period); 094 } 095 096 @Override 097 protected boolean initialChore() { 098 Threads.sleep(getPeriod() * 2); 099 return true; 100 } 101 102 @Override 103 protected void chore() { 104 Threads.sleep(getPeriod() * 2); 105 } 106 } 107 108 /** 109 * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests 110 */ 111 private static class DoNothingChore extends ScheduledChore { 112 113 public DoNothingChore(String name, int period) { 114 super(name, new SampleStopper(), period); 115 } 116 117 public DoNothingChore(String name, Stoppable stopper, int period) { 118 super(name, stopper, period); 119 } 120 121 @Override 122 protected void chore() { 123 // DO NOTHING 124 } 125 } 126 127 private static class SleepingChore extends ScheduledChore { 128 private int sleepTime; 129 130 public SleepingChore(String name, int chorePeriod, int sleepTime) { 131 this(name, new SampleStopper(), chorePeriod, sleepTime); 132 } 133 134 public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) { 135 super(name, stopper, period); 136 this.sleepTime = sleepTime; 137 } 138 139 @Override 140 protected boolean initialChore() { 141 Threads.sleep(sleepTime); 142 return true; 143 } 144 145 @Override 146 protected void chore() { 147 Threads.sleep(sleepTime); 148 } 149 } 150 151 private static class CountingChore extends ScheduledChore { 152 private int countOfChoreCalls; 153 private boolean outputOnTicks = false; 154 155 public CountingChore(String name, int period) { 156 this(name, new SampleStopper(), period); 157 } 158 159 public CountingChore(String name, Stoppable stopper, int period) { 160 this(name, stopper, period, false); 161 } 162 163 public CountingChore(String name, Stoppable stopper, int period, final boolean outputOnTicks) { 164 super(name, stopper, period); 165 this.countOfChoreCalls = 0; 166 this.outputOnTicks = outputOnTicks; 167 } 168 169 @Override 170 protected boolean initialChore() { 171 countOfChoreCalls++; 172 if (outputOnTicks) { 173 outputTickCount(); 174 } 175 return true; 176 } 177 178 @Override 179 protected void chore() { 180 countOfChoreCalls++; 181 if (outputOnTicks) { 182 outputTickCount(); 183 } 184 } 185 186 private void outputTickCount() { 187 LOG.info("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls); 188 } 189 190 public int getCountOfChoreCalls() { 191 return countOfChoreCalls; 192 } 193 } 194 195 /** 196 * A Chore that will try to execute the initial chore a few times before succeeding. Once the 197 * initial chore is complete the chore cancels itself 198 */ 199 public static class FailInitialChore extends ScheduledChore { 200 private int numberOfFailures; 201 private int failureThreshold; 202 203 /** 204 * @param failThreshold Number of times the Chore fails when trying to execute initialChore 205 * before succeeding. 206 */ 207 public FailInitialChore(String name, int period, int failThreshold) { 208 this(name, new SampleStopper(), period, failThreshold); 209 } 210 211 public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) { 212 super(name, stopper, period); 213 numberOfFailures = 0; 214 failureThreshold = failThreshold; 215 } 216 217 @Override 218 protected boolean initialChore() { 219 if (numberOfFailures < failureThreshold) { 220 numberOfFailures++; 221 return false; 222 } else { 223 return true; 224 } 225 } 226 227 @Override 228 protected void chore() { 229 assertTrue(numberOfFailures == failureThreshold); 230 cancel(false); 231 } 232 } 233 234 @Test 235 public void testInitialChorePrecedence() throws InterruptedException { 236 final int period = 100; 237 final int failureThreshold = 5; 238 ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold); 239 service.scheduleChore(chore); 240 241 int loopCount = 0; 242 boolean brokeOutOfLoop = false; 243 244 while (!chore.isInitialChoreComplete() && chore.isScheduled()) { 245 Thread.sleep(failureThreshold * period); 246 loopCount++; 247 if (loopCount > 3) { 248 brokeOutOfLoop = true; 249 break; 250 } 251 } 252 253 assertFalse(brokeOutOfLoop); 254 } 255 256 @Test 257 public void testCancelChore() throws InterruptedException { 258 final int period = 100; 259 ScheduledChore chore = new DoNothingChore("chore", period); 260 service.scheduleChore(chore); 261 assertTrue(chore.isScheduled()); 262 263 chore.cancel(true); 264 assertFalse(chore.isScheduled()); 265 assertTrue(service.getNumberOfScheduledChores() == 0); 266 } 267 268 @Test 269 public void testScheduledChoreConstruction() { 270 final String NAME = "chore"; 271 final int PERIOD = 100; 272 final long VALID_DELAY = 0; 273 final long INVALID_DELAY = -100; 274 final TimeUnit UNIT = TimeUnit.NANOSECONDS; 275 276 ScheduledChore chore1 = 277 new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) { 278 @Override 279 protected void chore() { 280 // DO NOTHING 281 } 282 }; 283 284 assertEquals(NAME, chore1.getName(), "Name construction failed"); 285 assertEquals(PERIOD, chore1.getPeriod(), "Period construction failed"); 286 assertEquals(VALID_DELAY, chore1.getInitialDelay(), "Initial Delay construction failed"); 287 assertEquals(UNIT, chore1.getTimeUnit(), "TimeUnit construction failed"); 288 289 ScheduledChore invalidDelayChore = 290 new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) { 291 @Override 292 protected void chore() { 293 // DO NOTHING 294 } 295 }; 296 297 assertEquals(0, invalidDelayChore.getInitialDelay(), 298 "Initial Delay should be set to 0 when invalid"); 299 } 300 301 @Test 302 public void testChoreServiceConstruction() throws InterruptedException { 303 final int corePoolSize = 10; 304 final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE; 305 306 ChoreService customInit = 307 new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false); 308 try { 309 assertEquals(corePoolSize, customInit.getCorePoolSize()); 310 } finally { 311 shutdownService(customInit); 312 } 313 314 ChoreService defaultInit = new ChoreService("testChoreServiceConstruction_default"); 315 try { 316 assertEquals(defaultCorePoolSize, defaultInit.getCorePoolSize()); 317 } finally { 318 shutdownService(defaultInit); 319 } 320 321 ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10, false); 322 try { 323 assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize()); 324 } finally { 325 shutdownService(invalidInit); 326 } 327 } 328 329 @Test 330 public void testFrequencyOfChores() throws InterruptedException { 331 final int period = 100; 332 // Small delta that acts as time buffer (allowing chores to complete if running slowly) 333 final int delta = period / 5; 334 CountingChore chore = new CountingChore("countingChore", period); 335 service.scheduleChore(chore); 336 337 Thread.sleep(10 * period + delta); 338 assertEquals(11, chore.getCountOfChoreCalls(), "10 periods have elapsed."); 339 340 Thread.sleep(10 * period + delta); 341 assertEquals(21, chore.getCountOfChoreCalls(), "20 periods have elapsed."); 342 } 343 344 public void shutdownService(ChoreService service) { 345 service.shutdown(); 346 Waiter.waitFor(CONF, 1000, () -> service.isTerminated()); 347 } 348 349 @Test 350 public void testForceTrigger() throws InterruptedException { 351 final int period = 100; 352 final int delta = period / 10; 353 final CountingChore chore = new CountingChore("countingChore", period); 354 service.scheduleChore(chore); 355 Thread.sleep(10 * period + delta); 356 357 assertEquals(11, chore.getCountOfChoreCalls(), "10 periods have elapsed."); 358 359 // Force five runs of the chore to occur, sleeping between triggers to ensure the 360 // chore has time to run 361 chore.triggerNow(); 362 Thread.sleep(delta); 363 chore.triggerNow(); 364 Thread.sleep(delta); 365 chore.triggerNow(); 366 Thread.sleep(delta); 367 chore.triggerNow(); 368 Thread.sleep(delta); 369 chore.triggerNow(); 370 Thread.sleep(delta); 371 372 assertEquals(16, chore.getCountOfChoreCalls(), "Trigger was called 5 times after 10 periods."); 373 374 Thread.sleep(10 * period + delta); 375 376 // Be loosey-goosey. It used to be '26' but it was a big flakey relying on timing. 377 assertTrue(chore.getCountOfChoreCalls() > 16, 378 "Expected at least 16 invocations, instead got " + chore.getCountOfChoreCalls()); 379 } 380 381 @Test 382 public void testCorePoolIncrease() throws InterruptedException { 383 assertEquals(initialCorePoolSize, service.getCorePoolSize(), 384 "Setting core pool size gave unexpected results."); 385 386 final int slowChorePeriod = 100; 387 SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod); 388 SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod); 389 SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod); 390 391 service.scheduleChore(slowChore1); 392 service.scheduleChore(slowChore2); 393 service.scheduleChore(slowChore3); 394 395 Thread.sleep(slowChorePeriod * 10); 396 assertEquals(3, service.getCorePoolSize(), 397 "Should not create more pools than scheduled chores"); 398 399 SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod); 400 service.scheduleChore(slowChore4); 401 402 Thread.sleep(slowChorePeriod * 10); 403 assertEquals(4, service.getCorePoolSize(), 404 "Chores are missing their start time. Should expand core pool size"); 405 406 SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod); 407 service.scheduleChore(slowChore5); 408 409 Thread.sleep(slowChorePeriod * 10); 410 assertEquals(5, service.getCorePoolSize(), 411 "Chores are missing their start time. Should expand core pool size"); 412 } 413 414 @Test 415 public void testCorePoolDecrease() throws InterruptedException { 416 final int chorePeriod = 100; 417 // Slow chores always miss their start time and thus the core pool size should be at least as 418 // large as the number of running slow chores 419 SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod); 420 SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod); 421 SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod); 422 423 service.scheduleChore(slowChore1); 424 service.scheduleChore(slowChore2); 425 service.scheduleChore(slowChore3); 426 427 Thread.sleep(chorePeriod * 10); 428 assertEquals(service.getNumberOfScheduledChores(), service.getCorePoolSize(), 429 "Should not create more pools than scheduled chores"); 430 431 SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod); 432 service.scheduleChore(slowChore4); 433 Thread.sleep(chorePeriod * 10); 434 assertEquals(service.getNumberOfScheduledChores(), service.getCorePoolSize(), 435 "Chores are missing their start time. Should expand core pool size"); 436 437 SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod); 438 service.scheduleChore(slowChore5); 439 Thread.sleep(chorePeriod * 10); 440 assertEquals(service.getNumberOfScheduledChores(), service.getCorePoolSize(), 441 "Chores are missing their start time. Should expand core pool size"); 442 assertEquals(5, service.getNumberOfChoresMissingStartTime()); 443 444 // Now we begin to cancel the chores that caused an increase in the core thread pool of the 445 // ChoreService. These cancellations should cause a decrease in the core thread pool. 446 slowChore5.cancel(); 447 Thread.sleep(chorePeriod * 10); 448 assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), 449 service.getCorePoolSize()); 450 assertEquals(4, service.getNumberOfChoresMissingStartTime()); 451 452 slowChore4.cancel(); 453 Thread.sleep(chorePeriod * 10); 454 assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), 455 service.getCorePoolSize()); 456 assertEquals(3, service.getNumberOfChoresMissingStartTime()); 457 458 slowChore3.cancel(); 459 Thread.sleep(chorePeriod * 10); 460 assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), 461 service.getCorePoolSize()); 462 assertEquals(2, service.getNumberOfChoresMissingStartTime()); 463 464 slowChore2.cancel(); 465 Thread.sleep(chorePeriod * 10); 466 assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), 467 service.getCorePoolSize()); 468 assertEquals(1, service.getNumberOfChoresMissingStartTime()); 469 470 slowChore1.cancel(); 471 Thread.sleep(chorePeriod * 10); 472 assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), 473 service.getCorePoolSize()); 474 assertEquals(0, service.getNumberOfChoresMissingStartTime()); 475 } 476 477 @Test 478 public void testNumberOfRunningChores() throws InterruptedException { 479 final int period = 100; 480 final int sleepTime = 5; 481 DoNothingChore dn1 = new DoNothingChore("dn1", period); 482 DoNothingChore dn2 = new DoNothingChore("dn2", period); 483 DoNothingChore dn3 = new DoNothingChore("dn3", period); 484 DoNothingChore dn4 = new DoNothingChore("dn4", period); 485 DoNothingChore dn5 = new DoNothingChore("dn5", period); 486 487 service.scheduleChore(dn1); 488 service.scheduleChore(dn2); 489 service.scheduleChore(dn3); 490 service.scheduleChore(dn4); 491 service.scheduleChore(dn5); 492 493 Thread.sleep(sleepTime); 494 assertEquals(5, service.getNumberOfScheduledChores(), "Scheduled chore mismatch"); 495 496 dn1.cancel(); 497 Thread.sleep(sleepTime); 498 assertEquals(4, service.getNumberOfScheduledChores(), "Scheduled chore mismatch"); 499 500 dn2.cancel(); 501 dn3.cancel(); 502 dn4.cancel(); 503 Thread.sleep(sleepTime); 504 assertEquals(1, service.getNumberOfScheduledChores(), "Scheduled chore mismatch"); 505 506 dn5.cancel(); 507 Thread.sleep(sleepTime); 508 assertEquals(0, service.getNumberOfScheduledChores(), "Scheduled chore mismatch"); 509 } 510 511 @Test 512 public void testNumberOfChoresMissingStartTime() throws InterruptedException { 513 final int period = 100; 514 final int sleepTime = 20 * period; 515 // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores 516 // ALWAYS miss their start time since their execution takes longer than their period 517 SlowChore sc1 = new SlowChore("sc1", period); 518 SlowChore sc2 = new SlowChore("sc2", period); 519 SlowChore sc3 = new SlowChore("sc3", period); 520 SlowChore sc4 = new SlowChore("sc4", period); 521 SlowChore sc5 = new SlowChore("sc5", period); 522 523 service.scheduleChore(sc1); 524 service.scheduleChore(sc2); 525 service.scheduleChore(sc3); 526 service.scheduleChore(sc4); 527 service.scheduleChore(sc5); 528 529 Thread.sleep(sleepTime); 530 assertEquals(5, service.getNumberOfChoresMissingStartTime()); 531 532 sc1.cancel(); 533 Thread.sleep(sleepTime); 534 assertEquals(4, service.getNumberOfChoresMissingStartTime()); 535 536 sc2.cancel(); 537 sc3.cancel(); 538 sc4.cancel(); 539 Thread.sleep(sleepTime); 540 assertEquals(1, service.getNumberOfChoresMissingStartTime()); 541 542 sc5.cancel(); 543 Thread.sleep(sleepTime); 544 assertEquals(0, service.getNumberOfChoresMissingStartTime()); 545 } 546 547 /** 548 * ChoreServices should never have a core pool size that exceeds the number of chores that have 549 * been scheduled with the service. For example, if 4 ScheduledChores are scheduled with a 550 * ChoreService, the number of threads in the ChoreService's core pool should never exceed 4 551 */ 552 @Test 553 public void testMaximumChoreServiceThreads() throws InterruptedException { 554 555 final int period = 100; 556 final int sleepTime = 5 * period; 557 // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores 558 // ALWAYS miss their start time since their execution takes longer than their period. 559 // Chores that miss their start time will trigger the onChoreMissedStartTime callback 560 // in the ChoreService. This callback will try to increase the number of core pool 561 // threads. 562 SlowChore sc1 = new SlowChore("sc1", period); 563 SlowChore sc2 = new SlowChore("sc2", period); 564 SlowChore sc3 = new SlowChore("sc3", period); 565 SlowChore sc4 = new SlowChore("sc4", period); 566 SlowChore sc5 = new SlowChore("sc5", period); 567 568 service.scheduleChore(sc1); 569 service.scheduleChore(sc2); 570 service.scheduleChore(sc3); 571 service.scheduleChore(sc4); 572 service.scheduleChore(sc5); 573 574 Thread.sleep(sleepTime); 575 assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); 576 577 SlowChore sc6 = new SlowChore("sc6", period); 578 SlowChore sc7 = new SlowChore("sc7", period); 579 SlowChore sc8 = new SlowChore("sc8", period); 580 SlowChore sc9 = new SlowChore("sc9", period); 581 SlowChore sc10 = new SlowChore("sc10", period); 582 583 service.scheduleChore(sc6); 584 service.scheduleChore(sc7); 585 service.scheduleChore(sc8); 586 service.scheduleChore(sc9); 587 service.scheduleChore(sc10); 588 589 Thread.sleep(sleepTime); 590 assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); 591 } 592 593 @Test 594 public void testChangingChoreServices(TestInfo testInfo) throws InterruptedException { 595 final int period = 100; 596 final int sleepTime = 10; 597 ChoreService anotherService = new ChoreService(testInfo.getTestMethod().get().getName() + "_2"); 598 ScheduledChore chore = new DoNothingChore("sample", period); 599 600 try { 601 assertFalse(chore.isScheduled()); 602 assertFalse(service.isChoreScheduled(chore)); 603 assertFalse(anotherService.isChoreScheduled(chore)); 604 assertTrue(chore.getChoreService() == null); 605 606 service.scheduleChore(chore); 607 Thread.sleep(sleepTime); 608 assertTrue(chore.isScheduled()); 609 assertTrue(service.isChoreScheduled(chore)); 610 assertFalse(anotherService.isChoreScheduled(chore)); 611 assertFalse(chore.getChoreService() == null); 612 613 anotherService.scheduleChore(chore); 614 Thread.sleep(sleepTime); 615 assertTrue(chore.isScheduled()); 616 assertFalse(service.isChoreScheduled(chore)); 617 assertTrue(anotherService.isChoreScheduled(chore)); 618 assertFalse(chore.getChoreService() == null); 619 620 chore.cancel(); 621 assertFalse(chore.isScheduled()); 622 assertFalse(service.isChoreScheduled(chore)); 623 assertFalse(anotherService.isChoreScheduled(chore)); 624 assertTrue(chore.getChoreService() == null); 625 } finally { 626 shutdownService(anotherService); 627 } 628 } 629 630 @Test 631 public void testStopperForScheduledChores() throws InterruptedException { 632 Stoppable stopperForGroup1 = new SampleStopper(); 633 Stoppable stopperForGroup2 = new SampleStopper(); 634 final int period = 100; 635 final int delta = period / 10; 636 ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period); 637 ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period); 638 ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period); 639 640 ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period); 641 ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period); 642 ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period); 643 644 service.scheduleChore(chore1_group1); 645 service.scheduleChore(chore2_group1); 646 service.scheduleChore(chore3_group1); 647 service.scheduleChore(chore1_group2); 648 service.scheduleChore(chore2_group2); 649 service.scheduleChore(chore3_group2); 650 651 Thread.sleep(delta); 652 Thread.sleep(10 * period); 653 assertTrue(chore1_group1.isScheduled()); 654 assertTrue(chore2_group1.isScheduled()); 655 assertTrue(chore3_group1.isScheduled()); 656 assertTrue(chore1_group2.isScheduled()); 657 assertTrue(chore2_group2.isScheduled()); 658 assertTrue(chore3_group2.isScheduled()); 659 660 stopperForGroup1.stop("test stopping group 1"); 661 Thread.sleep(period); 662 assertFalse(chore1_group1.isScheduled()); 663 assertFalse(chore2_group1.isScheduled()); 664 assertFalse(chore3_group1.isScheduled()); 665 assertTrue(chore1_group2.isScheduled()); 666 assertTrue(chore2_group2.isScheduled()); 667 assertTrue(chore3_group2.isScheduled()); 668 669 stopperForGroup2.stop("test stopping group 2"); 670 Thread.sleep(period); 671 assertFalse(chore1_group1.isScheduled()); 672 assertFalse(chore2_group1.isScheduled()); 673 assertFalse(chore3_group1.isScheduled()); 674 assertFalse(chore1_group2.isScheduled()); 675 assertFalse(chore2_group2.isScheduled()); 676 assertFalse(chore3_group2.isScheduled()); 677 } 678 679 @Test 680 public void testShutdownCancelsScheduledChores() throws InterruptedException { 681 final int period = 100; 682 ScheduledChore successChore1 = new DoNothingChore("sc1", period); 683 ScheduledChore successChore2 = new DoNothingChore("sc2", period); 684 ScheduledChore successChore3 = new DoNothingChore("sc3", period); 685 assertTrue(service.scheduleChore(successChore1)); 686 assertTrue(successChore1.isScheduled()); 687 assertTrue(service.scheduleChore(successChore2)); 688 assertTrue(successChore2.isScheduled()); 689 assertTrue(service.scheduleChore(successChore3)); 690 assertTrue(successChore3.isScheduled()); 691 692 shutdownService(service); 693 694 assertFalse(successChore1.isScheduled()); 695 assertFalse(successChore2.isScheduled()); 696 assertFalse(successChore3.isScheduled()); 697 } 698 699 @Test 700 public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException { 701 final int period = 100; 702 final int sleep = 5 * period; 703 ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep); 704 ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep); 705 ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep); 706 assertTrue(service.scheduleChore(slowChore1)); 707 assertTrue(service.scheduleChore(slowChore2)); 708 assertTrue(service.scheduleChore(slowChore3)); 709 710 Thread.sleep(sleep / 2); 711 shutdownService(service); 712 713 assertFalse(slowChore1.isScheduled()); 714 assertFalse(slowChore2.isScheduled()); 715 assertFalse(slowChore3.isScheduled()); 716 assertTrue(service.isShutdown()); 717 718 Thread.sleep(5); 719 assertTrue(service.isTerminated()); 720 } 721 722 @Test 723 public void testShutdownRejectsNewSchedules() throws InterruptedException { 724 final int period = 100; 725 ScheduledChore successChore1 = new DoNothingChore("sc1", period); 726 ScheduledChore successChore2 = new DoNothingChore("sc2", period); 727 ScheduledChore successChore3 = new DoNothingChore("sc3", period); 728 ScheduledChore failChore1 = new DoNothingChore("fc1", period); 729 ScheduledChore failChore2 = new DoNothingChore("fc2", period); 730 ScheduledChore failChore3 = new DoNothingChore("fc3", period); 731 732 assertTrue(service.scheduleChore(successChore1)); 733 assertTrue(successChore1.isScheduled()); 734 assertTrue(service.scheduleChore(successChore2)); 735 assertTrue(successChore2.isScheduled()); 736 assertTrue(service.scheduleChore(successChore3)); 737 assertTrue(successChore3.isScheduled()); 738 739 shutdownService(service); 740 741 assertFalse(service.scheduleChore(failChore1)); 742 assertFalse(failChore1.isScheduled()); 743 assertFalse(service.scheduleChore(failChore2)); 744 assertFalse(failChore2.isScheduled()); 745 assertFalse(service.scheduleChore(failChore3)); 746 assertFalse(failChore3.isScheduled()); 747 } 748 749 /** 750 * for HBASE-25014 751 */ 752 @Test 753 public void testInitialDelay() { 754 SampleStopper stopper = new SampleStopper(); 755 service.scheduleChore(new ScheduledChore("chore", stopper, 1000, 2000) { 756 @Override 757 protected void chore() { 758 stopper.stop("test"); 759 } 760 }); 761 Waiter.waitFor(CONF, 5000, () -> stopper.isStopped()); 762 } 763 764 @Test 765 public void testCleanupWithStopper() { 766 SampleStopper stopper = new SampleStopper(); 767 DoNothingChore chore = spy(new DoNothingChore("chore", stopper, 10)); 768 service.scheduleChore(chore); 769 assertTrue(chore.isScheduled()); 770 verify(chore, never()).cleanup(); 771 stopper.stop("test"); 772 Waiter.waitFor(CONF, 200, () -> !chore.isScheduled()); 773 verify(chore, atLeastOnce()).cleanup(); 774 } 775 776 @Test 777 public void testCleanupWithShutdown() { 778 DoNothingChore chore = spy(new DoNothingChore("chore", 10)); 779 service.scheduleChore(chore); 780 assertTrue(chore.isScheduled()); 781 verify(chore, never()).cleanup(); 782 chore.shutdown(true); 783 Waiter.waitFor(CONF, 200, () -> !chore.isScheduled()); 784 verify(chore, atLeastOnce()).cleanup(); 785 } 786}