001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNull;
022import static org.junit.Assert.assertTrue;
023
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.Waiter;
031import org.apache.hadoop.hbase.testclassification.ClientTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.junit.After;
035import org.junit.Before;
036import org.junit.ClassRule;
037import org.junit.Test;
038import org.junit.experimental.categories.Category;
039
040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
041
042@Category({ ClientTests.class, SmallTests.class })
043public class TestRegistryEndpointsRefresher {
044
045  @ClassRule
046  public static final HBaseClassTestRule CLASS_RULE =
047    HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);
048
049  private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
050    "hbase.test.registry.initial.delay.secs";
051  private static final String INTERVAL_SECS_CONFIG_NAME =
052    "hbase.test.registry.refresh.interval.secs";
053  private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
054    "hbase.test.registry.refresh.min.interval.secs";
055
056  private Configuration conf;
057  private RegistryEndpointsRefresher refresher;
058  private AtomicInteger refreshCallCounter;
059  private CopyOnWriteArrayList<Long> callTimestamps;
060
061  @Before
062  public void setUp() {
063    conf = HBaseConfiguration.create();
064    refreshCallCounter = new AtomicInteger(0);
065    callTimestamps = new CopyOnWriteArrayList<>();
066  }
067
068  @After
069  public void tearDown() {
070    if (refresher != null) {
071      refresher.stop();
072    }
073  }
074
075  private void refresh() {
076    refreshCallCounter.incrementAndGet();
077    callTimestamps.add(EnvironmentEdgeManager.currentTime());
078  }
079
080  private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) {
081    conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs);
082    conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
083    conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
084    refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME,
085      INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
086  }
087
088  @Test
089  public void testDisableRefresh() {
090    conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
091    assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
092      INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
093  }
094
095  @Test
096  public void testInitialDelay() throws InterruptedException {
097    createRefresher(1, 10, 0);
098    // Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1
099    // seconds
100    Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1);
101    // Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds
102    Thread.sleep(5000);
103    assertEquals(1, refreshCallCounter.get());
104  }
105
106  @Test
107  public void testPeriodicMasterEndPointRefresh() {
108    // Refresh every 1 second.
109    createRefresher(1, 1, 0);
110    // Wait for > 3 seconds to see that at least 3 refresh have been made.
111    Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
112  }
113
114  @Test
115  public void testDurationBetweenRefreshes() {
116    // Disable periodic refresh
117    // A minimum duration of 1s between refreshes
118    createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1);
119    // Issue a ton of manual refreshes.
120    for (int i = 0; i < 10000; i++) {
121      refresher.refreshNow();
122      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
123    }
124    // Overall wait time is 10000 ms, so the number of requests should be <=10
125    // Actual calls to refresh should be much lower than the refresh count.
126    assertTrue(String.valueOf(refreshCallCounter.get()), refreshCallCounter.get() <= 20);
127    assertTrue(callTimestamps.size() > 0);
128    // Verify that the delta between subsequent refresh is at least 1sec as configured.
129    for (int i = 1; i < callTimestamps.size() - 1; i++) {
130      long delta = callTimestamps.get(i) - callTimestamps.get(i - 1);
131      // Few ms cushion to account for any env jitter.
132      assertTrue(callTimestamps.toString(), delta > 990);
133    }
134  }
135}