001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER; 021import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.HashMap; 030import java.util.Map; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.CountDownLatch; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.ServerMetrics; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.StartTestingClusterOption; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.master.HMaster; 042import org.apache.hadoop.hbase.master.MasterServices; 043import org.apache.hadoop.hbase.master.RegionServerList; 044import org.apache.hadoop.hbase.master.ServerManager; 045import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 046import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; 047import org.apache.hadoop.hbase.procedure2.Procedure; 048import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 049import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 050import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 051import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 052import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 053import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 054import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; 055import org.apache.hadoop.hbase.testclassification.MasterTests; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.junit.After; 059import org.junit.AfterClass; 060import org.junit.BeforeClass; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 066 067@Category({ MasterTests.class, MediumTests.class }) 068public class TestMigrateReplicationQueueFromZkToTableProcedure { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedure.class); 073 074 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 075 076 public static final class HMasterForTest extends HMaster { 077 078 public HMasterForTest(Configuration conf) throws IOException { 079 super(conf); 080 } 081 082 @Override 083 protected ServerManager createServerManager(MasterServices master, RegionServerList storage) 084 throws IOException { 085 setupClusterConnection(); 086 return new ServerManagerForTest(master, storage); 087 } 088 } 089 090 private static final ConcurrentMap<ServerName, ServerMetrics> EXTRA_REGION_SERVERS = 091 new ConcurrentHashMap<>(); 092 093 public static final class ServerManagerForTest extends ServerManager { 094 095 public ServerManagerForTest(MasterServices master, RegionServerList storage) { 096 super(master, storage); 097 } 098 099 @Override 100 public Map<ServerName, ServerMetrics> getOnlineServers() { 101 Map<ServerName, ServerMetrics> map = new HashMap<>(super.getOnlineServers()); 102 map.putAll(EXTRA_REGION_SERVERS); 103 return map; 104 } 105 } 106 107 @BeforeClass 108 public static void setupCluster() throws Exception { 109 // one hour, to make sure it will not run during the test 110 UTIL.getConfiguration().setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 60 * 60 * 1000); 111 UTIL.startMiniCluster( 112 StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build()); 113 } 114 115 @AfterClass 116 public static void cleanupTest() throws Exception { 117 UTIL.shutdownMiniCluster(); 118 } 119 120 private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { 121 return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); 122 } 123 124 @After 125 public void tearDown() throws Exception { 126 Admin admin = UTIL.getAdmin(); 127 for (ReplicationPeerDescription pd : admin.listReplicationPeers()) { 128 admin.removeReplicationPeer(pd.getPeerId()); 129 } 130 } 131 132 private static CountDownLatch PEER_PROC_ARRIVE; 133 134 private static CountDownLatch PEER_PROC_RESUME; 135 136 public static final class FakePeerProcedure extends Procedure<MasterProcedureEnv> 137 implements PeerProcedureInterface { 138 139 private String peerId; 140 141 public FakePeerProcedure() { 142 } 143 144 public FakePeerProcedure(String peerId) { 145 this.peerId = peerId; 146 } 147 148 @Override 149 public String getPeerId() { 150 return peerId; 151 } 152 153 @Override 154 public PeerOperationType getPeerOperationType() { 155 return PeerOperationType.UPDATE_CONFIG; 156 } 157 158 @Override 159 protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 160 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 161 PEER_PROC_ARRIVE.countDown(); 162 PEER_PROC_RESUME.await(); 163 return null; 164 } 165 166 @Override 167 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 168 throw new UnsupportedOperationException(); 169 } 170 171 @Override 172 protected boolean abort(MasterProcedureEnv env) { 173 return false; 174 } 175 176 @Override 177 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 178 } 179 180 @Override 181 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 182 } 183 } 184 185 @Test 186 public void testWaitUntilNoPeerProcedure() throws Exception { 187 PEER_PROC_ARRIVE = new CountDownLatch(1); 188 PEER_PROC_RESUME = new CountDownLatch(1); 189 ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); 190 procExec.submitProcedure(new FakePeerProcedure("1")); 191 PEER_PROC_ARRIVE.await(); 192 MigrateReplicationQueueFromZkToTableProcedure proc = 193 new MigrateReplicationQueueFromZkToTableProcedure(); 194 procExec.submitProcedure(proc); 195 // make sure we will wait until there is no peer related procedures before proceeding 196 UTIL.waitFor(30000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT); 197 // continue and make sure we can finish successfully 198 PEER_PROC_RESUME.countDown(); 199 UTIL.waitFor(30000, () -> proc.isSuccess()); 200 } 201 202 // make sure we will disable replication peers while migrating 203 // and also tests disable/enable replication log cleaner and wait for region server upgrading 204 @Test 205 public void testDisablePeerAndWaitStates() throws Exception { 206 String peerId = "2"; 207 ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() 208 .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase") 209 .setReplicateAllUserTables(true).build(); 210 UTIL.getAdmin().addReplicationPeer(peerId, rpc); 211 // put a fake region server to simulate that there are still region servers with older version 212 ServerMetrics metrics = mock(ServerMetrics.class); 213 when(metrics.getVersion()).thenReturn("2.5.0"); 214 EXTRA_REGION_SERVERS 215 .put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics); 216 217 ReplicationLogCleanerBarrier barrier = 218 UTIL.getHBaseCluster().getMaster().getReplicationLogCleanerBarrier(); 219 assertTrue(barrier.start()); 220 221 ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); 222 223 MigrateReplicationQueueFromZkToTableProcedure proc = 224 new MigrateReplicationQueueFromZkToTableProcedure(); 225 procExec.submitProcedure(proc); 226 227 Thread.sleep(5000); 228 // make sure we are still waiting for replication log cleaner quit 229 assertEquals(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER.getNumber(), 230 proc.getCurrentStateId()); 231 barrier.stop(); 232 233 // wait until we reach the wait upgrading state 234 UTIL.waitFor(30000, 235 () -> proc.getCurrentStateId() 236 == MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING.getNumber() 237 && proc.getState() == ProcedureState.WAITING_TIMEOUT); 238 // make sure the peer is disabled for migrating 239 assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId)); 240 // make sure the replication log cleaner is disabled 241 assertFalse(barrier.start()); 242 243 // the procedure should finish successfully 244 EXTRA_REGION_SERVERS.clear(); 245 UTIL.waitFor(30000, () -> proc.isSuccess()); 246 247 // make sure the peer is enabled again 248 assertTrue(UTIL.getAdmin().isReplicationPeerEnabled(peerId)); 249 // make sure the replication log cleaner is enabled again 250 assertTrue(barrier.start()); 251 barrier.stop(); 252 } 253}