001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER; 021import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertFalse; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.HashMap; 030import java.util.Map; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.CountDownLatch; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.ServerMetrics; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.StartTestingClusterOption; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.master.HMaster; 041import org.apache.hadoop.hbase.master.MasterServices; 042import org.apache.hadoop.hbase.master.RegionServerList; 043import org.apache.hadoop.hbase.master.ServerManager; 044import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 045import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; 046import org.apache.hadoop.hbase.procedure2.Procedure; 047import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 048import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 049import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 050import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 051import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 052import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 053import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; 054import org.apache.hadoop.hbase.testclassification.MasterTests; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.junit.jupiter.api.AfterAll; 058import org.junit.jupiter.api.AfterEach; 059import org.junit.jupiter.api.BeforeAll; 060import org.junit.jupiter.api.Tag; 061import org.junit.jupiter.api.Test; 062 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 064 065@Tag(MasterTests.TAG) 066@Tag(MediumTests.TAG) 067public class TestMigrateReplicationQueueFromZkToTableProcedure { 068 069 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 070 071 public static final class HMasterForTest extends HMaster { 072 073 public HMasterForTest(Configuration conf) throws IOException { 074 super(conf); 075 } 076 077 @Override 078 protected ServerManager createServerManager(MasterServices master, RegionServerList storage) 079 throws IOException { 080 setupClusterConnection(); 081 return new ServerManagerForTest(master, storage); 082 } 083 } 084 085 private static final ConcurrentMap<ServerName, ServerMetrics> EXTRA_REGION_SERVERS = 086 new ConcurrentHashMap<>(); 087 088 public static final class ServerManagerForTest extends ServerManager { 089 090 public ServerManagerForTest(MasterServices master, RegionServerList storage) { 091 super(master, storage); 092 } 093 094 @Override 095 public Map<ServerName, ServerMetrics> getOnlineServers() { 096 Map<ServerName, ServerMetrics> map = new HashMap<>(super.getOnlineServers()); 097 map.putAll(EXTRA_REGION_SERVERS); 098 return map; 099 } 100 } 101 102 @BeforeAll 103 public static void setupCluster() throws Exception { 104 // one hour, to make sure it will not run during the test 105 UTIL.getConfiguration().setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 60 * 60 * 1000); 106 UTIL.startMiniCluster( 107 StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build()); 108 } 109 110 @AfterAll 111 public static void cleanupTest() throws Exception { 112 UTIL.shutdownMiniCluster(); 113 } 114 115 private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 116 return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); 117 } 118 119 @AfterEach 120 public void tearDown() throws Exception { 121 Admin admin = UTIL.getAdmin(); 122 for (ReplicationPeerDescription pd : admin.listReplicationPeers()) { 123 admin.removeReplicationPeer(pd.getPeerId()); 124 } 125 } 126 127 private static CountDownLatch PEER_PROC_ARRIVE; 128 129 private static CountDownLatch PEER_PROC_RESUME; 130 131 public static final class FakePeerProcedure extends Procedure<MasterProcedureEnv> 132 implements PeerProcedureInterface { 133 134 private String peerId; 135 136 public FakePeerProcedure() { 137 } 138 139 public FakePeerProcedure(String peerId) { 140 this.peerId = peerId; 141 } 142 143 @Override 144 public String getPeerId() { 145 return peerId; 146 } 147 148 @Override 149 public PeerOperationType getPeerOperationType() { 150 return PeerOperationType.UPDATE_CONFIG; 151 } 152 153 @Override 154 protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 155 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 156 PEER_PROC_ARRIVE.countDown(); 157 PEER_PROC_RESUME.await(); 158 return null; 159 } 160 161 @Override 162 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 163 throw new UnsupportedOperationException(); 164 } 165 166 @Override 167 protected boolean abort(MasterProcedureEnv env) { 168 return false; 169 } 170 171 @Override 172 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 173 } 174 175 @Override 176 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 177 } 178 } 179 180 @Test 181 public void testWaitUntilNoPeerProcedure() throws Exception { 182 PEER_PROC_ARRIVE = new CountDownLatch(1); 183 PEER_PROC_RESUME = new CountDownLatch(1); 184 ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); 185 procExec.submitProcedure(new FakePeerProcedure("1")); 186 PEER_PROC_ARRIVE.await(); 187 MigrateReplicationQueueFromZkToTableProcedure proc = 188 new MigrateReplicationQueueFromZkToTableProcedure(); 189 procExec.submitProcedure(proc); 190 // make sure we will wait until there is no peer related procedures before proceeding 191 UTIL.waitFor(30000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT); 192 // continue and make sure we can finish successfully 193 PEER_PROC_RESUME.countDown(); 194 UTIL.waitFor(30000, () -> proc.isSuccess()); 195 } 196 197 // make sure we will disable replication peers while migrating 198 // and also tests disable/enable replication log cleaner and wait for region server upgrading 199 @Test 200 public void testDisablePeerAndWaitStates() throws Exception { 201 String peerId = "2"; 202 ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() 203 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase") 204 .setReplicateAllUserTables(true).build(); 205 UTIL.getAdmin().addReplicationPeer(peerId, rpc); 206 // put a fake region server to simulate that there are still region servers with older version 207 ServerMetrics metrics = mock(ServerMetrics.class); 208 when(metrics.getVersion()).thenReturn("2.5.0"); 209 EXTRA_REGION_SERVERS 210 .put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics); 211 212 ReplicationLogCleanerBarrier barrier = 213 UTIL.getHBaseCluster().getMaster().getReplicationLogCleanerBarrier(); 214 assertTrue(barrier.start()); 215 216 ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); 217 218 MigrateReplicationQueueFromZkToTableProcedure proc = 219 new MigrateReplicationQueueFromZkToTableProcedure(); 220 procExec.submitProcedure(proc); 221 222 Thread.sleep(5000); 223 // make sure we are still waiting for replication log cleaner quit 224 assertEquals(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER.getNumber(), 225 proc.getCurrentStateId()); 226 barrier.stop(); 227 228 // wait until we reach the wait upgrading state 229 UTIL.waitFor(30000, 230 () -> proc.getCurrentStateId() 231 == MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING.getNumber() 232 && proc.getState() == ProcedureState.WAITING_TIMEOUT); 233 // make sure the peer is disabled for migrating 234 assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId)); 235 // make sure the replication log cleaner is disabled 236 assertFalse(barrier.start()); 237 238 // the procedure should finish successfully 239 EXTRA_REGION_SERVERS.clear(); 240 UTIL.waitFor(30000, () -> proc.isSuccess()); 241 242 // make sure the peer is enabled again 243 assertTrue(UTIL.getAdmin().isReplicationPeerEnabled(peerId)); 244 // make sure the replication log cleaner is enabled again 245 assertTrue(barrier.start()); 246 barrier.stop(); 247 } 248}