001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.Optional; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.ResultScanner; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.client.TableDescriptor; 034import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 035import org.apache.hadoop.hbase.regionserver.HRegionServer; 036import org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.junit.jupiter.api.AfterEach; 042import org.junit.jupiter.api.BeforeAll; 043import org.junit.jupiter.api.BeforeEach; 044import org.junit.jupiter.api.Tag; 045import org.junit.jupiter.api.Test; 046import org.junit.jupiter.api.TestInfo; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 051 052/** 053 * Testcase for HBASE-24871. 054 */ 055@Tag(ReplicationTests.TAG) 056@Tag(MediumTests.TAG) 057public class TestRefreshRecoveredReplication extends TestReplicationBaseNoBeforeAll { 058 059 private static final Logger LOG = LoggerFactory.getLogger(TestRefreshRecoveredReplication.class); 060 061 private static final int BATCH = 50; 062 063 @BeforeAll 064 public static void setUpBeforeClass() throws Exception { 065 // NUM_SLAVES1 is presumed 2 in below. 066 NUM_SLAVES1 = 2; 067 configureClusters(UTIL1, UTIL2); 068 // replicate slowly 069 CONF1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 100); 070 startClusters(); 071 } 072 073 private String testName; 074 075 private TableName tablename; 076 private Table table1; 077 private Table table2; 078 079 @BeforeEach 080 public void setup(TestInfo testInfo) throws Exception { 081 testName = testInfo.getTestMethod().get().getName(); 082 setUpBase(); 083 084 tablename = TableName.valueOf(testName); 085 TableDescriptor table = 086 TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder 087 .newBuilder(famName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 088 089 UTIL1.getAdmin().createTable(table); 090 UTIL2.getAdmin().createTable(table); 091 UTIL1.waitTableAvailable(tablename); 092 UTIL2.waitTableAvailable(tablename); 093 table1 = UTIL1.getConnection().getTable(tablename); 094 table2 = UTIL2.getConnection().getTable(tablename); 095 } 096 097 @AfterEach 098 public void teardown() throws Exception { 099 tearDownBase(); 100 101 UTIL1.deleteTableIfAny(tablename); 102 UTIL2.deleteTableIfAny(tablename); 103 } 104 105 @Test 106 public void testReplicationRefreshSource() throws Exception { 107 // put some data 108 for (int i = 0; i < BATCH; i++) { 109 byte[] r = Bytes.toBytes(i); 110 table1.put(new Put(r).addColumn(famName, famName, r)); 111 } 112 113 // Kill rs holding table region. There are only TWO servers. We depend on it. 114 List<RegionServerThread> rss = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads(); 115 assertEquals(2, rss.size()); 116 Optional<RegionServerThread> server = rss.stream() 117 .filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename))) 118 .findAny(); 119 assertTrue(server.isPresent()); 120 HRegionServer otherServer = rss.get(0).getRegionServer() == server.get().getRegionServer() 121 ? rss.get(1).getRegionServer() 122 : rss.get(0).getRegionServer(); 123 server.get().getRegionServer().abort("stopping for test"); 124 // waiting for recovered peer to appear. 125 Replication replication = (Replication) otherServer.getReplicationSourceService(); 126 UTIL1.waitFor(60000, () -> !replication.getReplicationManager().getOldSources().isEmpty()); 127 // Wait on only one server being up. 128 // Have to go back to source here because getLiveRegionServerThreads makes new array each time 129 UTIL1.waitFor(60000, 130 () -> UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1); 131 UTIL1.waitTableAvailable(tablename); 132 LOG.info("Available {}", tablename); 133 134 // disable peer to trigger refreshSources 135 hbaseAdmin.disableReplicationPeer(PEER_ID2); 136 LOG.info("has replicated {} rows before disable peer", checkReplicationData()); 137 hbaseAdmin.enableReplicationPeer(PEER_ID2); 138 // waiting to replicate all data to slave 139 UTIL2.waitFor(60000, () -> { 140 int count = checkReplicationData(); 141 LOG.info("Waiting all logs pushed to slave. Expected {} , actual {}", BATCH, count); 142 return count == BATCH; 143 }); 144 } 145 146 private int checkReplicationData() throws IOException { 147 int count = 0; 148 try (ResultScanner results = table2.getScanner(new Scan().setCaching(BATCH))) { 149 while (results.next() != null) { 150 count++; 151 } 152 } 153 return count; 154 } 155}