001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.List; 025import java.util.NavigableSet; 026import java.util.Set; 027import java.util.TreeSet; 028import java.util.stream.Stream; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 036import org.apache.hadoop.hbase.StartTestingClusterOption; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RegionLocator; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.MasterTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 046import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 047import org.junit.jupiter.api.BeforeEach; 048import org.junit.jupiter.api.Tag; 049import org.junit.jupiter.api.TestInfo; 050import org.junit.jupiter.api.TestTemplate; 051import org.junit.jupiter.params.provider.Arguments; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 056 057/** 058 * Tests the restarting of everything as done during rolling restarts. 059 */ 060@Tag(MasterTests.TAG) 061@Tag(LargeTests.TAG) 062@HBaseParameterizedTestTemplate(name = "{index}: splitWALCoordinatedByZK={0}") 063public class TestRollingRestart { 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestRollingRestart.class); 066 067 private static HBaseTestingUtil TEST_UTIL; 068 private String testMethodName; 069 070 @BeforeEach 071 public void setTestMethod(TestInfo testInfo) { 072 testMethodName = testInfo.getTestMethod().get().getName(); 073 } 074 075 private final boolean splitWALCoordinatedByZK; 076 077 public TestRollingRestart(boolean splitWALCoordinatedByZK) { 078 this.splitWALCoordinatedByZK = splitWALCoordinatedByZK; 079 } 080 081 @TestTemplate 082 public void testBasicRollingRestart() throws Exception { 083 084 // Start a cluster with 2 masters and 4 regionservers 085 final int NUM_MASTERS = 2; 086 final int NUM_RS = 3; 087 final int NUM_REGIONS_TO_CREATE = 20; 088 089 int expectedNumRS = 3; 090 091 // Start the cluster 092 log("Starting cluster"); 093 Configuration conf = HBaseConfiguration.create(); 094 conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, splitWALCoordinatedByZK); 095 TEST_UTIL = new HBaseTestingUtil(conf); 096 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(NUM_MASTERS) 097 .numRegionServers(NUM_RS).numDataNodes(NUM_RS).build(); 098 TEST_UTIL.startMiniCluster(option); 099 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 100 log("Waiting for active/ready master"); 101 cluster.waitForActiveAndReadyMaster(); 102 103 // Create a table with regions 104 final TableName tableName = TableName.valueOf(testMethodName.replaceAll("[\\[|\\]]", "-")); 105 byte[] family = Bytes.toBytes("family"); 106 log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions"); 107 Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE); 108 int numRegions = -1; 109 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 110 numRegions = r.getStartKeys().length; 111 } 112 numRegions += 1; // catalogs 113 log("Waiting for no more RIT\n"); 114 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 115 log("Disabling table\n"); 116 TEST_UTIL.getAdmin().disableTable(tableName); 117 log("Waiting for no more RIT\n"); 118 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 119 NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster); 120 log("Verifying only catalog region is assigned\n"); 121 if (regions.size() != 1) { 122 for (String oregion : regions) { 123 log("Region still online: " + oregion); 124 } 125 } 126 assertEquals(1, regions.size()); 127 log("Enabling table\n"); 128 TEST_UTIL.getAdmin().enableTable(tableName); 129 log("Waiting for no more RIT\n"); 130 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 131 log("Verifying there are " + numRegions + " assigned on cluster\n"); 132 regions = HBaseTestingUtil.getAllOnlineRegions(cluster); 133 assertRegionsAssigned(cluster, regions); 134 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); 135 136 // Add a new regionserver 137 log("Adding a fourth RS"); 138 RegionServerThread restarted = cluster.startRegionServer(); 139 expectedNumRS++; 140 restarted.waitForServerOnline(); 141 log("Additional RS is online"); 142 log("Waiting for no more RIT"); 143 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 144 log("Verifying there are " + numRegions + " assigned on cluster"); 145 assertRegionsAssigned(cluster, regions); 146 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); 147 148 // Master Restarts 149 List<MasterThread> masterThreads = cluster.getMasterThreads(); 150 MasterThread activeMaster = null; 151 MasterThread backupMaster = null; 152 assertEquals(2, masterThreads.size()); 153 if (masterThreads.get(0).getMaster().isActiveMaster()) { 154 activeMaster = masterThreads.get(0); 155 backupMaster = masterThreads.get(1); 156 } else { 157 activeMaster = masterThreads.get(1); 158 backupMaster = masterThreads.get(0); 159 } 160 161 // Bring down the backup master 162 log("Stopping backup master\n\n"); 163 backupMaster.getMaster().stop("Stop of backup during rolling restart"); 164 cluster.hbaseCluster.waitOnMaster(backupMaster); 165 166 // Bring down the primary master 167 log("Stopping primary master\n\n"); 168 activeMaster.getMaster().stop("Stop of active during rolling restart"); 169 cluster.hbaseCluster.waitOnMaster(activeMaster); 170 171 // Start primary master 172 log("Restarting primary master\n\n"); 173 activeMaster = cluster.startMaster(); 174 cluster.waitForActiveAndReadyMaster(); 175 176 // Start backup master 177 log("Restarting backup master\n\n"); 178 backupMaster = cluster.startMaster(); 179 180 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); 181 182 // RegionServer Restarts 183 184 // Bring them down, one at a time, waiting between each to complete 185 List<RegionServerThread> regionServers = cluster.getLiveRegionServerThreads(); 186 int num = 1; 187 int total = regionServers.size(); 188 for (RegionServerThread rst : regionServers) { 189 ServerName serverName = rst.getRegionServer().getServerName(); 190 log("Stopping region server " + num + " of " + total + " [ " + serverName + "]"); 191 rst.getRegionServer().stop("Stopping RS during rolling restart"); 192 cluster.hbaseCluster.waitOnRegionServer(rst); 193 log("Waiting for RS shutdown to be handled by master"); 194 waitForRSShutdownToStartAndFinish(activeMaster, serverName); 195 log("RS shutdown done, waiting for no more RIT"); 196 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 197 log("Verifying there are " + numRegions + " assigned on cluster"); 198 assertRegionsAssigned(cluster, regions); 199 expectedNumRS--; 200 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); 201 log("Restarting region server " + num + " of " + total); 202 restarted = cluster.startRegionServer(); 203 restarted.waitForServerOnline(); 204 expectedNumRS++; 205 log("Region server " + num + " is back online"); 206 log("Waiting for no more RIT"); 207 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 208 log("Verifying there are " + numRegions + " assigned on cluster"); 209 assertRegionsAssigned(cluster, regions); 210 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); 211 num++; 212 } 213 Thread.sleep(1000); 214 assertRegionsAssigned(cluster, regions); 215 216 // TODO: Bring random 3 of 4 RS down at the same time 217 218 ht.close(); 219 // Stop the cluster 220 TEST_UTIL.shutdownMiniCluster(); 221 } 222 223 /** 224 * Checks if the SCP of specific dead server has been executed. 225 * @return true if the SCP of specific serverName has been executed, false if not 226 */ 227 private boolean isDeadServerSCPExecuted(ServerName serverName) throws IOException { 228 return TEST_UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream() 229 .anyMatch(p -> p instanceof ServerCrashProcedure 230 && ((ServerCrashProcedure) p).getServerName().equals(serverName)); 231 } 232 233 private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster, ServerName serverName) 234 throws InterruptedException, IOException { 235 ServerManager sm = activeMaster.getMaster().getServerManager(); 236 // First wait for it to be in dead list 237 while (!sm.getDeadServers().isDeadServer(serverName)) { 238 log("Waiting for [" + serverName + "] to be listed as dead in master"); 239 Thread.sleep(1); 240 } 241 log( 242 "Server [" + serverName + "] marked as dead, waiting for it to " + "finish dead processing"); 243 244 TEST_UTIL.waitFor(60000, () -> isDeadServerSCPExecuted(serverName)); 245 246 while (sm.areDeadServersInProgress()) { 247 log("Server [" + serverName + "] still being processed, waiting"); 248 Thread.sleep(100); 249 } 250 log("Server [" + serverName + "] done with server shutdown processing"); 251 } 252 253 private void log(String msg) { 254 LOG.debug("\n\nTRR: " + msg + "\n"); 255 } 256 257 private int getNumberOfOnlineRegions(SingleProcessHBaseCluster cluster) { 258 int numFound = 0; 259 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 260 numFound += rst.getRegionServer().getNumberOfOnlineRegions(); 261 } 262 return numFound; 263 } 264 265 private void assertRegionsAssigned(SingleProcessHBaseCluster cluster, Set<String> expectedRegions) 266 throws IOException { 267 int numFound = getNumberOfOnlineRegions(cluster); 268 if (expectedRegions.size() > numFound) { 269 log("Expected to find " + expectedRegions.size() + " but only found" + " " + numFound); 270 NavigableSet<String> foundRegions = HBaseTestingUtil.getAllOnlineRegions(cluster); 271 for (String region : expectedRegions) { 272 if (!foundRegions.contains(region)) { 273 log("Missing region: " + region); 274 } 275 } 276 assertEquals(expectedRegions.size(), numFound); 277 } else if (expectedRegions.size() < numFound) { 278 int doubled = numFound - expectedRegions.size(); 279 log("Expected to find " + expectedRegions.size() + " but found" + " " + numFound + " (" 280 + doubled + " double assignments?)"); 281 NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster); 282 for (String region : doubleRegions) { 283 log("Region is double assigned: " + region); 284 } 285 assertEquals(expectedRegions.size(), numFound); 286 } else { 287 log("Success! Found expected number of " + numFound + " regions"); 288 } 289 } 290 291 private NavigableSet<String> getDoubleAssignedRegions(SingleProcessHBaseCluster cluster) 292 throws IOException { 293 NavigableSet<String> online = new TreeSet<>(); 294 NavigableSet<String> doubled = new TreeSet<>(); 295 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 296 for (RegionInfo region : ProtobufUtil 297 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 298 if (!online.add(region.getRegionNameAsString())) { 299 doubled.add(region.getRegionNameAsString()); 300 } 301 } 302 } 303 return doubled; 304 } 305 306 public static Stream<Arguments> parameters() { 307 return Arrays.asList(false, true).stream().map(Arguments::of); 308 } 309}