001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.Waiter;
030import org.apache.hadoop.hbase.testclassification.ClientTests;
031import org.apache.hadoop.hbase.testclassification.SmallTests;
032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
033import org.junit.jupiter.api.AfterEach;
034import org.junit.jupiter.api.BeforeEach;
035import org.junit.jupiter.api.Tag;
036import org.junit.jupiter.api.Test;
037
038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
039
040@Tag(ClientTests.TAG)
041@Tag(SmallTests.TAG)
042public class TestRegistryEndpointsRefresher {
043
044  private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
045    "hbase.test.registry.initial.delay.secs";
046  private static final String INTERVAL_SECS_CONFIG_NAME =
047    "hbase.test.registry.refresh.interval.secs";
048  private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
049    "hbase.test.registry.refresh.min.interval.secs";
050
051  private Configuration conf;
052  private RegistryEndpointsRefresher refresher;
053  private AtomicInteger refreshCallCounter;
054  private CopyOnWriteArrayList<Long> callTimestamps;
055
056  @BeforeEach
057  public void setUp() {
058    conf = HBaseConfiguration.create();
059    refreshCallCounter = new AtomicInteger(0);
060    callTimestamps = new CopyOnWriteArrayList<>();
061  }
062
063  @AfterEach
064  public void tearDown() {
065    if (refresher != null) {
066      refresher.stop();
067    }
068  }
069
070  private void refresh() {
071    refreshCallCounter.incrementAndGet();
072    callTimestamps.add(EnvironmentEdgeManager.currentTime());
073  }
074
075  private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) {
076    conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs);
077    conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
078    conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
079    refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME,
080      INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
081  }
082
083  @Test
084  public void testDisableRefresh() {
085    conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
086    assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
087      INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
088  }
089
090  @Test
091  public void testInitialDelay() throws InterruptedException {
092    createRefresher(1, 10, 0);
093    // Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1
094    // seconds
095    Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1);
096    // Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds
097    Thread.sleep(5000);
098    assertEquals(1, refreshCallCounter.get());
099  }
100
101  @Test
102  public void testPeriodicMasterEndPointRefresh() {
103    // Refresh every 1 second.
104    createRefresher(1, 1, 0);
105    // Wait for > 3 seconds to see that at least 3 refresh have been made.
106    Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
107  }
108
109  @Test
110  public void testDurationBetweenRefreshes() {
111    // Disable periodic refresh
112    // A minimum duration of 1s between refreshes
113    createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1);
114    // Issue a ton of manual refreshes.
115    for (int i = 0; i < 10000; i++) {
116      refresher.refreshNow();
117      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
118    }
119    // Overall wait time is 10000 ms, so the number of requests should be <=10
120    // Actual calls to refresh should be much lower than the refresh count.
121    assertTrue(refreshCallCounter.get() <= 20, String.valueOf(refreshCallCounter.get()));
122    assertTrue(callTimestamps.size() > 0);
123    // Verify that the delta between subsequent refresh is at least 1sec as configured.
124    for (int i = 1; i < callTimestamps.size() - 1; i++) {
125      long delta = callTimestamps.get(i) - callTimestamps.get(i - 1);
126      // Few ms cushion to account for any env jitter.
127      assertTrue(delta > 990, callTimestamps.toString());
128    }
129  }
130}