001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.fail;
021
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.HBaseTestingUtility;
024import org.apache.hadoop.hbase.UnknownScannerException;
025import org.apache.hadoop.hbase.client.Result;
026import org.apache.hadoop.hbase.client.ResultScanner;
027import org.apache.hadoop.hbase.client.Scan;
028import org.apache.hadoop.hbase.testclassification.LargeTests;
029import org.apache.hadoop.hbase.testclassification.ReplicationTests;
030import org.junit.ClassRule;
031import org.junit.experimental.categories.Category;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035@Category({ReplicationTests.class, LargeTests.class})
036public class TestReplicationKillRS extends TestReplicationBase {
037
038  @ClassRule
039  public static final HBaseClassTestRule CLASS_RULE =
040      HBaseClassTestRule.forClass(TestReplicationKillRS.class);
041
042  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationKillRS.class);
043
044  /**
045   * Load up 1 tables over 2 region servers and kill a source during
046   * the upload. The failover happens internally.
047   *
048   * WARNING this test sometimes fails because of HBASE-3515
049   *
050   * @throws Exception
051   */
052  public void loadTableAndKillRS(HBaseTestingUtility util) throws Exception {
053    // killing the RS with hbase:meta can result into failed puts until we solve
054    // IO fencing
055    int rsToKill1 =
056        util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
057
058    // Takes about 20 secs to run the full loading, kill around the middle
059    Thread killer = killARegionServer(util, 5000, rsToKill1);
060
061    LOG.info("Start loading table");
062    int initialCount = utility1.loadTable(htable1, famName);
063    LOG.info("Done loading table");
064    killer.join(5000);
065    LOG.info("Done waiting for threads");
066
067    Result[] res;
068    while (true) {
069      try {
070        Scan scan = new Scan();
071        ResultScanner scanner = htable1.getScanner(scan);
072        res = scanner.next(initialCount);
073        scanner.close();
074        break;
075      } catch (UnknownScannerException ex) {
076        LOG.info("Cluster wasn't ready yet, restarting scanner");
077      }
078    }
079    // Test we actually have all the rows, we may miss some because we
080    // don't have IO fencing.
081    if (res.length != initialCount) {
082      LOG.warn("We lost some rows on the master cluster!");
083      // We don't really expect the other cluster to have more rows
084      initialCount = res.length;
085    }
086
087    int lastCount = 0;
088
089    final long start = System.currentTimeMillis();
090    int i = 0;
091    while (true) {
092      if (i==NB_RETRIES-1) {
093        fail("Waited too much time for queueFailover replication. " +
094            "Waited "+(System.currentTimeMillis() - start)+"ms.");
095      }
096      Scan scan2 = new Scan();
097      ResultScanner scanner2 = htable2.getScanner(scan2);
098      Result[] res2 = scanner2.next(initialCount * 2);
099      scanner2.close();
100      if (res2.length < initialCount) {
101        if (lastCount < res2.length) {
102          i--; // Don't increment timeout if we make progress
103        } else {
104          i++;
105        }
106        lastCount = res2.length;
107        LOG.info("Only got " + lastCount + " rows instead of " +
108            initialCount + " current i=" + i);
109        Thread.sleep(SLEEP_TIME*2);
110      } else {
111        break;
112      }
113    }
114  }
115
116  private static Thread killARegionServer(final HBaseTestingUtility utility,
117                                          final long timeout, final int rs) {
118    Thread killer = new Thread() {
119      @Override
120      public void run() {
121        try {
122          Thread.sleep(timeout);
123          utility.getHBaseCluster().getRegionServer(rs).stop("Stopping as part of the test");
124        } catch (Exception e) {
125          LOG.error("Couldn't kill a region server", e);
126        }
127      }
128    };
129    killer.setDaemon(true);
130    killer.start();
131    return killer;
132  }
133}