001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import org.apache.hadoop.hbase.HBaseTestingUtil;
022import org.apache.hadoop.hbase.ProcedureTestUtil;
023import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
024import org.apache.hadoop.hbase.procedure2.Procedure;
025import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
026import org.apache.hadoop.hbase.replication.ReplicationException;
027import org.apache.hadoop.hbase.replication.SyncReplicationState;
028import org.apache.hadoop.hbase.testclassification.LargeTests;
029import org.apache.hadoop.hbase.testclassification.MasterTests;
030import org.junit.jupiter.api.AfterAll;
031import org.junit.jupiter.api.BeforeAll;
032import org.junit.jupiter.api.Tag;
033import org.junit.jupiter.api.Test;
034
035@Tag(MasterTests.TAG)
036@Tag(LargeTests.TAG)
037public class TestTransitPeerSyncReplicationStateProcedureBackoff {
038
039  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
040
041  private static boolean FAIL = true;
042
043  public static class TestTransitPeerSyncReplicationStateProcedure
044    extends TransitPeerSyncReplicationStateProcedure {
045
046    public TestTransitPeerSyncReplicationStateProcedure() {
047    }
048
049    public TestTransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) {
050      super(peerId, state);
051    }
052
053    private void tryFail() throws ReplicationException {
054      synchronized (TestTransitPeerSyncReplicationStateProcedureBackoff.class) {
055        if (FAIL) {
056          throw new ReplicationException("Inject error");
057        }
058        FAIL = true;
059      }
060    }
061
062    @Override
063    protected <T extends Procedure<MasterProcedureEnv>> void
064      addChildProcedure(@SuppressWarnings("unchecked") T... subProcedure) {
065      // Make it a no-op
066    }
067
068    @Override
069    protected void preTransit(MasterProcedureEnv env) throws IOException {
070      fromState = SyncReplicationState.DOWNGRADE_ACTIVE;
071    }
072
073    @Override
074    protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
075      throws ReplicationException {
076      tryFail();
077    }
078
079    @Override
080    protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException {
081      tryFail();
082    }
083
084    @Override
085    protected void reopenRegions(MasterProcedureEnv env) {
086      // do nothing;
087    }
088
089    @Override
090    protected void transitPeerSyncReplicationState(MasterProcedureEnv env)
091      throws ReplicationException {
092      tryFail();
093    }
094
095    @Override
096    protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException {
097      try {
098        tryFail();
099      } catch (ReplicationException e) {
100        throw new IOException(e);
101      }
102    }
103  }
104
105  @BeforeAll
106  public static void setUp() throws Exception {
107    UTIL.startMiniCluster(1);
108  }
109
110  @AfterAll
111  public static void tearDown() throws Exception {
112    UTIL.shutdownMiniCluster();
113  }
114
115  private void assertBackoffIncrease() throws IOException, InterruptedException {
116    ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL,
117      TestTransitPeerSyncReplicationStateProcedure.class, 30000);
118    ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL,
119      TestTransitPeerSyncReplicationStateProcedure.class, 2);
120    synchronized (TestTransitPeerSyncReplicationStateProcedure.class) {
121      FAIL = false;
122    }
123    UTIL.waitFor(30000, () -> FAIL);
124  }
125
126  @Test
127  public void testDowngradeActiveToActive() throws IOException, InterruptedException {
128    ProcedureExecutor<MasterProcedureEnv> procExec =
129      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
130    // Test procedure: DOWNGRADE_ACTIVE ==> ACTIVE
131    long procId = procExec.submitProcedure(
132      new TestTransitPeerSyncReplicationStateProcedure("1", SyncReplicationState.ACTIVE));
133    // No retry for PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION
134    // SET_PEER_NEW_SYNC_REPLICATION_STATE
135    assertBackoffIncrease();
136    // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN
137    // No retry for REOPEN_ALL_REGIONS_IN_PEER
138    // TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE
139    assertBackoffIncrease();
140    // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END
141    // No retry for POST_PEER_SYNC_REPLICATION_STATE_TRANSITION
142    UTIL.waitFor(30000, () -> procExec.isFinished(procId));
143  }
144
145  @Test
146  public void testDowngradeActiveToStandby() throws IOException, InterruptedException {
147    ProcedureExecutor<MasterProcedureEnv> procExec =
148      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
149    // Test procedure: DOWNGRADE_ACTIVE ==> ACTIVE
150    long procId = procExec.submitProcedure(
151      new TestTransitPeerSyncReplicationStateProcedure("2", SyncReplicationState.STANDBY));
152    // No retry for PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION
153    // SET_PEER_NEW_SYNC_REPLICATION_STATE
154    assertBackoffIncrease();
155    // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN
156    // REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
157    assertBackoffIncrease();
158    // TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE
159    assertBackoffIncrease();
160    // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END
161    // CREATE_DIR_FOR_REMOTE_WAL
162    assertBackoffIncrease();
163    // No retry for POST_PEER_SYNC_REPLICATION_STATE_TRANSITION
164    UTIL.waitFor(30000, () -> procExec.isFinished(procId));
165  }
166}