001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.CoreMatchers.hasItems; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.ArgumentMatchers.anyInt; 025import static org.mockito.Mockito.atLeast; 026import static org.mockito.Mockito.atLeastOnce; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.never; 030import static org.mockito.Mockito.times; 031import static org.mockito.Mockito.verify; 032import static org.mockito.Mockito.when; 033 034import java.io.IOException; 035import java.util.Arrays; 036import java.util.List; 037import java.util.concurrent.CompletableFuture; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseRpcServicesBase; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.client.AsyncClusterConnection; 044import org.apache.hadoop.hbase.testclassification.RegionServerTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.FutureUtils; 048import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 049import org.junit.jupiter.api.AfterEach; 050import org.junit.jupiter.api.BeforeEach; 051import org.junit.jupiter.api.Tag; 052import org.junit.jupiter.api.Test; 053 054@Tag(RegionServerTests.TAG) 055@Tag(SmallTests.TAG) 056public class TestBootstrapNodeManager { 057 058 private Configuration conf; 059 060 private AsyncClusterConnection conn; 061 062 private MasterAddressTracker tracker; 063 064 private BootstrapNodeManager manager; 065 066 @BeforeEach 067 public void setUp() { 068 conf = HBaseConfiguration.create(); 069 conf.setLong(BootstrapNodeManager.REQUEST_MASTER_INTERVAL_SECS, 5); 070 conf.setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1); 071 conf.setLong(BootstrapNodeManager.REQUEST_REGIONSERVER_INTERVAL_SECS, 1); 072 conf.setInt(HBaseRpcServicesBase.CLIENT_BOOTSTRAP_NODE_LIMIT, 2); 073 conn = mock(AsyncClusterConnection.class); 074 when(conn.getConfiguration()).thenReturn(conf); 075 tracker = mock(MasterAddressTracker.class); 076 } 077 078 @AfterEach 079 public void tearDown() { 080 if (manager != null) { 081 manager.stop(); 082 } 083 } 084 085 private void assertListEquals(List<ServerName> expected, List<ServerName> actual) { 086 assertEquals(expected.size(), expected.size()); 087 assertThat(actual, hasItems(expected.toArray(new ServerName[0]))); 088 } 089 090 @Test 091 public void testNormal() throws Exception { 092 List<ServerName> regionServers = 093 Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()), 094 ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()), 095 ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()), 096 ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime())); 097 when(conn.getLiveRegionServers(any(), anyInt())) 098 .thenReturn(CompletableFuture.completedFuture(regionServers)); 099 when(conn.getAllBootstrapNodes(any())) 100 .thenReturn(CompletableFuture.completedFuture(regionServers)); 101 manager = new BootstrapNodeManager(conn, tracker); 102 Thread.sleep(3000); 103 verify(conn, times(1)).getLiveRegionServers(any(), anyInt()); 104 verify(conn, atLeastOnce()).getAllBootstrapNodes(any()); 105 assertListEquals(regionServers, manager.getBootstrapNodes()); 106 } 107 108 // if we do not return enough region servers, we will always get from master 109 @Test 110 public void testOnlyMaster() throws Exception { 111 List<ServerName> regionServers = 112 Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime())); 113 when(conn.getLiveRegionServers(any(), anyInt())) 114 .thenReturn(CompletableFuture.completedFuture(regionServers)); 115 when(conn.getAllBootstrapNodes(any())) 116 .thenReturn(CompletableFuture.completedFuture(regionServers)); 117 manager = new BootstrapNodeManager(conn, tracker); 118 Thread.sleep(3000); 119 verify(conn, atLeast(2)).getLiveRegionServers(any(), anyInt()); 120 verify(conn, never()).getAllBootstrapNodes(any()); 121 assertListEquals(regionServers, manager.getBootstrapNodes()); 122 } 123 124 @Test 125 public void testRegionServerError() throws Exception { 126 List<ServerName> regionServers = 127 Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()), 128 ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()), 129 ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()), 130 ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime())); 131 List<ServerName> newRegionServers = 132 Arrays.asList(ServerName.valueOf("server5", 12345, EnvironmentEdgeManager.currentTime()), 133 ServerName.valueOf("server6", 12345, EnvironmentEdgeManager.currentTime())); 134 when(conn.getLiveRegionServers(any(), anyInt())) 135 .thenReturn(CompletableFuture.completedFuture(regionServers)); 136 when(conn.getAllBootstrapNodes(any())).thenAnswer(invocation -> { 137 if (invocation.getArgument(0, ServerName.class).getHostname().equals("server4")) { 138 return FutureUtils.failedFuture(new IOException("Inject error")); 139 } else { 140 return CompletableFuture.completedFuture(regionServers.subList(0, 3)); 141 } 142 }); 143 manager = new BootstrapNodeManager(conn, tracker); 144 // we should remove server4 from the list 145 Waiter.waitFor(conf, 30000, () -> manager.getBootstrapNodes().size() == 3); 146 assertListEquals(regionServers.subList(0, 3), manager.getBootstrapNodes()); 147 when(conn.getLiveRegionServers(any(), anyInt())) 148 .thenReturn(CompletableFuture.completedFuture(newRegionServers)); 149 doAnswer(invocation -> { 150 String hostname = invocation.getArgument(0, ServerName.class).getHostname(); 151 switch (hostname) { 152 case "server1": 153 return CompletableFuture.completedFuture(regionServers.subList(0, 1)); 154 case "server2": 155 case "server3": 156 return FutureUtils.failedFuture(new IOException("Inject error")); 157 default: 158 return CompletableFuture.completedFuture(newRegionServers); 159 } 160 }).when(conn).getAllBootstrapNodes(any()); 161 // we should remove server2, server3 from the list and then get the new list from master again 162 Waiter.waitFor(conf, 30000, () -> { 163 List<ServerName> bootstrapNodes = manager.getBootstrapNodes(); 164 if (bootstrapNodes.size() != 2) { 165 return false; 166 } 167 String hostname = bootstrapNodes.get(0).getHostname(); 168 return hostname.equals("server5") || hostname.equals("server6"); 169 }); 170 assertListEquals(newRegionServers, manager.getBootstrapNodes()); 171 } 172}