001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.StringWriter; 025 026import org.apache.commons.lang3.StringUtils; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.LocalHBaseCluster; 032import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 035import org.apache.hadoop.hbase.master.HMaster; 036import org.apache.hadoop.hbase.master.LoadBalancer; 037import org.apache.hadoop.hbase.master.ServerManager; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.apache.log4j.Appender; 042import org.apache.log4j.Layout; 043import org.apache.log4j.PatternLayout; 044import org.apache.log4j.WriterAppender; 045import org.apache.zookeeper.KeeperException; 046import org.junit.After; 047import org.junit.Before; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054@Category(MediumTests.class) 055public class TestRegionServerReportForDuty { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class); 060 061 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class); 062 063 private static final long SLEEP_INTERVAL = 500; 064 065 private HBaseTestingUtility testUtil; 066 private LocalHBaseCluster cluster; 067 private RegionServerThread rs; 068 private RegionServerThread rs2; 069 private MasterThread master; 070 private MasterThread backupMaster; 071 072 @Before 073 public void setUp() throws Exception { 074 testUtil = new HBaseTestingUtility(); 075 testUtil.startMiniDFSCluster(1); 076 testUtil.startMiniZKCluster(1); 077 testUtil.createRootDir(); 078 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); 079 } 080 081 @After 082 public void tearDown() throws Exception { 083 cluster.shutdown(); 084 cluster.join(); 085 testUtil.shutdownMiniZKCluster(); 086 testUtil.shutdownMiniDFSCluster(); 087 } 088 089 /** 090 * LogCapturer is similar to {@link org.apache.hadoop.test.GenericTestUtils.LogCapturer} 091 * except that this implementation has a default appender to the root logger. 092 * Hadoop 2.8+ supports the default appender in the LogCapture it ships and this can be replaced. 093 * TODO: This class can be removed after we upgrade Hadoop dependency. 094 */ 095 static class LogCapturer { 096 private StringWriter sw = new StringWriter(); 097 private WriterAppender appender; 098 private org.apache.log4j.Logger logger; 099 100 LogCapturer(org.apache.log4j.Logger logger) { 101 this.logger = logger; 102 Appender defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("stdout"); 103 if (defaultAppender == null) { 104 defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("console"); 105 } 106 final Layout layout = (defaultAppender == null) ? new PatternLayout() : 107 defaultAppender.getLayout(); 108 this.appender = new WriterAppender(layout, sw); 109 this.logger.addAppender(this.appender); 110 } 111 112 String getOutput() { 113 return sw.toString(); 114 } 115 116 public void stopCapturing() { 117 this.logger.removeAppender(this.appender); 118 } 119 } 120 121 /** 122 * This test HMaster class will always throw ServerNotRunningYetException if checked. 123 */ 124 public static class NeverInitializedMaster extends HMaster { 125 public NeverInitializedMaster(Configuration conf) throws IOException, KeeperException { 126 super(conf); 127 } 128 129 @Override 130 protected void checkServiceStarted() throws ServerNotRunningYetException { 131 throw new ServerNotRunningYetException("Server is not running yet"); 132 } 133 } 134 135 /** 136 * Tests region server should backoff to report for duty if master is not ready. 137 */ 138 @Test 139 public void testReportForDutyBackoff() throws IOException, InterruptedException { 140 cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName()); 141 master = cluster.addMaster(); 142 master.start(); 143 144 LogCapturer capturer = new LogCapturer(org.apache.log4j.Logger.getLogger(HRegionServer.class)); 145 // Set sleep interval relatively low so that exponential backoff is more demanding. 146 int msginterval = 100; 147 cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval); 148 rs = cluster.addRegionServer(); 149 rs.start(); 150 151 int interval = 10_000; 152 Thread.sleep(interval); 153 capturer.stopCapturing(); 154 String output = capturer.getOutput(); 155 LOG.info("{}", output); 156 String failMsg = "reportForDuty failed;"; 157 int count = StringUtils.countMatches(output, failMsg); 158 159 // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2). 160 // Ideally we can assert the exact retry count. We relax here to tolerate contention error. 161 int expectedRetry = (int)Math.ceil(Math.log(interval - msginterval)); 162 assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", 163 count, expectedRetry / 2), count > expectedRetry / 2); 164 assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", 165 count, expectedRetry * 2), count < expectedRetry * 2); 166 } 167 168 /** 169 * Tests region sever reportForDuty with backup master becomes primary master after 170 * the first master goes away. 171 */ 172 @Test 173 public void testReportForDutyWithMasterChange() throws Exception { 174 175 // Start a master and wait for it to become the active/primary master. 176 // Use a random unique port 177 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 178 // master has a rs. defaultMinToStart = 2 179 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 180 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, tablesOnMaster? 2: 1); 181 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, tablesOnMaster? 2: 1); 182 master = cluster.addMaster(); 183 rs = cluster.addRegionServer(); 184 LOG.debug("Starting master: " + master.getMaster().getServerName()); 185 master.start(); 186 rs.start(); 187 188 waitForClusterOnline(master); 189 190 // Add a 2nd region server 191 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 192 rs2 = cluster.addRegionServer(); 193 // Start the region server. This region server will refresh RPC connection 194 // from the current active master to the next active master before completing 195 // reportForDuty 196 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); 197 rs2.start(); 198 199 waitForSecondRsStarted(); 200 201 // Stop the current master. 202 master.getMaster().stop("Stopping master"); 203 204 // Start a new master and use another random unique port 205 // Also let it wait for exactly 2 region severs to report in. 206 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 207 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 208 tablesOnMaster? 3: 2); 209 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 210 tablesOnMaster? 3: 2); 211 backupMaster = cluster.addMaster(); 212 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); 213 backupMaster.start(); 214 215 waitForClusterOnline(backupMaster); 216 217 // Do some checking/asserts here. 218 assertTrue(backupMaster.getMaster().isActiveMaster()); 219 assertTrue(backupMaster.getMaster().isInitialized()); 220 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 221 tablesOnMaster? 3: 2); 222 223 } 224 225 private void waitForClusterOnline(MasterThread master) throws InterruptedException { 226 while (true) { 227 if (master.getMaster().isInitialized()) { 228 break; 229 } 230 Thread.sleep(SLEEP_INTERVAL); 231 LOG.debug("Waiting for master to come online ..."); 232 } 233 rs.waitForServerOnline(); 234 } 235 236 private void waitForSecondRsStarted() throws InterruptedException { 237 while (true) { 238 if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { 239 break; 240 } 241 Thread.sleep(SLEEP_INTERVAL); 242 LOG.debug("Waiting 2nd RS to be started ..."); 243 } 244 } 245 246 // Create a Region Server that provide a hook so that we can wait for the master switch over 247 // before continuing reportForDuty to the mater. 248 // The idea is that we get a RPC connection to the first active master, then we wait. 249 // The first master goes down, the second master becomes the active master. The region 250 // server continues reportForDuty. It should succeed with the new master. 251 public static class MyRegionServer extends MiniHBaseClusterRegionServer { 252 253 private ServerName sn; 254 // This flag is to make sure this rs has obtained the rpcStub to the first master. 255 // The first master will go down after this. 256 private boolean rpcStubCreatedFlag = false; 257 private boolean masterChanged = false; 258 259 public MyRegionServer(Configuration conf) throws IOException, KeeperException, 260 InterruptedException { 261 super(conf); 262 } 263 264 @Override 265 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 266 sn = super.createRegionServerStatusStub(refresh); 267 rpcStubCreatedFlag = true; 268 269 // Wait for master switch over. Only do this for the second region server. 270 while (!masterChanged) { 271 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); 272 if (newSn != null && !newSn.equals(sn)) { 273 masterChanged = true; 274 break; 275 } 276 try { 277 Thread.sleep(SLEEP_INTERVAL); 278 } catch (InterruptedException e) { 279 return null; 280 } 281 LOG.debug("Waiting for master switch over ... "); 282 } 283 return sn; 284 } 285 286 public boolean getRpcStubCreatedFlag() { 287 return rpcStubCreatedFlag; 288 } 289 } 290}