001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import org.apache.hadoop.hbase.HBaseTestingUtil; 022import org.apache.hadoop.hbase.ProcedureTestUtil; 023import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 024import org.apache.hadoop.hbase.procedure2.Procedure; 025import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 026import org.apache.hadoop.hbase.replication.ReplicationException; 027import org.apache.hadoop.hbase.replication.SyncReplicationState; 028import org.apache.hadoop.hbase.testclassification.LargeTests; 029import org.apache.hadoop.hbase.testclassification.MasterTests; 030import org.junit.jupiter.api.AfterAll; 031import org.junit.jupiter.api.BeforeAll; 032import org.junit.jupiter.api.Tag; 033import org.junit.jupiter.api.Test; 034 035@Tag(MasterTests.TAG) 036@Tag(LargeTests.TAG) 037public class TestTransitPeerSyncReplicationStateProcedureBackoff { 038 039 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 040 041 private static boolean FAIL = true; 042 043 public static class TestTransitPeerSyncReplicationStateProcedure 044 extends TransitPeerSyncReplicationStateProcedure { 045 046 public TestTransitPeerSyncReplicationStateProcedure() { 047 } 048 049 public TestTransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) { 050 super(peerId, state); 051 } 052 053 private void tryFail() throws ReplicationException { 054 synchronized (TestTransitPeerSyncReplicationStateProcedureBackoff.class) { 055 if (FAIL) { 056 throw new ReplicationException("Inject error"); 057 } 058 FAIL = true; 059 } 060 } 061 062 @Override 063 protected <T extends Procedure<MasterProcedureEnv>> void 064 addChildProcedure(@SuppressWarnings("unchecked") T... subProcedure) { 065 // Make it a no-op 066 } 067 068 @Override 069 protected void preTransit(MasterProcedureEnv env) throws IOException { 070 fromState = SyncReplicationState.DOWNGRADE_ACTIVE; 071 } 072 073 @Override 074 protected void setPeerNewSyncReplicationState(MasterProcedureEnv env) 075 throws ReplicationException { 076 tryFail(); 077 } 078 079 @Override 080 protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException { 081 tryFail(); 082 } 083 084 @Override 085 protected void reopenRegions(MasterProcedureEnv env) { 086 // do nothing; 087 } 088 089 @Override 090 protected void transitPeerSyncReplicationState(MasterProcedureEnv env) 091 throws ReplicationException { 092 tryFail(); 093 } 094 095 @Override 096 protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException { 097 try { 098 tryFail(); 099 } catch (ReplicationException e) { 100 throw new IOException(e); 101 } 102 } 103 } 104 105 @BeforeAll 106 public static void setUp() throws Exception { 107 UTIL.startMiniCluster(1); 108 } 109 110 @AfterAll 111 public static void tearDown() throws Exception { 112 UTIL.shutdownMiniCluster(); 113 } 114 115 private void assertBackoffIncrease() throws IOException, InterruptedException { 116 ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, 117 TestTransitPeerSyncReplicationStateProcedure.class, 30000); 118 ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, 119 TestTransitPeerSyncReplicationStateProcedure.class, 2); 120 synchronized (TestTransitPeerSyncReplicationStateProcedure.class) { 121 FAIL = false; 122 } 123 UTIL.waitFor(30000, () -> FAIL); 124 } 125 126 @Test 127 public void testDowngradeActiveToActive() throws IOException, InterruptedException { 128 ProcedureExecutor<MasterProcedureEnv> procExec = 129 UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); 130 // Test procedure: DOWNGRADE_ACTIVE ==> ACTIVE 131 long procId = procExec.submitProcedure( 132 new TestTransitPeerSyncReplicationStateProcedure("1", SyncReplicationState.ACTIVE)); 133 // No retry for PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION 134 // SET_PEER_NEW_SYNC_REPLICATION_STATE 135 assertBackoffIncrease(); 136 // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN 137 // No retry for REOPEN_ALL_REGIONS_IN_PEER 138 // TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE 139 assertBackoffIncrease(); 140 // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END 141 // No retry for POST_PEER_SYNC_REPLICATION_STATE_TRANSITION 142 UTIL.waitFor(30000, () -> procExec.isFinished(procId)); 143 } 144 145 @Test 146 public void testDowngradeActiveToStandby() throws IOException, InterruptedException { 147 ProcedureExecutor<MasterProcedureEnv> procExec = 148 UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); 149 // Test procedure: DOWNGRADE_ACTIVE ==> ACTIVE 150 long procId = procExec.submitProcedure( 151 new TestTransitPeerSyncReplicationStateProcedure("2", SyncReplicationState.STANDBY)); 152 // No retry for PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION 153 // SET_PEER_NEW_SYNC_REPLICATION_STATE 154 assertBackoffIncrease(); 155 // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN 156 // REMOVE_ALL_REPLICATION_QUEUES_IN_PEER 157 assertBackoffIncrease(); 158 // TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE 159 assertBackoffIncrease(); 160 // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END 161 // CREATE_DIR_FOR_REMOTE_WAL 162 assertBackoffIncrease(); 163 // No retry for POST_PEER_SYNC_REPLICATION_STATE_TRANSITION 164 UTIL.waitFor(30000, () -> procExec.isFinished(procId)); 165 } 166}