001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.client.Result;
032import org.apache.hadoop.hbase.client.ResultScanner;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
037import org.apache.hadoop.hbase.regionserver.HRegionServer;
038import org.apache.hadoop.hbase.replication.TestReplicationBase;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.testclassification.ReplicationTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
043import org.junit.After;
044import org.junit.AfterClass;
045import org.junit.Assert;
046import org.junit.Before;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Rule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.rules.TestName;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
057
058/**
059 * Testcase for HBASE-24871.
060 */
061@Category({ ReplicationTests.class, MediumTests.class })
062public class TestRefreshRecoveredReplication extends TestReplicationBase {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestRefreshRecoveredReplication.class);
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestRefreshRecoveredReplication.class);
069
070  private static final int BATCH = 50;
071
072  @Rule
073  public TestName name = new TestName();
074
075  private TableName tablename;
076  private Table table1;
077  private Table table2;
078
079  @BeforeClass
080  public static void setUpBeforeClass() throws Exception {
081    // NUM_SLAVES1 is presumed 2 in below.
082    NUM_SLAVES1 = 2;
083    // replicate slowly
084    Configuration conf1 = UTIL1.getConfiguration();
085    conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 100);
086    TestReplicationBase.setUpBeforeClass();
087  }
088
089  @AfterClass
090  public static void tearDownAfterClass() throws Exception {
091    TestReplicationBase.tearDownAfterClass();
092  }
093
094  @Before
095  public void setup() throws Exception {
096    setUpBase();
097
098    tablename = TableName.valueOf(name.getMethodName());
099    TableDescriptor table =
100      TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
101        .newBuilder(famName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
102
103    UTIL1.getAdmin().createTable(table);
104    UTIL2.getAdmin().createTable(table);
105    UTIL1.waitTableAvailable(tablename);
106    UTIL2.waitTableAvailable(tablename);
107    table1 = UTIL1.getConnection().getTable(tablename);
108    table2 = UTIL2.getConnection().getTable(tablename);
109  }
110
111  @After
112  public void teardown() throws Exception {
113    tearDownBase();
114
115    UTIL1.deleteTableIfAny(tablename);
116    UTIL2.deleteTableIfAny(tablename);
117  }
118
119  @Test
120  public void testReplicationRefreshSource() throws Exception {
121    // put some data
122    for (int i = 0; i < BATCH; i++) {
123      byte[] r = Bytes.toBytes(i);
124      table1.put(new Put(r).addColumn(famName, famName, r));
125    }
126
127    // Kill rs holding table region. There are only TWO servers. We depend on it.
128    List<RegionServerThread> rss = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads();
129    assertEquals(2, rss.size());
130    Optional<RegionServerThread> server = rss.stream()
131      .filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename)))
132      .findAny();
133    Assert.assertTrue(server.isPresent());
134    HRegionServer otherServer = rss.get(0).getRegionServer() == server.get().getRegionServer()
135      ? rss.get(1).getRegionServer()
136      : rss.get(0).getRegionServer();
137    server.get().getRegionServer().abort("stopping for test");
138    // waiting for recovered peer to appear.
139    Replication replication = (Replication) otherServer.getReplicationSourceService();
140    UTIL1.waitFor(60000, () -> !replication.getReplicationManager().getOldSources().isEmpty());
141    // Wait on only one server being up.
142    // Have to go back to source here because getLiveRegionServerThreads makes new array each time
143    UTIL1.waitFor(60000,
144      () -> UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1);
145    UTIL1.waitTableAvailable(tablename);
146    LOG.info("Available {}", tablename);
147
148    // disable peer to trigger refreshSources
149    hbaseAdmin.disableReplicationPeer(PEER_ID2);
150    LOG.info("has replicated {} rows before disable peer", checkReplicationData());
151    hbaseAdmin.enableReplicationPeer(PEER_ID2);
152    // waiting to replicate all data to slave
153    UTIL2.waitFor(60000, () -> {
154      int count = checkReplicationData();
155      LOG.info("Waiting all logs pushed to slave. Expected {} , actual {}", BATCH, count);
156      return count == BATCH;
157    });
158  }
159
160  private int checkReplicationData() throws IOException {
161    int count = 0;
162    ResultScanner results = table2.getScanner(new Scan().setCaching(BATCH));
163    for (Result r : results) {
164      count++;
165    }
166    return count;
167  }
168}