001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.StringWriter; 025import java.util.concurrent.ScheduledThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.LocalHBaseCluster; 033import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 036import org.apache.hadoop.hbase.master.HMaster; 037import org.apache.hadoop.hbase.master.LoadBalancer; 038import org.apache.hadoop.hbase.master.ServerManager; 039import org.apache.hadoop.hbase.testclassification.LargeTests; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 043import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.log4j.Appender; 046import org.apache.log4j.Layout; 047import org.apache.log4j.PatternLayout; 048import org.apache.log4j.WriterAppender; 049import org.apache.zookeeper.KeeperException; 050import org.junit.After; 051import org.junit.Before; 052import org.junit.ClassRule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058@Category(LargeTests.class) 059public class TestRegionServerReportForDuty { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class); 066 067 private static final long SLEEP_INTERVAL = 500; 068 069 private HBaseTestingUtility testUtil; 070 private LocalHBaseCluster cluster; 071 private RegionServerThread rs; 072 private RegionServerThread rs2; 073 private MasterThread master; 074 private MasterThread backupMaster; 075 076 @Before 077 public void setUp() throws Exception { 078 testUtil = new HBaseTestingUtility(); 079 testUtil.startMiniDFSCluster(1); 080 testUtil.startMiniZKCluster(1); 081 testUtil.createRootDir(); 082 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); 083 } 084 085 @After 086 public void tearDown() throws Exception { 087 cluster.shutdown(); 088 cluster.join(); 089 testUtil.shutdownMiniZKCluster(); 090 testUtil.shutdownMiniDFSCluster(); 091 } 092 093 /** 094 * LogCapturer is similar to {@link org.apache.hadoop.test.GenericTestUtils.LogCapturer} 095 * except that this implementation has a default appender to the root logger. 096 * Hadoop 2.8+ supports the default appender in the LogCapture it ships and this can be replaced. 097 * TODO: This class can be removed after we upgrade Hadoop dependency. 098 */ 099 static class LogCapturer { 100 private StringWriter sw = new StringWriter(); 101 private WriterAppender appender; 102 private org.apache.log4j.Logger logger; 103 104 LogCapturer(org.apache.log4j.Logger logger) { 105 this.logger = logger; 106 Appender defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("stdout"); 107 if (defaultAppender == null) { 108 defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("console"); 109 } 110 final Layout layout = (defaultAppender == null) ? new PatternLayout() : 111 defaultAppender.getLayout(); 112 this.appender = new WriterAppender(layout, sw); 113 this.logger.addAppender(this.appender); 114 } 115 116 String getOutput() { 117 return sw.toString(); 118 } 119 120 public void stopCapturing() { 121 this.logger.removeAppender(this.appender); 122 } 123 } 124 125 /** 126 * This test HMaster class will always throw ServerNotRunningYetException if checked. 127 */ 128 public static class NeverInitializedMaster extends HMaster { 129 public NeverInitializedMaster(Configuration conf) throws IOException { 130 super(conf); 131 } 132 133 @Override 134 protected void checkServiceStarted() throws ServerNotRunningYetException { 135 throw new ServerNotRunningYetException("Server is not running yet"); 136 } 137 } 138 139 /** 140 * Tests region server should backoff to report for duty if master is not ready. 141 */ 142 @Test 143 public void testReportForDutyBackoff() throws IOException, InterruptedException { 144 cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName()); 145 master = cluster.addMaster(); 146 master.start(); 147 148 LogCapturer capturer = new LogCapturer(org.apache.log4j.Logger.getLogger(HRegionServer.class)); 149 // Set sleep interval relatively low so that exponential backoff is more demanding. 150 int msginterval = 100; 151 cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval); 152 rs = cluster.addRegionServer(); 153 rs.start(); 154 155 int interval = 10_000; 156 Thread.sleep(interval); 157 capturer.stopCapturing(); 158 String output = capturer.getOutput(); 159 LOG.info("{}", output); 160 String failMsg = "reportForDuty failed;"; 161 int count = StringUtils.countMatches(output, failMsg); 162 163 // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2). 164 // Ideally we can assert the exact retry count. We relax here to tolerate contention error. 165 int expectedRetry = (int)Math.ceil(Math.log(interval - msginterval)); 166 assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", 167 count, expectedRetry / 2), count > expectedRetry / 2); 168 assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", 169 count, expectedRetry * 2), count < expectedRetry * 2); 170 } 171 172 /** 173 * Tests region sever reportForDuty with backup master becomes primary master after 174 * the first master goes away. 175 */ 176 @Test 177 public void testReportForDutyWithMasterChange() throws Exception { 178 179 // Start a master and wait for it to become the active/primary master. 180 // Use a random unique port 181 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 182 // master has a rs. defaultMinToStart = 2 183 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 184 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, tablesOnMaster? 2: 1); 185 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, tablesOnMaster? 2: 1); 186 master = cluster.addMaster(); 187 rs = cluster.addRegionServer(); 188 LOG.debug("Starting master: " + master.getMaster().getServerName()); 189 master.start(); 190 rs.start(); 191 192 waitForClusterOnline(master); 193 194 // Add a 2nd region server 195 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 196 rs2 = cluster.addRegionServer(); 197 // Start the region server. This region server will refresh RPC connection 198 // from the current active master to the next active master before completing 199 // reportForDuty 200 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); 201 rs2.start(); 202 203 waitForSecondRsStarted(); 204 205 // Stop the current master. 206 master.getMaster().stop("Stopping master"); 207 208 // Start a new master and use another random unique port 209 // Also let it wait for exactly 2 region severs to report in. 210 // TODO: Add handling bindexception. Random port is not enough!!! Flakie test! 211 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 212 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 213 tablesOnMaster? 3: 2); 214 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 215 tablesOnMaster? 3: 2); 216 backupMaster = cluster.addMaster(); 217 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); 218 backupMaster.start(); 219 220 waitForClusterOnline(backupMaster); 221 222 // Do some checking/asserts here. 223 assertTrue(backupMaster.getMaster().isActiveMaster()); 224 assertTrue(backupMaster.getMaster().isInitialized()); 225 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 226 tablesOnMaster? 3: 2); 227 228 } 229 230 /** 231 * Tests region sever reportForDuty with RS RPC retry 232 */ 233 @Test 234 public void testReportForDutyWithRSRpcRetry() throws Exception { 235 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = 236 new ScheduledThreadPoolExecutor(1, Threads.newDaemonThreadFactory("RSDelayedStart")); 237 238 // Start a master and wait for it to become the active/primary master. 239 // Use a random unique port 240 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 241 // Override the default RS RPC retry interval of 100ms to 300ms 242 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300); 243 // master has a rs. defaultMinToStart = 2 244 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 245 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 246 tablesOnMaster ? 2 : 1); 247 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 248 tablesOnMaster ? 2 : 1); 249 master = cluster.addMaster(); 250 rs = cluster.addRegionServer(); 251 LOG.debug("Starting master: " + master.getMaster().getServerName()); 252 master.start(); 253 // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block 254 scheduledThreadPoolExecutor.schedule(new Runnable() { 255 @Override 256 public void run() { 257 rs.start(); 258 } 259 }, 1000, TimeUnit.MILLISECONDS); 260 261 waitForClusterOnline(master); 262 } 263 264 /** 265 * Tests region sever reportForDuty with manual environment edge 266 */ 267 @Test 268 public void testReportForDutyWithEnvironmentEdge() throws Exception { 269 // Start a master and wait for it to become the active/primary master. 270 // Use a random unique port 271 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 272 // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately 273 cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0); 274 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0); 275 276 // master has a rs. defaultMinToStart = 2 277 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 278 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 279 tablesOnMaster ? 2 : 1); 280 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 281 tablesOnMaster ? 2 : 1); 282 283 // Inject manual environment edge for clock skew computation between RS and master 284 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 285 EnvironmentEdgeManager.injectEdge(edge); 286 master = cluster.addMaster(); 287 rs = cluster.addRegionServer(); 288 LOG.debug("Starting master: " + master.getMaster().getServerName()); 289 master.start(); 290 rs.start(); 291 292 waitForClusterOnline(master); 293 } 294 295 private void waitForClusterOnline(MasterThread master) throws InterruptedException { 296 while (true) { 297 if (master.getMaster().isInitialized()) { 298 break; 299 } 300 Thread.sleep(SLEEP_INTERVAL); 301 LOG.debug("Waiting for master to come online ..."); 302 } 303 rs.waitForServerOnline(); 304 } 305 306 private void waitForSecondRsStarted() throws InterruptedException { 307 while (true) { 308 if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { 309 break; 310 } 311 Thread.sleep(SLEEP_INTERVAL); 312 LOG.debug("Waiting 2nd RS to be started ..."); 313 } 314 } 315 316 // Create a Region Server that provide a hook so that we can wait for the master switch over 317 // before continuing reportForDuty to the mater. 318 // The idea is that we get a RPC connection to the first active master, then we wait. 319 // The first master goes down, the second master becomes the active master. The region 320 // server continues reportForDuty. It should succeed with the new master. 321 public static class MyRegionServer extends MiniHBaseClusterRegionServer { 322 323 private ServerName sn; 324 // This flag is to make sure this rs has obtained the rpcStub to the first master. 325 // The first master will go down after this. 326 private boolean rpcStubCreatedFlag = false; 327 private boolean masterChanged = false; 328 329 public MyRegionServer(Configuration conf) throws IOException, KeeperException, 330 InterruptedException { 331 super(conf); 332 } 333 334 @Override 335 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 336 sn = super.createRegionServerStatusStub(refresh); 337 rpcStubCreatedFlag = true; 338 339 // Wait for master switch over. Only do this for the second region server. 340 while (!masterChanged) { 341 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); 342 if (newSn != null && !newSn.equals(sn)) { 343 masterChanged = true; 344 break; 345 } 346 try { 347 Thread.sleep(SLEEP_INTERVAL); 348 } catch (InterruptedException e) { 349 return null; 350 } 351 LOG.debug("Waiting for master switch over ... "); 352 } 353 return sn; 354 } 355 356 public boolean getRpcStubCreatedFlag() { 357 return rpcStubCreatedFlag; 358 } 359 } 360}