001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore; 026import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore; 027import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore; 028import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SampleStopper; 029import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SleepingChore; 030import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore; 031import org.apache.hadoop.hbase.testclassification.MediumTests; 032import org.apache.hadoop.hbase.util.Threads; 033import org.junit.ClassRule; 034import org.junit.Rule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037import org.junit.rules.TestName; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041@Category(MediumTests.class) 042public class TestChoreService { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestChoreService.class); 047 048 public static final Logger log = LoggerFactory.getLogger(TestChoreService.class); 049 050 @Rule 051 public TestName name = new TestName(); 052 053 /** 054 * A few ScheduledChore samples that are useful for testing with ChoreService 055 */ 056 public static class ScheduledChoreSamples { 057 /** 058 * Straight forward stopper implementation that is used by default when one is not provided 059 */ 060 public static class SampleStopper implements Stoppable { 061 private boolean stopped = false; 062 063 @Override 064 public void stop(String why) { 065 stopped = true; 066 } 067 068 @Override 069 public boolean isStopped() { 070 return stopped; 071 } 072 } 073 074 /** 075 * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic 076 * executions 077 */ 078 public static class SlowChore extends ScheduledChore { 079 public SlowChore(String name, int period) { 080 this(name, new SampleStopper(), period); 081 } 082 083 public SlowChore(String name, Stoppable stopper, int period) { 084 super(name, stopper, period); 085 } 086 087 @Override 088 protected boolean initialChore() { 089 try { 090 Thread.sleep(getPeriod() * 2); 091 } catch (InterruptedException e) { 092 log.warn("", e); 093 } 094 return true; 095 } 096 097 @Override 098 protected void chore() { 099 try { 100 Thread.sleep(getPeriod() * 2); 101 } catch (InterruptedException e) { 102 log.warn("", e); 103 } 104 } 105 } 106 107 /** 108 * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests 109 */ 110 public static class DoNothingChore extends ScheduledChore { 111 public DoNothingChore(String name, int period) { 112 super(name, new SampleStopper(), period); 113 } 114 115 public DoNothingChore(String name, Stoppable stopper, int period) { 116 super(name, stopper, period); 117 } 118 119 @Override 120 protected void chore() { 121 // DO NOTHING 122 } 123 124 } 125 126 public static class SleepingChore extends ScheduledChore { 127 private int sleepTime; 128 129 public SleepingChore(String name, int chorePeriod, int sleepTime) { 130 this(name, new SampleStopper(), chorePeriod, sleepTime); 131 } 132 133 public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) { 134 super(name, stopper, period); 135 this.sleepTime = sleepTime; 136 } 137 138 @Override 139 protected boolean initialChore() { 140 try { 141 Thread.sleep(sleepTime); 142 } catch (InterruptedException e) { 143 log.warn("", e); 144 } 145 return true; 146 } 147 148 @Override 149 protected void chore() { 150 try { 151 Thread.sleep(sleepTime); 152 } catch (Exception e) { 153 log.warn("", e); 154 } 155 } 156 } 157 158 public static class CountingChore extends ScheduledChore { 159 private int countOfChoreCalls; 160 private boolean outputOnTicks = false; 161 162 public CountingChore(String name, int period) { 163 this(name, new SampleStopper(), period); 164 } 165 166 public CountingChore(String name, Stoppable stopper, int period) { 167 this(name, stopper, period, false); 168 } 169 170 public CountingChore(String name, Stoppable stopper, int period, 171 final boolean outputOnTicks) { 172 super(name, stopper, period); 173 this.countOfChoreCalls = 0; 174 this.outputOnTicks = outputOnTicks; 175 } 176 177 @Override 178 protected boolean initialChore() { 179 countOfChoreCalls++; 180 if (outputOnTicks) { 181 outputTickCount(); 182 } 183 return true; 184 } 185 186 @Override 187 protected void chore() { 188 countOfChoreCalls++; 189 if (outputOnTicks) { 190 outputTickCount(); 191 } 192 } 193 194 private void outputTickCount() { 195 log.info("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls); 196 } 197 198 public int getCountOfChoreCalls() { 199 return countOfChoreCalls; 200 } 201 202 public boolean isOutputtingOnTicks() { 203 return outputOnTicks; 204 } 205 206 public void setOutputOnTicks(boolean o) { 207 outputOnTicks = o; 208 } 209 } 210 211 /** 212 * A Chore that will try to execute the initial chore a few times before succeeding. Once the 213 * initial chore is complete the chore cancels itself 214 */ 215 public static class FailInitialChore extends ScheduledChore { 216 private int numberOfFailures; 217 private int failureThreshold; 218 219 /** 220 * @param failThreshold Number of times the Chore fails when trying to execute initialChore 221 * before succeeding. 222 */ 223 public FailInitialChore(String name, int period, int failThreshold) { 224 this(name, new SampleStopper(), period, failThreshold); 225 } 226 227 public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) { 228 super(name, stopper, period); 229 numberOfFailures = 0; 230 failureThreshold = failThreshold; 231 } 232 233 @Override 234 protected boolean initialChore() { 235 if (numberOfFailures < failureThreshold) { 236 numberOfFailures++; 237 return false; 238 } else { 239 return true; 240 } 241 } 242 243 @Override 244 protected void chore() { 245 assertTrue(numberOfFailures == failureThreshold); 246 cancel(false); 247 } 248 249 } 250 } 251 252 @Test 253 public void testInitialChorePrecedence() throws InterruptedException { 254 ChoreService service = new ChoreService("testInitialChorePrecedence"); 255 256 final int period = 100; 257 final int failureThreshold = 5; 258 259 try { 260 ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold); 261 service.scheduleChore(chore); 262 263 int loopCount = 0; 264 boolean brokeOutOfLoop = false; 265 266 while (!chore.isInitialChoreComplete() && chore.isScheduled()) { 267 Thread.sleep(failureThreshold * period); 268 loopCount++; 269 if (loopCount > 3) { 270 brokeOutOfLoop = true; 271 break; 272 } 273 } 274 275 assertFalse(brokeOutOfLoop); 276 } finally { 277 shutdownService(service); 278 } 279 } 280 281 @Test 282 public void testCancelChore() throws InterruptedException { 283 final int period = 100; 284 ScheduledChore chore1 = new DoNothingChore("chore1", period); 285 ChoreService service = new ChoreService("testCancelChore"); 286 try { 287 service.scheduleChore(chore1); 288 assertTrue(chore1.isScheduled()); 289 290 chore1.cancel(true); 291 assertFalse(chore1.isScheduled()); 292 assertTrue(service.getNumberOfScheduledChores() == 0); 293 } finally { 294 shutdownService(service); 295 } 296 } 297 298 @Test 299 public void testScheduledChoreConstruction() { 300 final String NAME = "chore"; 301 final int PERIOD = 100; 302 final long VALID_DELAY = 0; 303 final long INVALID_DELAY = -100; 304 final TimeUnit UNIT = TimeUnit.NANOSECONDS; 305 306 ScheduledChore chore1 = 307 new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) { 308 @Override 309 protected void chore() { 310 // DO NOTHING 311 } 312 }; 313 314 assertEquals("Name construction failed", NAME, chore1.getName()); 315 assertEquals("Period construction failed", PERIOD, chore1.getPeriod()); 316 assertEquals("Initial Delay construction failed", VALID_DELAY, chore1.getInitialDelay()); 317 assertEquals("TimeUnit construction failed", UNIT, chore1.getTimeUnit()); 318 319 ScheduledChore invalidDelayChore = 320 new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) { 321 @Override 322 protected void chore() { 323 // DO NOTHING 324 } 325 }; 326 327 assertEquals("Initial Delay should be set to 0 when invalid", 0, 328 invalidDelayChore.getInitialDelay()); 329 } 330 331 @Test 332 public void testChoreServiceConstruction() throws InterruptedException { 333 final int corePoolSize = 10; 334 final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE; 335 336 ChoreService customInit = 337 new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false); 338 try { 339 assertEquals(corePoolSize, customInit.getCorePoolSize()); 340 } finally { 341 shutdownService(customInit); 342 } 343 344 ChoreService defaultInit = new ChoreService("testChoreServiceConstruction_default"); 345 try { 346 assertEquals(defaultCorePoolSize, defaultInit.getCorePoolSize()); 347 } finally { 348 shutdownService(defaultInit); 349 } 350 351 ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10, false); 352 try { 353 assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize()); 354 } finally { 355 shutdownService(invalidInit); 356 } 357 } 358 359 @Test 360 public void testFrequencyOfChores() throws InterruptedException { 361 final int period = 100; 362 // Small delta that acts as time buffer (allowing chores to complete if running slowly) 363 final int delta = period/5; 364 ChoreService service = new ChoreService("testFrequencyOfChores"); 365 CountingChore chore = new CountingChore("countingChore", period); 366 try { 367 service.scheduleChore(chore); 368 369 Thread.sleep(10 * period + delta); 370 assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls()); 371 372 Thread.sleep(10 * period + delta); 373 assertEquals("20 periods have elapsed.", 21, chore.getCountOfChoreCalls()); 374 } finally { 375 shutdownService(service); 376 } 377 } 378 379 public void shutdownService(ChoreService service) throws InterruptedException { 380 service.shutdown(); 381 while (!service.isTerminated()) { 382 Thread.sleep(100); 383 } 384 } 385 386 @Test 387 public void testForceTrigger() throws InterruptedException { 388 final int period = 100; 389 final int delta = period/10; 390 ChoreService service = new ChoreService("testForceTrigger"); 391 final CountingChore chore = new CountingChore("countingChore", period); 392 try { 393 service.scheduleChore(chore); 394 Thread.sleep(10 * period + delta); 395 396 assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls()); 397 398 // Force five runs of the chore to occur, sleeping between triggers to ensure the 399 // chore has time to run 400 chore.triggerNow(); 401 Thread.sleep(delta); 402 chore.triggerNow(); 403 Thread.sleep(delta); 404 chore.triggerNow(); 405 Thread.sleep(delta); 406 chore.triggerNow(); 407 Thread.sleep(delta); 408 chore.triggerNow(); 409 Thread.sleep(delta); 410 411 assertEquals("Trigger was called 5 times after 10 periods.", 16, 412 chore.getCountOfChoreCalls()); 413 414 Thread.sleep(10 * period + delta); 415 416 // Be loosey-goosey. It used to be '26' but it was a big flakey relying on timing. 417 assertTrue("Expected at least 16 invocations, instead got " + chore.getCountOfChoreCalls(), 418 chore.getCountOfChoreCalls() > 16); 419 } finally { 420 shutdownService(service); 421 } 422 } 423 424 @Test 425 public void testCorePoolIncrease() throws InterruptedException { 426 final int initialCorePoolSize = 3; 427 ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize, false); 428 429 try { 430 assertEquals("Setting core pool size gave unexpected results.", initialCorePoolSize, 431 service.getCorePoolSize()); 432 433 final int slowChorePeriod = 100; 434 SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod); 435 SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod); 436 SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod); 437 438 service.scheduleChore(slowChore1); 439 service.scheduleChore(slowChore2); 440 service.scheduleChore(slowChore3); 441 442 Thread.sleep(slowChorePeriod * 10); 443 assertEquals("Should not create more pools than scheduled chores", 3, 444 service.getCorePoolSize()); 445 446 SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod); 447 service.scheduleChore(slowChore4); 448 449 Thread.sleep(slowChorePeriod * 10); 450 assertEquals("Chores are missing their start time. Should expand core pool size", 4, 451 service.getCorePoolSize()); 452 453 SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod); 454 service.scheduleChore(slowChore5); 455 456 Thread.sleep(slowChorePeriod * 10); 457 assertEquals("Chores are missing their start time. Should expand core pool size", 5, 458 service.getCorePoolSize()); 459 } finally { 460 shutdownService(service); 461 } 462 } 463 464 @Test 465 public void testCorePoolDecrease() throws InterruptedException { 466 final int initialCorePoolSize = 3; 467 ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize, false); 468 final int chorePeriod = 100; 469 try { 470 // Slow chores always miss their start time and thus the core pool size should be at least as 471 // large as the number of running slow chores 472 SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod); 473 SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod); 474 SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod); 475 476 service.scheduleChore(slowChore1); 477 service.scheduleChore(slowChore2); 478 service.scheduleChore(slowChore3); 479 480 Thread.sleep(chorePeriod * 10); 481 assertEquals("Should not create more pools than scheduled chores", 482 service.getNumberOfScheduledChores(), service.getCorePoolSize()); 483 484 SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod); 485 service.scheduleChore(slowChore4); 486 Thread.sleep(chorePeriod * 10); 487 assertEquals("Chores are missing their start time. Should expand core pool size", 488 service.getNumberOfScheduledChores(), service.getCorePoolSize()); 489 490 SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod); 491 service.scheduleChore(slowChore5); 492 Thread.sleep(chorePeriod * 10); 493 assertEquals("Chores are missing their start time. Should expand core pool size", 494 service.getNumberOfScheduledChores(), service.getCorePoolSize()); 495 assertEquals(5, service.getNumberOfChoresMissingStartTime()); 496 497 // Now we begin to cancel the chores that caused an increase in the core thread pool of the 498 // ChoreService. These cancellations should cause a decrease in the core thread pool. 499 slowChore5.cancel(); 500 Thread.sleep(chorePeriod * 10); 501 assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), 502 service.getCorePoolSize()); 503 assertEquals(4, service.getNumberOfChoresMissingStartTime()); 504 505 slowChore4.cancel(); 506 Thread.sleep(chorePeriod * 10); 507 assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), 508 service.getCorePoolSize()); 509 assertEquals(3, service.getNumberOfChoresMissingStartTime()); 510 511 slowChore3.cancel(); 512 Thread.sleep(chorePeriod * 10); 513 assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), 514 service.getCorePoolSize()); 515 assertEquals(2, service.getNumberOfChoresMissingStartTime()); 516 517 slowChore2.cancel(); 518 Thread.sleep(chorePeriod * 10); 519 assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), 520 service.getCorePoolSize()); 521 assertEquals(1, service.getNumberOfChoresMissingStartTime()); 522 523 slowChore1.cancel(); 524 Thread.sleep(chorePeriod * 10); 525 assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), 526 service.getCorePoolSize()); 527 assertEquals(0, service.getNumberOfChoresMissingStartTime()); 528 } finally { 529 shutdownService(service); 530 } 531 } 532 533 @Test 534 public void testNumberOfRunningChores() throws InterruptedException { 535 ChoreService service = new ChoreService("testNumberOfRunningChores"); 536 537 final int period = 100; 538 final int sleepTime = 5; 539 540 try { 541 DoNothingChore dn1 = new DoNothingChore("dn1", period); 542 DoNothingChore dn2 = new DoNothingChore("dn2", period); 543 DoNothingChore dn3 = new DoNothingChore("dn3", period); 544 DoNothingChore dn4 = new DoNothingChore("dn4", period); 545 DoNothingChore dn5 = new DoNothingChore("dn5", period); 546 547 service.scheduleChore(dn1); 548 service.scheduleChore(dn2); 549 service.scheduleChore(dn3); 550 service.scheduleChore(dn4); 551 service.scheduleChore(dn5); 552 553 Thread.sleep(sleepTime); 554 assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores()); 555 556 dn1.cancel(); 557 Thread.sleep(sleepTime); 558 assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores()); 559 560 dn2.cancel(); 561 dn3.cancel(); 562 dn4.cancel(); 563 Thread.sleep(sleepTime); 564 assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores()); 565 566 dn5.cancel(); 567 Thread.sleep(sleepTime); 568 assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores()); 569 } finally { 570 shutdownService(service); 571 } 572 } 573 574 @Test 575 public void testNumberOfChoresMissingStartTime() throws InterruptedException { 576 ChoreService service = new ChoreService("testNumberOfChoresMissingStartTime"); 577 578 final int period = 100; 579 final int sleepTime = 20 * period; 580 581 try { 582 // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores 583 // ALWAYS miss their start time since their execution takes longer than their period 584 SlowChore sc1 = new SlowChore("sc1", period); 585 SlowChore sc2 = new SlowChore("sc2", period); 586 SlowChore sc3 = new SlowChore("sc3", period); 587 SlowChore sc4 = new SlowChore("sc4", period); 588 SlowChore sc5 = new SlowChore("sc5", period); 589 590 service.scheduleChore(sc1); 591 service.scheduleChore(sc2); 592 service.scheduleChore(sc3); 593 service.scheduleChore(sc4); 594 service.scheduleChore(sc5); 595 596 Thread.sleep(sleepTime); 597 assertEquals(5, service.getNumberOfChoresMissingStartTime()); 598 599 sc1.cancel(); 600 Thread.sleep(sleepTime); 601 assertEquals(4, service.getNumberOfChoresMissingStartTime()); 602 603 sc2.cancel(); 604 sc3.cancel(); 605 sc4.cancel(); 606 Thread.sleep(sleepTime); 607 assertEquals(1, service.getNumberOfChoresMissingStartTime()); 608 609 sc5.cancel(); 610 Thread.sleep(sleepTime); 611 assertEquals(0, service.getNumberOfChoresMissingStartTime()); 612 } finally { 613 shutdownService(service); 614 } 615 } 616 617 /** 618 * ChoreServices should never have a core pool size that exceeds the number of chores that have 619 * been scheduled with the service. For example, if 4 ScheduledChores are scheduled with a 620 * ChoreService, the number of threads in the ChoreService's core pool should never exceed 4 621 */ 622 @Test 623 public void testMaximumChoreServiceThreads() throws InterruptedException { 624 ChoreService service = new ChoreService("testMaximumChoreServiceThreads"); 625 626 final int period = 100; 627 final int sleepTime = 5 * period; 628 629 try { 630 // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores 631 // ALWAYS miss their start time since their execution takes longer than their period. 632 // Chores that miss their start time will trigger the onChoreMissedStartTime callback 633 // in the ChoreService. This callback will try to increase the number of core pool 634 // threads. 635 SlowChore sc1 = new SlowChore("sc1", period); 636 SlowChore sc2 = new SlowChore("sc2", period); 637 SlowChore sc3 = new SlowChore("sc3", period); 638 SlowChore sc4 = new SlowChore("sc4", period); 639 SlowChore sc5 = new SlowChore("sc5", period); 640 641 service.scheduleChore(sc1); 642 service.scheduleChore(sc2); 643 service.scheduleChore(sc3); 644 service.scheduleChore(sc4); 645 service.scheduleChore(sc5); 646 647 Thread.sleep(sleepTime); 648 assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); 649 650 SlowChore sc6 = new SlowChore("sc6", period); 651 SlowChore sc7 = new SlowChore("sc7", period); 652 SlowChore sc8 = new SlowChore("sc8", period); 653 SlowChore sc9 = new SlowChore("sc9", period); 654 SlowChore sc10 = new SlowChore("sc10", period); 655 656 service.scheduleChore(sc6); 657 service.scheduleChore(sc7); 658 service.scheduleChore(sc8); 659 service.scheduleChore(sc9); 660 service.scheduleChore(sc10); 661 662 Thread.sleep(sleepTime); 663 assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); 664 } finally { 665 shutdownService(service); 666 } 667 } 668 669 @Test 670 public void testChangingChoreServices() throws InterruptedException { 671 final int period = 100; 672 final int sleepTime = 10; 673 ChoreService service1 = new ChoreService("testChangingChoreServices_1"); 674 ChoreService service2 = new ChoreService("testChangingChoreServices_2"); 675 ScheduledChore chore = new DoNothingChore("sample", period); 676 677 try { 678 assertFalse(chore.isScheduled()); 679 assertFalse(service1.isChoreScheduled(chore)); 680 assertFalse(service2.isChoreScheduled(chore)); 681 assertTrue(chore.getChoreServicer() == null); 682 683 service1.scheduleChore(chore); 684 Thread.sleep(sleepTime); 685 assertTrue(chore.isScheduled()); 686 assertTrue(service1.isChoreScheduled(chore)); 687 assertFalse(service2.isChoreScheduled(chore)); 688 assertFalse(chore.getChoreServicer() == null); 689 690 service2.scheduleChore(chore); 691 Thread.sleep(sleepTime); 692 assertTrue(chore.isScheduled()); 693 assertFalse(service1.isChoreScheduled(chore)); 694 assertTrue(service2.isChoreScheduled(chore)); 695 assertFalse(chore.getChoreServicer() == null); 696 697 chore.cancel(); 698 assertFalse(chore.isScheduled()); 699 assertFalse(service1.isChoreScheduled(chore)); 700 assertFalse(service2.isChoreScheduled(chore)); 701 assertTrue(chore.getChoreServicer() == null); 702 } finally { 703 shutdownService(service1); 704 shutdownService(service2); 705 } 706 } 707 708 @Test 709 public void testStopperForScheduledChores() throws InterruptedException { 710 ChoreService service = new ChoreService("testStopperForScheduledChores"); 711 Stoppable stopperForGroup1 = new SampleStopper(); 712 Stoppable stopperForGroup2 = new SampleStopper(); 713 final int period = 100; 714 final int delta = period/10; 715 716 try { 717 ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period); 718 ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period); 719 ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period); 720 721 ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period); 722 ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period); 723 ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period); 724 725 service.scheduleChore(chore1_group1); 726 service.scheduleChore(chore2_group1); 727 service.scheduleChore(chore3_group1); 728 service.scheduleChore(chore1_group2); 729 service.scheduleChore(chore2_group2); 730 service.scheduleChore(chore3_group2); 731 732 Thread.sleep(delta); 733 Thread.sleep(10 * period); 734 assertTrue(chore1_group1.isScheduled()); 735 assertTrue(chore2_group1.isScheduled()); 736 assertTrue(chore3_group1.isScheduled()); 737 assertTrue(chore1_group2.isScheduled()); 738 assertTrue(chore2_group2.isScheduled()); 739 assertTrue(chore3_group2.isScheduled()); 740 741 stopperForGroup1.stop("test stopping group 1"); 742 Thread.sleep(period); 743 assertFalse(chore1_group1.isScheduled()); 744 assertFalse(chore2_group1.isScheduled()); 745 assertFalse(chore3_group1.isScheduled()); 746 assertTrue(chore1_group2.isScheduled()); 747 assertTrue(chore2_group2.isScheduled()); 748 assertTrue(chore3_group2.isScheduled()); 749 750 stopperForGroup2.stop("test stopping group 2"); 751 Thread.sleep(period); 752 assertFalse(chore1_group1.isScheduled()); 753 assertFalse(chore2_group1.isScheduled()); 754 assertFalse(chore3_group1.isScheduled()); 755 assertFalse(chore1_group2.isScheduled()); 756 assertFalse(chore2_group2.isScheduled()); 757 assertFalse(chore3_group2.isScheduled()); 758 } finally { 759 shutdownService(service); 760 } 761 } 762 763 @Test 764 public void testShutdownCancelsScheduledChores() throws InterruptedException { 765 final int period = 100; 766 ChoreService service = new ChoreService("testShutdownCancelsScheduledChores"); 767 ScheduledChore successChore1 = new DoNothingChore("sc1", period); 768 ScheduledChore successChore2 = new DoNothingChore("sc2", period); 769 ScheduledChore successChore3 = new DoNothingChore("sc3", period); 770 771 try { 772 assertTrue(service.scheduleChore(successChore1)); 773 assertTrue(successChore1.isScheduled()); 774 assertTrue(service.scheduleChore(successChore2)); 775 assertTrue(successChore2.isScheduled()); 776 assertTrue(service.scheduleChore(successChore3)); 777 assertTrue(successChore3.isScheduled()); 778 } finally { 779 shutdownService(service); 780 } 781 782 assertFalse(successChore1.isScheduled()); 783 assertFalse(successChore2.isScheduled()); 784 assertFalse(successChore3.isScheduled()); 785 } 786 787 @Test 788 public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException { 789 final int period = 100; 790 final int sleep = 5 * period; 791 ChoreService service = new ChoreService("testShutdownWorksWhileChoresAreExecuting"); 792 ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep); 793 ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep); 794 ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep); 795 try { 796 assertTrue(service.scheduleChore(slowChore1)); 797 assertTrue(service.scheduleChore(slowChore2)); 798 assertTrue(service.scheduleChore(slowChore3)); 799 800 Thread.sleep(sleep / 2); 801 shutdownService(service); 802 803 assertFalse(slowChore1.isScheduled()); 804 assertFalse(slowChore2.isScheduled()); 805 assertFalse(slowChore3.isScheduled()); 806 assertTrue(service.isShutdown()); 807 808 Thread.sleep(5); 809 assertTrue(service.isTerminated()); 810 } finally { 811 shutdownService(service); 812 } 813 } 814 815 @Test 816 public void testShutdownRejectsNewSchedules() throws InterruptedException { 817 final int period = 100; 818 ChoreService service = new ChoreService("testShutdownRejectsNewSchedules"); 819 ScheduledChore successChore1 = new DoNothingChore("sc1", period); 820 ScheduledChore successChore2 = new DoNothingChore("sc2", period); 821 ScheduledChore successChore3 = new DoNothingChore("sc3", period); 822 ScheduledChore failChore1 = new DoNothingChore("fc1", period); 823 ScheduledChore failChore2 = new DoNothingChore("fc2", period); 824 ScheduledChore failChore3 = new DoNothingChore("fc3", period); 825 826 try { 827 assertTrue(service.scheduleChore(successChore1)); 828 assertTrue(successChore1.isScheduled()); 829 assertTrue(service.scheduleChore(successChore2)); 830 assertTrue(successChore2.isScheduled()); 831 assertTrue(service.scheduleChore(successChore3)); 832 assertTrue(successChore3.isScheduled()); 833 } finally { 834 shutdownService(service); 835 } 836 837 assertFalse(service.scheduleChore(failChore1)); 838 assertFalse(failChore1.isScheduled()); 839 assertFalse(service.scheduleChore(failChore2)); 840 assertFalse(failChore2.isScheduled()); 841 assertFalse(service.scheduleChore(failChore3)); 842 assertFalse(failChore3.isScheduled()); 843 } 844 845 /** 846 * for HBASE-25014 847 */ 848 @Test(timeout = 10000) 849 public void testInitialDelay() { 850 ChoreService service = new ChoreService(name.getMethodName()); 851 SampleStopper stopper = new SampleStopper(); 852 service.scheduleChore(new ScheduledChore("chore", stopper, 1000, 2000) { 853 @Override protected void chore() { 854 stopper.stop("test"); 855 } 856 }); 857 while (!stopper.isStopped()) { 858 Threads.sleep(1000); 859 } 860 } 861}