001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.fail; 021 022import org.apache.hadoop.hbase.HBaseTestingUtil; 023import org.apache.hadoop.hbase.client.Connection; 024import org.apache.hadoop.hbase.client.ConnectionFactory; 025import org.apache.hadoop.hbase.client.Result; 026import org.apache.hadoop.hbase.client.ResultScanner; 027import org.apache.hadoop.hbase.client.Scan; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033public abstract class ReplicationKillRSTestBase extends TestReplicationBaseNoBeforeAll { 034 035 private static final Logger LOG = LoggerFactory.getLogger(ReplicationKillRSTestBase.class); 036 037 /** 038 * Load up 1 tables over 2 region servers and kill a source during the upload. The failover 039 * happens internally. WARNING this test sometimes fails because of HBASE-3515 040 */ 041 protected void loadTableAndKillRS(HBaseTestingUtil util) throws Exception { 042 // killing the RS with hbase:meta can result into failed puts until we solve 043 // IO fencing 044 int rsToKill1 = util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0; 045 046 // Takes about 20 secs to run the full loading, kill around the middle 047 Thread killer = killARegionServer(util, 5000, rsToKill1); 048 Result[] res; 049 int initialCount; 050 try (Connection conn = ConnectionFactory.createConnection(CONF1)) { 051 try (Table table = conn.getTable(tableName)) { 052 LOG.info("Start loading table"); 053 initialCount = UTIL1.loadTable(table, famName); 054 LOG.info("Done loading table"); 055 killer.join(5000); 056 LOG.info("Done waiting for threads"); 057 058 while (true) { 059 try (ResultScanner scanner = table.getScanner(new Scan())) { 060 res = scanner.next(initialCount); 061 break; 062 } catch (Exception ex) { 063 LOG.info("Cluster wasn't ready yet, restarting scanner"); 064 } 065 } 066 } 067 } 068 // Test we actually have all the rows, we may miss some because we 069 // don't have IO fencing. 070 if (res.length != initialCount) { 071 LOG.warn("We lost some rows on the master cluster!"); 072 // We don't really expect the other cluster to have more rows 073 initialCount = res.length; 074 } 075 076 int lastCount = 0; 077 final long start = EnvironmentEdgeManager.currentTime(); 078 int i = 0; 079 try (Connection conn = ConnectionFactory.createConnection(CONF2)) { 080 try (Table table = conn.getTable(tableName)) { 081 while (true) { 082 if (i == NB_RETRIES - 1) { 083 fail("Waited too much time for queueFailover replication. " + "Waited " 084 + (EnvironmentEdgeManager.currentTime() - start) + "ms."); 085 } 086 Result[] res2; 087 try (ResultScanner scanner = table.getScanner(new Scan())) { 088 res2 = scanner.next(initialCount * 2); 089 } catch (Exception e) { 090 LOG.warn("Cluster wasn't ready yet, sleep and retry later"); 091 Thread.sleep(SLEEP_TIME * 2); 092 continue; 093 } 094 if (res2.length < initialCount) { 095 if (lastCount < res2.length) { 096 i--; // Don't increment timeout if we make progress 097 } else { 098 i++; 099 } 100 lastCount = res2.length; 101 LOG.info( 102 "Only got " + lastCount + " rows instead of " + initialCount + " current i=" + i); 103 Thread.sleep(SLEEP_TIME * 2); 104 } else { 105 break; 106 } 107 } 108 } 109 } 110 } 111 112 private static Thread killARegionServer(final HBaseTestingUtil utility, final long timeout, 113 final int rs) { 114 Thread killer = new Thread() { 115 @Override 116 public void run() { 117 try { 118 Thread.sleep(timeout); 119 utility.getHBaseCluster().getRegionServer(rs).abort("Stopping as part of the test"); 120 } catch (Exception e) { 121 LOG.error("Couldn't kill a region server", e); 122 } 123 } 124 }; 125 killer.setDaemon(true); 126 killer.start(); 127 return killer; 128 } 129}