001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.fail; 021 022import org.apache.hadoop.hbase.HBaseClassTestRule; 023import org.apache.hadoop.hbase.HBaseTestingUtility; 024import org.apache.hadoop.hbase.UnknownScannerException; 025import org.apache.hadoop.hbase.client.Connection; 026import org.apache.hadoop.hbase.client.ConnectionFactory; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.ResultScanner; 029import org.apache.hadoop.hbase.client.Scan; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.testclassification.LargeTests; 032import org.apache.hadoop.hbase.testclassification.ReplicationTests; 033import org.junit.ClassRule; 034import org.junit.experimental.categories.Category; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038@Category({ ReplicationTests.class, LargeTests.class }) 039public class TestReplicationKillRS extends TestReplicationBase { 040 041 @ClassRule 042 public static final HBaseClassTestRule CLASS_RULE = 043 HBaseClassTestRule.forClass(TestReplicationKillRS.class); 044 045 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationKillRS.class); 046 047 /** 048 * Load up 1 tables over 2 region servers and kill a source during the upload. The failover 049 * happens internally. WARNING this test sometimes fails because of HBASE-3515 050 */ 051 public void loadTableAndKillRS(HBaseTestingUtility util) throws Exception { 052 // killing the RS with hbase:meta can result into failed puts until we solve 053 // IO fencing 054 int rsToKill1 = util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0; 055 056 // Takes about 20 secs to run the full loading, kill around the middle 057 Thread killer = killARegionServer(util, 5000, rsToKill1); 058 Result[] res; 059 int initialCount; 060 try (Connection conn = ConnectionFactory.createConnection(CONF1)) { 061 try (Table table = conn.getTable(tableName)) { 062 LOG.info("Start loading table"); 063 initialCount = UTIL1.loadTable(table, famName); 064 LOG.info("Done loading table"); 065 killer.join(5000); 066 LOG.info("Done waiting for threads"); 067 068 while (true) { 069 try (ResultScanner scanner = table.getScanner(new Scan())) { 070 res = scanner.next(initialCount); 071 break; 072 } catch (UnknownScannerException ex) { 073 LOG.info("Cluster wasn't ready yet, restarting scanner"); 074 } 075 } 076 } 077 } 078 // Test we actually have all the rows, we may miss some because we 079 // don't have IO fencing. 080 if (res.length != initialCount) { 081 LOG.warn("We lost some rows on the master cluster!"); 082 // We don't really expect the other cluster to have more rows 083 initialCount = res.length; 084 } 085 086 int lastCount = 0; 087 final long start = System.currentTimeMillis(); 088 int i = 0; 089 try (Connection conn = ConnectionFactory.createConnection(CONF2)) { 090 try (Table table = conn.getTable(tableName)) { 091 while (true) { 092 if (i == NB_RETRIES - 1) { 093 fail("Waited too much time for queueFailover replication. " + "Waited " 094 + (System.currentTimeMillis() - start) + "ms."); 095 } 096 Result[] res2; 097 try (ResultScanner scanner = table.getScanner(new Scan())) { 098 res2 = scanner.next(initialCount * 2); 099 } 100 if (res2.length < initialCount) { 101 if (lastCount < res2.length) { 102 i--; // Don't increment timeout if we make progress 103 } else { 104 i++; 105 } 106 lastCount = res2.length; 107 LOG.info( 108 "Only got " + lastCount + " rows instead of " + initialCount + " current i=" + i); 109 Thread.sleep(SLEEP_TIME * 2); 110 } else { 111 break; 112 } 113 } 114 } 115 } 116 } 117 118 private static Thread killARegionServer(final HBaseTestingUtility utility, final long timeout, 119 final int rs) { 120 Thread killer = new Thread() { 121 @Override 122 public void run() { 123 try { 124 Thread.sleep(timeout); 125 utility.getHBaseCluster().getRegionServer(rs).stop("Stopping as part of the test"); 126 } catch (Exception e) { 127 LOG.error("Couldn't kill a region server", e); 128 } 129 } 130 }; 131 killer.setDaemon(true); 132 killer.start(); 133 return killer; 134 } 135}