001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.fail;
021
022import org.apache.hadoop.hbase.HBaseTestingUtil;
023import org.apache.hadoop.hbase.client.Connection;
024import org.apache.hadoop.hbase.client.ConnectionFactory;
025import org.apache.hadoop.hbase.client.Result;
026import org.apache.hadoop.hbase.client.ResultScanner;
027import org.apache.hadoop.hbase.client.Scan;
028import org.apache.hadoop.hbase.client.Table;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033public abstract class ReplicationKillRSTestBase extends TestReplicationBaseNoBeforeAll {
034
035  private static final Logger LOG = LoggerFactory.getLogger(ReplicationKillRSTestBase.class);
036
037  /**
038   * Load up 1 tables over 2 region servers and kill a source during the upload. The failover
039   * happens internally. WARNING this test sometimes fails because of HBASE-3515
040   */
041  protected void loadTableAndKillRS(HBaseTestingUtil util) throws Exception {
042    // killing the RS with hbase:meta can result into failed puts until we solve
043    // IO fencing
044    int rsToKill1 = util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
045
046    // Takes about 20 secs to run the full loading, kill around the middle
047    Thread killer = killARegionServer(util, 5000, rsToKill1);
048    Result[] res;
049    int initialCount;
050    try (Connection conn = ConnectionFactory.createConnection(CONF1)) {
051      try (Table table = conn.getTable(tableName)) {
052        LOG.info("Start loading table");
053        initialCount = UTIL1.loadTable(table, famName);
054        LOG.info("Done loading table");
055        killer.join(5000);
056        LOG.info("Done waiting for threads");
057
058        while (true) {
059          try (ResultScanner scanner = table.getScanner(new Scan())) {
060            res = scanner.next(initialCount);
061            break;
062          } catch (Exception ex) {
063            LOG.info("Cluster wasn't ready yet, restarting scanner");
064          }
065        }
066      }
067    }
068    // Test we actually have all the rows, we may miss some because we
069    // don't have IO fencing.
070    if (res.length != initialCount) {
071      LOG.warn("We lost some rows on the master cluster!");
072      // We don't really expect the other cluster to have more rows
073      initialCount = res.length;
074    }
075
076    int lastCount = 0;
077    final long start = EnvironmentEdgeManager.currentTime();
078    int i = 0;
079    try (Connection conn = ConnectionFactory.createConnection(CONF2)) {
080      try (Table table = conn.getTable(tableName)) {
081        while (true) {
082          if (i == NB_RETRIES - 1) {
083            fail("Waited too much time for queueFailover replication. " + "Waited "
084              + (EnvironmentEdgeManager.currentTime() - start) + "ms.");
085          }
086          Result[] res2;
087          try (ResultScanner scanner = table.getScanner(new Scan())) {
088            res2 = scanner.next(initialCount * 2);
089          } catch (Exception e) {
090            LOG.warn("Cluster wasn't ready yet, sleep and retry later");
091            Thread.sleep(SLEEP_TIME * 2);
092            continue;
093          }
094          if (res2.length < initialCount) {
095            if (lastCount < res2.length) {
096              i--; // Don't increment timeout if we make progress
097            } else {
098              i++;
099            }
100            lastCount = res2.length;
101            LOG.info(
102              "Only got " + lastCount + " rows instead of " + initialCount + " current i=" + i);
103            Thread.sleep(SLEEP_TIME * 2);
104          } else {
105            break;
106          }
107        }
108      }
109    }
110  }
111
112  private static Thread killARegionServer(final HBaseTestingUtil utility, final long timeout,
113    final int rs) {
114    Thread killer = new Thread() {
115      @Override
116      public void run() {
117        try {
118          Thread.sleep(timeout);
119          utility.getHBaseCluster().getRegionServer(rs).abort("Stopping as part of the test");
120        } catch (Exception e) {
121          LOG.error("Couldn't kill a region server", e);
122        }
123      }
124    };
125    killer.setDaemon(true);
126    killer.start();
127    return killer;
128  }
129}