001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.Waiter; 030import org.apache.hadoop.hbase.testclassification.ClientTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 033import org.junit.jupiter.api.AfterEach; 034import org.junit.jupiter.api.BeforeEach; 035import org.junit.jupiter.api.Tag; 036import org.junit.jupiter.api.Test; 037 038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 039 040@Tag(ClientTests.TAG) 041@Tag(SmallTests.TAG) 042public class TestRegistryEndpointsRefresher { 043 044 private static final String INITIAL_DELAY_SECS_CONFIG_NAME = 045 "hbase.test.registry.initial.delay.secs"; 046 private static final String INTERVAL_SECS_CONFIG_NAME = 047 "hbase.test.registry.refresh.interval.secs"; 048 private static final String MIN_INTERVAL_SECS_CONFIG_NAME = 049 "hbase.test.registry.refresh.min.interval.secs"; 050 051 private Configuration conf; 052 private RegistryEndpointsRefresher refresher; 053 private AtomicInteger refreshCallCounter; 054 private CopyOnWriteArrayList<Long> callTimestamps; 055 056 @BeforeEach 057 public void setUp() { 058 conf = HBaseConfiguration.create(); 059 refreshCallCounter = new AtomicInteger(0); 060 callTimestamps = new CopyOnWriteArrayList<>(); 061 } 062 063 @AfterEach 064 public void tearDown() { 065 if (refresher != null) { 066 refresher.stop(); 067 } 068 } 069 070 private void refresh() { 071 refreshCallCounter.incrementAndGet(); 072 callTimestamps.add(EnvironmentEdgeManager.currentTime()); 073 } 074 075 private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) { 076 conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs); 077 conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs); 078 conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs); 079 refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME, 080 INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh); 081 } 082 083 @Test 084 public void testDisableRefresh() { 085 conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1); 086 assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME, 087 INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh)); 088 } 089 090 @Test 091 public void testInitialDelay() throws InterruptedException { 092 createRefresher(1, 10, 0); 093 // Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1 094 // seconds 095 Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1); 096 // Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds 097 Thread.sleep(5000); 098 assertEquals(1, refreshCallCounter.get()); 099 } 100 101 @Test 102 public void testPeriodicMasterEndPointRefresh() { 103 // Refresh every 1 second. 104 createRefresher(1, 1, 0); 105 // Wait for > 3 seconds to see that at least 3 refresh have been made. 106 Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3); 107 } 108 109 @Test 110 public void testDurationBetweenRefreshes() { 111 // Disable periodic refresh 112 // A minimum duration of 1s between refreshes 113 createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1); 114 // Issue a ton of manual refreshes. 115 for (int i = 0; i < 10000; i++) { 116 refresher.refreshNow(); 117 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); 118 } 119 // Overall wait time is 10000 ms, so the number of requests should be <=10 120 // Actual calls to refresh should be much lower than the refresh count. 121 assertTrue(refreshCallCounter.get() <= 20, String.valueOf(refreshCallCounter.get())); 122 assertTrue(callTimestamps.size() > 0); 123 // Verify that the delta between subsequent refresh is at least 1sec as configured. 124 for (int i = 1; i < callTimestamps.size() - 1; i++) { 125 long delta = callTimestamps.get(i) - callTimestamps.get(i - 1); 126 // Few ms cushion to account for any env jitter. 127 assertTrue(delta > 990, callTimestamps.toString()); 128 } 129 } 130}