001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.master.HMaster;
026import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
028import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility;
029import org.apache.hadoop.hbase.procedure2.Procedure;
030import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
032import org.apache.hadoop.hbase.replication.SyncReplicationState;
033import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
034import org.apache.hadoop.hbase.testclassification.LargeTests;
035import org.apache.hadoop.hbase.testclassification.MasterTests;
036import org.junit.BeforeClass;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040
041@Category({ MasterTests.class, LargeTests.class })
042public class TestTransitPeerSyncReplicationStateProcedureRetry extends SyncReplicationTestBase {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046    HBaseClassTestRule.forClass(TestTransitPeerSyncReplicationStateProcedureRetry.class);
047
048  @BeforeClass
049  public static void setUp() throws Exception {
050    UTIL2.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
051    SyncReplicationTestBase.setUp();
052  }
053
054  @Test
055  public void testRecoveryAndDoubleExecution() throws Exception {
056    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
057      SyncReplicationState.STANDBY);
058    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
059      SyncReplicationState.ACTIVE);
060
061    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
062    write(UTIL1, 0, 100);
063    Thread.sleep(2000);
064    // peer is disabled so no data have been replicated
065    verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
066
067    // transit the A to DA first to avoid too many error logs.
068    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
069      SyncReplicationState.DOWNGRADE_ACTIVE);
070    HMaster master = UTIL2.getHBaseCluster().getMaster();
071    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
072    // Enable test flags and then queue the procedure.
073    ProcedureTestingUtility.waitNoProcedureRunning(procExec);
074    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
075    Thread t = new Thread() {
076
077      @Override
078      public void run() {
079        try {
080          UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
081            SyncReplicationState.DOWNGRADE_ACTIVE);
082        } catch (IOException e) {
083          throw new UncheckedIOException(e);
084        }
085      }
086    };
087    t.start();
088    UTIL2.waitFor(30000, () -> procExec.getProcedures().stream()
089      .anyMatch(p -> p instanceof TransitPeerSyncReplicationStateProcedure && !p.isFinished()));
090    long procId = procExec.getProcedures().stream()
091      .filter(p -> p instanceof TransitPeerSyncReplicationStateProcedure && !p.isFinished())
092      .mapToLong(Procedure::getProcId).min().getAsLong();
093    MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
094    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
095    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
096      UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID));
097    verify(UTIL2, 0, 100);
098  }
099}