001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.testclassification.ReplicationTests;
025import org.apache.hadoop.hbase.testclassification.SmallTests;
026import org.junit.ClassRule;
027import org.junit.Test;
028import org.junit.experimental.categories.Category;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032@Category({ ReplicationTests.class, SmallTests.class })
033public class TestReplicationThrottler {
034
035  @ClassRule
036  public static final HBaseClassTestRule CLASS_RULE =
037    HBaseClassTestRule.forClass(TestReplicationThrottler.class);
038
039  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationThrottler.class);
040
041  /**
042   * unit test for throttling
043   */
044  @Test
045  public void testThrottling() {
046    LOG.info("testThrottling");
047
048    // throttle bandwidth is 100 and 10 bytes/cycle respectively
049    ReplicationThrottler throttler1 = new ReplicationThrottler(100);
050    ReplicationThrottler throttler2 = new ReplicationThrottler(10);
051
052    long ticks1 = throttler1.getNextSleepInterval(1000);
053    long ticks2 = throttler2.getNextSleepInterval(1000);
054
055    // 1. the first push size is 1000, though 1000 bytes exceeds 100/10
056    // bandwidthes, but no sleep since it's the first push of current
057    // cycle, amortizing occurs when next push arrives
058    assertEquals(0, ticks1);
059    assertEquals(0, ticks2);
060
061    throttler1.addPushSize(1000);
062    throttler2.addPushSize(1000);
063
064    ticks1 = throttler1.getNextSleepInterval(5);
065    ticks2 = throttler2.getNextSleepInterval(5);
066
067    // 2. when the second push(5) arrives and throttling(5) is called, the
068    // current cyclePushSize is 1000 bytes, this should make throttler1
069    // sleep 1000/100 = 10 cycles = 1s and make throttler2 sleep 1000/10
070    // = 100 cycles = 10s before the second push occurs -- amortize case
071    // after amortizing, both cycleStartTick and cyclePushSize are reset
072    //
073    // Note: in a slow machine, the sleep interval might be less than ideal ticks.
074    // If it is 75% of expected value, its is still acceptable.
075    if (ticks1 != 1000 && ticks1 != 999) {
076      assertTrue(ticks1 >= 750 && ticks1 <= 1000);
077    }
078    if (ticks2 != 10000 && ticks2 != 9999) {
079      assertTrue(ticks2 >= 7500 && ticks2 <= 10000);
080    }
081
082    throttler1.resetStartTick();
083    throttler2.resetStartTick();
084
085    throttler1.addPushSize(5);
086    throttler2.addPushSize(5);
087
088    ticks1 = throttler1.getNextSleepInterval(45);
089    ticks2 = throttler2.getNextSleepInterval(45);
090
091    // 3. when the third push(45) arrives and throttling(45) is called, the
092    // current cyclePushSize is 5 bytes, 50-byte makes throttler1 no
093    // sleep, but can make throttler2 delay to next cycle
094    // note: in real case, sleep time should cover time elapses during push
095    // operation
096    assertTrue(ticks1 == 0);
097    if (ticks2 != 100 && ticks2 != 99) {
098      assertTrue(ticks1 >= 75 && ticks1 <= 100);
099    }
100
101    throttler2.resetStartTick();
102
103    throttler1.addPushSize(45);
104    throttler2.addPushSize(45);
105
106    ticks1 = throttler1.getNextSleepInterval(60);
107    ticks2 = throttler2.getNextSleepInterval(60);
108
109    // 4. when the fourth push(60) arrives and throttling(60) is called, throttler1
110    // delay to next cycle since 45+60 == 105; and throttler2 should firstly sleep
111    // ceiling(45/10)= 5 cycles = 500ms to amortize previous push
112    //
113    // Note: in real case, sleep time should cover time elapses during push operation
114    if (ticks1 != 100 && ticks1 != 99) {
115      assertTrue(ticks1 >= 75 && ticks1 <= 100);
116    }
117    if (ticks2 != 500 && ticks2 != 499) {
118      assertTrue(ticks1 >= 375 && ticks1 <= 500);
119    }
120  }
121}