001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Iterator; 028import java.util.List; 029import java.util.NavigableSet; 030import java.util.concurrent.atomic.LongAdder; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.MiniHBaseCluster; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.SplitLogCounters; 040import org.apache.hadoop.hbase.StartMiniClusterOption; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionInfoBuilder; 046import org.apache.hadoop.hbase.client.RegionLocator; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 049import org.apache.hadoop.hbase.master.assignment.RegionStates; 050import org.apache.hadoop.hbase.regionserver.HRegionServer; 051import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 052import org.apache.hadoop.hbase.regionserver.Region; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 056import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 057import org.apache.hadoop.hbase.util.Threads; 058import org.apache.hadoop.hbase.wal.WAL; 059import org.apache.hadoop.hbase.wal.WALEdit; 060import org.apache.hadoop.hbase.wal.WALFactory; 061import org.apache.hadoop.hbase.wal.WALKeyImpl; 062import org.apache.hadoop.hbase.zookeeper.ZKUtil; 063import org.junit.After; 064import org.junit.AfterClass; 065import org.junit.Before; 066import org.junit.BeforeClass; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.rules.TestName; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 073 074/** 075 * Base class for testing distributed log splitting. 076 */ 077public abstract class AbstractTestDLS { 078 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 079 080 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 081 082 // Start a cluster with 2 masters and 5 regionservers 083 private static final int NUM_MASTERS = 2; 084 private static final int NUM_RS = 5; 085 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); 086 087 @Rule 088 public TestName testName = new TestName(); 089 090 private TableName tableName; 091 private MiniHBaseCluster cluster; 092 private HMaster master; 093 private Configuration conf; 094 095 @Rule 096 public TestName name = new TestName(); 097 098 @BeforeClass 099 public static void setup() throws Exception { 100 // Uncomment the following line if more verbosity is needed for 101 // debugging (see HBASE-12285 for details). 102 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); 103 TEST_UTIL.startMiniZKCluster(); 104 TEST_UTIL.startMiniDFSCluster(3); 105 } 106 107 @AfterClass 108 public static void tearDown() throws Exception { 109 TEST_UTIL.shutdownMiniCluster(); 110 } 111 112 protected abstract String getWalProvider(); 113 114 private void startCluster(int numRS) throws Exception { 115 SplitLogCounters.resetCounters(); 116 LOG.info("Starting cluster"); 117 conf.setLong("hbase.splitlog.max.resubmit", 0); 118 // Make the failure test faster 119 conf.setInt("zookeeper.recovery.retry", 0); 120 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); 121 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing 122 conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3); 123 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 124 conf.set("hbase.wal.provider", getWalProvider()); 125 StartMiniClusterOption option = StartMiniClusterOption.builder() 126 .numMasters(NUM_MASTERS).numRegionServers(numRS).build(); 127 TEST_UTIL.startMiniHBaseCluster(option); 128 cluster = TEST_UTIL.getHBaseCluster(); 129 LOG.info("Waiting for active/ready master"); 130 cluster.waitForActiveAndReadyMaster(); 131 master = cluster.getMaster(); 132 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 133 @Override 134 public boolean evaluate() throws Exception { 135 return cluster.getLiveRegionServerThreads().size() >= numRS; 136 } 137 }); 138 } 139 140 @Before 141 public void before() throws Exception { 142 conf = TEST_UTIL.getConfiguration(); 143 tableName = TableName.valueOf(testName.getMethodName()); 144 } 145 146 @After 147 public void after() throws Exception { 148 TEST_UTIL.shutdownMiniHBaseCluster(); 149 TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()), 150 true); 151 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); 152 } 153 154 @Test 155 public void testMasterStartsUpWithLogSplittingWork() throws Exception { 156 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); 157 startCluster(NUM_RS); 158 159 int numRegionsToCreate = 40; 160 int numLogLines = 1000; 161 // turn off load balancing to prevent regions from moving around otherwise 162 // they will consume recovered.edits 163 master.balanceSwitch(false); 164 165 try (Table ht = installTable(numRegionsToCreate)) { 166 HRegionServer hrs = findRSToKill(false); 167 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 168 makeWAL(hrs, regions, numLogLines, 100); 169 170 // abort master 171 abortMaster(cluster); 172 173 // abort RS 174 LOG.info("Aborting region server: " + hrs.getServerName()); 175 hrs.abort("testing"); 176 177 // wait for abort completes 178 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 179 @Override 180 public boolean evaluate() throws Exception { 181 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1; 182 } 183 }); 184 185 Thread.sleep(2000); 186 LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); 187 188 // wait for abort completes 189 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 190 @Override 191 public boolean evaluate() throws Exception { 192 return (HBaseTestingUtility.getAllOnlineRegions(cluster) 193 .size() >= (numRegionsToCreate + 1)); 194 } 195 }); 196 197 LOG.info("Current Open Regions After Master Node Starts Up:" + 198 HBaseTestingUtility.getAllOnlineRegions(cluster).size()); 199 200 assertEquals(numLogLines, TEST_UTIL.countRows(ht)); 201 } 202 } 203 204 @Test 205 public void testThreeRSAbort() throws Exception { 206 LOG.info("testThreeRSAbort"); 207 int numRegionsToCreate = 40; 208 int numRowsPerRegion = 100; 209 210 startCluster(NUM_RS); // NUM_RS=6. 211 212 try (Table table = installTable(numRegionsToCreate)) { 213 populateDataInTable(numRowsPerRegion); 214 215 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 216 assertEquals(NUM_RS, rsts.size()); 217 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName()); 218 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); 219 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); 220 221 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() { 222 223 @Override 224 public boolean evaluate() throws Exception { 225 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; 226 } 227 228 @Override 229 public String explainFailure() throws Exception { 230 return "Timed out waiting for server aborts."; 231 } 232 }); 233 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 234 int rows; 235 try { 236 rows = TEST_UTIL.countRows(table); 237 } catch (Exception e) { 238 Threads.printThreadInfo(System.out, "Thread dump before fail"); 239 throw e; 240 } 241 assertEquals(numRegionsToCreate * numRowsPerRegion, rows); 242 } 243 } 244 245 private Table installTable(int nrs) throws Exception { 246 return installTable(nrs, 0); 247 } 248 249 private Table installTable(int nrs, int existingRegions) throws Exception { 250 // Create a table with regions 251 byte[] family = Bytes.toBytes("family"); 252 LOG.info("Creating table with " + nrs + " regions"); 253 Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); 254 int numRegions = -1; 255 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 256 numRegions = r.getStartKeys().length; 257 } 258 assertEquals(nrs, numRegions); 259 LOG.info("Waiting for no more RIT\n"); 260 blockUntilNoRIT(); 261 // disable-enable cycle to get rid of table's dead regions left behind 262 // by createMultiRegions 263 assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName)); 264 LOG.debug("Disabling table\n"); 265 TEST_UTIL.getAdmin().disableTable(tableName); 266 LOG.debug("Waiting for no more RIT\n"); 267 blockUntilNoRIT(); 268 NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster); 269 LOG.debug("Verifying only catalog and namespace regions are assigned\n"); 270 if (regions.size() != 2) { 271 for (String oregion : regions) { 272 LOG.debug("Region still online: " + oregion); 273 } 274 } 275 assertEquals(2 + existingRegions, regions.size()); 276 LOG.debug("Enabling table\n"); 277 TEST_UTIL.getAdmin().enableTable(tableName); 278 LOG.debug("Waiting for no more RIT\n"); 279 blockUntilNoRIT(); 280 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); 281 regions = HBaseTestingUtility.getAllOnlineRegions(cluster); 282 assertEquals(numRegions + 2 + existingRegions, regions.size()); 283 return table; 284 } 285 286 void populateDataInTable(int nrows) throws Exception { 287 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 288 assertEquals(NUM_RS, rsts.size()); 289 290 for (RegionServerThread rst : rsts) { 291 HRegionServer hrs = rst.getRegionServer(); 292 List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 293 for (RegionInfo hri : hris) { 294 if (hri.getTable().isSystemTable()) { 295 continue; 296 } 297 LOG.debug( 298 "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString()); 299 Region region = hrs.getOnlineRegion(hri.getRegionName()); 300 assertTrue(region != null); 301 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 302 } 303 } 304 305 for (MasterThread mt : cluster.getLiveMasterThreads()) { 306 HRegionServer hrs = mt.getMaster(); 307 List<RegionInfo> hris; 308 try { 309 hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 310 } catch (ServerNotRunningYetException e) { 311 // It's ok: this master may be a backup. Ignored. 312 continue; 313 } 314 for (RegionInfo hri : hris) { 315 if (hri.getTable().isSystemTable()) { 316 continue; 317 } 318 LOG.debug( 319 "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString()); 320 Region region = hrs.getOnlineRegion(hri.getRegionName()); 321 assertTrue(region != null); 322 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 323 } 324 } 325 } 326 327 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size) 328 throws IOException { 329 makeWAL(hrs, regions, num_edits, edit_size, true); 330 } 331 332 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize, 333 boolean cleanShutdown) throws IOException { 334 // remove root and meta region 335 regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); 336 337 for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) { 338 RegionInfo regionInfo = iter.next(); 339 if (regionInfo.getTable().isSystemTable()) { 340 iter.remove(); 341 } 342 } 343 byte[] value = new byte[editSize]; 344 345 List<RegionInfo> hris = new ArrayList<>(); 346 for (RegionInfo region : regions) { 347 if (region.getTable() != tableName) { 348 continue; 349 } 350 hris.add(region); 351 } 352 LOG.info("Creating wal edits across " + hris.size() + " regions."); 353 for (int i = 0; i < editSize; i++) { 354 value[i] = (byte) ('a' + (i % 26)); 355 } 356 int n = hris.size(); 357 int[] counts = new int[n]; 358 // sync every ~30k to line up with desired wal rolls 359 final int syncEvery = 30 * 1024 / editSize; 360 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 361 if (n > 0) { 362 for (int i = 0; i < numEdits; i += 1) { 363 WALEdit e = new WALEdit(); 364 RegionInfo curRegionInfo = hris.get(i % n); 365 WAL log = hrs.getWAL(curRegionInfo); 366 byte[] startRow = curRegionInfo.getStartKey(); 367 if (startRow == null || startRow.length == 0) { 368 startRow = new byte[] { 0, 0, 0, 0, 1 }; 369 } 370 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); 371 row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because 372 // HBaseTestingUtility.createMultiRegions use 5 bytes key 373 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); 374 e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); 375 log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), 376 tableName, System.currentTimeMillis(), mvcc), e); 377 if (0 == i % syncEvery) { 378 log.sync(); 379 } 380 counts[i % n] += 1; 381 } 382 } 383 // done as two passes because the regions might share logs. shutdown is idempotent, but sync 384 // will cause errors if done after. 385 for (RegionInfo info : hris) { 386 WAL log = hrs.getWAL(info); 387 log.sync(); 388 } 389 if (cleanShutdown) { 390 for (RegionInfo info : hris) { 391 WAL log = hrs.getWAL(info); 392 log.shutdown(); 393 } 394 } 395 for (int i = 0; i < n; i++) { 396 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); 397 } 398 return; 399 } 400 401 private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException { 402 int count = 0; 403 try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) { 404 WAL.Entry e; 405 while ((e = in.next()) != null) { 406 if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { 407 count++; 408 } 409 } 410 } 411 return count; 412 } 413 414 private void blockUntilNoRIT() throws Exception { 415 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 416 } 417 418 private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families) 419 throws IOException { 420 for (int i = 0; i < numRows; i++) { 421 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); 422 for (byte[] family : families) { 423 put.addColumn(family, qf, null); 424 } 425 region.put(put); 426 } 427 } 428 429 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 430 throws InterruptedException { 431 long curt = System.currentTimeMillis(); 432 long endt = curt + timems; 433 while (curt < endt) { 434 if (ctr.sum() == oldval) { 435 Thread.sleep(100); 436 curt = System.currentTimeMillis(); 437 } else { 438 assertEquals(newval, ctr.sum()); 439 return; 440 } 441 } 442 fail(); 443 } 444 445 private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { 446 for (MasterThread mt : cluster.getLiveMasterThreads()) { 447 if (mt.getMaster().isActiveMaster()) { 448 mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); 449 mt.join(); 450 break; 451 } 452 } 453 LOG.debug("Master is aborted"); 454 } 455 456 /** 457 * Find a RS that has regions of a table. 458 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well 459 */ 460 private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception { 461 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 462 List<RegionInfo> regions = null; 463 HRegionServer hrs = null; 464 465 for (RegionServerThread rst : rsts) { 466 hrs = rst.getRegionServer(); 467 while (rst.isAlive() && !hrs.isOnline()) { 468 Thread.sleep(100); 469 } 470 if (!rst.isAlive()) { 471 continue; 472 } 473 boolean isCarryingMeta = false; 474 boolean foundTableRegion = false; 475 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 476 for (RegionInfo region : regions) { 477 if (region.isMetaRegion()) { 478 isCarryingMeta = true; 479 } 480 if (region.getTable() == tableName) { 481 foundTableRegion = true; 482 } 483 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { 484 break; 485 } 486 } 487 if (isCarryingMeta && hasMetaRegion) { 488 // clients ask for a RS with META 489 if (!foundTableRegion) { 490 HRegionServer destRS = hrs; 491 // the RS doesn't have regions of the specified table so we need move one to this RS 492 List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); 493 RegionInfo hri = tableRegions.get(0); 494 TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName()); 495 // wait for region move completes 496 RegionStates regionStates = 497 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 498 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { 499 @Override 500 public boolean evaluate() throws Exception { 501 ServerName sn = regionStates.getRegionServerOfRegion(hri); 502 return (sn != null && sn.equals(destRS.getServerName())); 503 } 504 }); 505 } 506 return hrs; 507 } else if (hasMetaRegion || isCarryingMeta) { 508 continue; 509 } 510 if (foundTableRegion) { 511 break; 512 } 513 } 514 515 return hrs; 516 } 517}