001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.StringWriter; 025import java.util.concurrent.ScheduledThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.LocalHBaseCluster; 033import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 036import org.apache.hadoop.hbase.master.HMaster; 037import org.apache.hadoop.hbase.master.LoadBalancer; 038import org.apache.hadoop.hbase.master.ServerManager; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 043import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 046import org.apache.log4j.Appender; 047import org.apache.log4j.Layout; 048import org.apache.log4j.PatternLayout; 049import org.apache.log4j.WriterAppender; 050import org.apache.zookeeper.KeeperException; 051import org.junit.After; 052import org.junit.Before; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059@Category(LargeTests.class) 060public class TestRegionServerReportForDuty { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class); 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class); 067 068 private static final long SLEEP_INTERVAL = 500; 069 070 private HBaseTestingUtility testUtil; 071 private LocalHBaseCluster cluster; 072 private RegionServerThread rs; 073 private RegionServerThread rs2; 074 private MasterThread master; 075 private MasterThread backupMaster; 076 077 @Before 078 public void setUp() throws Exception { 079 testUtil = new HBaseTestingUtility(); 080 testUtil.startMiniDFSCluster(1); 081 testUtil.startMiniZKCluster(1); 082 testUtil.createRootDir(); 083 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); 084 } 085 086 @After 087 public void tearDown() throws Exception { 088 cluster.shutdown(); 089 cluster.join(); 090 testUtil.shutdownMiniZKCluster(); 091 testUtil.shutdownMiniDFSCluster(); 092 } 093 094 /** 095 * LogCapturer is similar to {@link org.apache.hadoop.test.GenericTestUtils.LogCapturer} 096 * except that this implementation has a default appender to the root logger. 097 * Hadoop 2.8+ supports the default appender in the LogCapture it ships and this can be replaced. 098 * TODO: This class can be removed after we upgrade Hadoop dependency. 099 */ 100 static class LogCapturer { 101 private StringWriter sw = new StringWriter(); 102 private WriterAppender appender; 103 private org.apache.log4j.Logger logger; 104 105 LogCapturer(org.apache.log4j.Logger logger) { 106 this.logger = logger; 107 Appender defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("stdout"); 108 if (defaultAppender == null) { 109 defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("console"); 110 } 111 final Layout layout = (defaultAppender == null) ? new PatternLayout() : 112 defaultAppender.getLayout(); 113 this.appender = new WriterAppender(layout, sw); 114 this.logger.addAppender(this.appender); 115 } 116 117 String getOutput() { 118 return sw.toString(); 119 } 120 121 public void stopCapturing() { 122 this.logger.removeAppender(this.appender); 123 } 124 } 125 126 /** 127 * This test HMaster class will always throw ServerNotRunningYetException if checked. 128 */ 129 public static class NeverInitializedMaster extends HMaster { 130 public NeverInitializedMaster(Configuration conf) throws IOException { 131 super(conf); 132 } 133 134 @Override 135 protected void checkServiceStarted() throws ServerNotRunningYetException { 136 throw new ServerNotRunningYetException("Server is not running yet"); 137 } 138 } 139 140 /** 141 * Tests region server should backoff to report for duty if master is not ready. 142 */ 143 @Test 144 public void testReportForDutyBackoff() throws IOException, InterruptedException { 145 cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName()); 146 master = cluster.addMaster(); 147 master.start(); 148 149 LogCapturer capturer = new LogCapturer(org.apache.log4j.Logger.getLogger(HRegionServer.class)); 150 // Set sleep interval relatively low so that exponential backoff is more demanding. 151 int msginterval = 100; 152 cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval); 153 rs = cluster.addRegionServer(); 154 rs.start(); 155 156 int interval = 10_000; 157 Thread.sleep(interval); 158 capturer.stopCapturing(); 159 String output = capturer.getOutput(); 160 LOG.info("{}", output); 161 String failMsg = "reportForDuty failed;"; 162 int count = StringUtils.countMatches(output, failMsg); 163 164 // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2). 165 // Ideally we can assert the exact retry count. We relax here to tolerate contention error. 166 int expectedRetry = (int)Math.ceil(Math.log(interval - msginterval)); 167 assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", 168 count, expectedRetry / 2), count > expectedRetry / 2); 169 assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", 170 count, expectedRetry * 2), count < expectedRetry * 2); 171 } 172 173 /** 174 * Tests region sever reportForDuty with backup master becomes primary master after 175 * the first master goes away. 176 */ 177 @Test 178 public void testReportForDutyWithMasterChange() throws Exception { 179 180 // Start a master and wait for it to become the active/primary master. 181 // Use a random unique port 182 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 183 // master has a rs. defaultMinToStart = 2 184 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 185 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, tablesOnMaster? 2: 1); 186 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, tablesOnMaster? 2: 1); 187 master = cluster.addMaster(); 188 rs = cluster.addRegionServer(); 189 LOG.debug("Starting master: " + master.getMaster().getServerName()); 190 master.start(); 191 rs.start(); 192 193 waitForClusterOnline(master); 194 195 // Add a 2nd region server 196 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 197 rs2 = cluster.addRegionServer(); 198 // Start the region server. This region server will refresh RPC connection 199 // from the current active master to the next active master before completing 200 // reportForDuty 201 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); 202 rs2.start(); 203 204 waitForSecondRsStarted(); 205 206 // Stop the current master. 207 master.getMaster().stop("Stopping master"); 208 209 // Start a new master and use another random unique port 210 // Also let it wait for exactly 2 region severs to report in. 211 // TODO: Add handling bindexception. Random port is not enough!!! Flakie test! 212 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 213 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 214 tablesOnMaster? 3: 2); 215 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 216 tablesOnMaster? 3: 2); 217 backupMaster = cluster.addMaster(); 218 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); 219 backupMaster.start(); 220 221 waitForClusterOnline(backupMaster); 222 223 // Do some checking/asserts here. 224 assertTrue(backupMaster.getMaster().isActiveMaster()); 225 assertTrue(backupMaster.getMaster().isInitialized()); 226 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 227 tablesOnMaster? 3: 2); 228 229 } 230 231 /** 232 * Tests region sever reportForDuty with RS RPC retry 233 */ 234 @Test 235 public void testReportForDutyWithRSRpcRetry() throws Exception { 236 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, 237 new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true) 238 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 239 240 // Start a master and wait for it to become the active/primary master. 241 // Use a random unique port 242 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 243 // Override the default RS RPC retry interval of 100ms to 300ms 244 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300); 245 // master has a rs. defaultMinToStart = 2 246 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 247 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 248 tablesOnMaster ? 2 : 1); 249 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 250 tablesOnMaster ? 2 : 1); 251 master = cluster.addMaster(); 252 rs = cluster.addRegionServer(); 253 LOG.debug("Starting master: " + master.getMaster().getServerName()); 254 master.start(); 255 // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block 256 scheduledThreadPoolExecutor.schedule(new Runnable() { 257 @Override 258 public void run() { 259 rs.start(); 260 } 261 }, 1000, TimeUnit.MILLISECONDS); 262 263 waitForClusterOnline(master); 264 } 265 266 /** 267 * Tests region sever reportForDuty with manual environment edge 268 */ 269 @Test 270 public void testReportForDutyWithEnvironmentEdge() throws Exception { 271 // Start a master and wait for it to become the active/primary master. 272 // Use a random unique port 273 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 274 // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately 275 cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0); 276 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0); 277 278 // master has a rs. defaultMinToStart = 2 279 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 280 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 281 tablesOnMaster ? 2 : 1); 282 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 283 tablesOnMaster ? 2 : 1); 284 285 // Inject manual environment edge for clock skew computation between RS and master 286 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 287 EnvironmentEdgeManager.injectEdge(edge); 288 master = cluster.addMaster(); 289 rs = cluster.addRegionServer(); 290 LOG.debug("Starting master: " + master.getMaster().getServerName()); 291 master.start(); 292 rs.start(); 293 294 waitForClusterOnline(master); 295 } 296 297 private void waitForClusterOnline(MasterThread master) throws InterruptedException { 298 while (true) { 299 if (master.getMaster().isInitialized()) { 300 break; 301 } 302 Thread.sleep(SLEEP_INTERVAL); 303 LOG.debug("Waiting for master to come online ..."); 304 } 305 rs.waitForServerOnline(); 306 } 307 308 private void waitForSecondRsStarted() throws InterruptedException { 309 while (true) { 310 if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { 311 break; 312 } 313 Thread.sleep(SLEEP_INTERVAL); 314 LOG.debug("Waiting 2nd RS to be started ..."); 315 } 316 } 317 318 // Create a Region Server that provide a hook so that we can wait for the master switch over 319 // before continuing reportForDuty to the mater. 320 // The idea is that we get a RPC connection to the first active master, then we wait. 321 // The first master goes down, the second master becomes the active master. The region 322 // server continues reportForDuty. It should succeed with the new master. 323 public static class MyRegionServer extends MiniHBaseClusterRegionServer { 324 325 private ServerName sn; 326 // This flag is to make sure this rs has obtained the rpcStub to the first master. 327 // The first master will go down after this. 328 private boolean rpcStubCreatedFlag = false; 329 private boolean masterChanged = false; 330 331 public MyRegionServer(Configuration conf) throws IOException, KeeperException, 332 InterruptedException { 333 super(conf); 334 } 335 336 @Override 337 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 338 sn = super.createRegionServerStatusStub(refresh); 339 rpcStubCreatedFlag = true; 340 341 // Wait for master switch over. Only do this for the second region server. 342 while (!masterChanged) { 343 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); 344 if (newSn != null && !newSn.equals(sn)) { 345 masterChanged = true; 346 break; 347 } 348 try { 349 Thread.sleep(SLEEP_INTERVAL); 350 } catch (InterruptedException e) { 351 return null; 352 } 353 LOG.debug("Waiting for master switch over ... "); 354 } 355 return sn; 356 } 357 358 public boolean getRpcStubCreatedFlag() { 359 return rpcStubCreatedFlag; 360 } 361 } 362}