001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState.DISPATCH_WALS_VALUE;
021import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState.UNREGISTER_PEER_FROM_WORKER_STORAGE_VALUE;
022
023import java.io.IOException;
024import java.io.UncheckedIOException;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.master.HMaster;
029import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
030import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
031import org.apache.hadoop.hbase.replication.SyncReplicationState;
032import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.MasterTests;
035import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
036import org.junit.BeforeClass;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040
041/**
042 * Testcase for HBASE-21494.
043 */
044@Category({ MasterTests.class, LargeTests.class })
045public class TestRegisterPeerWorkerWhenRestarting extends SyncReplicationTestBase {
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049    HBaseClassTestRule.forClass(TestRegisterPeerWorkerWhenRestarting.class);
050
051  private static volatile boolean FAIL = false;
052
053  public static final class HMasterForTest extends HMaster {
054
055    public HMasterForTest(Configuration conf) throws IOException {
056      super(conf);
057    }
058
059    @Override
060    public void remoteProcedureCompleted(long procId) {
061      if (
062        FAIL && getMasterProcedureExecutor()
063          .getProcedure(procId) instanceof SyncReplicationReplayWALRemoteProcedure
064      ) {
065        throw new RuntimeException("Inject error");
066      }
067      super.remoteProcedureCompleted(procId);
068    }
069  }
070
071  @BeforeClass
072  public static void setUp() throws Exception {
073    UTIL2.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
074    SyncReplicationTestBase.setUp();
075  }
076
077  @Test
078  public void testRestart() throws Exception {
079    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
080      SyncReplicationState.STANDBY);
081    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
082      SyncReplicationState.ACTIVE);
083
084    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
085    write(UTIL1, 0, 100);
086    Thread.sleep(2000);
087    // peer is disabled so no data have been replicated
088    verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
089
090    // transit the A to DA first to avoid too many error logs.
091    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
092      SyncReplicationState.DOWNGRADE_ACTIVE);
093    HMaster master = UTIL2.getHBaseCluster().getMaster();
094    // make sure the transiting can not succeed
095    FAIL = true;
096    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
097    Thread t = new Thread() {
098
099      @Override
100      public void run() {
101        try {
102          UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
103            SyncReplicationState.DOWNGRADE_ACTIVE);
104        } catch (IOException e) {
105          throw new UncheckedIOException(e);
106        }
107      }
108    };
109    t.start();
110    // wait until we are in the states where we need to register peer worker when restarting
111    UTIL2.waitFor(60000,
112      () -> procExec.getProcedures().stream().filter(p -> p instanceof RecoverStandbyProcedure)
113        .map(p -> (RecoverStandbyProcedure) p)
114        .anyMatch(p -> p.getCurrentStateId() == DISPATCH_WALS_VALUE
115          || p.getCurrentStateId() == UNREGISTER_PEER_FROM_WORKER_STORAGE_VALUE));
116    // failover to another master
117    MasterThread mt = UTIL2.getMiniHBaseCluster().getMasterThread();
118    mt.getMaster().abort("for testing");
119    mt.join();
120    FAIL = false;
121    t.join();
122    // make sure the new master can finish the transition
123    UTIL2.waitFor(60000, () -> UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)
124        == SyncReplicationState.DOWNGRADE_ACTIVE);
125    verify(UTIL2, 0, 100);
126  }
127}