001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.StringWriter; 025import java.util.concurrent.ScheduledThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.LocalHBaseCluster; 033import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 036import org.apache.hadoop.hbase.master.HMaster; 037import org.apache.hadoop.hbase.master.LoadBalancer; 038import org.apache.hadoop.hbase.master.ServerManager; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 042import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 043import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.zookeeper.KeeperException; 046import org.junit.After; 047import org.junit.Before; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 055 056@Category(LargeTests.class) 057public class TestRegionServerReportForDuty { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class); 062 063 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class); 064 065 private static final long SLEEP_INTERVAL = 500; 066 067 private HBaseTestingUtility testUtil; 068 private LocalHBaseCluster cluster; 069 private RegionServerThread rs; 070 private RegionServerThread rs2; 071 private MasterThread master; 072 private MasterThread backupMaster; 073 074 @Before 075 public void setUp() throws Exception { 076 testUtil = new HBaseTestingUtility(); 077 testUtil.startMiniDFSCluster(1); 078 testUtil.startMiniZKCluster(1); 079 testUtil.createRootDir(); 080 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); 081 } 082 083 @After 084 public void tearDown() throws Exception { 085 cluster.shutdown(); 086 cluster.join(); 087 testUtil.shutdownMiniZKCluster(); 088 testUtil.shutdownMiniDFSCluster(); 089 } 090 091 private static class LogCapturer { 092 private StringWriter sw = new StringWriter(); 093 private org.apache.logging.log4j.core.appender.WriterAppender appender; 094 private org.apache.logging.log4j.core.Logger logger; 095 096 LogCapturer(org.apache.logging.log4j.core.Logger logger) { 097 this.logger = logger; 098 this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder() 099 .setName("test").setTarget(sw).build(); 100 this.logger.addAppender(this.appender); 101 } 102 103 String getOutput() { 104 return sw.toString(); 105 } 106 107 public void stopCapturing() { 108 this.logger.removeAppender(this.appender); 109 } 110 } 111 112 /** 113 * This test HMaster class will always throw ServerNotRunningYetException if checked. 114 */ 115 public static class NeverInitializedMaster extends HMaster { 116 public NeverInitializedMaster(Configuration conf) throws IOException { 117 super(conf); 118 } 119 120 @Override 121 protected void checkServiceStarted() throws ServerNotRunningYetException { 122 throw new ServerNotRunningYetException("Server is not running yet"); 123 } 124 } 125 126 /** 127 * Tests region server should backoff to report for duty if master is not ready. 128 */ 129 @Test 130 public void testReportForDutyBackoff() throws IOException, InterruptedException { 131 cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName()); 132 master = cluster.addMaster(); 133 master.start(); 134 135 LogCapturer capturer = 136 new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 137 .getLogger(HRegionServer.class)); 138 // Set sleep interval relatively low so that exponential backoff is more demanding. 139 int msginterval = 100; 140 cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval); 141 rs = cluster.addRegionServer(); 142 rs.start(); 143 144 int interval = 10_000; 145 Thread.sleep(interval); 146 capturer.stopCapturing(); 147 String output = capturer.getOutput(); 148 LOG.info("{}", output); 149 String failMsg = "reportForDuty failed;"; 150 int count = StringUtils.countMatches(output, failMsg); 151 152 // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2). 153 // Ideally we can assert the exact retry count. We relax here to tolerate contention error. 154 int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval)); 155 assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", count, 156 expectedRetry / 2), count > expectedRetry / 2); 157 assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", count, 158 expectedRetry * 2), count < expectedRetry * 2); 159 } 160 161 /** 162 * Tests region sever reportForDuty with backup master becomes primary master after the first 163 * master goes away. 164 */ 165 @Test 166 public void testReportForDutyWithMasterChange() throws Exception { 167 168 // Start a master and wait for it to become the active/primary master. 169 // Use a random unique port 170 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 171 // master has a rs. defaultMinToStart = 2 172 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 173 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 174 tablesOnMaster ? 2 : 1); 175 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 176 tablesOnMaster ? 2 : 1); 177 master = cluster.addMaster(); 178 rs = cluster.addRegionServer(); 179 LOG.debug("Starting master: " + master.getMaster().getServerName()); 180 master.start(); 181 rs.start(); 182 183 waitForClusterOnline(master); 184 185 // Add a 2nd region server 186 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 187 rs2 = cluster.addRegionServer(); 188 // Start the region server. This region server will refresh RPC connection 189 // from the current active master to the next active master before completing 190 // reportForDuty 191 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); 192 rs2.start(); 193 194 waitForSecondRsStarted(); 195 196 // Stop the current master. 197 master.getMaster().stop("Stopping master"); 198 199 // Start a new master and use another random unique port 200 // Also let it wait for exactly 2 region severs to report in. 201 // TODO: Add handling bindexception. Random port is not enough!!! Flakie test! 202 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 203 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 204 tablesOnMaster ? 3 : 2); 205 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 206 tablesOnMaster ? 3 : 2); 207 backupMaster = cluster.addMaster(); 208 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); 209 backupMaster.start(); 210 211 waitForClusterOnline(backupMaster); 212 213 // Do some checking/asserts here. 214 assertTrue(backupMaster.getMaster().isActiveMaster()); 215 assertTrue(backupMaster.getMaster().isInitialized()); 216 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 217 tablesOnMaster ? 3 : 2); 218 219 } 220 221 /** 222 * Tests region sever reportForDuty with RS RPC retry 223 */ 224 @Test 225 public void testReportForDutyWithRSRpcRetry() throws Exception { 226 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, 227 new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true) 228 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 229 230 // Start a master and wait for it to become the active/primary master. 231 // Use a random unique port 232 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 233 // Override the default RS RPC retry interval of 100ms to 300ms 234 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300); 235 // master has a rs. defaultMinToStart = 2 236 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 237 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 238 tablesOnMaster ? 2 : 1); 239 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 240 tablesOnMaster ? 2 : 1); 241 master = cluster.addMaster(); 242 rs = cluster.addRegionServer(); 243 LOG.debug("Starting master: " + master.getMaster().getServerName()); 244 master.start(); 245 // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block 246 scheduledThreadPoolExecutor.schedule(new Runnable() { 247 @Override 248 public void run() { 249 rs.start(); 250 } 251 }, 1000, TimeUnit.MILLISECONDS); 252 253 waitForClusterOnline(master); 254 } 255 256 /** 257 * Tests region sever reportForDuty with a non-default environment edge 258 */ 259 @Test 260 public void testReportForDutyWithEnvironmentEdge() throws Exception { 261 // Start a master and wait for it to become the active/primary master. 262 // Use a random unique port 263 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 264 // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately 265 cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0); 266 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0); 267 268 // master has a rs. defaultMinToStart = 2 269 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 270 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 271 tablesOnMaster ? 2 : 1); 272 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 273 tablesOnMaster ? 2 : 1); 274 275 // Inject non-default environment edge 276 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 277 EnvironmentEdgeManager.injectEdge(edge); 278 master = cluster.addMaster(); 279 rs = cluster.addRegionServer(); 280 LOG.debug("Starting master: " + master.getMaster().getServerName()); 281 master.start(); 282 rs.start(); 283 waitForClusterOnline(master); 284 } 285 286 private void waitForClusterOnline(MasterThread master) throws InterruptedException { 287 while (true) { 288 if (master.getMaster().isInitialized()) { 289 break; 290 } 291 Thread.sleep(SLEEP_INTERVAL); 292 LOG.debug("Waiting for master to come online ..."); 293 } 294 rs.waitForServerOnline(); 295 } 296 297 private void waitForSecondRsStarted() throws InterruptedException { 298 while (true) { 299 if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { 300 break; 301 } 302 Thread.sleep(SLEEP_INTERVAL); 303 LOG.debug("Waiting 2nd RS to be started ..."); 304 } 305 } 306 307 // Create a Region Server that provide a hook so that we can wait for the master switch over 308 // before continuing reportForDuty to the mater. 309 // The idea is that we get a RPC connection to the first active master, then we wait. 310 // The first master goes down, the second master becomes the active master. The region 311 // server continues reportForDuty. It should succeed with the new master. 312 public static class MyRegionServer extends MiniHBaseClusterRegionServer { 313 314 private ServerName sn; 315 // This flag is to make sure this rs has obtained the rpcStub to the first master. 316 // The first master will go down after this. 317 private boolean rpcStubCreatedFlag = false; 318 private boolean masterChanged = false; 319 320 public MyRegionServer(Configuration conf) 321 throws IOException, KeeperException, InterruptedException { 322 super(conf); 323 } 324 325 @Override 326 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 327 sn = super.createRegionServerStatusStub(refresh); 328 rpcStubCreatedFlag = true; 329 330 // Wait for master switch over. Only do this for the second region server. 331 while (!masterChanged) { 332 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); 333 if (newSn != null && !newSn.equals(sn)) { 334 masterChanged = true; 335 break; 336 } 337 try { 338 Thread.sleep(SLEEP_INTERVAL); 339 } catch (InterruptedException e) { 340 return null; 341 } 342 LOG.debug("Waiting for master switch over ... "); 343 } 344 return sn; 345 } 346 347 public boolean getRpcStubCreatedFlag() { 348 return rpcStubCreatedFlag; 349 } 350 } 351}