001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY;
021import static org.awaitility.Awaitility.await;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.time.Duration;
028import java.util.Optional;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
031import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
032import org.apache.hadoop.hbase.coprocessor.MasterObserver;
033import org.apache.hadoop.hbase.coprocessor.ObserverContext;
034import org.apache.hadoop.hbase.master.MasterFileSystem;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.testclassification.ReplicationTests;
037import org.junit.jupiter.api.BeforeAll;
038import org.junit.jupiter.api.Tag;
039import org.junit.jupiter.api.Test;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@Tag(ReplicationTests.TAG)
044@Tag(LargeTests.TAG)
045public class TestSyncReplicationStandbyKillMaster extends SyncReplicationTestBaseNoBeforeAll {
046
047  private static final Logger LOG =
048    LoggerFactory.getLogger(TestSyncReplicationStandbyKillMaster.class);
049
050  private static final int COUNT = 1000;
051
052  private static volatile boolean KILL_MASTER = false;
053
054  public static final class TransitCP implements MasterCoprocessor, MasterObserver {
055
056    @Override
057    public void preTransitReplicationPeerSyncReplicationState(
058      ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId, SyncReplicationState state)
059      throws IOException {
060      if (KILL_MASTER) {
061        KILL_MASTER = false;
062        UTIL2.getMiniHBaseCluster().getMaster().abort("Stop master for test");
063      }
064    }
065
066    @Override
067    public Optional<MasterObserver> getMasterObserver() {
068      return Optional.of(this);
069    }
070  }
071
072  @BeforeAll
073  public static void setUp() throws Exception {
074    UTIL2.getConfiguration().set(MASTER_COPROCESSOR_CONF_KEY, TransitCP.class.getName());
075    startClusters();
076  }
077
078  @Test
079  public void testStandbyKillMaster() throws Exception {
080    MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
081    Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
082    assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
083    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
084      SyncReplicationState.STANDBY);
085    assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
086    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
087      SyncReplicationState.ACTIVE);
088
089    // Disable async replication and write data, then shutdown
090    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
091    write(UTIL1, 0, COUNT);
092    UTIL1.shutdownMiniCluster();
093
094    KILL_MASTER = true;
095    // Transit standby to DA to replay logs
096    try {
097      UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
098        SyncReplicationState.DOWNGRADE_ACTIVE);
099    } catch (Exception e) {
100      // we will kill master in the transit procedure's preCheck method, not in the rpc thread, so
101      // we can not know whether we could successfully get the returned proc id. If we can get the
102      // proc id, the call should be succeeded and we will not arrive here, and the below await
103      // check will pass immediately, if not, we will get this exception and then we need to rely on
104      // the below await check to make sure the transition is successfully finished.
105      LOG.warn("Failed to transit standby cluster to {}", SyncReplicationState.DOWNGRADE_ACTIVE);
106    }
107
108    await().atMost(Duration.ofMinutes(3))
109      .untilAsserted(() -> assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
110        UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)));
111
112    verify(UTIL2, 0, COUNT);
113  }
114}