001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.empty;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023
024import java.io.IOException;
025import java.util.Collections;
026import java.util.List;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.master.HMaster;
033import org.apache.hadoop.hbase.master.MasterServices;
034import org.apache.hadoop.hbase.master.RegionServerList;
035import org.apache.hadoop.hbase.master.ServerManager;
036import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
037import org.apache.hadoop.hbase.master.replication.AssignReplicationQueuesProcedure;
038import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
039import org.apache.hadoop.hbase.procedure2.Procedure;
040import org.apache.hadoop.hbase.testclassification.LargeTests;
041import org.apache.hadoop.hbase.testclassification.ReplicationTests;
042import org.junit.jupiter.api.AfterAll;
043import org.junit.jupiter.api.AfterEach;
044import org.junit.jupiter.api.BeforeAll;
045import org.junit.jupiter.api.BeforeEach;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048
049import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
050
051import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
053
054/**
055 * Make sure we will wait until all the SCPs finished in RemovePeerProcedure.
056 * <p/>
057 * See HBASE-27109 for more details.
058 */
059@Tag(ReplicationTests.TAG)
060@Tag(LargeTests.TAG)
061public class TestRemovePeerProcedureWaitForSCP extends TestReplicationBaseNoBeforeAll {
062
063  private static final TableName tableName3 = TableName.valueOf("test3");
064
065  private static final String PEER_ID3 = "3";
066
067  private static Table table3;
068
069  private static volatile boolean EMPTY = false;
070
071  public static final class ServerManagerForTest extends ServerManager {
072
073    public ServerManagerForTest(MasterServices master, RegionServerList storage) {
074      super(master, storage);
075    }
076
077    @Override
078    public List<ServerName> getOnlineServersList() {
079      // return no region server to make the procedure hang
080      if (EMPTY) {
081        for (StackTraceElement e : Thread.currentThread().getStackTrace()) {
082          if (e.getClassName().equals(AssignReplicationQueuesProcedure.class.getName())) {
083            return Collections.emptyList();
084          }
085        }
086      }
087      return super.getOnlineServersList();
088    }
089  }
090
091  public static final class HMasterForTest extends HMaster {
092
093    public HMasterForTest(Configuration conf) throws IOException {
094      super(conf);
095    }
096
097    @Override
098    protected ServerManager createServerManager(MasterServices master, RegionServerList storage)
099      throws IOException {
100      setupClusterConnection();
101      return new ServerManagerForTest(master, storage);
102    }
103  }
104
105  @BeforeAll
106  public static void setUpBeforeClass() throws Exception {
107    CONF1.setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
108    configureClusters(UTIL1, UTIL2);
109    startClusters();
110    createTable(tableName3);
111    table3 = connection1.getTable(tableName3);
112  }
113
114  @BeforeEach
115  public void addExtraPeer() throws Exception {
116    // set up two replication peers and only 1 rs to test claim replication queue with multiple
117    // round
118    addPeer(PEER_ID3, tableName3);
119  }
120
121  @AfterEach
122  public void removeExtraPeer() throws Exception {
123    removePeer(PEER_ID3);
124  }
125
126  @AfterAll
127  public static void tearDownAfterClass() throws Exception {
128    Closeables.close(table3, true);
129  }
130
131  @Test
132  public void testWait() throws Exception {
133    // disable the peers
134    hbaseAdmin.disableReplicationPeer(PEER_ID2);
135    hbaseAdmin.disableReplicationPeer(PEER_ID3);
136
137    // put some data
138    UTIL1.loadTable(htable1, famName);
139    UTIL1.loadTable(table3, famName);
140
141    EMPTY = true;
142    UTIL1.getMiniHBaseCluster().stopRegionServer(0).join();
143    UTIL1.getMiniHBaseCluster().startRegionServer();
144
145    // since there is no active region server to get the replication queue, the procedure should be
146    // in WAITING_TIMEOUT state for most time to retry
147    HMaster master = UTIL1.getMiniHBaseCluster().getMaster();
148    UTIL1.waitFor(30000,
149      () -> master.getProcedures().stream()
150        .filter(p -> p instanceof AssignReplicationQueuesProcedure)
151        .anyMatch(p -> p.getState() == ProcedureState.WAITING_TIMEOUT));
152
153    // call remove replication peer, and make sure it will be stuck in the POST_PEER_MODIFICATION
154    // state.
155    hbaseAdmin.removeReplicationPeerAsync(PEER_ID3);
156    UTIL1.waitFor(30000,
157      () -> master.getProcedures().stream().filter(p -> p instanceof RemovePeerProcedure)
158        .anyMatch(p -> ((RemovePeerProcedure) p).getCurrentStateId()
159            == PeerModificationState.POST_PEER_MODIFICATION_VALUE));
160    Thread.sleep(5000);
161    assertEquals(PeerModificationState.POST_PEER_MODIFICATION_VALUE,
162      ((RemovePeerProcedure) master.getProcedures().stream()
163        .filter(p -> p instanceof RemovePeerProcedure).findFirst().get()).getCurrentStateId());
164    EMPTY = false;
165    // wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP
166    UTIL1.waitFor(30000, () -> master.getProcedures().stream()
167      .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
168    // the RemovePeerProcedure should have also finished
169    UTIL1.waitFor(30000, () -> master.getProcedures().stream()
170      .filter(p -> p instanceof RemovePeerProcedure).allMatch(Procedure::isSuccess));
171    // make sure there is no remaining replication queues for PEER_ID3
172    assertThat(master.getReplicationPeerManager().getQueueStorage().listAllQueueIds(PEER_ID3),
173      empty());
174  }
175}