001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.List;
023import java.util.Optional;
024import java.util.stream.Collectors;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.client.Result;
033import org.apache.hadoop.hbase.client.ResultScanner;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
038import org.apache.hadoop.hbase.regionserver.HRegionServer;
039import org.apache.hadoop.hbase.replication.TestReplicationBase;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.testclassification.ReplicationTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
044import org.junit.After;
045import org.junit.AfterClass;
046import org.junit.Assert;
047import org.junit.Before;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Rule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.rules.TestName;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
058import static org.junit.Assert.assertEquals;
059
060/**
061 * Testcase for HBASE-24871.
062 */
063@Category({ ReplicationTests.class, MediumTests.class })
064public class TestRefreshRecoveredReplication extends TestReplicationBase {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestRefreshRecoveredReplication.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestRefreshRecoveredReplication.class);
071
072  private static final int BATCH = 50;
073
074  @Rule
075  public TestName name = new TestName();
076
077  private TableName tablename;
078  private Table table1;
079  private Table table2;
080
081  @BeforeClass
082  public static void setUpBeforeClass() throws Exception {
083    // NUM_SLAVES1 is presumed 2 in below.
084    NUM_SLAVES1 = 2;
085    // replicate slowly
086    Configuration conf1 = UTIL1.getConfiguration();
087    conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 100);
088    TestReplicationBase.setUpBeforeClass();
089  }
090
091  @AfterClass
092  public static void tearDownAfterClass() throws Exception {
093    TestReplicationBase.tearDownAfterClass();
094  }
095
096  @Before
097  public void setup() throws Exception {
098    setUpBase();
099
100    tablename = TableName.valueOf(name.getMethodName());
101    TableDescriptor table = TableDescriptorBuilder.newBuilder(tablename)
102        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
103            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
104        .build();
105
106    UTIL1.getAdmin().createTable(table);
107    UTIL2.getAdmin().createTable(table);
108    UTIL1.waitTableAvailable(tablename);
109    UTIL2.waitTableAvailable(tablename);
110    table1 = UTIL1.getConnection().getTable(tablename);
111    table2 = UTIL2.getConnection().getTable(tablename);
112  }
113
114  @After
115  public void teardown() throws Exception {
116    tearDownBase();
117
118    UTIL1.deleteTableIfAny(tablename);
119    UTIL2.deleteTableIfAny(tablename);
120  }
121
122  @Test
123  public void testReplicationRefreshSource() throws Exception {
124    // put some data
125    for (int i = 0; i < BATCH; i++) {
126      byte[] r = Bytes.toBytes(i);
127      table1.put(new Put(r).addColumn(famName, famName, r));
128    }
129
130    // Kill rs holding table region. There are only TWO servers. We depend on it.
131    List<RegionServerThread> rss = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads();
132    assertEquals(2, rss.size());
133    Optional<RegionServerThread> server = rss.stream()
134        .filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename)))
135        .findAny();
136    Assert.assertTrue(server.isPresent());
137    HRegionServer otherServer = rss.get(0).getRegionServer() == server.get().getRegionServer()?
138      rss.get(1).getRegionServer(): rss.get(0).getRegionServer();
139    server.get().getRegionServer().abort("stopping for test");
140    // waiting for recovered peer to appear.
141    Replication replication = (Replication)otherServer.getReplicationSourceService();
142    UTIL1.waitFor(60000, () -> !replication.getReplicationManager().getOldSources().isEmpty());
143    // Wait on only one server being up.
144    UTIL1.waitFor(60000, () ->
145      // Have to go back to source here because getLiveRegionServerThreads makes new array each time
146      UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1);
147    UTIL1.waitTableAvailable(tablename);
148    LOG.info("Available {}", tablename);
149
150    // disable peer to trigger refreshSources
151    hbaseAdmin.disableReplicationPeer(PEER_ID2);
152    LOG.info("has replicated {} rows before disable peer", checkReplicationData());
153    hbaseAdmin.enableReplicationPeer(PEER_ID2);
154    // waiting to replicate all data to slave
155    UTIL2.waitFor(60000, () -> {
156      int count = checkReplicationData();
157      LOG.info("Waiting all logs pushed to slave. Expected {} , actual {}", BATCH, count);
158      return count == BATCH;
159    });
160  }
161
162  private int checkReplicationData() throws IOException {
163    int count = 0;
164    ResultScanner results = table2.getScanner(new Scan().setCaching(BATCH));
165    for (Result r : results) {
166      count++;
167    }
168    return count;
169  }
170}