001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseTestingUtility; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.LocalHBaseCluster; 029import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.master.LoadBalancer; 032import org.apache.hadoop.hbase.master.ServerManager; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 035import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 036import org.apache.zookeeper.KeeperException; 037import org.junit.After; 038import org.junit.Before; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@Category(MediumTests.class) 046public class TestRegionServerReportForDuty { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class); 051 052 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class); 053 054 private static final long SLEEP_INTERVAL = 500; 055 056 private HBaseTestingUtility testUtil; 057 private LocalHBaseCluster cluster; 058 private RegionServerThread rs; 059 private RegionServerThread rs2; 060 private MasterThread master; 061 private MasterThread backupMaster; 062 063 @Before 064 public void setUp() throws Exception { 065 testUtil = new HBaseTestingUtility(); 066 testUtil.startMiniDFSCluster(1); 067 testUtil.startMiniZKCluster(1); 068 testUtil.createRootDir(); 069 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); 070 } 071 072 @After 073 public void tearDown() throws Exception { 074 cluster.shutdown(); 075 cluster.join(); 076 testUtil.shutdownMiniZKCluster(); 077 testUtil.shutdownMiniDFSCluster(); 078 } 079 080 /** 081 * Tests region sever reportForDuty with backup master becomes primary master after 082 * the first master goes away. 083 */ 084 @Test 085 public void testReportForDutyWithMasterChange() throws Exception { 086 087 // Start a master and wait for it to become the active/primary master. 088 // Use a random unique port 089 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 090 // master has a rs. defaultMinToStart = 2 091 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 092 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, tablesOnMaster? 2: 1); 093 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, tablesOnMaster? 2: 1); 094 master = cluster.addMaster(); 095 rs = cluster.addRegionServer(); 096 LOG.debug("Starting master: " + master.getMaster().getServerName()); 097 master.start(); 098 rs.start(); 099 100 waitForClusterOnline(master); 101 102 // Add a 2nd region server 103 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 104 rs2 = cluster.addRegionServer(); 105 // Start the region server. This region server will refresh RPC connection 106 // from the current active master to the next active master before completing 107 // reportForDuty 108 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); 109 rs2.start(); 110 111 waitForSecondRsStarted(); 112 113 // Stop the current master. 114 master.getMaster().stop("Stopping master"); 115 116 // Start a new master and use another random unique port 117 // Also let it wait for exactly 2 region severs to report in. 118 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 119 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 120 tablesOnMaster? 3: 2); 121 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 122 tablesOnMaster? 3: 2); 123 backupMaster = cluster.addMaster(); 124 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); 125 backupMaster.start(); 126 127 waitForClusterOnline(backupMaster); 128 129 // Do some checking/asserts here. 130 assertTrue(backupMaster.getMaster().isActiveMaster()); 131 assertTrue(backupMaster.getMaster().isInitialized()); 132 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 133 tablesOnMaster? 3: 2); 134 135 } 136 137 private void waitForClusterOnline(MasterThread master) throws InterruptedException { 138 while (true) { 139 if (master.getMaster().isInitialized()) { 140 break; 141 } 142 Thread.sleep(SLEEP_INTERVAL); 143 LOG.debug("Waiting for master to come online ..."); 144 } 145 rs.waitForServerOnline(); 146 } 147 148 private void waitForSecondRsStarted() throws InterruptedException { 149 while (true) { 150 if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { 151 break; 152 } 153 Thread.sleep(SLEEP_INTERVAL); 154 LOG.debug("Waiting 2nd RS to be started ..."); 155 } 156 } 157 158 // Create a Region Server that provide a hook so that we can wait for the master switch over 159 // before continuing reportForDuty to the mater. 160 // The idea is that we get a RPC connection to the first active master, then we wait. 161 // The first master goes down, the second master becomes the active master. The region 162 // server continues reportForDuty. It should succeed with the new master. 163 public static class MyRegionServer extends MiniHBaseClusterRegionServer { 164 165 private ServerName sn; 166 // This flag is to make sure this rs has obtained the rpcStub to the first master. 167 // The first master will go down after this. 168 private boolean rpcStubCreatedFlag = false; 169 private boolean masterChanged = false; 170 171 public MyRegionServer(Configuration conf) throws IOException, KeeperException, 172 InterruptedException { 173 super(conf); 174 } 175 176 @Override 177 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 178 sn = super.createRegionServerStatusStub(refresh); 179 rpcStubCreatedFlag = true; 180 181 // Wait for master switch over. Only do this for the second region server. 182 while (!masterChanged) { 183 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); 184 if (newSn != null && !newSn.equals(sn)) { 185 masterChanged = true; 186 break; 187 } 188 try { 189 Thread.sleep(SLEEP_INTERVAL); 190 } catch (InterruptedException e) { 191 return null; 192 } 193 LOG.debug("Waiting for master switch over ... "); 194 } 195 return sn; 196 } 197 198 public boolean getRpcStubCreatedFlag() { 199 return rpcStubCreatedFlag; 200 } 201 } 202}