001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.stream.Collectors; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.master.MasterFileSystem; 030import org.apache.hadoop.hbase.master.ServerManager; 031import org.apache.hadoop.hbase.testclassification.LargeTests; 032import org.apache.hadoop.hbase.testclassification.ReplicationTests; 033import org.apache.hadoop.hbase.util.JVMClusterUtil; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040@Category({ ReplicationTests.class, LargeTests.class }) 041public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase { 042 043 private static final Logger LOG = LoggerFactory.getLogger(TestSyncReplicationStandbyKillRS.class); 044 045 private final long SLEEP_TIME = 1000; 046 047 private final int COUNT = 1000; 048 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillRS.class); 052 053 @Test 054 public void testStandbyKillRegionServer() throws Exception { 055 MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); 056 Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); 057 assertFalse(mfs.getWALFileSystem().exists(remoteWALDir)); 058 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 059 SyncReplicationState.STANDBY); 060 assertTrue(mfs.getWALFileSystem().exists(remoteWALDir)); 061 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 062 SyncReplicationState.ACTIVE); 063 064 // Disable async replication and write data, then shutdown 065 UTIL1.getAdmin().disableReplicationPeer(PEER_ID); 066 write(UTIL1, 0, COUNT); 067 UTIL1.shutdownMiniCluster(); 068 069 JVMClusterUtil.MasterThread activeMaster = UTIL2.getMiniHBaseCluster().getMasterThread(); 070 String threadName = "RegionServer-Restarter"; 071 Thread t = new Thread(() -> { 072 try { 073 List<JVMClusterUtil.RegionServerThread> regionServers = 074 UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads(); 075 LOG.debug("Going to stop {} RSes: [{}]", regionServers.size(), 076 regionServers.stream().map(rst -> rst.getRegionServer().getServerName().getServerName()) 077 .collect(Collectors.joining(", "))); 078 for (JVMClusterUtil.RegionServerThread rst : regionServers) { 079 ServerName serverName = rst.getRegionServer().getServerName(); 080 LOG.debug("Going to RS stop [{}]", serverName); 081 rst.getRegionServer().stop("Stop RS for test"); 082 waitForRSShutdownToStartAndFinish(activeMaster, serverName); 083 LOG.debug("Going to start a new RS"); 084 JVMClusterUtil.RegionServerThread restarted = 085 UTIL2.getMiniHBaseCluster().startRegionServer(); 086 LOG.debug("Waiting RS [{}] to online", restarted.getRegionServer().getServerName()); 087 restarted.waitForServerOnline(); 088 LOG.debug("Waiting the old RS {} thread to quit", rst.getName()); 089 rst.join(); 090 LOG.debug("Done stop RS [{}] and restart [{}]", serverName, 091 restarted.getRegionServer().getServerName()); 092 } 093 LOG.debug("All RSes restarted"); 094 } catch (Exception e) { 095 LOG.error("Failed to kill RS", e); 096 } 097 }, threadName); 098 t.start(); 099 100 LOG.debug("Going to transit peer {} to {} state", PEER_ID, 101 SyncReplicationState.DOWNGRADE_ACTIVE); 102 // Transit standby to DA to replay logs 103 try { 104 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 105 SyncReplicationState.DOWNGRADE_ACTIVE); 106 } catch (Exception e) { 107 LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE, e); 108 } 109 110 LOG.debug("Waiting for the restarter thread {} to quit", threadName); 111 t.join(); 112 113 while ( 114 UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) 115 != SyncReplicationState.DOWNGRADE_ACTIVE 116 ) { 117 LOG.debug("Waiting for peer {} to be in {} state", PEER_ID, 118 SyncReplicationState.DOWNGRADE_ACTIVE); 119 Thread.sleep(SLEEP_TIME); 120 } 121 LOG.debug("Going to verify the result, {} records expected", COUNT); 122 verify(UTIL2, 0, COUNT); 123 LOG.debug("Verification successfully done"); 124 } 125 126 private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster, 127 ServerName serverName) throws InterruptedException, IOException { 128 ServerManager sm = activeMaster.getMaster().getServerManager(); 129 // First wait for it to be in dead list 130 while (!sm.getDeadServers().isDeadServer(serverName)) { 131 LOG.debug("Waiting for {} to be listed as dead in master", serverName); 132 Thread.sleep(SLEEP_TIME); 133 } 134 LOG.debug("Server {} marked as dead, waiting for it to finish dead processing", serverName); 135 while (sm.areDeadServersInProgress()) { 136 LOG.debug("Server {} still being processed, waiting", serverName); 137 Thread.sleep(SLEEP_TIME); 138 } 139 LOG.debug("Server {} done with server shutdown processing", serverName); 140 } 141}