001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.stream.Collectors;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.master.MasterFileSystem;
030import org.apache.hadoop.hbase.master.ServerManager;
031import org.apache.hadoop.hbase.testclassification.LargeTests;
032import org.apache.hadoop.hbase.testclassification.ReplicationTests;
033import org.apache.hadoop.hbase.util.JVMClusterUtil;
034import org.junit.jupiter.api.Tag;
035import org.junit.jupiter.api.Test;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039@Tag(ReplicationTests.TAG)
040@Tag(LargeTests.TAG)
041public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase {
042
043  private static final Logger LOG = LoggerFactory.getLogger(TestSyncReplicationStandbyKillRS.class);
044
045  private final long SLEEP_TIME = 1000;
046
047  private final int COUNT = 1000;
048
049  @Test
050  public void testStandbyKillRegionServer() throws Exception {
051    MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
052    Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
053    assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
054    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
055      SyncReplicationState.STANDBY);
056    assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
057    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
058      SyncReplicationState.ACTIVE);
059
060    // Disable async replication and write data, then shutdown
061    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
062    write(UTIL1, 0, COUNT);
063    UTIL1.shutdownMiniCluster();
064
065    JVMClusterUtil.MasterThread activeMaster = UTIL2.getMiniHBaseCluster().getMasterThread();
066    String threadName = "RegionServer-Restarter";
067    Thread t = new Thread(() -> {
068      try {
069        List<JVMClusterUtil.RegionServerThread> regionServers =
070          new ArrayList<>(UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads());
071        LOG.debug("Going to stop {} RSes: [{}]", regionServers.size(),
072          regionServers.stream().map(rst -> rst.getRegionServer().getServerName().getServerName())
073            .collect(Collectors.joining(", ")));
074        for (JVMClusterUtil.RegionServerThread rst : regionServers) {
075          ServerName serverName = rst.getRegionServer().getServerName();
076          LOG.debug("Going to RS stop [{}]", serverName);
077          rst.getRegionServer().stop("Stop RS for test");
078          waitForRSShutdownToStartAndFinish(activeMaster, serverName);
079          LOG.debug("Going to start a new RS");
080          JVMClusterUtil.RegionServerThread restarted =
081            UTIL2.getMiniHBaseCluster().startRegionServer();
082          LOG.debug("Waiting RS [{}] to online", restarted.getRegionServer().getServerName());
083          restarted.waitForServerOnline();
084          LOG.debug("Waiting the old RS {} thread to quit", rst.getName());
085          rst.join();
086          LOG.debug("Done stop RS [{}] and restart [{}]", serverName,
087            restarted.getRegionServer().getServerName());
088        }
089        LOG.debug("All RSes restarted");
090      } catch (Exception e) {
091        LOG.error("Failed to kill RS", e);
092      }
093    }, threadName);
094    t.start();
095
096    LOG.debug("Going to transit peer {} to {} state", PEER_ID,
097      SyncReplicationState.DOWNGRADE_ACTIVE);
098    // Transit standby to DA to replay logs
099    try {
100      UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
101        SyncReplicationState.DOWNGRADE_ACTIVE);
102    } catch (Exception e) {
103      LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE, e);
104    }
105
106    LOG.debug("Waiting for the restarter thread {} to quit", threadName);
107    t.join();
108
109    while (
110      UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)
111          != SyncReplicationState.DOWNGRADE_ACTIVE
112    ) {
113      LOG.debug("Waiting for peer {} to be in {} state", PEER_ID,
114        SyncReplicationState.DOWNGRADE_ACTIVE);
115      Thread.sleep(SLEEP_TIME);
116    }
117    LOG.debug("Going to verify the result, {} records expected", COUNT);
118    verify(UTIL2, 0, COUNT);
119    LOG.debug("Verification successfully done");
120  }
121
122  private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster,
123    ServerName serverName) throws InterruptedException, IOException {
124    ServerManager sm = activeMaster.getMaster().getServerManager();
125    // First wait for it to be in dead list
126    while (!sm.getDeadServers().isDeadServer(serverName)) {
127      LOG.debug("Waiting for {} to be listed as dead in master", serverName);
128      Thread.sleep(SLEEP_TIME);
129    }
130    LOG.debug("Server {} marked as dead, waiting for it to finish dead processing", serverName);
131    while (sm.areDeadServersInProgress()) {
132      LOG.debug("Server {} still being processed, waiting", serverName);
133      Thread.sleep(SLEEP_TIME);
134    }
135    LOG.debug("Server {} done with server shutdown processing", serverName);
136  }
137}