001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT; 021import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT_WAIT_INTERVAL; 022import static org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertNotEquals; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026 027import java.io.IOException; 028import java.io.UncheckedIOException; 029import java.util.List; 030import java.util.Map; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 035import org.apache.hadoop.hbase.StartTestingClusterOption; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.RegionInfo; 038import org.apache.hadoop.hbase.testclassification.MasterTests; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.util.JVMClusterUtil; 041import org.junit.jupiter.api.Tag; 042import org.junit.jupiter.api.Test; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046@Tag(MasterTests.TAG) 047@Tag(MediumTests.TAG) 048public class TestRetainAssignmentOnRestart extends AbstractTestRestartCluster { 049 050 private static final Logger LOG = LoggerFactory.getLogger(TestRetainAssignmentOnRestart.class); 051 052 private static int NUM_OF_RS = 3; 053 054 public static final class HMasterForTest extends HMaster { 055 056 public HMasterForTest(Configuration conf) throws IOException { 057 super(conf); 058 } 059 060 @Override 061 protected void startProcedureExecutor() throws IOException { 062 // only start procedure executor when we have all the regionservers ready to take regions 063 new Thread(() -> { 064 for (;;) { 065 if (getServerManager().createDestinationServersList().size() == NUM_OF_RS) { 066 try { 067 HMasterForTest.super.startProcedureExecutor(); 068 } catch (IOException e) { 069 throw new UncheckedIOException(e); 070 } 071 break; 072 } 073 try { 074 Thread.sleep(1000); 075 } catch (InterruptedException e) { 076 } 077 } 078 }).start(); 079 } 080 } 081 082 @Override 083 protected boolean splitWALCoordinatedByZk() { 084 return true; 085 } 086 087 /** 088 * This tests retaining assignments on a cluster restart 089 */ 090 @Test 091 public void testRetainAssignmentOnClusterRestart() throws Exception { 092 setupCluster(); 093 HMaster master = UTIL.getMiniHBaseCluster().getMaster(); 094 SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster(); 095 List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads(); 096 assertEquals(NUM_OF_RS, threads.size()); 097 int[] rsPorts = new int[NUM_OF_RS]; 098 for (int i = 0; i < NUM_OF_RS; i++) { 099 rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort(); 100 } 101 102 // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to 103 // use it to load all user region placements 104 SnapshotOfRegionAssignmentFromMeta snapshot = 105 new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 106 snapshot.initialize(); 107 Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 108 for (ServerName serverName : regionToRegionServerMap.values()) { 109 boolean found = false; // Test only, no need to optimize 110 for (int k = 0; k < NUM_OF_RS && !found; k++) { 111 found = serverName.getPort() == rsPorts[k]; 112 } 113 assertTrue(found); 114 } 115 116 LOG.info("\n\nShutting down HBase cluster"); 117 cluster.stopMaster(0); 118 cluster.shutdown(); 119 cluster.waitUntilShutDown(); 120 121 LOG.info("\n\nSleeping a bit"); 122 Thread.sleep(2000); 123 124 LOG.info("\n\nStarting cluster the second time with the same ports"); 125 cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3); 126 master = cluster.startMaster().getMaster(); 127 for (int i = 0; i < NUM_OF_RS; i++) { 128 cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, rsPorts[i]); 129 cluster.startRegionServer(); 130 } 131 132 ensureServersWithSamePort(master, rsPorts); 133 134 // Wait till master is initialized and all regions are assigned 135 for (TableName TABLE : TABLES) { 136 UTIL.waitTableAvailable(TABLE); 137 } 138 UTIL.waitUntilNoRegionsInTransition(60000); 139 140 snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 141 snapshot.initialize(); 142 Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 143 assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size()); 144 for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) { 145 ServerName oldServer = regionToRegionServerMap.get(entry.getKey()); 146 ServerName currentServer = entry.getValue(); 147 LOG.info( 148 "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer); 149 assertEquals(oldServer.getAddress(), currentServer.getAddress(), entry.getKey().toString()); 150 assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode()); 151 } 152 } 153 154 /** 155 * This tests retaining assignments on a single node restart 156 */ 157 @Test 158 public void testRetainAssignmentOnSingleRSRestart() throws Exception { 159 setupCluster(); 160 HMaster master = UTIL.getMiniHBaseCluster().getMaster(); 161 SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster(); 162 List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads(); 163 assertEquals(NUM_OF_RS, threads.size()); 164 int[] rsPorts = new int[NUM_OF_RS]; 165 for (int i = 0; i < NUM_OF_RS; i++) { 166 rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort(); 167 } 168 169 // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to 170 // use it to load all user region placements 171 SnapshotOfRegionAssignmentFromMeta snapshot = 172 new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 173 snapshot.initialize(); 174 Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 175 for (ServerName serverName : regionToRegionServerMap.values()) { 176 boolean found = false; // Test only, no need to optimize 177 for (int k = 0; k < NUM_OF_RS && !found; k++) { 178 found = serverName.getPort() == rsPorts[k]; 179 } 180 assertTrue(found); 181 } 182 183 // Server to be restarted 184 ServerName deadRS = threads.get(0).getRegionServer().getServerName(); 185 LOG.info("\n\nStopping HMaster and {} server", deadRS); 186 // Stopping master first so that region server SCP will not be initiated 187 cluster.stopMaster(0); 188 cluster.waitForMasterToStop(master.getServerName(), 5000); 189 cluster.stopRegionServer(deadRS); 190 cluster.waitForRegionServerToStop(deadRS, 5000); 191 192 LOG.info("\n\nSleeping a bit"); 193 Thread.sleep(2000); 194 195 LOG.info("\n\nStarting HMaster and region server {} second time with the same port", deadRS); 196 cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3); 197 master = cluster.startMaster().getMaster(); 198 cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort()); 199 cluster.startRegionServer(); 200 201 ensureServersWithSamePort(master, rsPorts); 202 203 // Wait till master is initialized and all regions are assigned 204 for (TableName TABLE : TABLES) { 205 UTIL.waitTableAvailable(TABLE); 206 } 207 UTIL.waitUntilNoRegionsInTransition(60000); 208 209 snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 210 snapshot.initialize(); 211 Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 212 assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size()); 213 for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) { 214 ServerName oldServer = regionToRegionServerMap.get(entry.getKey()); 215 ServerName currentServer = entry.getValue(); 216 LOG.info( 217 "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer); 218 assertEquals(oldServer.getAddress(), currentServer.getAddress(), entry.getKey().toString()); 219 220 if (deadRS.getPort() == oldServer.getPort()) { 221 // Restarted RS start code wont be same 222 assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode()); 223 } else { 224 assertEquals(oldServer.getStartcode(), currentServer.getStartcode()); 225 } 226 } 227 } 228 229 /** 230 * This tests the force retaining assignments upon an RS restart, even when master triggers an SCP 231 */ 232 @Test 233 public void testForceRetainAssignment() throws Exception { 234 UTIL.getConfiguration().setBoolean(FORCE_REGION_RETAINMENT, true); 235 UTIL.getConfiguration().setLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 50); 236 setupCluster(); 237 HMaster master = UTIL.getMiniHBaseCluster().getMaster(); 238 SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster(); 239 List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads(); 240 assertEquals(NUM_OF_RS, threads.size()); 241 int[] rsPorts = new int[NUM_OF_RS]; 242 for (int i = 0; i < NUM_OF_RS; i++) { 243 rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort(); 244 } 245 246 // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to 247 // use it to load all user region placements 248 SnapshotOfRegionAssignmentFromMeta snapshot = 249 new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 250 snapshot.initialize(); 251 Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 252 for (ServerName serverName : regionToRegionServerMap.values()) { 253 boolean found = false; // Test only, no need to optimize 254 for (int k = 0; k < NUM_OF_RS && !found; k++) { 255 found = serverName.getPort() == rsPorts[k]; 256 } 257 LOG.info("Server {} has regions? {}", serverName, found); 258 assertTrue(found); 259 } 260 261 // Server to be restarted 262 ServerName deadRS = threads.get(0).getRegionServer().getServerName(); 263 LOG.info("\n\nStopping {} server", deadRS); 264 cluster.stopRegionServer(deadRS); 265 266 LOG.info("\n\nSleeping a bit"); 267 Thread.sleep(2000); 268 269 LOG.info("\n\nStarting region server {} second time with the same port", deadRS); 270 cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3); 271 cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort()); 272 cluster.startRegionServer(); 273 274 ensureServersWithSamePort(master, rsPorts); 275 276 // Wait till master is initialized and all regions are assigned 277 for (TableName TABLE : TABLES) { 278 UTIL.waitTableAvailable(TABLE); 279 } 280 UTIL.waitUntilNoRegionsInTransition(60000); 281 snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 282 snapshot.initialize(); 283 Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 284 assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size()); 285 for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) { 286 ServerName oldServer = regionToRegionServerMap.get(entry.getKey()); 287 ServerName currentServer = entry.getValue(); 288 LOG.info( 289 "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer); 290 assertEquals(oldServer.getAddress(), currentServer.getAddress(), entry.getKey().toString()); 291 292 if (deadRS.getPort() == oldServer.getPort()) { 293 // Restarted RS start code wont be same 294 assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode()); 295 } else { 296 assertEquals(oldServer.getStartcode(), currentServer.getStartcode()); 297 } 298 } 299 } 300 301 private void setupCluster() throws Exception, IOException, InterruptedException { 302 // Set Zookeeper based connection registry since we will stop master and start a new master 303 // without populating the underlying config for the connection. 304 UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 305 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 306 // Enable retain assignment during ServerCrashProcedure 307 UTIL.getConfiguration().setBoolean(MASTER_SCP_RETAIN_ASSIGNMENT, true); 308 UTIL.startMiniCluster(StartTestingClusterOption.builder().masterClass(HMasterForTest.class) 309 .numRegionServers(NUM_OF_RS).build()); 310 311 // Turn off balancer 312 UTIL.getMiniHBaseCluster().getMaster().getMasterRpcServices().synchronousBalanceSwitch(false); 313 314 LOG.info("\n\nCreating tables"); 315 for (TableName TABLE : TABLES) { 316 UTIL.createTable(TABLE, FAMILY); 317 } 318 for (TableName TABLE : TABLES) { 319 UTIL.waitTableEnabled(TABLE); 320 } 321 322 UTIL.getMiniHBaseCluster().getMaster(); 323 UTIL.waitUntilNoRegionsInTransition(60000); 324 } 325 326 private void ensureServersWithSamePort(HMaster master, int[] rsPorts) { 327 // Make sure live regionservers are on the same host/port 328 List<ServerName> localServers = master.getServerManager().getOnlineServersList(); 329 assertEquals(NUM_OF_RS, localServers.size()); 330 for (int i = 0; i < NUM_OF_RS; i++) { 331 boolean found = false; 332 for (ServerName serverName : localServers) { 333 if (serverName.getPort() == rsPorts[i]) { 334 found = true; 335 break; 336 } 337 } 338 assertTrue(found); 339 } 340 } 341}