001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.client.Result; 032import org.apache.hadoop.hbase.client.ResultScanner; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 037import org.apache.hadoop.hbase.regionserver.HRegionServer; 038import org.apache.hadoop.hbase.replication.TestReplicationBase; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.testclassification.ReplicationTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 043import org.junit.After; 044import org.junit.AfterClass; 045import org.junit.Assert; 046import org.junit.Before; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Rule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.rules.TestName; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 057 058/** 059 * Testcase for HBASE-24871. 060 */ 061@Category({ ReplicationTests.class, MediumTests.class }) 062public class TestRefreshRecoveredReplication extends TestReplicationBase { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestRefreshRecoveredReplication.class); 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestRefreshRecoveredReplication.class); 069 070 private static final int BATCH = 50; 071 072 @Rule 073 public TestName name = new TestName(); 074 075 private TableName tablename; 076 private Table table1; 077 private Table table2; 078 079 @BeforeClass 080 public static void setUpBeforeClass() throws Exception { 081 // NUM_SLAVES1 is presumed 2 in below. 082 NUM_SLAVES1 = 2; 083 // replicate slowly 084 Configuration conf1 = UTIL1.getConfiguration(); 085 conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 100); 086 TestReplicationBase.setUpBeforeClass(); 087 } 088 089 @AfterClass 090 public static void tearDownAfterClass() throws Exception { 091 TestReplicationBase.tearDownAfterClass(); 092 } 093 094 @Before 095 public void setup() throws Exception { 096 setUpBase(); 097 098 tablename = TableName.valueOf(name.getMethodName()); 099 TableDescriptor table = 100 TableDescriptorBuilder.newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder 101 .newBuilder(famName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 102 103 UTIL1.getAdmin().createTable(table); 104 UTIL2.getAdmin().createTable(table); 105 UTIL1.waitTableAvailable(tablename); 106 UTIL2.waitTableAvailable(tablename); 107 table1 = UTIL1.getConnection().getTable(tablename); 108 table2 = UTIL2.getConnection().getTable(tablename); 109 } 110 111 @After 112 public void teardown() throws Exception { 113 tearDownBase(); 114 115 UTIL1.deleteTableIfAny(tablename); 116 UTIL2.deleteTableIfAny(tablename); 117 } 118 119 @Test 120 public void testReplicationRefreshSource() throws Exception { 121 // put some data 122 for (int i = 0; i < BATCH; i++) { 123 byte[] r = Bytes.toBytes(i); 124 table1.put(new Put(r).addColumn(famName, famName, r)); 125 } 126 127 // Kill rs holding table region. There are only TWO servers. We depend on it. 128 List<RegionServerThread> rss = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads(); 129 assertEquals(2, rss.size()); 130 Optional<RegionServerThread> server = rss.stream() 131 .filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename))) 132 .findAny(); 133 Assert.assertTrue(server.isPresent()); 134 HRegionServer otherServer = rss.get(0).getRegionServer() == server.get().getRegionServer() 135 ? rss.get(1).getRegionServer() 136 : rss.get(0).getRegionServer(); 137 server.get().getRegionServer().abort("stopping for test"); 138 // waiting for recovered peer to appear. 139 Replication replication = (Replication) otherServer.getReplicationSourceService(); 140 UTIL1.waitFor(60000, () -> !replication.getReplicationManager().getOldSources().isEmpty()); 141 // Wait on only one server being up. 142 // Have to go back to source here because getLiveRegionServerThreads makes new array each time 143 UTIL1.waitFor(60000, 144 () -> UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1); 145 UTIL1.waitTableAvailable(tablename); 146 LOG.info("Available {}", tablename); 147 148 // disable peer to trigger refreshSources 149 hbaseAdmin.disableReplicationPeer(PEER_ID2); 150 LOG.info("has replicated {} rows before disable peer", checkReplicationData()); 151 hbaseAdmin.enableReplicationPeer(PEER_ID2); 152 // waiting to replicate all data to slave 153 UTIL2.waitFor(60000, () -> { 154 int count = checkReplicationData(); 155 LOG.info("Waiting all logs pushed to slave. Expected {} , actual {}", BATCH, count); 156 return count == BATCH; 157 }); 158 } 159 160 private int checkReplicationData() throws IOException { 161 int count = 0; 162 ResultScanner results = table2.getScanner(new Scan().setCaching(BATCH)); 163 for (Result r : results) { 164 count++; 165 } 166 return count; 167 } 168}