001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Table;
029import org.apache.hadoop.hbase.master.HMaster;
030import org.apache.hadoop.hbase.master.MasterServices;
031import org.apache.hadoop.hbase.master.ServerManager;
032import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
033import org.apache.hadoop.hbase.master.replication.ClaimReplicationQueuesProcedure;
034import org.apache.hadoop.hbase.procedure2.Procedure;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.testclassification.ReplicationTests;
037import org.junit.AfterClass;
038import org.junit.BeforeClass;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042
043import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
046
047/**
048 * In HBASE-26029, we reimplement the claim queue operation with proc-v2 and make it a step in SCP,
049 * this is a UT to make sure the {@link ClaimReplicationQueuesProcedure} works correctly.
050 */
051@Category({ ReplicationTests.class, LargeTests.class })
052public class TestClaimReplicationQueue extends TestReplicationBase {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestClaimReplicationQueue.class);
057
058  private static final TableName tableName3 = TableName.valueOf("test3");
059
060  private static final String PEER_ID3 = "3";
061
062  private static Table table3;
063
064  private static Table table4;
065
066  private static volatile boolean EMPTY = false;
067
068  public static final class ServerManagerForTest extends ServerManager {
069
070    public ServerManagerForTest(MasterServices master) {
071      super(master);
072    }
073
074    @Override
075    public List<ServerName> getOnlineServersList() {
076      // return no region server to make the procedure hang
077      if (EMPTY) {
078        for (StackTraceElement e : Thread.currentThread().getStackTrace()) {
079          if (e.getClassName().equals(ClaimReplicationQueuesProcedure.class.getName())) {
080            return Collections.emptyList();
081          }
082        }
083      }
084      return super.getOnlineServersList();
085    }
086  }
087
088  public static final class HMasterForTest extends HMaster {
089
090    public HMasterForTest(Configuration conf) throws IOException {
091      super(conf);
092    }
093
094    @Override
095    protected ServerManager createServerManager(MasterServices master) throws IOException {
096      setupClusterConnection();
097      return new ServerManagerForTest(master);
098    }
099  }
100
101  @BeforeClass
102  public static void setUpBeforeClass() throws Exception {
103    CONF1.setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
104    TestReplicationBase.setUpBeforeClass();
105    createTable(tableName3);
106    table3 = connection1.getTable(tableName3);
107    table4 = connection2.getTable(tableName3);
108  }
109
110  @AfterClass
111  public static void tearDownAfterClass() throws Exception {
112    Closeables.close(table3, true);
113    Closeables.close(table4, true);
114    TestReplicationBase.tearDownAfterClass();
115  }
116
117  @Override
118  public void setUpBase() throws Exception {
119    super.setUpBase();
120    // set up two replication peers and only 1 rs to test claim replication queue with multiple
121    // round
122    addPeer(PEER_ID3, tableName3);
123  }
124
125  @Override
126  public void tearDownBase() throws Exception {
127    super.tearDownBase();
128    removePeer(PEER_ID3);
129  }
130
131  @Test
132  public void testClaim() throws Exception {
133    // disable the peers
134    hbaseAdmin.disableReplicationPeer(PEER_ID2);
135    hbaseAdmin.disableReplicationPeer(PEER_ID3);
136
137    // put some data
138    int count1 = UTIL1.loadTable(htable1, famName);
139    int count2 = UTIL1.loadTable(table3, famName);
140
141    EMPTY = true;
142    UTIL1.getMiniHBaseCluster().stopRegionServer(0).join();
143    UTIL1.getMiniHBaseCluster().startRegionServer();
144
145    // since there is no active region server to get the replication queue, the procedure should be
146    // in WAITING_TIMEOUT state for most time to retry
147    HMaster master = UTIL1.getMiniHBaseCluster().getMaster();
148    UTIL1.waitFor(30000,
149      () -> master.getProcedures().stream()
150        .filter(p -> p instanceof ClaimReplicationQueuesProcedure)
151        .anyMatch(p -> p.getState() == ProcedureState.WAITING_TIMEOUT));
152
153    hbaseAdmin.enableReplicationPeer(PEER_ID2);
154    hbaseAdmin.enableReplicationPeer(PEER_ID3);
155
156    EMPTY = false;
157    // wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP
158    UTIL1.waitFor(30000, () -> master.getProcedures().stream()
159      .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
160
161    // we should get all the data in the target cluster
162    waitForReplication(htable2, count1, NB_RETRIES);
163    waitForReplication(table4, count2, NB_RETRIES);
164  }
165}