001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.containsString; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.ForkJoinPool; 030import java.util.stream.Stream; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.client.AsyncAdmin; 036import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; 037import org.apache.hadoop.hbase.replication.FSReplicationPeerStorage; 038import org.apache.hadoop.hbase.replication.ReplicationException; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 040import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 041import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 042import org.apache.hadoop.hbase.replication.SyncReplicationState; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.testclassification.MasterTests; 045import org.junit.jupiter.api.AfterAll; 046import org.junit.jupiter.api.BeforeAll; 047import org.junit.jupiter.api.BeforeEach; 048import org.junit.jupiter.api.Tag; 049import org.junit.jupiter.api.TestTemplate; 050import org.junit.jupiter.params.provider.Arguments; 051 052@Tag(MasterTests.TAG) 053@Tag(LargeTests.TAG) 054@HBaseParameterizedTestTemplate(name = "{index}: async={0}") 055public class TestDisablePeerModification { 056 057 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 058 059 private static volatile CountDownLatch ARRIVE; 060 061 private static volatile CountDownLatch RESUME; 062 063 public static final class MockPeerStorage extends FSReplicationPeerStorage { 064 065 public MockPeerStorage(FileSystem fs, Configuration conf) throws IOException { 066 super(fs, conf); 067 } 068 069 @Override 070 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled, 071 SyncReplicationState syncReplicationState) throws ReplicationException { 072 ARRIVE.countDown(); 073 try { 074 RESUME.await(); 075 } catch (InterruptedException e) { 076 throw new ReplicationException(e); 077 } 078 super.addPeer(peerId, peerConfig, enabled, syncReplicationState); 079 } 080 } 081 082 private final boolean async; 083 084 public TestDisablePeerModification(boolean async) { 085 this.async = async; 086 } 087 088 public static Stream<Arguments> parameters() { 089 return Arrays.asList(true, false).stream().map(Arguments::of); 090 } 091 092 @BeforeAll 093 public static void setUp() throws Exception { 094 UTIL.getConfiguration().setClass(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, 095 MockPeerStorage.class, ReplicationPeerStorage.class); 096 UTIL.startMiniCluster(1); 097 } 098 099 @AfterAll 100 public static void tearDown() throws IOException { 101 UTIL.shutdownMiniCluster(); 102 } 103 104 @BeforeEach 105 public void setUpBeforeTest() throws IOException { 106 UTIL.getAdmin().replicationPeerModificationSwitch(true, true); 107 } 108 109 @TestTemplate 110 public void testDrainProcs() throws Exception { 111 ARRIVE = new CountDownLatch(1); 112 RESUME = new CountDownLatch(1); 113 AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin(); 114 ReplicationPeerConfig rpc = 115 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getRpcConnnectionURI() + "-test") 116 .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build(); 117 CompletableFuture<Void> addFuture = admin.addReplicationPeer("test_peer_" + async, rpc); 118 ARRIVE.await(); 119 120 // we have a pending add peer procedure which has already passed the first state, let's issue a 121 // peer modification switch request to disable peer modification and set drainProcs to true 122 CompletableFuture<Boolean> switchFuture; 123 if (async) { 124 switchFuture = admin.replicationPeerModificationSwitch(false, true); 125 } else { 126 switchFuture = new CompletableFuture<>(); 127 ForkJoinPool.commonPool().submit(() -> { 128 try { 129 switchFuture.complete(UTIL.getAdmin().replicationPeerModificationSwitch(false, true)); 130 } catch (IOException e) { 131 switchFuture.completeExceptionally(e); 132 } 133 }); 134 } 135 136 // sleep a while, the switchFuture should not finish yet 137 // the sleep is necessary as we can not join on the switchFuture, so there is no stable way to 138 // make sure we have already changed the flag at master side, sleep a while is the most suitable 139 // way here 140 Thread.sleep(5000); 141 assertFalse(switchFuture.isDone()); 142 143 // also verify that we can not schedule a new peer modification procedure 144 AddPeerProcedure proc = new AddPeerProcedure("failure", rpc, true); 145 UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc); 146 UTIL.waitFor(15000, () -> proc.isFinished()); 147 // make sure the procedure is failed because of peer modification disabled 148 assertTrue(proc.isFailed()); 149 assertThat(proc.getException().getCause().getMessage(), 150 containsString("Replication peer modification disabled")); 151 152 // sleep a while and check again, make sure the switchFuture is still not done 153 Thread.sleep(5000); 154 assertFalse(switchFuture.isDone()); 155 156 // resume the add peer procedure and wait it done 157 RESUME.countDown(); 158 addFuture.get(); 159 160 // this time the switchFuture should be able to finish 161 assertTrue(switchFuture.get()); 162 } 163}