001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.Collection; 022import java.util.List; 023import java.util.Optional; 024import java.util.stream.Collectors; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.client.Result; 033import org.apache.hadoop.hbase.client.ResultScanner; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.regionserver.HRegionServer; 039import org.apache.hadoop.hbase.replication.TestReplicationBase; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.testclassification.ReplicationTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 044import org.junit.After; 045import org.junit.AfterClass; 046import org.junit.Assert; 047import org.junit.Before; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Rule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.junit.rules.TestName; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 058import static org.junit.Assert.assertEquals; 059 060/** 061 * Testcase for HBASE-24871. 062 */ 063@Category({ ReplicationTests.class, MediumTests.class }) 064public class TestRefreshRecoveredReplication extends TestReplicationBase { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestRefreshRecoveredReplication.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestRefreshRecoveredReplication.class); 071 072 private static final int BATCH = 50; 073 074 @Rule 075 public TestName name = new TestName(); 076 077 private TableName tablename; 078 private Table table1; 079 private Table table2; 080 081 @BeforeClass 082 public static void setUpBeforeClass() throws Exception { 083 // NUM_SLAVES1 is presumed 2 in below. 084 NUM_SLAVES1 = 2; 085 // replicate slowly 086 Configuration conf1 = UTIL1.getConfiguration(); 087 conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 100); 088 TestReplicationBase.setUpBeforeClass(); 089 } 090 091 @AfterClass 092 public static void tearDownAfterClass() throws Exception { 093 TestReplicationBase.tearDownAfterClass(); 094 } 095 096 @Before 097 public void setup() throws Exception { 098 setUpBase(); 099 100 tablename = TableName.valueOf(name.getMethodName()); 101 TableDescriptor table = TableDescriptorBuilder.newBuilder(tablename) 102 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 103 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 104 .build(); 105 106 UTIL1.getAdmin().createTable(table); 107 UTIL2.getAdmin().createTable(table); 108 UTIL1.waitTableAvailable(tablename); 109 UTIL2.waitTableAvailable(tablename); 110 table1 = UTIL1.getConnection().getTable(tablename); 111 table2 = UTIL2.getConnection().getTable(tablename); 112 } 113 114 @After 115 public void teardown() throws Exception { 116 tearDownBase(); 117 118 UTIL1.deleteTableIfAny(tablename); 119 UTIL2.deleteTableIfAny(tablename); 120 } 121 122 @Test 123 public void testReplicationRefreshSource() throws Exception { 124 // put some data 125 for (int i = 0; i < BATCH; i++) { 126 byte[] r = Bytes.toBytes(i); 127 table1.put(new Put(r).addColumn(famName, famName, r)); 128 } 129 130 // Kill rs holding table region. There are only TWO servers. We depend on it. 131 List<RegionServerThread> rss = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads(); 132 assertEquals(2, rss.size()); 133 Optional<RegionServerThread> server = rss.stream() 134 .filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename))) 135 .findAny(); 136 Assert.assertTrue(server.isPresent()); 137 HRegionServer otherServer = rss.get(0).getRegionServer() == server.get().getRegionServer()? 138 rss.get(1).getRegionServer(): rss.get(0).getRegionServer(); 139 server.get().getRegionServer().abort("stopping for test"); 140 // waiting for recovered peer to appear. 141 Replication replication = (Replication)otherServer.getReplicationSourceService(); 142 UTIL1.waitFor(60000, () -> !replication.getReplicationManager().getOldSources().isEmpty()); 143 // Wait on only one server being up. 144 UTIL1.waitFor(60000, () -> 145 // Have to go back to source here because getLiveRegionServerThreads makes new array each time 146 UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1); 147 UTIL1.waitTableAvailable(tablename); 148 LOG.info("Available {}", tablename); 149 150 // disable peer to trigger refreshSources 151 hbaseAdmin.disableReplicationPeer(PEER_ID2); 152 LOG.info("has replicated {} rows before disable peer", checkReplicationData()); 153 hbaseAdmin.enableReplicationPeer(PEER_ID2); 154 // waiting to replicate all data to slave 155 UTIL2.waitFor(60000, () -> { 156 int count = checkReplicationData(); 157 LOG.info("Waiting all logs pushed to slave. Expected {} , actual {}", BATCH, count); 158 return count == BATCH; 159 }); 160 } 161 162 private int checkReplicationData() throws IOException { 163 int count = 0; 164 ResultScanner results = table2.getScanner(new Scan().setCaching(BATCH)); 165 for (Result r : results) { 166 count++; 167 } 168 return count; 169 } 170}