001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Iterator; 029import java.util.List; 030import java.util.NavigableSet; 031import java.util.concurrent.atomic.LongAdder; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.MiniHBaseCluster; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.SplitLogCounters; 041import org.apache.hadoop.hbase.StartMiniClusterOption; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.Waiter; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.RegionInfoBuilder; 047import org.apache.hadoop.hbase.client.RegionLocator; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 050import org.apache.hadoop.hbase.master.assignment.RegionStates; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 053import org.apache.hadoop.hbase.regionserver.Region; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.CommonFSUtils; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 058import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 059import org.apache.hadoop.hbase.util.Threads; 060import org.apache.hadoop.hbase.wal.WAL; 061import org.apache.hadoop.hbase.wal.WALEdit; 062import org.apache.hadoop.hbase.wal.WALFactory; 063import org.apache.hadoop.hbase.wal.WALKeyImpl; 064import org.apache.hadoop.hbase.zookeeper.ZKUtil; 065import org.junit.After; 066import org.junit.AfterClass; 067import org.junit.Before; 068import org.junit.BeforeClass; 069import org.junit.Rule; 070import org.junit.Test; 071import org.junit.rules.TestName; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 076 077/** 078 * Base class for testing distributed log splitting. 079 */ 080public abstract class AbstractTestDLS { 081 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 082 083 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 084 085 // Start a cluster with 2 masters and 5 regionservers 086 private static final int NUM_MASTERS = 2; 087 private static final int NUM_RS = 5; 088 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); 089 090 @Rule 091 public TestName testName = new TestName(); 092 093 private TableName tableName; 094 private MiniHBaseCluster cluster; 095 private HMaster master; 096 private Configuration conf; 097 098 @Rule 099 public TestName name = new TestName(); 100 101 @BeforeClass 102 public static void setup() throws Exception { 103 // Uncomment the following line if more verbosity is needed for 104 // debugging (see HBASE-12285 for details). 105 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); 106 TEST_UTIL.startMiniZKCluster(); 107 TEST_UTIL.startMiniDFSCluster(3); 108 } 109 110 @AfterClass 111 public static void tearDown() throws Exception { 112 TEST_UTIL.shutdownMiniCluster(); 113 } 114 115 protected abstract String getWalProvider(); 116 117 private void startCluster(int numRS) throws Exception { 118 SplitLogCounters.resetCounters(); 119 LOG.info("Starting cluster"); 120 conf.setLong("hbase.splitlog.max.resubmit", 0); 121 // Make the failure test faster 122 conf.setInt("zookeeper.recovery.retry", 0); 123 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); 124 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing 125 conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3); 126 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 127 conf.set("hbase.wal.provider", getWalProvider()); 128 StartMiniClusterOption option = 129 StartMiniClusterOption.builder().numMasters(NUM_MASTERS).numRegionServers(numRS).build(); 130 TEST_UTIL.startMiniHBaseCluster(option); 131 cluster = TEST_UTIL.getHBaseCluster(); 132 LOG.info("Waiting for active/ready master"); 133 cluster.waitForActiveAndReadyMaster(); 134 master = cluster.getMaster(); 135 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 136 @Override 137 public boolean evaluate() throws Exception { 138 return cluster.getLiveRegionServerThreads().size() >= numRS; 139 } 140 }); 141 } 142 143 @Before 144 public void before() throws Exception { 145 conf = TEST_UTIL.getConfiguration(); 146 tableName = TableName.valueOf(testName.getMethodName()); 147 } 148 149 @After 150 public void after() throws Exception { 151 TEST_UTIL.shutdownMiniHBaseCluster(); 152 TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()), 153 true); 154 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); 155 } 156 157 @Test 158 public void testMasterStartsUpWithLogSplittingWork() throws Exception { 159 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); 160 startCluster(NUM_RS); 161 162 int numRegionsToCreate = 40; 163 int numLogLines = 1000; 164 // turn off load balancing to prevent regions from moving around otherwise 165 // they will consume recovered.edits 166 master.balanceSwitch(false); 167 168 try (Table ht = installTable(numRegionsToCreate)) { 169 HRegionServer hrs = findRSToKill(false); 170 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 171 makeWAL(hrs, regions, numLogLines, 100); 172 173 // abort master 174 abortMaster(cluster); 175 176 // abort RS 177 LOG.info("Aborting region server: " + hrs.getServerName()); 178 hrs.abort("testing"); 179 180 // wait for abort completes 181 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 182 @Override 183 public boolean evaluate() throws Exception { 184 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1; 185 } 186 }); 187 188 Thread.sleep(2000); 189 LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); 190 191 // wait for abort completes 192 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 193 @Override 194 public boolean evaluate() throws Exception { 195 return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() 196 >= (numRegionsToCreate + 1)); 197 } 198 }); 199 200 LOG.info("Current Open Regions After Master Node Starts Up:" 201 + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); 202 203 assertEquals(numLogLines, TEST_UTIL.countRows(ht)); 204 } 205 } 206 207 @Test 208 public void testThreeRSAbort() throws Exception { 209 LOG.info("testThreeRSAbort"); 210 int numRegionsToCreate = 40; 211 int numRowsPerRegion = 100; 212 213 startCluster(NUM_RS); // NUM_RS=6. 214 215 try (Table table = installTable(numRegionsToCreate)) { 216 populateDataInTable(numRowsPerRegion); 217 218 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 219 assertEquals(NUM_RS, rsts.size()); 220 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName()); 221 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); 222 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); 223 224 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() { 225 226 @Override 227 public boolean evaluate() throws Exception { 228 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; 229 } 230 231 @Override 232 public String explainFailure() throws Exception { 233 return "Timed out waiting for server aborts."; 234 } 235 }); 236 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 237 int rows; 238 try { 239 rows = TEST_UTIL.countRows(table); 240 } catch (Exception e) { 241 Threads.printThreadInfo(System.out, "Thread dump before fail"); 242 throw e; 243 } 244 assertEquals(numRegionsToCreate * numRowsPerRegion, rows); 245 } 246 } 247 248 private Table installTable(int nrs) throws Exception { 249 return installTable(nrs, 0); 250 } 251 252 private Table installTable(int nrs, int existingRegions) throws Exception { 253 // Create a table with regions 254 byte[] family = Bytes.toBytes("family"); 255 LOG.info("Creating table with " + nrs + " regions"); 256 Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); 257 int numRegions = -1; 258 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 259 numRegions = r.getStartKeys().length; 260 } 261 assertEquals(nrs, numRegions); 262 LOG.info("Waiting for no more RIT\n"); 263 blockUntilNoRIT(); 264 // disable-enable cycle to get rid of table's dead regions left behind 265 // by createMultiRegions 266 assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName)); 267 LOG.debug("Disabling table\n"); 268 TEST_UTIL.getAdmin().disableTable(tableName); 269 LOG.debug("Waiting for no more RIT\n"); 270 blockUntilNoRIT(); 271 NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster); 272 LOG.debug("Verifying only catalog and namespace regions are assigned\n"); 273 if (regions.size() != 2) { 274 for (String oregion : regions) { 275 LOG.debug("Region still online: " + oregion); 276 } 277 } 278 assertEquals(2 + existingRegions, regions.size()); 279 LOG.debug("Enabling table\n"); 280 TEST_UTIL.getAdmin().enableTable(tableName); 281 LOG.debug("Waiting for no more RIT\n"); 282 blockUntilNoRIT(); 283 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); 284 regions = HBaseTestingUtility.getAllOnlineRegions(cluster); 285 assertEquals(numRegions + 2 + existingRegions, regions.size()); 286 return table; 287 } 288 289 void populateDataInTable(int nrows) throws Exception { 290 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 291 assertEquals(NUM_RS, rsts.size()); 292 293 for (RegionServerThread rst : rsts) { 294 HRegionServer hrs = rst.getRegionServer(); 295 List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 296 for (RegionInfo hri : hris) { 297 if (hri.getTable().isSystemTable()) { 298 continue; 299 } 300 LOG.debug( 301 "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString()); 302 Region region = hrs.getOnlineRegion(hri.getRegionName()); 303 assertTrue(region != null); 304 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 305 } 306 } 307 308 for (MasterThread mt : cluster.getLiveMasterThreads()) { 309 HRegionServer hrs = mt.getMaster(); 310 List<RegionInfo> hris; 311 try { 312 hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 313 } catch (ServerNotRunningYetException e) { 314 // It's ok: this master may be a backup. Ignored. 315 continue; 316 } 317 for (RegionInfo hri : hris) { 318 if (hri.getTable().isSystemTable()) { 319 continue; 320 } 321 LOG.debug( 322 "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString()); 323 Region region = hrs.getOnlineRegion(hri.getRegionName()); 324 assertTrue(region != null); 325 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 326 } 327 } 328 } 329 330 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size) 331 throws IOException { 332 makeWAL(hrs, regions, num_edits, edit_size, true); 333 } 334 335 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize, 336 boolean cleanShutdown) throws IOException { 337 // remove root and meta region 338 regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); 339 340 for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) { 341 RegionInfo regionInfo = iter.next(); 342 if (regionInfo.getTable().isSystemTable()) { 343 iter.remove(); 344 } 345 } 346 byte[] value = new byte[editSize]; 347 348 List<RegionInfo> hris = new ArrayList<>(); 349 for (RegionInfo region : regions) { 350 if (region.getTable() != tableName) { 351 continue; 352 } 353 hris.add(region); 354 } 355 LOG.info("Creating wal edits across " + hris.size() + " regions."); 356 for (int i = 0; i < editSize; i++) { 357 value[i] = (byte) ('a' + (i % 26)); 358 } 359 int n = hris.size(); 360 int[] counts = new int[n]; 361 // sync every ~30k to line up with desired wal rolls 362 final int syncEvery = 30 * 1024 / editSize; 363 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 364 if (n > 0) { 365 for (int i = 0; i < numEdits; i += 1) { 366 WALEdit e = new WALEdit(); 367 RegionInfo curRegionInfo = hris.get(i % n); 368 WAL log = hrs.getWAL(curRegionInfo); 369 byte[] startRow = curRegionInfo.getStartKey(); 370 if (startRow == null || startRow.length == 0) { 371 startRow = new byte[] { 0, 0, 0, 0, 1 }; 372 } 373 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); 374 row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because 375 // HBaseTestingUtility.createMultiRegions use 5 bytes key 376 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); 377 e.add( 378 new KeyValue(row, COLUMN_FAMILY, qualifier, EnvironmentEdgeManager.currentTime(), value)); 379 log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), 380 tableName, EnvironmentEdgeManager.currentTime(), mvcc), e); 381 if (0 == i % syncEvery) { 382 log.sync(); 383 } 384 counts[i % n] += 1; 385 } 386 } 387 // done as two passes because the regions might share logs. shutdown is idempotent, but sync 388 // will cause errors if done after. 389 for (RegionInfo info : hris) { 390 WAL log = hrs.getWAL(info); 391 log.sync(); 392 } 393 if (cleanShutdown) { 394 for (RegionInfo info : hris) { 395 WAL log = hrs.getWAL(info); 396 log.shutdown(); 397 } 398 } 399 for (int i = 0; i < n; i++) { 400 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); 401 } 402 return; 403 } 404 405 private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException { 406 int count = 0; 407 try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) { 408 WAL.Entry e; 409 while ((e = in.next()) != null) { 410 if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { 411 count++; 412 } 413 } 414 } 415 return count; 416 } 417 418 private void blockUntilNoRIT() throws Exception { 419 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 420 } 421 422 private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families) 423 throws IOException { 424 for (int i = 0; i < numRows; i++) { 425 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); 426 for (byte[] family : families) { 427 put.addColumn(family, qf, null); 428 } 429 region.put(put); 430 } 431 } 432 433 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 434 throws InterruptedException { 435 long curt = EnvironmentEdgeManager.currentTime(); 436 long endt = curt + timems; 437 while (curt < endt) { 438 if (ctr.sum() == oldval) { 439 Thread.sleep(100); 440 curt = EnvironmentEdgeManager.currentTime(); 441 } else { 442 assertEquals(newval, ctr.sum()); 443 return; 444 } 445 } 446 fail(); 447 } 448 449 private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { 450 for (MasterThread mt : cluster.getLiveMasterThreads()) { 451 if (mt.getMaster().isActiveMaster()) { 452 mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); 453 mt.join(); 454 break; 455 } 456 } 457 LOG.debug("Master is aborted"); 458 } 459 460 /** 461 * Find a RS that has regions of a table. 462 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well 463 */ 464 private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception { 465 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 466 List<RegionInfo> regions = null; 467 HRegionServer hrs = null; 468 469 for (RegionServerThread rst : rsts) { 470 hrs = rst.getRegionServer(); 471 while (rst.isAlive() && !hrs.isOnline()) { 472 Thread.sleep(100); 473 } 474 if (!rst.isAlive()) { 475 continue; 476 } 477 boolean isCarryingMeta = false; 478 boolean foundTableRegion = false; 479 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 480 for (RegionInfo region : regions) { 481 if (region.isMetaRegion()) { 482 isCarryingMeta = true; 483 } 484 if (region.getTable() == tableName) { 485 foundTableRegion = true; 486 } 487 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { 488 break; 489 } 490 } 491 if (isCarryingMeta && hasMetaRegion) { 492 // clients ask for a RS with META 493 if (!foundTableRegion) { 494 HRegionServer destRS = hrs; 495 // the RS doesn't have regions of the specified table so we need move one to this RS 496 List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); 497 RegionInfo hri = tableRegions.get(0); 498 TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName()); 499 // wait for region move completes 500 RegionStates regionStates = 501 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 502 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { 503 @Override 504 public boolean evaluate() throws Exception { 505 ServerName sn = regionStates.getRegionServerOfRegion(hri); 506 return (sn != null && sn.equals(destRS.getServerName())); 507 } 508 }); 509 } 510 return hrs; 511 } else if (hasMetaRegion || isCarryingMeta) { 512 continue; 513 } 514 if (foundTableRegion) { 515 break; 516 } 517 } 518 519 return hrs; 520 } 521}