001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore;
026import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore;
027import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore;
028import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SampleStopper;
029import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SleepingChore;
030import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.apache.hadoop.hbase.util.Threads;
033import org.junit.ClassRule;
034import org.junit.Rule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037import org.junit.rules.TestName;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041@Category(MediumTests.class)
042public class TestChoreService {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046      HBaseClassTestRule.forClass(TestChoreService.class);
047
048  public static final Logger log = LoggerFactory.getLogger(TestChoreService.class);
049
050  @Rule
051  public TestName name = new TestName();
052
053  /**
054   * A few ScheduledChore samples that are useful for testing with ChoreService
055   */
056  public static class ScheduledChoreSamples {
057    /**
058     * Straight forward stopper implementation that is used by default when one is not provided
059     */
060    public static class SampleStopper implements Stoppable {
061      private boolean stopped = false;
062
063      @Override
064      public void stop(String why) {
065        stopped = true;
066      }
067
068      @Override
069      public boolean isStopped() {
070        return stopped;
071      }
072    }
073
074    /**
075     * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic
076     * executions
077     */
078    public static class SlowChore extends ScheduledChore {
079      public SlowChore(String name, int period) {
080        this(name, new SampleStopper(), period);
081      }
082
083      public SlowChore(String name, Stoppable stopper, int period) {
084        super(name, stopper, period);
085      }
086
087      @Override
088      protected boolean initialChore() {
089        try {
090          Thread.sleep(getPeriod() * 2);
091        } catch (InterruptedException e) {
092          log.warn("", e);
093        }
094        return true;
095      }
096
097      @Override
098      protected void chore() {
099        try {
100          Thread.sleep(getPeriod() * 2);
101        } catch (InterruptedException e) {
102          log.warn("", e);
103        }
104      }
105    }
106
107    /**
108     * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests
109     */
110    public static class DoNothingChore extends ScheduledChore {
111      public DoNothingChore(String name, int period) {
112        super(name, new SampleStopper(), period);
113      }
114
115      public DoNothingChore(String name, Stoppable stopper, int period) {
116        super(name, stopper, period);
117      }
118
119      @Override
120      protected void chore() {
121        // DO NOTHING
122      }
123
124    }
125
126    public static class SleepingChore extends ScheduledChore {
127      private int sleepTime;
128
129      public SleepingChore(String name, int chorePeriod, int sleepTime) {
130        this(name, new SampleStopper(), chorePeriod, sleepTime);
131      }
132
133      public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) {
134        super(name, stopper, period);
135        this.sleepTime = sleepTime;
136      }
137
138      @Override
139      protected boolean initialChore() {
140        try {
141          Thread.sleep(sleepTime);
142        } catch (InterruptedException e) {
143          log.warn("", e);
144        }
145        return true;
146      }
147
148      @Override
149      protected void chore() {
150        try {
151          Thread.sleep(sleepTime);
152        } catch (Exception e) {
153          log.warn("", e);
154        }
155      }
156    }
157
158    public static class CountingChore extends ScheduledChore {
159      private int countOfChoreCalls;
160      private boolean outputOnTicks = false;
161
162      public CountingChore(String name, int period) {
163        this(name, new SampleStopper(), period);
164      }
165
166      public CountingChore(String name, Stoppable stopper, int period) {
167        this(name, stopper, period, false);
168      }
169
170      public CountingChore(String name, Stoppable stopper, int period,
171          final boolean outputOnTicks) {
172        super(name, stopper, period);
173        this.countOfChoreCalls = 0;
174        this.outputOnTicks = outputOnTicks;
175      }
176
177      @Override
178      protected boolean initialChore() {
179        countOfChoreCalls++;
180        if (outputOnTicks) {
181          outputTickCount();
182        }
183        return true;
184      }
185
186      @Override
187      protected void chore() {
188        countOfChoreCalls++;
189        if (outputOnTicks) {
190          outputTickCount();
191        }
192      }
193
194      private void outputTickCount() {
195        log.info("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls);
196      }
197
198      public int getCountOfChoreCalls() {
199        return countOfChoreCalls;
200      }
201
202      public boolean isOutputtingOnTicks() {
203        return outputOnTicks;
204      }
205
206      public void setOutputOnTicks(boolean o) {
207        outputOnTicks = o;
208      }
209    }
210
211    /**
212     * A Chore that will try to execute the initial chore a few times before succeeding. Once the
213     * initial chore is complete the chore cancels itself
214     */
215    public static class FailInitialChore extends ScheduledChore {
216      private int numberOfFailures;
217      private int failureThreshold;
218
219      /**
220       * @param failThreshold Number of times the Chore fails when trying to execute initialChore
221       *          before succeeding.
222       */
223      public FailInitialChore(String name, int period, int failThreshold) {
224        this(name, new SampleStopper(), period, failThreshold);
225      }
226
227      public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) {
228        super(name, stopper, period);
229        numberOfFailures = 0;
230        failureThreshold = failThreshold;
231      }
232
233      @Override
234      protected boolean initialChore() {
235        if (numberOfFailures < failureThreshold) {
236          numberOfFailures++;
237          return false;
238        } else {
239          return true;
240        }
241      }
242
243      @Override
244      protected void chore() {
245        assertTrue(numberOfFailures == failureThreshold);
246        cancel(false);
247      }
248
249    }
250  }
251
252  @Test
253  public void testInitialChorePrecedence() throws InterruptedException {
254    ChoreService service = new ChoreService("testInitialChorePrecedence");
255
256    final int period = 100;
257    final int failureThreshold = 5;
258
259    try {
260      ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold);
261      service.scheduleChore(chore);
262
263      int loopCount = 0;
264      boolean brokeOutOfLoop = false;
265
266      while (!chore.isInitialChoreComplete() && chore.isScheduled()) {
267        Thread.sleep(failureThreshold * period);
268        loopCount++;
269        if (loopCount > 3) {
270          brokeOutOfLoop = true;
271          break;
272        }
273      }
274
275      assertFalse(brokeOutOfLoop);
276    } finally {
277      shutdownService(service);
278    }
279  }
280
281  @Test
282  public void testCancelChore() throws InterruptedException {
283    final int period = 100;
284    ScheduledChore chore1 = new DoNothingChore("chore1", period);
285    ChoreService service = new ChoreService("testCancelChore");
286    try {
287      service.scheduleChore(chore1);
288      assertTrue(chore1.isScheduled());
289
290      chore1.cancel(true);
291      assertFalse(chore1.isScheduled());
292      assertTrue(service.getNumberOfScheduledChores() == 0);
293    } finally {
294      shutdownService(service);
295    }
296  }
297
298  @Test
299  public void testScheduledChoreConstruction() {
300    final String NAME = "chore";
301    final int PERIOD = 100;
302    final long VALID_DELAY = 0;
303    final long INVALID_DELAY = -100;
304    final TimeUnit UNIT = TimeUnit.NANOSECONDS;
305
306    ScheduledChore chore1 =
307        new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) {
308      @Override
309      protected void chore() {
310        // DO NOTHING
311      }
312    };
313
314    assertEquals("Name construction failed", NAME, chore1.getName());
315    assertEquals("Period construction failed", PERIOD, chore1.getPeriod());
316    assertEquals("Initial Delay construction failed", VALID_DELAY, chore1.getInitialDelay());
317    assertEquals("TimeUnit construction failed", UNIT, chore1.getTimeUnit());
318
319    ScheduledChore invalidDelayChore =
320        new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) {
321      @Override
322      protected void chore() {
323        // DO NOTHING
324      }
325    };
326
327    assertEquals("Initial Delay should be set to 0 when invalid", 0,
328      invalidDelayChore.getInitialDelay());
329  }
330
331  @Test
332  public void testChoreServiceConstruction() throws InterruptedException {
333    final int corePoolSize = 10;
334    final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE;
335
336    ChoreService customInit =
337        new ChoreService("testChoreServiceConstruction_custom", corePoolSize, false);
338    try {
339      assertEquals(corePoolSize, customInit.getCorePoolSize());
340    } finally {
341      shutdownService(customInit);
342    }
343
344    ChoreService defaultInit = new ChoreService("testChoreServiceConstruction_default");
345    try {
346      assertEquals(defaultCorePoolSize, defaultInit.getCorePoolSize());
347    } finally {
348      shutdownService(defaultInit);
349    }
350
351    ChoreService invalidInit = new ChoreService("testChoreServiceConstruction_invalid", -10, false);
352    try {
353      assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize());
354    } finally {
355      shutdownService(invalidInit);
356    }
357  }
358
359  @Test
360  public void testFrequencyOfChores() throws InterruptedException {
361    final int period = 100;
362    // Small delta that acts as time buffer (allowing chores to complete if running slowly)
363    final int delta = period/5;
364    ChoreService service = new ChoreService("testFrequencyOfChores");
365    CountingChore chore = new CountingChore("countingChore", period);
366    try {
367      service.scheduleChore(chore);
368
369      Thread.sleep(10 * period + delta);
370      assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls());
371
372      Thread.sleep(10 * period + delta);
373      assertEquals("20 periods have elapsed.", 21, chore.getCountOfChoreCalls());
374    } finally {
375      shutdownService(service);
376    }
377  }
378
379  public void shutdownService(ChoreService service) throws InterruptedException {
380    service.shutdown();
381    while (!service.isTerminated()) {
382      Thread.sleep(100);
383    }
384  }
385
386  @Test
387  public void testForceTrigger() throws InterruptedException {
388    final int period = 100;
389    final int delta = period/10;
390    ChoreService service = new ChoreService("testForceTrigger");
391    final CountingChore chore = new CountingChore("countingChore", period);
392    try {
393      service.scheduleChore(chore);
394      Thread.sleep(10 * period + delta);
395
396      assertEquals("10 periods have elapsed.", 11, chore.getCountOfChoreCalls());
397
398      // Force five runs of the chore to occur, sleeping between triggers to ensure the
399      // chore has time to run
400      chore.triggerNow();
401      Thread.sleep(delta);
402      chore.triggerNow();
403      Thread.sleep(delta);
404      chore.triggerNow();
405      Thread.sleep(delta);
406      chore.triggerNow();
407      Thread.sleep(delta);
408      chore.triggerNow();
409      Thread.sleep(delta);
410
411      assertEquals("Trigger was called 5 times after 10 periods.", 16,
412          chore.getCountOfChoreCalls());
413
414      Thread.sleep(10 * period + delta);
415
416      // Be loosey-goosey. It used to be '26' but it was a big flakey relying on timing.
417      assertTrue("Expected at least 16 invocations, instead got " + chore.getCountOfChoreCalls(),
418          chore.getCountOfChoreCalls() > 16);
419    } finally {
420      shutdownService(service);
421    }
422  }
423
424  @Test
425  public void testCorePoolIncrease() throws InterruptedException {
426    final int initialCorePoolSize = 3;
427    ChoreService service = new ChoreService("testCorePoolIncrease", initialCorePoolSize, false);
428
429    try {
430      assertEquals("Setting core pool size gave unexpected results.", initialCorePoolSize,
431        service.getCorePoolSize());
432
433      final int slowChorePeriod = 100;
434      SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod);
435      SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod);
436      SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod);
437
438      service.scheduleChore(slowChore1);
439      service.scheduleChore(slowChore2);
440      service.scheduleChore(slowChore3);
441
442      Thread.sleep(slowChorePeriod * 10);
443      assertEquals("Should not create more pools than scheduled chores", 3,
444        service.getCorePoolSize());
445
446      SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod);
447      service.scheduleChore(slowChore4);
448
449      Thread.sleep(slowChorePeriod * 10);
450      assertEquals("Chores are missing their start time. Should expand core pool size", 4,
451        service.getCorePoolSize());
452
453      SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod);
454      service.scheduleChore(slowChore5);
455
456      Thread.sleep(slowChorePeriod * 10);
457      assertEquals("Chores are missing their start time. Should expand core pool size", 5,
458        service.getCorePoolSize());
459    } finally {
460      shutdownService(service);
461    }
462  }
463
464  @Test
465  public void testCorePoolDecrease() throws InterruptedException {
466    final int initialCorePoolSize = 3;
467    ChoreService service = new ChoreService("testCorePoolDecrease", initialCorePoolSize, false);
468    final int chorePeriod = 100;
469    try {
470      // Slow chores always miss their start time and thus the core pool size should be at least as
471      // large as the number of running slow chores
472      SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod);
473      SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod);
474      SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod);
475
476      service.scheduleChore(slowChore1);
477      service.scheduleChore(slowChore2);
478      service.scheduleChore(slowChore3);
479
480      Thread.sleep(chorePeriod * 10);
481      assertEquals("Should not create more pools than scheduled chores",
482        service.getNumberOfScheduledChores(), service.getCorePoolSize());
483
484      SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod);
485      service.scheduleChore(slowChore4);
486      Thread.sleep(chorePeriod * 10);
487      assertEquals("Chores are missing their start time. Should expand core pool size",
488        service.getNumberOfScheduledChores(), service.getCorePoolSize());
489
490      SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod);
491      service.scheduleChore(slowChore5);
492      Thread.sleep(chorePeriod * 10);
493      assertEquals("Chores are missing their start time. Should expand core pool size",
494        service.getNumberOfScheduledChores(), service.getCorePoolSize());
495      assertEquals(5, service.getNumberOfChoresMissingStartTime());
496
497      // Now we begin to cancel the chores that caused an increase in the core thread pool of the
498      // ChoreService. These cancellations should cause a decrease in the core thread pool.
499      slowChore5.cancel();
500      Thread.sleep(chorePeriod * 10);
501      assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
502        service.getCorePoolSize());
503      assertEquals(4, service.getNumberOfChoresMissingStartTime());
504
505      slowChore4.cancel();
506      Thread.sleep(chorePeriod * 10);
507      assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
508        service.getCorePoolSize());
509      assertEquals(3, service.getNumberOfChoresMissingStartTime());
510
511      slowChore3.cancel();
512      Thread.sleep(chorePeriod * 10);
513      assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
514        service.getCorePoolSize());
515      assertEquals(2, service.getNumberOfChoresMissingStartTime());
516
517      slowChore2.cancel();
518      Thread.sleep(chorePeriod * 10);
519      assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
520        service.getCorePoolSize());
521      assertEquals(1, service.getNumberOfChoresMissingStartTime());
522
523      slowChore1.cancel();
524      Thread.sleep(chorePeriod * 10);
525      assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
526        service.getCorePoolSize());
527      assertEquals(0, service.getNumberOfChoresMissingStartTime());
528    } finally {
529      shutdownService(service);
530    }
531  }
532
533  @Test
534  public void testNumberOfRunningChores() throws InterruptedException {
535    ChoreService service = new ChoreService("testNumberOfRunningChores");
536
537    final int period = 100;
538    final int sleepTime = 5;
539
540    try {
541      DoNothingChore dn1 = new DoNothingChore("dn1", period);
542      DoNothingChore dn2 = new DoNothingChore("dn2", period);
543      DoNothingChore dn3 = new DoNothingChore("dn3", period);
544      DoNothingChore dn4 = new DoNothingChore("dn4", period);
545      DoNothingChore dn5 = new DoNothingChore("dn5", period);
546
547      service.scheduleChore(dn1);
548      service.scheduleChore(dn2);
549      service.scheduleChore(dn3);
550      service.scheduleChore(dn4);
551      service.scheduleChore(dn5);
552
553      Thread.sleep(sleepTime);
554      assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores());
555
556      dn1.cancel();
557      Thread.sleep(sleepTime);
558      assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores());
559
560      dn2.cancel();
561      dn3.cancel();
562      dn4.cancel();
563      Thread.sleep(sleepTime);
564      assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores());
565
566      dn5.cancel();
567      Thread.sleep(sleepTime);
568      assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores());
569    } finally {
570      shutdownService(service);
571    }
572  }
573
574  @Test
575  public void testNumberOfChoresMissingStartTime() throws InterruptedException {
576    ChoreService service = new ChoreService("testNumberOfChoresMissingStartTime");
577
578    final int period = 100;
579    final int sleepTime = 20 * period;
580
581    try {
582      // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
583      // ALWAYS miss their start time since their execution takes longer than their period
584      SlowChore sc1 = new SlowChore("sc1", period);
585      SlowChore sc2 = new SlowChore("sc2", period);
586      SlowChore sc3 = new SlowChore("sc3", period);
587      SlowChore sc4 = new SlowChore("sc4", period);
588      SlowChore sc5 = new SlowChore("sc5", period);
589
590      service.scheduleChore(sc1);
591      service.scheduleChore(sc2);
592      service.scheduleChore(sc3);
593      service.scheduleChore(sc4);
594      service.scheduleChore(sc5);
595
596      Thread.sleep(sleepTime);
597      assertEquals(5, service.getNumberOfChoresMissingStartTime());
598
599      sc1.cancel();
600      Thread.sleep(sleepTime);
601      assertEquals(4, service.getNumberOfChoresMissingStartTime());
602
603      sc2.cancel();
604      sc3.cancel();
605      sc4.cancel();
606      Thread.sleep(sleepTime);
607      assertEquals(1, service.getNumberOfChoresMissingStartTime());
608
609      sc5.cancel();
610      Thread.sleep(sleepTime);
611      assertEquals(0, service.getNumberOfChoresMissingStartTime());
612    } finally {
613      shutdownService(service);
614    }
615  }
616
617  /**
618   * ChoreServices should never have a core pool size that exceeds the number of chores that have
619   * been scheduled with the service. For example, if 4 ScheduledChores are scheduled with a
620   * ChoreService, the number of threads in the ChoreService's core pool should never exceed 4
621   */
622  @Test
623  public void testMaximumChoreServiceThreads() throws InterruptedException {
624    ChoreService service = new ChoreService("testMaximumChoreServiceThreads");
625
626    final int period = 100;
627    final int sleepTime = 5 * period;
628
629    try {
630      // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
631      // ALWAYS miss their start time since their execution takes longer than their period.
632      // Chores that miss their start time will trigger the onChoreMissedStartTime callback
633      // in the ChoreService. This callback will try to increase the number of core pool
634      // threads.
635      SlowChore sc1 = new SlowChore("sc1", period);
636      SlowChore sc2 = new SlowChore("sc2", period);
637      SlowChore sc3 = new SlowChore("sc3", period);
638      SlowChore sc4 = new SlowChore("sc4", period);
639      SlowChore sc5 = new SlowChore("sc5", period);
640
641      service.scheduleChore(sc1);
642      service.scheduleChore(sc2);
643      service.scheduleChore(sc3);
644      service.scheduleChore(sc4);
645      service.scheduleChore(sc5);
646
647      Thread.sleep(sleepTime);
648      assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
649
650      SlowChore sc6 = new SlowChore("sc6", period);
651      SlowChore sc7 = new SlowChore("sc7", period);
652      SlowChore sc8 = new SlowChore("sc8", period);
653      SlowChore sc9 = new SlowChore("sc9", period);
654      SlowChore sc10 = new SlowChore("sc10", period);
655
656      service.scheduleChore(sc6);
657      service.scheduleChore(sc7);
658      service.scheduleChore(sc8);
659      service.scheduleChore(sc9);
660      service.scheduleChore(sc10);
661
662      Thread.sleep(sleepTime);
663      assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
664    } finally {
665      shutdownService(service);
666    }
667  }
668
669  @Test
670  public void testChangingChoreServices() throws InterruptedException {
671    final int period = 100;
672    final int sleepTime = 10;
673    ChoreService service1 = new ChoreService("testChangingChoreServices_1");
674    ChoreService service2 = new ChoreService("testChangingChoreServices_2");
675    ScheduledChore chore = new DoNothingChore("sample", period);
676
677    try {
678      assertFalse(chore.isScheduled());
679      assertFalse(service1.isChoreScheduled(chore));
680      assertFalse(service2.isChoreScheduled(chore));
681      assertTrue(chore.getChoreServicer() == null);
682
683      service1.scheduleChore(chore);
684      Thread.sleep(sleepTime);
685      assertTrue(chore.isScheduled());
686      assertTrue(service1.isChoreScheduled(chore));
687      assertFalse(service2.isChoreScheduled(chore));
688      assertFalse(chore.getChoreServicer() == null);
689
690      service2.scheduleChore(chore);
691      Thread.sleep(sleepTime);
692      assertTrue(chore.isScheduled());
693      assertFalse(service1.isChoreScheduled(chore));
694      assertTrue(service2.isChoreScheduled(chore));
695      assertFalse(chore.getChoreServicer() == null);
696
697      chore.cancel();
698      assertFalse(chore.isScheduled());
699      assertFalse(service1.isChoreScheduled(chore));
700      assertFalse(service2.isChoreScheduled(chore));
701      assertTrue(chore.getChoreServicer() == null);
702    } finally {
703      shutdownService(service1);
704      shutdownService(service2);
705    }
706  }
707
708  @Test
709  public void testStopperForScheduledChores() throws InterruptedException {
710    ChoreService service = new ChoreService("testStopperForScheduledChores");
711    Stoppable stopperForGroup1 = new SampleStopper();
712    Stoppable stopperForGroup2 = new SampleStopper();
713    final int period = 100;
714    final int delta = period/10;
715
716    try {
717      ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period);
718      ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period);
719      ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period);
720
721      ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period);
722      ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period);
723      ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period);
724
725      service.scheduleChore(chore1_group1);
726      service.scheduleChore(chore2_group1);
727      service.scheduleChore(chore3_group1);
728      service.scheduleChore(chore1_group2);
729      service.scheduleChore(chore2_group2);
730      service.scheduleChore(chore3_group2);
731
732      Thread.sleep(delta);
733      Thread.sleep(10 * period);
734      assertTrue(chore1_group1.isScheduled());
735      assertTrue(chore2_group1.isScheduled());
736      assertTrue(chore3_group1.isScheduled());
737      assertTrue(chore1_group2.isScheduled());
738      assertTrue(chore2_group2.isScheduled());
739      assertTrue(chore3_group2.isScheduled());
740
741      stopperForGroup1.stop("test stopping group 1");
742      Thread.sleep(period);
743      assertFalse(chore1_group1.isScheduled());
744      assertFalse(chore2_group1.isScheduled());
745      assertFalse(chore3_group1.isScheduled());
746      assertTrue(chore1_group2.isScheduled());
747      assertTrue(chore2_group2.isScheduled());
748      assertTrue(chore3_group2.isScheduled());
749
750      stopperForGroup2.stop("test stopping group 2");
751      Thread.sleep(period);
752      assertFalse(chore1_group1.isScheduled());
753      assertFalse(chore2_group1.isScheduled());
754      assertFalse(chore3_group1.isScheduled());
755      assertFalse(chore1_group2.isScheduled());
756      assertFalse(chore2_group2.isScheduled());
757      assertFalse(chore3_group2.isScheduled());
758    } finally {
759      shutdownService(service);
760    }
761  }
762
763  @Test
764  public void testShutdownCancelsScheduledChores() throws InterruptedException {
765    final int period = 100;
766    ChoreService service = new ChoreService("testShutdownCancelsScheduledChores");
767    ScheduledChore successChore1 = new DoNothingChore("sc1", period);
768    ScheduledChore successChore2 = new DoNothingChore("sc2", period);
769    ScheduledChore successChore3 = new DoNothingChore("sc3", period);
770
771    try {
772      assertTrue(service.scheduleChore(successChore1));
773      assertTrue(successChore1.isScheduled());
774      assertTrue(service.scheduleChore(successChore2));
775      assertTrue(successChore2.isScheduled());
776      assertTrue(service.scheduleChore(successChore3));
777      assertTrue(successChore3.isScheduled());
778    } finally {
779      shutdownService(service);
780    }
781
782    assertFalse(successChore1.isScheduled());
783    assertFalse(successChore2.isScheduled());
784    assertFalse(successChore3.isScheduled());
785  }
786
787  @Test
788  public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException {
789    final int period = 100;
790    final int sleep = 5 * period;
791    ChoreService service = new ChoreService("testShutdownWorksWhileChoresAreExecuting");
792    ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep);
793    ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep);
794    ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep);
795    try {
796      assertTrue(service.scheduleChore(slowChore1));
797      assertTrue(service.scheduleChore(slowChore2));
798      assertTrue(service.scheduleChore(slowChore3));
799
800      Thread.sleep(sleep / 2);
801      shutdownService(service);
802
803      assertFalse(slowChore1.isScheduled());
804      assertFalse(slowChore2.isScheduled());
805      assertFalse(slowChore3.isScheduled());
806      assertTrue(service.isShutdown());
807
808      Thread.sleep(5);
809      assertTrue(service.isTerminated());
810    } finally {
811      shutdownService(service);
812    }
813  }
814
815  @Test
816  public void testShutdownRejectsNewSchedules() throws InterruptedException {
817    final int period = 100;
818    ChoreService service = new ChoreService("testShutdownRejectsNewSchedules");
819    ScheduledChore successChore1 = new DoNothingChore("sc1", period);
820    ScheduledChore successChore2 = new DoNothingChore("sc2", period);
821    ScheduledChore successChore3 = new DoNothingChore("sc3", period);
822    ScheduledChore failChore1 = new DoNothingChore("fc1", period);
823    ScheduledChore failChore2 = new DoNothingChore("fc2", period);
824    ScheduledChore failChore3 = new DoNothingChore("fc3", period);
825
826    try {
827      assertTrue(service.scheduleChore(successChore1));
828      assertTrue(successChore1.isScheduled());
829      assertTrue(service.scheduleChore(successChore2));
830      assertTrue(successChore2.isScheduled());
831      assertTrue(service.scheduleChore(successChore3));
832      assertTrue(successChore3.isScheduled());
833    } finally {
834      shutdownService(service);
835    }
836
837    assertFalse(service.scheduleChore(failChore1));
838    assertFalse(failChore1.isScheduled());
839    assertFalse(service.scheduleChore(failChore2));
840    assertFalse(failChore2.isScheduled());
841    assertFalse(service.scheduleChore(failChore3));
842    assertFalse(failChore3.isScheduled());
843  }
844
845  /**
846   * for HBASE-25014
847   */
848  @Test(timeout = 10000)
849  public void testInitialDelay() {
850    ChoreService service = new ChoreService(name.getMethodName());
851    SampleStopper stopper = new SampleStopper();
852    service.scheduleChore(new ScheduledChore("chore", stopper, 1000, 2000) {
853      @Override protected void chore() {
854        stopper.stop("test");
855      }
856    });
857    while (!stopper.isStopped()) {
858      Threads.sleep(1000);
859    }
860  }
861}