001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.List;
025import java.util.Optional;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.client.ResultScanner;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.client.TableDescriptor;
034import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
035import org.apache.hadoop.hbase.regionserver.HRegionServer;
036import org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
041import org.junit.jupiter.api.AfterEach;
042import org.junit.jupiter.api.BeforeAll;
043import org.junit.jupiter.api.BeforeEach;
044import org.junit.jupiter.api.Tag;
045import org.junit.jupiter.api.Test;
046import org.junit.jupiter.api.TestInfo;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
051
052/**
053 * Testcase for HBASE-24871.
054 */
055@Tag(ReplicationTests.TAG)
056@Tag(MediumTests.TAG)
057public class TestRefreshRecoveredReplication extends TestReplicationBaseNoBeforeAll {
058
059  private static final Logger LOG = LoggerFactory.getLogger(TestRefreshRecoveredReplication.class);
060
061  private static final int BATCH = 50;
062
063  @BeforeAll
064  public static void setUpBeforeClass() throws Exception {
065    // NUM_SLAVES1 is presumed 2 in below.
066    NUM_SLAVES1 = 2;
067    configureClusters(UTIL1, UTIL2);
068    // replicate slowly
069    CONF1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 100);
070    startClusters();
071  }
072
073  private String testName;
074
075  private TableName tablename;
076  private Table table1;
077  private Table table2;
078
079  @BeforeEach
080  public void setup(TestInfo testInfo) throws Exception {
081    testName = testInfo.getTestMethod().get().getName();
082    setUpBase();
083
084    tablename = TableName.valueOf(testName);
085    TableDescriptor table =
086      TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
087        .newBuilder(famName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
088
089    UTIL1.getAdmin().createTable(table);
090    UTIL2.getAdmin().createTable(table);
091    UTIL1.waitTableAvailable(tablename);
092    UTIL2.waitTableAvailable(tablename);
093    table1 = UTIL1.getConnection().getTable(tablename);
094    table2 = UTIL2.getConnection().getTable(tablename);
095  }
096
097  @AfterEach
098  public void teardown() throws Exception {
099    tearDownBase();
100
101    UTIL1.deleteTableIfAny(tablename);
102    UTIL2.deleteTableIfAny(tablename);
103  }
104
105  @Test
106  public void testReplicationRefreshSource() throws Exception {
107    // put some data
108    for (int i = 0; i < BATCH; i++) {
109      byte[] r = Bytes.toBytes(i);
110      table1.put(new Put(r).addColumn(famName, famName, r));
111    }
112
113    // Kill rs holding table region. There are only TWO servers. We depend on it.
114    List<RegionServerThread> rss = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads();
115    assertEquals(2, rss.size());
116    Optional<RegionServerThread> server = rss.stream()
117      .filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename)))
118      .findAny();
119    assertTrue(server.isPresent());
120    HRegionServer otherServer = rss.get(0).getRegionServer() == server.get().getRegionServer()
121      ? rss.get(1).getRegionServer()
122      : rss.get(0).getRegionServer();
123    server.get().getRegionServer().abort("stopping for test");
124    // waiting for recovered peer to appear.
125    Replication replication = (Replication) otherServer.getReplicationSourceService();
126    UTIL1.waitFor(60000, () -> !replication.getReplicationManager().getOldSources().isEmpty());
127    // Wait on only one server being up.
128    // Have to go back to source here because getLiveRegionServerThreads makes new array each time
129    UTIL1.waitFor(60000,
130      () -> UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1);
131    UTIL1.waitTableAvailable(tablename);
132    LOG.info("Available {}", tablename);
133
134    // disable peer to trigger refreshSources
135    hbaseAdmin.disableReplicationPeer(PEER_ID2);
136    LOG.info("has replicated {} rows before disable peer", checkReplicationData());
137    hbaseAdmin.enableReplicationPeer(PEER_ID2);
138    // waiting to replicate all data to slave
139    UTIL2.waitFor(60000, () -> {
140      int count = checkReplicationData();
141      LOG.info("Waiting all logs pushed to slave. Expected {} , actual {}", BATCH, count);
142      return count == BATCH;
143    });
144  }
145
146  private int checkReplicationData() throws IOException {
147    int count = 0;
148    try (ResultScanner results = table2.getScanner(new Scan().setCaching(BATCH))) {
149      while (results.next() != null) {
150        count++;
151      }
152    }
153    return count;
154  }
155}