001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Table;
028import org.apache.hadoop.hbase.master.HMaster;
029import org.apache.hadoop.hbase.master.MasterServices;
030import org.apache.hadoop.hbase.master.RegionServerList;
031import org.apache.hadoop.hbase.master.ServerManager;
032import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
033import org.apache.hadoop.hbase.master.replication.AssignReplicationQueuesProcedure;
034import org.apache.hadoop.hbase.procedure2.Procedure;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.testclassification.ReplicationTests;
037import org.junit.jupiter.api.AfterAll;
038import org.junit.jupiter.api.AfterEach;
039import org.junit.jupiter.api.BeforeAll;
040import org.junit.jupiter.api.BeforeEach;
041import org.junit.jupiter.api.Tag;
042import org.junit.jupiter.api.Test;
043
044import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
047
048/**
049 * In HBASE-26029, we reimplement the claim queue operation with proc-v2 and make it a step in SCP,
050 * this is a UT to make sure the {@link AssignReplicationQueuesProcedure} works correctly.
051 */
052@Tag(ReplicationTests.TAG)
053@Tag(LargeTests.TAG)
054public class TestClaimReplicationQueue extends TestReplicationBaseNoBeforeAll {
055
056  private static final TableName tableName3 = TableName.valueOf("test3");
057
058  private static final String PEER_ID3 = "3";
059
060  private static Table table3;
061
062  private static Table table4;
063
064  private static volatile boolean EMPTY = false;
065
066  public static final class ServerManagerForTest extends ServerManager {
067
068    public ServerManagerForTest(MasterServices master, RegionServerList storage) {
069      super(master, storage);
070    }
071
072    @Override
073    public List<ServerName> getOnlineServersList() {
074      // return no region server to make the procedure hang
075      if (EMPTY) {
076        for (StackTraceElement e : Thread.currentThread().getStackTrace()) {
077          if (e.getClassName().equals(AssignReplicationQueuesProcedure.class.getName())) {
078            return Collections.emptyList();
079          }
080        }
081      }
082      return super.getOnlineServersList();
083    }
084  }
085
086  public static final class HMasterForTest extends HMaster {
087
088    public HMasterForTest(Configuration conf) throws IOException {
089      super(conf);
090    }
091
092    @Override
093    protected ServerManager createServerManager(MasterServices master, RegionServerList storage)
094      throws IOException {
095      setupClusterConnection();
096      return new ServerManagerForTest(master, storage);
097    }
098  }
099
100  @BeforeAll
101  public static void setUpBeforeClass() throws Exception {
102    CONF1.setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
103    configureClusters(UTIL1, UTIL2);
104    startClusters();
105    createTable(tableName3);
106    table3 = connection1.getTable(tableName3);
107    table4 = connection2.getTable(tableName3);
108  }
109
110  @AfterAll
111  public static void tearDownAfterClass() throws Exception {
112    Closeables.close(table3, true);
113    Closeables.close(table4, true);
114  }
115
116  @BeforeEach
117  public void addExtraPeer() throws Exception {
118    // set up two replication peers and only 1 rs to test claim replication queue with multiple
119    // round
120    addPeer(PEER_ID3, tableName3);
121  }
122
123  @AfterEach
124  public void removeExtraPeer() throws Exception {
125    removePeer(PEER_ID3);
126  }
127
128  @Test
129  public void testClaim() throws Exception {
130    // disable the peers
131    hbaseAdmin.disableReplicationPeer(PEER_ID2);
132    hbaseAdmin.disableReplicationPeer(PEER_ID3);
133
134    // put some data
135    int count1 = UTIL1.loadTable(htable1, famName);
136    int count2 = UTIL1.loadTable(table3, famName);
137
138    EMPTY = true;
139    UTIL1.getMiniHBaseCluster().stopRegionServer(0).join();
140    UTIL1.getMiniHBaseCluster().startRegionServer();
141
142    // since there is no active region server to get the replication queue, the procedure should be
143    // in WAITING_TIMEOUT state for most time to retry
144    HMaster master = UTIL1.getMiniHBaseCluster().getMaster();
145    UTIL1.waitFor(30000,
146      () -> master.getProcedures().stream()
147        .filter(p -> p instanceof AssignReplicationQueuesProcedure)
148        .anyMatch(p -> p.getState() == ProcedureState.WAITING_TIMEOUT));
149
150    hbaseAdmin.enableReplicationPeer(PEER_ID2);
151    hbaseAdmin.enableReplicationPeer(PEER_ID3);
152
153    EMPTY = false;
154    // wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP
155    UTIL1.waitFor(30000, () -> master.getProcedures().stream()
156      .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
157
158    // we should get all the data in the target cluster
159    waitForReplication(htable2, count1, NB_RETRIES);
160    waitForReplication(table4, count2, NB_RETRIES);
161  }
162}