001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
021import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.CountDownLatch;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.StartTestingClusterOption;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.master.HMaster;
041import org.apache.hadoop.hbase.master.MasterServices;
042import org.apache.hadoop.hbase.master.RegionServerList;
043import org.apache.hadoop.hbase.master.ServerManager;
044import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
045import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
046import org.apache.hadoop.hbase.procedure2.Procedure;
047import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
048import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
049import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
050import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
051import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
052import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
053import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
054import org.apache.hadoop.hbase.testclassification.MasterTests;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.junit.jupiter.api.AfterAll;
058import org.junit.jupiter.api.AfterEach;
059import org.junit.jupiter.api.BeforeAll;
060import org.junit.jupiter.api.Tag;
061import org.junit.jupiter.api.Test;
062
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
064
065@Tag(MasterTests.TAG)
066@Tag(MediumTests.TAG)
067public class TestMigrateReplicationQueueFromZkToTableProcedure {
068
069  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
070
071  public static final class HMasterForTest extends HMaster {
072
073    public HMasterForTest(Configuration conf) throws IOException {
074      super(conf);
075    }
076
077    @Override
078    protected ServerManager createServerManager(MasterServices master, RegionServerList storage)
079      throws IOException {
080      setupClusterConnection();
081      return new ServerManagerForTest(master, storage);
082    }
083  }
084
085  private static final ConcurrentMap<ServerName, ServerMetrics> EXTRA_REGION_SERVERS =
086    new ConcurrentHashMap<>();
087
088  public static final class ServerManagerForTest extends ServerManager {
089
090    public ServerManagerForTest(MasterServices master, RegionServerList storage) {
091      super(master, storage);
092    }
093
094    @Override
095    public Map<ServerName, ServerMetrics> getOnlineServers() {
096      Map<ServerName, ServerMetrics> map = new HashMap<>(super.getOnlineServers());
097      map.putAll(EXTRA_REGION_SERVERS);
098      return map;
099    }
100  }
101
102  @BeforeAll
103  public static void setupCluster() throws Exception {
104    // one hour, to make sure it will not run during the test
105    UTIL.getConfiguration().setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 60 * 60 * 1000);
106    UTIL.startMiniCluster(
107      StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build());
108  }
109
110  @AfterAll
111  public static void cleanupTest() throws Exception {
112    UTIL.shutdownMiniCluster();
113  }
114
115  private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
116    return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
117  }
118
119  @AfterEach
120  public void tearDown() throws Exception {
121    Admin admin = UTIL.getAdmin();
122    for (ReplicationPeerDescription pd : admin.listReplicationPeers()) {
123      admin.removeReplicationPeer(pd.getPeerId());
124    }
125  }
126
127  private static CountDownLatch PEER_PROC_ARRIVE;
128
129  private static CountDownLatch PEER_PROC_RESUME;
130
131  public static final class FakePeerProcedure extends Procedure<MasterProcedureEnv>
132    implements PeerProcedureInterface {
133
134    private String peerId;
135
136    public FakePeerProcedure() {
137    }
138
139    public FakePeerProcedure(String peerId) {
140      this.peerId = peerId;
141    }
142
143    @Override
144    public String getPeerId() {
145      return peerId;
146    }
147
148    @Override
149    public PeerOperationType getPeerOperationType() {
150      return PeerOperationType.UPDATE_CONFIG;
151    }
152
153    @Override
154    protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
155      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
156      PEER_PROC_ARRIVE.countDown();
157      PEER_PROC_RESUME.await();
158      return null;
159    }
160
161    @Override
162    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
163      throw new UnsupportedOperationException();
164    }
165
166    @Override
167    protected boolean abort(MasterProcedureEnv env) {
168      return false;
169    }
170
171    @Override
172    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
173    }
174
175    @Override
176    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
177    }
178  }
179
180  @Test
181  public void testWaitUntilNoPeerProcedure() throws Exception {
182    PEER_PROC_ARRIVE = new CountDownLatch(1);
183    PEER_PROC_RESUME = new CountDownLatch(1);
184    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
185    procExec.submitProcedure(new FakePeerProcedure("1"));
186    PEER_PROC_ARRIVE.await();
187    MigrateReplicationQueueFromZkToTableProcedure proc =
188      new MigrateReplicationQueueFromZkToTableProcedure();
189    procExec.submitProcedure(proc);
190    // make sure we will wait until there is no peer related procedures before proceeding
191    UTIL.waitFor(30000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);
192    // continue and make sure we can finish successfully
193    PEER_PROC_RESUME.countDown();
194    UTIL.waitFor(30000, () -> proc.isSuccess());
195  }
196
197  // make sure we will disable replication peers while migrating
198  // and also tests disable/enable replication log cleaner and wait for region server upgrading
199  @Test
200  public void testDisablePeerAndWaitStates() throws Exception {
201    String peerId = "2";
202    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
203      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
204      .setReplicateAllUserTables(true).build();
205    UTIL.getAdmin().addReplicationPeer(peerId, rpc);
206    // put a fake region server to simulate that there are still region servers with older version
207    ServerMetrics metrics = mock(ServerMetrics.class);
208    when(metrics.getVersion()).thenReturn("2.5.0");
209    EXTRA_REGION_SERVERS
210      .put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);
211
212    ReplicationLogCleanerBarrier barrier =
213      UTIL.getHBaseCluster().getMaster().getReplicationLogCleanerBarrier();
214    assertTrue(barrier.start());
215
216    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
217
218    MigrateReplicationQueueFromZkToTableProcedure proc =
219      new MigrateReplicationQueueFromZkToTableProcedure();
220    procExec.submitProcedure(proc);
221
222    Thread.sleep(5000);
223    // make sure we are still waiting for replication log cleaner quit
224    assertEquals(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER.getNumber(),
225      proc.getCurrentStateId());
226    barrier.stop();
227
228    // wait until we reach the wait upgrading state
229    UTIL.waitFor(30000,
230      () -> proc.getCurrentStateId()
231          == MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING.getNumber()
232        && proc.getState() == ProcedureState.WAITING_TIMEOUT);
233    // make sure the peer is disabled for migrating
234    assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
235    // make sure the replication log cleaner is disabled
236    assertFalse(barrier.start());
237
238    // the procedure should finish successfully
239    EXTRA_REGION_SERVERS.clear();
240    UTIL.waitFor(30000, () -> proc.isSuccess());
241
242    // make sure the peer is enabled again
243    assertTrue(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
244    // make sure the replication log cleaner is enabled again
245    assertTrue(barrier.start());
246    barrier.stop();
247  }
248}