001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.*; 021 022import java.io.IOException; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.concurrent.atomic.AtomicInteger; 025import java.util.concurrent.atomic.AtomicReference; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.HTableDescriptor; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.Waiter.Predicate; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Consistency; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint; 040import org.apache.hadoop.hbase.testclassification.LargeTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 043import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 044import org.junit.After; 045import org.junit.Before; 046import org.junit.ClassRule; 047import org.junit.Rule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.junit.rules.TestName; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * Tests failover of secondary region replicas. 056 */ 057@Category(LargeTests.class) 058public class TestRegionReplicaFailover { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestRegionReplicaFailover.class); 063 064 private static final Logger LOG = 065 LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class); 066 067 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 068 069 private static final int NB_SERVERS = 3; 070 071 protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1, 072 HBaseTestingUtility.fam2, HBaseTestingUtility.fam3}; 073 protected final byte[] fam = HBaseTestingUtility.fam1; 074 protected final byte[] qual1 = Bytes.toBytes("qual1"); 075 protected final byte[] value1 = Bytes.toBytes("value1"); 076 protected final byte[] row = Bytes.toBytes("rowA"); 077 protected final byte[] row2 = Bytes.toBytes("rowB"); 078 079 @Rule public TestName name = new TestName(); 080 081 private HTableDescriptor htd; 082 083 @Before 084 public void before() throws Exception { 085 Configuration conf = HTU.getConfiguration(); 086 // Up the handlers; this test needs more than usual. 087 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 088 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 089 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true); 090 conf.setInt("replication.stats.thread.period.seconds", 5); 091 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); 092 093 HTU.startMiniCluster(NB_SERVERS); 094 htd = HTU.createTableDescriptor( 095 name.getMethodName().substring(0, name.getMethodName().length()-3)); 096 htd.setRegionReplication(3); 097 HTU.getAdmin().createTable(htd); 098 } 099 100 @After 101 public void after() throws Exception { 102 HTU.deleteTableIfAny(htd.getTableName()); 103 HTU.shutdownMiniCluster(); 104 } 105 106 /** 107 * Tests the case where a newly created table with region replicas and no data, the secondary 108 * region replicas are available to read immediately. 109 */ 110 @Test 111 public void testSecondaryRegionWithEmptyRegion() throws IOException { 112 // Create a new table with region replication, don't put any data. Test that the secondary 113 // region replica is available to read. 114 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 115 Table table = connection.getTable(htd.getTableName())) { 116 117 Get get = new Get(row); 118 get.setConsistency(Consistency.TIMELINE); 119 get.setReplicaId(1); 120 table.get(get); // this should not block 121 } 122 } 123 124 /** 125 * Tests the case where if there is some data in the primary region, reopening the region replicas 126 * (enable/disable table, etc) makes the region replicas readable. 127 * @throws IOException 128 */ 129 @Test 130 public void testSecondaryRegionWithNonEmptyRegion() throws IOException { 131 // Create a new table with region replication and load some data 132 // than disable and enable the table again and verify the data from secondary 133 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 134 Table table = connection.getTable(htd.getTableName())) { 135 136 HTU.loadNumericRows(table, fam, 0, 1000); 137 138 HTU.getAdmin().disableTable(htd.getTableName()); 139 HTU.getAdmin().enableTable(htd.getTableName()); 140 141 HTU.verifyNumericRows(table, fam, 0, 1000, 1); 142 } 143 } 144 145 /** 146 * Tests the case where killing a primary region with unflushed data recovers 147 */ 148 @Test 149 public void testPrimaryRegionKill() throws Exception { 150 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 151 Table table = connection.getTable(htd.getTableName())) { 152 153 HTU.loadNumericRows(table, fam, 0, 1000); 154 155 // wal replication is async, we have to wait until the replication catches up, or we timeout 156 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 157 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); 158 159 // we should not have flushed files now, but data in memstores of primary and secondary 160 // kill the primary region replica now, and ensure that when it comes back up, we can still 161 // read from it the same data from primary and secondaries 162 boolean aborted = false; 163 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { 164 for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) { 165 if (r.getRegionInfo().getReplicaId() == 0) { 166 LOG.info("Aborting region server hosting primary region replica"); 167 rs.getRegionServer().abort("for test"); 168 aborted = true; 169 break; 170 } 171 } 172 } 173 assertTrue(aborted); 174 175 // wal replication is async, we have to wait until the replication catches up, or we timeout 176 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000); 177 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 178 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); 179 } 180 181 // restart the region server 182 HTU.getMiniHBaseCluster().startRegionServer(); 183 } 184 185 /** wal replication is async, we have to wait until the replication catches up, or we timeout 186 */ 187 private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow, 188 final int endRow, final int replicaId, final long timeout) throws Exception { 189 try { 190 HTU.waitFor(timeout, new Predicate<Exception>() { 191 @Override 192 public boolean evaluate() throws Exception { 193 try { 194 HTU.verifyNumericRows(table, f, startRow, endRow, replicaId); 195 return true; 196 } catch (AssertionError ae) { 197 return false; 198 } 199 } 200 }); 201 } catch (Throwable t) { 202 // ignore this, but redo the verify do get the actual exception 203 HTU.verifyNumericRows(table, f, startRow, endRow, replicaId); 204 } 205 } 206 207 /** 208 * Tests the case where killing a secondary region with unflushed data recovers, and the replica 209 * becomes available to read again shortly. 210 */ 211 @Test 212 public void testSecondaryRegionKill() throws Exception { 213 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 214 Table table = connection.getTable(htd.getTableName())) { 215 HTU.loadNumericRows(table, fam, 0, 1000); 216 217 // wait for some time to ensure that async wal replication does it's magic 218 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 219 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000); 220 221 // we should not have flushed files now, but data in memstores of primary and secondary 222 // kill the secondary region replica now, and ensure that when it comes back up, we can still 223 // read from it the same data 224 boolean aborted = false; 225 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { 226 for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) { 227 if (r.getRegionInfo().getReplicaId() == 1) { 228 LOG.info("Aborting region server hosting secondary region replica"); 229 rs.getRegionServer().abort("for test"); 230 aborted = true; 231 break; 232 } 233 } 234 } 235 assertTrue(aborted); 236 237 // It takes extra time for replica region is ready for read as during 238 // region open process, it needs to ask primary region to do a flush and replica region 239 // can open newly flushed hfiles to avoid data out-of-sync. 240 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000); 241 HTU.verifyNumericRows(table, fam, 0, 1000, 2); 242 } 243 244 // restart the region server 245 HTU.getMiniHBaseCluster().startRegionServer(); 246 } 247 248 /** 249 * Tests the case where there are 3 region replicas and the primary is continuously accepting 250 * new writes while one of the secondaries is killed. Verification is done for both of the 251 * secondary replicas. 252 */ 253 @Test 254 public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception { 255 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 256 Table table = connection.getTable(htd.getTableName()); 257 Admin admin = connection.getAdmin()) { 258 // start a thread to do the loading of primary 259 HTU.loadNumericRows(table, fam, 0, 1000); // start with some base 260 admin.flush(table.getName()); 261 HTU.loadNumericRows(table, fam, 1000, 2000); 262 263 final AtomicReference<Throwable> ex = new AtomicReference<>(null); 264 final AtomicBoolean done = new AtomicBoolean(false); 265 final AtomicInteger key = new AtomicInteger(2000); 266 267 Thread loader = new Thread() { 268 @Override 269 public void run() { 270 while (!done.get()) { 271 try { 272 HTU.loadNumericRows(table, fam, key.get(), key.get()+1000); 273 key.addAndGet(1000); 274 } catch (Throwable e) { 275 ex.compareAndSet(null, e); 276 } 277 } 278 } 279 }; 280 loader.start(); 281 282 Thread aborter = new Thread() { 283 @Override 284 public void run() { 285 try { 286 boolean aborted = false; 287 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { 288 for (Region r : rs.getRegionServer().getRegions(htd.getTableName())) { 289 if (r.getRegionInfo().getReplicaId() == 1) { 290 LOG.info("Aborting region server hosting secondary region replica"); 291 rs.getRegionServer().abort("for test"); 292 aborted = true; 293 } 294 } 295 } 296 assertTrue(aborted); 297 } catch (Throwable e) { 298 ex.compareAndSet(null, e); 299 } 300 }; 301 }; 302 303 aborter.start(); 304 aborter.join(); 305 done.set(true); 306 loader.join(); 307 308 assertNull(ex.get()); 309 310 assertTrue(key.get() > 1000); // assert that the test is working as designed 311 LOG.info("Loaded up to key :" + key.get()); 312 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000); 313 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000); 314 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000); 315 } 316 317 // restart the region server 318 HTU.getMiniHBaseCluster().startRegionServer(); 319 } 320 321 /** 322 * Tests the case where we are creating a table with a lot of regions and replicas. Opening region 323 * replicas should not block handlers on RS indefinitely. 324 */ 325 @Test 326 public void testLotsOfRegionReplicas() throws IOException { 327 int numRegions = NB_SERVERS * 20; 328 int regionReplication = 10; 329 String tableName = htd.getTableName().getNameAsString() + "2"; 330 htd = HTU.createTableDescriptor(tableName); 331 htd.setRegionReplication(regionReplication); 332 333 // dont care about splits themselves too much 334 byte[] startKey = Bytes.toBytes("aaa"); 335 byte[] endKey = Bytes.toBytes("zzz"); 336 byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions); 337 HTU.getAdmin().createTable(htd, startKey, endKey, numRegions); 338 339 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); 340 Table table = connection.getTable(htd.getTableName())) { 341 342 for (int i = 1; i < splits.length; i++) { 343 for (int j = 0; j < regionReplication; j++) { 344 Get get = new Get(splits[i]); 345 get.setConsistency(Consistency.TIMELINE); 346 get.setReplicaId(j); 347 table.get(get); // this should not block. Regions should be coming online 348 } 349 } 350 } 351 352 HTU.deleteTableIfAny(TableName.valueOf(tableName)); 353 } 354}