001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.StringWriter; 025import java.util.concurrent.ScheduledThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.LocalHBaseCluster; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer; 035import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 036import org.apache.hadoop.hbase.master.HMaster; 037import org.apache.hadoop.hbase.master.ServerManager; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 041import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 043import org.apache.hadoop.hbase.util.Threads; 044import org.apache.zookeeper.KeeperException; 045import org.junit.After; 046import org.junit.Before; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054 055@Category(LargeTests.class) 056public class TestRegionServerReportForDuty { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class); 061 062 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class); 063 064 private static final long SLEEP_INTERVAL = 500; 065 066 private HBaseTestingUtil testUtil; 067 private LocalHBaseCluster cluster; 068 private RegionServerThread rs; 069 private RegionServerThread rs2; 070 private MasterThread master; 071 private MasterThread backupMaster; 072 073 @Before 074 public void setUp() throws Exception { 075 testUtil = new HBaseTestingUtil(); 076 testUtil.startMiniDFSCluster(1); 077 testUtil.startMiniZKCluster(1); 078 testUtil.createRootDir(); 079 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); 080 } 081 082 @After 083 public void tearDown() throws Exception { 084 cluster.shutdown(); 085 cluster.join(); 086 testUtil.shutdownMiniZKCluster(); 087 testUtil.shutdownMiniDFSCluster(); 088 } 089 090 private static class LogCapturer { 091 private StringWriter sw = new StringWriter(); 092 private org.apache.logging.log4j.core.appender.WriterAppender appender; 093 private org.apache.logging.log4j.core.Logger logger; 094 095 LogCapturer(org.apache.logging.log4j.core.Logger logger) { 096 this.logger = logger; 097 this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder() 098 .setName("test").setTarget(sw).build(); 099 this.logger.addAppender(this.appender); 100 } 101 102 String getOutput() { 103 return sw.toString(); 104 } 105 106 public void stopCapturing() { 107 this.logger.removeAppender(this.appender); 108 } 109 } 110 111 /** 112 * This test HMaster class will always throw ServerNotRunningYetException if checked. 113 */ 114 public static class NeverInitializedMaster extends HMaster { 115 public NeverInitializedMaster(Configuration conf) throws IOException { 116 super(conf); 117 } 118 119 @Override 120 protected void checkServiceStarted() throws ServerNotRunningYetException { 121 throw new ServerNotRunningYetException("Server is not running yet"); 122 } 123 } 124 125 /** 126 * Tests region server should backoff to report for duty if master is not ready. 127 */ 128 @Test 129 public void testReportForDutyBackoff() throws IOException, InterruptedException { 130 cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName()); 131 master = cluster.addMaster(); 132 master.start(); 133 134 LogCapturer capturer = 135 new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 136 .getLogger(HRegionServer.class)); 137 // Set sleep interval relatively low so that exponential backoff is more demanding. 138 int msginterval = 100; 139 cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval); 140 rs = cluster.addRegionServer(); 141 rs.start(); 142 143 int interval = 10_000; 144 Thread.sleep(interval); 145 capturer.stopCapturing(); 146 String output = capturer.getOutput(); 147 LOG.info("{}", output); 148 String failMsg = "reportForDuty failed;"; 149 int count = StringUtils.countMatches(output, failMsg); 150 151 // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2). 152 // Ideally we can assert the exact retry count. We relax here to tolerate contention error. 153 int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval)); 154 assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", count, 155 expectedRetry / 2), count > expectedRetry / 2); 156 assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", count, 157 expectedRetry * 2), count < expectedRetry * 2); 158 } 159 160 /** 161 * Tests region sever reportForDuty with backup master becomes primary master after the first 162 * master goes away. 163 */ 164 @Test 165 public void testReportForDutyWithMasterChange() throws Exception { 166 167 // Start a master and wait for it to become the active/primary master. 168 // Use a random unique port 169 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 170 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 171 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 172 master = cluster.addMaster(); 173 rs = cluster.addRegionServer(); 174 LOG.debug("Starting master: " + master.getMaster().getServerName()); 175 master.start(); 176 rs.start(); 177 178 waitForClusterOnline(master); 179 180 // Add a 2nd region server 181 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 182 rs2 = cluster.addRegionServer(); 183 // Start the region server. This region server will refresh RPC connection 184 // from the current active master to the next active master before completing 185 // reportForDuty 186 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); 187 rs2.start(); 188 189 waitForSecondRsStarted(); 190 191 // Stop the current master. 192 master.getMaster().stop("Stopping master"); 193 194 // Start a new master and use another random unique port 195 // Also let it wait for exactly 2 region severs to report in. 196 // TODO: Add handling bindexception. Random port is not enough!!! Flakie test! 197 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 198 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2); 199 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2); 200 backupMaster = cluster.addMaster(); 201 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); 202 backupMaster.start(); 203 204 waitForClusterOnline(backupMaster); 205 206 // Do some checking/asserts here. 207 assertTrue(backupMaster.getMaster().isActiveMaster()); 208 assertTrue(backupMaster.getMaster().isInitialized()); 209 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2); 210 211 } 212 213 /** 214 * Tests region sever reportForDuty with RS RPC retry 215 */ 216 @Test 217 public void testReportForDutyWithRSRpcRetry() throws Exception { 218 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, 219 new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true) 220 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 221 222 // Start a master and wait for it to become the active/primary master. 223 // Use a random unique port 224 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 225 // Override the default RS RPC retry interval of 100ms to 300ms 226 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300); 227 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 228 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 229 master = cluster.addMaster(); 230 rs = cluster.addRegionServer(); 231 LOG.debug("Starting master: " + master.getMaster().getServerName()); 232 master.start(); 233 // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block 234 scheduledThreadPoolExecutor.schedule(new Runnable() { 235 @Override 236 public void run() { 237 rs.start(); 238 } 239 }, 1000, TimeUnit.MILLISECONDS); 240 241 waitForClusterOnline(master); 242 } 243 244 /** 245 * Tests region sever reportForDuty with a non-default environment edge 246 */ 247 @Test 248 public void testReportForDutyWithEnvironmentEdge() throws Exception { 249 // Start a master and wait for it to become the active/primary master. 250 // Use a random unique port 251 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 252 // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately 253 cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0); 254 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0); 255 256 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 257 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 258 259 // Inject non-default environment edge 260 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 261 EnvironmentEdgeManager.injectEdge(edge); 262 master = cluster.addMaster(); 263 rs = cluster.addRegionServer(); 264 LOG.debug("Starting master: " + master.getMaster().getServerName()); 265 master.start(); 266 rs.start(); 267 waitForClusterOnline(master); 268 } 269 270 private void waitForClusterOnline(MasterThread master) throws InterruptedException { 271 while (true) { 272 if (master.getMaster().isInitialized()) { 273 break; 274 } 275 Thread.sleep(SLEEP_INTERVAL); 276 LOG.debug("Waiting for master to come online ..."); 277 } 278 rs.waitForServerOnline(); 279 } 280 281 private void waitForSecondRsStarted() throws InterruptedException { 282 while (true) { 283 if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { 284 break; 285 } 286 Thread.sleep(SLEEP_INTERVAL); 287 LOG.debug("Waiting 2nd RS to be started ..."); 288 } 289 } 290 291 // Create a Region Server that provide a hook so that we can wait for the master switch over 292 // before continuing reportForDuty to the mater. 293 // The idea is that we get a RPC connection to the first active master, then we wait. 294 // The first master goes down, the second master becomes the active master. The region 295 // server continues reportForDuty. It should succeed with the new master. 296 public static class MyRegionServer extends MiniHBaseClusterRegionServer { 297 298 private ServerName sn; 299 // This flag is to make sure this rs has obtained the rpcStub to the first master. 300 // The first master will go down after this. 301 private boolean rpcStubCreatedFlag = false; 302 private boolean masterChanged = false; 303 304 public MyRegionServer(Configuration conf) 305 throws IOException, KeeperException, InterruptedException { 306 super(conf); 307 } 308 309 @Override 310 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 311 sn = super.createRegionServerStatusStub(refresh); 312 rpcStubCreatedFlag = true; 313 314 // Wait for master switch over. Only do this for the second region server. 315 while (!masterChanged) { 316 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); 317 if (newSn != null && !newSn.equals(sn)) { 318 masterChanged = true; 319 break; 320 } 321 try { 322 Thread.sleep(SLEEP_INTERVAL); 323 } catch (InterruptedException e) { 324 return null; 325 } 326 LOG.debug("Waiting for master switch over ... "); 327 } 328 return sn; 329 } 330 331 public boolean getRpcStubCreatedFlag() { 332 return rpcStubCreatedFlag; 333 } 334 } 335}