001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.allOf; 023import static org.hamcrest.Matchers.hasItem; 024import static org.hamcrest.Matchers.is; 025import static org.junit.jupiter.api.Assertions.assertEquals; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027 028import java.io.IOException; 029import java.io.StringWriter; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.concurrent.ScheduledThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import org.apache.commons.lang3.StringUtils; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.LocalHBaseCluster; 039import org.apache.hadoop.hbase.MatcherPredicate; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer; 042import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException; 043import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 044import org.apache.hadoop.hbase.master.HMaster; 045import org.apache.hadoop.hbase.master.ServerManager; 046import org.apache.hadoop.hbase.testclassification.LargeTests; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 049import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 051import org.apache.hadoop.hbase.util.Threads; 052import org.apache.zookeeper.KeeperException; 053import org.junit.jupiter.api.AfterEach; 054import org.junit.jupiter.api.BeforeEach; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.Test; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062@Tag(LargeTests.TAG) 063public class TestRegionServerReportForDuty { 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class); 066 067 private static final long SLEEP_INTERVAL = 500; 068 069 private HBaseTestingUtil testUtil; 070 private LocalHBaseCluster cluster; 071 private RegionServerThread rs; 072 private RegionServerThread rs2; 073 private MasterThread master; 074 private MasterThread backupMaster; 075 076 @BeforeEach 077 public void setUp() throws Exception { 078 testUtil = new HBaseTestingUtil(); 079 testUtil.startMiniDFSCluster(1); 080 testUtil.startMiniZKCluster(1); 081 testUtil.createRootDir(); 082 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); 083 } 084 085 @AfterEach 086 public void tearDown() throws Exception { 087 cluster.shutdown(); 088 cluster.join(); 089 testUtil.shutdownMiniZKCluster(); 090 testUtil.shutdownMiniDFSCluster(); 091 } 092 093 private static class LogCapturer { 094 private StringWriter sw = new StringWriter(); 095 private org.apache.logging.log4j.core.appender.WriterAppender appender; 096 private org.apache.logging.log4j.core.Logger logger; 097 098 LogCapturer(org.apache.logging.log4j.core.Logger logger) { 099 this.logger = logger; 100 this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder() 101 .setName("test").setTarget(sw).build(); 102 this.logger.addAppender(this.appender); 103 } 104 105 String getOutput() { 106 return sw.toString(); 107 } 108 109 public void stopCapturing() { 110 this.logger.removeAppender(this.appender); 111 } 112 } 113 114 /** 115 * This test HMaster class will always throw ServerNotRunningYetException if checked. 116 */ 117 public static class NeverInitializedMaster extends HMaster { 118 public NeverInitializedMaster(Configuration conf) throws IOException { 119 super(conf); 120 } 121 122 @Override 123 protected void checkServiceStarted() throws ServerNotRunningYetException { 124 throw new ServerNotRunningYetException("Server is not running yet"); 125 } 126 } 127 128 /** 129 * Tests region server should backoff to report for duty if master is not ready. 130 */ 131 @Test 132 public void testReportForDutyBackoff() throws IOException, InterruptedException { 133 cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName()); 134 master = cluster.addMaster(); 135 master.start(); 136 137 LogCapturer capturer = 138 new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 139 .getLogger(HRegionServer.class)); 140 // Set sleep interval relatively low so that exponential backoff is more demanding. 141 int msginterval = 100; 142 cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval); 143 rs = cluster.addRegionServer(); 144 rs.start(); 145 146 int interval = 10_000; 147 Thread.sleep(interval); 148 capturer.stopCapturing(); 149 String output = capturer.getOutput(); 150 LOG.info("{}", output); 151 String failMsg = "reportForDuty failed;"; 152 int count = StringUtils.countMatches(output, failMsg); 153 154 // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2). 155 // Ideally we can assert the exact retry count. We relax here to tolerate contention error. 156 int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval)); 157 assertTrue(count > expectedRetry / 2, String.format( 158 "reportForDuty retries %d times, less than expected min %d", count, expectedRetry / 2)); 159 assertTrue(count < expectedRetry * 2, String.format( 160 "reportForDuty retries %d times, more than expected max %d", count, expectedRetry * 2)); 161 } 162 163 /** 164 * Tests region sever reportForDuty with backup master becomes primary master after the first 165 * master goes away. 166 */ 167 @Test 168 public void testReportForDutyWithMasterChange() throws Exception { 169 170 // Start a master and wait for it to become the active/primary master. 171 // Use a random unique port 172 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 173 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 174 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 175 master = cluster.addMaster(); 176 rs = cluster.addRegionServer(); 177 LOG.debug("Starting master: " + master.getMaster().getServerName()); 178 master.start(); 179 rs.start(); 180 181 waitForClusterOnline(master); 182 183 // Add a 2nd region server 184 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 185 rs2 = cluster.addRegionServer(); 186 // Start the region server. This region server will refresh RPC connection 187 // from the current active master to the next active master before completing 188 // reportForDuty 189 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); 190 rs2.start(); 191 192 waitForSecondRsStarted(); 193 194 // Stop the current master. 195 master.getMaster().stop("Stopping master"); 196 197 // Start a new master and use another random unique port 198 // Also let it wait for exactly 2 region severs to report in. 199 // TODO: Add handling bindexception. Random port is not enough!!! Flakie test! 200 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 201 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2); 202 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2); 203 backupMaster = cluster.addMaster(); 204 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); 205 backupMaster.start(); 206 207 waitForClusterOnline(backupMaster); 208 209 // Do some checking/asserts here. 210 assertTrue(backupMaster.getMaster().isActiveMaster()); 211 assertTrue(backupMaster.getMaster().isInitialized()); 212 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2); 213 214 } 215 216 /** 217 * Tests region sever reportForDuty with RS RPC retry 218 */ 219 @Test 220 public void testReportForDutyWithRSRpcRetry() throws Exception { 221 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, 222 new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true) 223 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 224 225 // Start a master and wait for it to become the active/primary master. 226 // Use a random unique port 227 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 228 // Override the default RS RPC retry interval of 100ms to 300ms 229 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300); 230 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 231 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 232 master = cluster.addMaster(); 233 rs = cluster.addRegionServer(); 234 LOG.debug("Starting master: " + master.getMaster().getServerName()); 235 master.start(); 236 // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block 237 scheduledThreadPoolExecutor.schedule(new Runnable() { 238 @Override 239 public void run() { 240 rs.start(); 241 } 242 }, 1000, TimeUnit.MILLISECONDS); 243 244 waitForClusterOnline(master); 245 } 246 247 /** 248 * Tests that the RegionServer's reportForDuty gets rejected by the master when the master is 249 * configured to reject decommissioned hosts and when there is a match for the joining 250 * RegionServer in the list of decommissioned servers. Test case for HBASE-28342. 251 */ 252 @Test 253 public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts() 254 throws Exception { 255 // Start a master and wait for it to become the active/primary master. 256 // Use a random unique port 257 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 258 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 259 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 260 261 // Set the cluster to reject decommissioned hosts 262 cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true); 263 264 master = cluster.addMaster(); 265 rs = cluster.addRegionServer(); 266 master.start(); 267 rs.start(); 268 waitForClusterOnline(master); 269 270 // Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty 271 LogCapturer capturer = 272 new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 273 .getLogger(HRegionServer.class)); 274 275 rs2 = cluster.addRegionServer(); 276 master.getMaster().decommissionRegionServers( 277 Collections.singletonList(rs2.getRegionServer().getServerName()), false); 278 rs2.start(); 279 280 // Assert that the second regionserver has aborted 281 testUtil.waitFor(TimeUnit.SECONDS.toMillis(90), 282 new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true))); 283 284 // Assert that the log messages for DecommissionedHostRejectedException exist in the logs 285 capturer.stopCapturing(); 286 287 assertThat(capturer.getOutput(), 288 containsString("Master rejected startup because the host is considered decommissioned")); 289 290 /** 291 * Assert that the following log message occurred (one line): 292 * "org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException: 293 * org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException: Host localhost exists in the 294 * list of decommissioned servers and Master is configured to reject decommissioned hosts" 295 */ 296 assertThat(Arrays.asList(capturer.getOutput().split("\n")), 297 hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()), 298 containsString(DecommissionedHostRejectedException.class.getSimpleName()), 299 containsString("Host " + rs2.getRegionServer().getServerName().getHostname() 300 + " exists in the list of decommissioned servers and Master is configured to reject" 301 + " decommissioned hosts")))); 302 303 assertThat(Arrays.asList(capturer.getOutput().split("\n")), 304 hasItem( 305 allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()), 306 containsString("Unhandled"), 307 containsString(DecommissionedHostRejectedException.class.getSimpleName()), 308 containsString("Host " + rs2.getRegionServer().getServerName().getHostname() 309 + " exists in the list of decommissioned servers and Master is configured to reject" 310 + " decommissioned hosts")))); 311 } 312 313 /** 314 * Tests region sever reportForDuty with a non-default environment edge 315 */ 316 @Test 317 public void testReportForDutyWithEnvironmentEdge() throws Exception { 318 // Start a master and wait for it to become the active/primary master. 319 // Use a random unique port 320 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 321 // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately 322 cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0); 323 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0); 324 325 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 326 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 327 328 // Inject non-default environment edge 329 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 330 EnvironmentEdgeManager.injectEdge(edge); 331 master = cluster.addMaster(); 332 rs = cluster.addRegionServer(); 333 LOG.debug("Starting master: " + master.getMaster().getServerName()); 334 master.start(); 335 rs.start(); 336 waitForClusterOnline(master); 337 } 338 339 private void waitForClusterOnline(MasterThread master) throws InterruptedException { 340 while (true) { 341 if (master.getMaster().isInitialized()) { 342 break; 343 } 344 Thread.sleep(SLEEP_INTERVAL); 345 LOG.debug("Waiting for master to come online ..."); 346 } 347 rs.waitForServerOnline(); 348 } 349 350 private void waitForSecondRsStarted() throws InterruptedException { 351 while (true) { 352 if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { 353 break; 354 } 355 Thread.sleep(SLEEP_INTERVAL); 356 LOG.debug("Waiting 2nd RS to be started ..."); 357 } 358 } 359 360 // Create a Region Server that provide a hook so that we can wait for the master switch over 361 // before continuing reportForDuty to the mater. 362 // The idea is that we get a RPC connection to the first active master, then we wait. 363 // The first master goes down, the second master becomes the active master. The region 364 // server continues reportForDuty. It should succeed with the new master. 365 public static class MyRegionServer extends MiniHBaseClusterRegionServer { 366 367 private ServerName sn; 368 // This flag is to make sure this rs has obtained the rpcStub to the first master. 369 // The first master will go down after this. 370 private boolean rpcStubCreatedFlag = false; 371 private boolean masterChanged = false; 372 373 public MyRegionServer(Configuration conf) 374 throws IOException, KeeperException, InterruptedException { 375 super(conf); 376 } 377 378 @Override 379 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 380 sn = super.createRegionServerStatusStub(refresh); 381 rpcStubCreatedFlag = true; 382 383 // Wait for master switch over. Only do this for the second region server. 384 while (!masterChanged) { 385 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); 386 if (newSn != null && !newSn.equals(sn)) { 387 masterChanged = true; 388 break; 389 } 390 try { 391 Thread.sleep(SLEEP_INTERVAL); 392 } catch (InterruptedException e) { 393 return null; 394 } 395 LOG.debug("Waiting for master switch over ... "); 396 } 397 return sn; 398 } 399 400 public boolean getRpcStubCreatedFlag() { 401 return rpcStubCreatedFlag; 402 } 403 } 404}