001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.fail;
021
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.HBaseTestingUtility;
024import org.apache.hadoop.hbase.UnknownScannerException;
025import org.apache.hadoop.hbase.client.Connection;
026import org.apache.hadoop.hbase.client.ConnectionFactory;
027import org.apache.hadoop.hbase.client.Result;
028import org.apache.hadoop.hbase.client.ResultScanner;
029import org.apache.hadoop.hbase.client.Scan;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.testclassification.LargeTests;
032import org.apache.hadoop.hbase.testclassification.ReplicationTests;
033import org.junit.ClassRule;
034import org.junit.experimental.categories.Category;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038@Category({ ReplicationTests.class, LargeTests.class })
039public class TestReplicationKillRS extends TestReplicationBase {
040
041  @ClassRule
042  public static final HBaseClassTestRule CLASS_RULE =
043      HBaseClassTestRule.forClass(TestReplicationKillRS.class);
044
045  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationKillRS.class);
046
047  /**
048   * Load up 1 tables over 2 region servers and kill a source during the upload. The failover
049   * happens internally. WARNING this test sometimes fails because of HBASE-3515
050   */
051  public void loadTableAndKillRS(HBaseTestingUtility util) throws Exception {
052    // killing the RS with hbase:meta can result into failed puts until we solve
053    // IO fencing
054    int rsToKill1 = util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
055
056    // Takes about 20 secs to run the full loading, kill around the middle
057    Thread killer = killARegionServer(util, 5000, rsToKill1);
058    Result[] res;
059    int initialCount;
060    try (Connection conn = ConnectionFactory.createConnection(CONF1)) {
061      try (Table table = conn.getTable(tableName)) {
062        LOG.info("Start loading table");
063        initialCount = UTIL1.loadTable(table, famName);
064        LOG.info("Done loading table");
065        killer.join(5000);
066        LOG.info("Done waiting for threads");
067
068        while (true) {
069          try (ResultScanner scanner = table.getScanner(new Scan())) {
070            res = scanner.next(initialCount);
071            break;
072          } catch (UnknownScannerException ex) {
073            LOG.info("Cluster wasn't ready yet, restarting scanner");
074          }
075        }
076      }
077    }
078    // Test we actually have all the rows, we may miss some because we
079    // don't have IO fencing.
080    if (res.length != initialCount) {
081      LOG.warn("We lost some rows on the master cluster!");
082      // We don't really expect the other cluster to have more rows
083      initialCount = res.length;
084    }
085
086    int lastCount = 0;
087    final long start = System.currentTimeMillis();
088    int i = 0;
089    try (Connection conn = ConnectionFactory.createConnection(CONF2)) {
090      try (Table table = conn.getTable(tableName)) {
091        while (true) {
092          if (i == NB_RETRIES - 1) {
093            fail("Waited too much time for queueFailover replication. " + "Waited "
094                + (System.currentTimeMillis() - start) + "ms.");
095          }
096          Result[] res2;
097          try (ResultScanner scanner = table.getScanner(new Scan())) {
098            res2 = scanner.next(initialCount * 2);
099          }
100          if (res2.length < initialCount) {
101            if (lastCount < res2.length) {
102              i--; // Don't increment timeout if we make progress
103            } else {
104              i++;
105            }
106            lastCount = res2.length;
107            LOG.info(
108              "Only got " + lastCount + " rows instead of " + initialCount + " current i=" + i);
109            Thread.sleep(SLEEP_TIME * 2);
110          } else {
111            break;
112          }
113        }
114      }
115    }
116  }
117
118  private static Thread killARegionServer(final HBaseTestingUtility utility, final long timeout,
119      final int rs) {
120    Thread killer = new Thread() {
121      @Override
122      public void run() {
123        try {
124          Thread.sleep(timeout);
125          utility.getHBaseCluster().getRegionServer(rs).stop("Stopping as part of the test");
126        } catch (Exception e) {
127          LOG.error("Couldn't kill a region server", e);
128        }
129      }
130    };
131    killer.setDaemon(true);
132    killer.start();
133    return killer;
134  }
135}