001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Iterator; 030import java.util.List; 031import java.util.NavigableSet; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.Future; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.atomic.LongAdder; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataOutputStream; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 047import org.apache.hadoop.hbase.SplitLogCounters; 048import org.apache.hadoop.hbase.StartTestingClusterOption; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.Waiter; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.RegionInfoBuilder; 054import org.apache.hadoop.hbase.client.RegionLocator; 055import org.apache.hadoop.hbase.client.Table; 056import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; 057import org.apache.hadoop.hbase.master.assignment.RegionStates; 058import org.apache.hadoop.hbase.regionserver.HRegionServer; 059import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 060import org.apache.hadoop.hbase.regionserver.Region; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.CommonFSUtils; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 065import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 066import org.apache.hadoop.hbase.util.Threads; 067import org.apache.hadoop.hbase.wal.WAL; 068import org.apache.hadoop.hbase.wal.WALEdit; 069import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 070import org.apache.hadoop.hbase.wal.WALKeyImpl; 071import org.apache.hadoop.hbase.zookeeper.ZKUtil; 072import org.junit.jupiter.api.AfterAll; 073import org.junit.jupiter.api.AfterEach; 074import org.junit.jupiter.api.BeforeAll; 075import org.junit.jupiter.api.BeforeEach; 076import org.junit.jupiter.api.Test; 077import org.junit.jupiter.api.TestInfo; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 082 083/** 084 * Base class for testing distributed log splitting. 085 */ 086public abstract class AbstractTestDLS { 087 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestDLS.class); 088 089 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 090 091 // Start a cluster with 2 masters and 5 regionservers 092 private static final int NUM_MASTERS = 2; 093 private static final int NUM_RS = 5; 094 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); 095 096 private TableName tableName; 097 private SingleProcessHBaseCluster cluster; 098 private HMaster master; 099 private Configuration conf; 100 101 @BeforeAll 102 public static void setup() throws Exception { 103 // Uncomment the following line if more verbosity is needed for 104 // debugging (see HBASE-12285 for details). 105 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); 106 TEST_UTIL.startMiniZKCluster(); 107 TEST_UTIL.startMiniDFSCluster(3); 108 } 109 110 @AfterAll 111 public static void tearDown() throws Exception { 112 TEST_UTIL.shutdownMiniCluster(); 113 } 114 115 protected abstract String getWalProvider(); 116 117 private void startCluster(int numRS) throws Exception { 118 SplitLogCounters.resetCounters(); 119 LOG.info("Starting cluster"); 120 conf.setLong("hbase.splitlog.max.resubmit", 0); 121 // Make the failure test faster 122 conf.setInt("zookeeper.recovery.retry", 0); 123 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); 124 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing 125 conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3); 126 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 127 conf.set("hbase.wal.provider", getWalProvider()); 128 StartTestingClusterOption option = 129 StartTestingClusterOption.builder().numMasters(NUM_MASTERS).numRegionServers(numRS).build(); 130 TEST_UTIL.startMiniHBaseCluster(option); 131 cluster = TEST_UTIL.getHBaseCluster(); 132 LOG.info("Waiting for active/ready master"); 133 cluster.waitForActiveAndReadyMaster(); 134 master = cluster.getMaster(); 135 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 136 @Override 137 public boolean evaluate() throws Exception { 138 return cluster.getLiveRegionServerThreads().size() >= numRS; 139 } 140 }); 141 } 142 143 @BeforeEach 144 public void before(TestInfo testInfo) throws Exception { 145 conf = TEST_UTIL.getConfiguration(); 146 tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 147 } 148 149 @AfterEach 150 public void after() throws Exception { 151 TEST_UTIL.shutdownMiniHBaseCluster(); 152 TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()), 153 true); 154 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); 155 } 156 157 @Test 158 public void testMasterStartsUpWithLogSplittingWork() throws Exception { 159 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); 160 startCluster(NUM_RS); 161 162 int numRegionsToCreate = 40; 163 int numLogLines = 1000; 164 // turn off load balancing to prevent regions from moving around otherwise 165 // they will consume recovered.edits 166 master.balanceSwitch(false); 167 168 try (Table ht = installTable(numRegionsToCreate)) { 169 HRegionServer hrs = findRSToKill(false); 170 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 171 makeWAL(hrs, regions, numLogLines, 100); 172 173 // abort master 174 abortMaster(cluster); 175 176 // abort RS 177 LOG.info("Aborting region server: " + hrs.getServerName()); 178 hrs.abort("testing"); 179 180 // wait for abort completes 181 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 182 @Override 183 public boolean evaluate() throws Exception { 184 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1; 185 } 186 }); 187 188 Thread.sleep(2000); 189 LOG.info("Current Open Regions:" + HBaseTestingUtil.getAllOnlineRegions(cluster).size()); 190 191 // wait for abort completes 192 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 193 @Override 194 public boolean evaluate() throws Exception { 195 return (HBaseTestingUtil.getAllOnlineRegions(cluster).size() >= (numRegionsToCreate + 1)); 196 } 197 }); 198 199 LOG.info("Current Open Regions After Master Node Starts Up:" 200 + HBaseTestingUtil.getAllOnlineRegions(cluster).size()); 201 202 assertEquals(numLogLines, TEST_UTIL.countRows(ht)); 203 } 204 } 205 206 @Test 207 public void testThreeRSAbort() throws Exception { 208 LOG.info("testThreeRSAbort"); 209 int numRegionsToCreate = 40; 210 int numRowsPerRegion = 100; 211 212 startCluster(NUM_RS); // NUM_RS=6. 213 214 try (Table table = installTable(numRegionsToCreate)) { 215 populateDataInTable(numRowsPerRegion); 216 217 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 218 assertEquals(NUM_RS, rsts.size()); 219 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName()); 220 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); 221 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); 222 223 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() { 224 225 @Override 226 public boolean evaluate() throws Exception { 227 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; 228 } 229 230 @Override 231 public String explainFailure() throws Exception { 232 return "Timed out waiting for server aborts."; 233 } 234 }); 235 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 236 int rows; 237 try { 238 rows = TEST_UTIL.countRows(table); 239 } catch (Exception e) { 240 Threads.printThreadInfo(System.out, "Thread dump before fail"); 241 throw e; 242 } 243 assertEquals(numRegionsToCreate * numRowsPerRegion, rows); 244 } 245 } 246 247 @Test 248 public void testDelayedDeleteOnFailure() throws Exception { 249 if ( 250 !this.conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 251 HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 252 ) { 253 // This test depends on zk coordination.... 254 return; 255 } 256 LOG.info("testDelayedDeleteOnFailure"); 257 startCluster(1); 258 final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 259 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 260 final Path rootLogDir = 261 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME); 262 final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString()); 263 fs.mkdirs(logDir); 264 ExecutorService executor = null; 265 try { 266 final Path corruptedLogFile = new Path(logDir, "x"); 267 FSDataOutputStream out; 268 out = fs.create(corruptedLogFile); 269 out.write(0); 270 out.write(Bytes.toBytes("corrupted bytes")); 271 out.close(); 272 ZKSplitLogManagerCoordination coordination = 273 (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager()) 274 .getSplitLogManagerCoordination(); 275 coordination.setIgnoreDeleteForTesting(true); 276 executor = Executors.newSingleThreadExecutor(); 277 Runnable runnable = new Runnable() { 278 @Override 279 public void run() { 280 try { 281 // since the logDir is a fake, corrupted one, so the split log worker 282 // will finish it quickly with error, and this call will fail and throw 283 // an IOException. 284 slm.splitLogDistributed(logDir); 285 } catch (IOException ioe) { 286 try { 287 assertTrue(fs.exists(corruptedLogFile)); 288 // this call will block waiting for the task to be removed from the 289 // tasks map which is not going to happen since ignoreZKDeleteForTesting 290 // is set to true, until it is interrupted. 291 slm.splitLogDistributed(logDir); 292 } catch (IOException e) { 293 assertTrue(Thread.currentThread().isInterrupted()); 294 return; 295 } 296 fail("did not get the expected IOException from the 2nd call"); 297 } 298 fail("did not get the expected IOException from the 1st call"); 299 } 300 }; 301 Future<?> result = executor.submit(runnable); 302 try { 303 result.get(2000, TimeUnit.MILLISECONDS); 304 } catch (TimeoutException te) { 305 // it is ok, expected. 306 } 307 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); 308 executor.shutdownNow(); 309 executor = null; 310 311 // make sure the runnable is finished with no exception thrown. 312 result.get(); 313 } finally { 314 if (executor != null) { 315 // interrupt the thread in case the test fails in the middle. 316 // it has no effect if the thread is already terminated. 317 executor.shutdownNow(); 318 } 319 fs.delete(logDir, true); 320 } 321 } 322 323 private Table installTable(int nrs) throws Exception { 324 return installTable(nrs, 0); 325 } 326 327 private Table installTable(int nrs, int existingRegions) throws Exception { 328 // Create a table with regions 329 byte[] family = Bytes.toBytes("family"); 330 LOG.info("Creating table with " + nrs + " regions"); 331 Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); 332 int numRegions = -1; 333 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 334 numRegions = r.getStartKeys().length; 335 } 336 assertEquals(nrs, numRegions); 337 LOG.info("Waiting for no more RIT\n"); 338 blockUntilNoRIT(); 339 // disable-enable cycle to get rid of table's dead regions left behind 340 // by createMultiRegions 341 assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName)); 342 LOG.debug("Disabling table\n"); 343 TEST_UTIL.getAdmin().disableTable(tableName); 344 LOG.debug("Waiting for no more RIT\n"); 345 blockUntilNoRIT(); 346 NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster); 347 LOG.debug("Verifying only catalog region is assigned\n"); 348 if (regions.size() != 1) { 349 for (String oregion : regions) { 350 LOG.debug("Region still online: " + oregion); 351 } 352 } 353 assertEquals(1 + existingRegions, regions.size()); 354 LOG.debug("Enabling table\n"); 355 TEST_UTIL.getAdmin().enableTable(tableName); 356 LOG.debug("Waiting for no more RIT\n"); 357 blockUntilNoRIT(); 358 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); 359 regions = HBaseTestingUtil.getAllOnlineRegions(cluster); 360 assertEquals(numRegions + 1 + existingRegions, regions.size()); 361 return table; 362 } 363 364 void populateDataInTable(int nrows) throws Exception { 365 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 366 assertEquals(NUM_RS, rsts.size()); 367 368 for (RegionServerThread rst : rsts) { 369 HRegionServer hrs = rst.getRegionServer(); 370 List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 371 for (RegionInfo hri : hris) { 372 if (hri.getTable().isSystemTable()) { 373 continue; 374 } 375 LOG.debug( 376 "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString()); 377 Region region = hrs.getOnlineRegion(hri.getRegionName()); 378 assertTrue(region != null); 379 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 380 } 381 } 382 } 383 384 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size) 385 throws IOException { 386 makeWAL(hrs, regions, num_edits, edit_size, true); 387 } 388 389 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize, 390 boolean cleanShutdown) throws IOException { 391 // remove root and meta region 392 regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); 393 394 for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) { 395 RegionInfo regionInfo = iter.next(); 396 if (regionInfo.getTable().isSystemTable()) { 397 iter.remove(); 398 } 399 } 400 byte[] value = new byte[editSize]; 401 402 List<RegionInfo> hris = new ArrayList<>(); 403 for (RegionInfo region : regions) { 404 if (region.getTable() != tableName) { 405 continue; 406 } 407 hris.add(region); 408 } 409 LOG.info("Creating wal edits across " + hris.size() + " regions."); 410 for (int i = 0; i < editSize; i++) { 411 value[i] = (byte) ('a' + (i % 26)); 412 } 413 int n = hris.size(); 414 int[] counts = new int[n]; 415 // sync every ~30k to line up with desired wal rolls 416 final int syncEvery = 30 * 1024 / editSize; 417 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 418 if (n > 0) { 419 for (int i = 0; i < numEdits; i += 1) { 420 WALEdit e = new WALEdit(); 421 RegionInfo curRegionInfo = hris.get(i % n); 422 WAL log = hrs.getWAL(curRegionInfo); 423 byte[] startRow = curRegionInfo.getStartKey(); 424 if (startRow == null || startRow.length == 0) { 425 startRow = new byte[] { 0, 0, 0, 0, 1 }; 426 } 427 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); 428 row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because 429 // HBaseTestingUtility.createMultiRegions use 5 bytes key 430 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); 431 WALEditInternalHelper.addExtendedCell(e, 432 new KeyValue(row, COLUMN_FAMILY, qualifier, EnvironmentEdgeManager.currentTime(), value)); 433 log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), 434 tableName, EnvironmentEdgeManager.currentTime(), mvcc), e); 435 if (0 == i % syncEvery) { 436 log.sync(); 437 } 438 counts[i % n] += 1; 439 } 440 } 441 // done as two passes because the regions might share logs. shutdown is idempotent, but sync 442 // will cause errors if done after. 443 for (RegionInfo info : hris) { 444 WAL log = hrs.getWAL(info); 445 log.sync(); 446 } 447 if (cleanShutdown) { 448 for (RegionInfo info : hris) { 449 WAL log = hrs.getWAL(info); 450 log.shutdown(); 451 } 452 } 453 for (int i = 0; i < n; i++) { 454 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); 455 } 456 return; 457 } 458 459 private void blockUntilNoRIT() throws Exception { 460 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 461 } 462 463 private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families) 464 throws IOException { 465 for (int i = 0; i < numRows; i++) { 466 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); 467 for (byte[] family : families) { 468 put.addColumn(family, qf, null); 469 } 470 region.put(put); 471 } 472 } 473 474 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 475 throws InterruptedException { 476 long curt = EnvironmentEdgeManager.currentTime(); 477 long endt = curt + timems; 478 while (curt < endt) { 479 if (ctr.sum() == oldval) { 480 Thread.sleep(100); 481 curt = EnvironmentEdgeManager.currentTime(); 482 } else { 483 assertEquals(newval, ctr.sum()); 484 return; 485 } 486 } 487 fail(); 488 } 489 490 private void abortMaster(SingleProcessHBaseCluster cluster) throws InterruptedException { 491 for (MasterThread mt : cluster.getLiveMasterThreads()) { 492 if (mt.getMaster().isActiveMaster()) { 493 mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); 494 mt.join(); 495 break; 496 } 497 } 498 LOG.debug("Master is aborted"); 499 } 500 501 /** 502 * Find a RS that has regions of a table. 503 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well 504 */ 505 private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception { 506 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 507 List<RegionInfo> regions = null; 508 HRegionServer hrs = null; 509 510 for (RegionServerThread rst : rsts) { 511 hrs = rst.getRegionServer(); 512 while (rst.isAlive() && !hrs.isOnline()) { 513 Thread.sleep(100); 514 } 515 if (!rst.isAlive()) { 516 continue; 517 } 518 boolean isCarryingMeta = false; 519 boolean foundTableRegion = false; 520 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 521 for (RegionInfo region : regions) { 522 if (region.isMetaRegion()) { 523 isCarryingMeta = true; 524 } 525 if (region.getTable() == tableName) { 526 foundTableRegion = true; 527 } 528 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { 529 break; 530 } 531 } 532 if (isCarryingMeta && hasMetaRegion) { 533 // clients ask for a RS with META 534 if (!foundTableRegion) { 535 HRegionServer destRS = hrs; 536 // the RS doesn't have regions of the specified table so we need move one to this RS 537 List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); 538 RegionInfo hri = tableRegions.get(0); 539 TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName()); 540 // wait for region move completes 541 RegionStates regionStates = 542 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 543 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { 544 @Override 545 public boolean evaluate() throws Exception { 546 ServerName sn = regionStates.getRegionServerOfRegion(hri); 547 return (sn != null && sn.equals(destRS.getServerName())); 548 } 549 }); 550 } 551 return hrs; 552 } else if (hasMetaRegion || isCarryingMeta) { 553 continue; 554 } 555 if (foundTableRegion) { 556 break; 557 } 558 } 559 560 return hrs; 561 } 562}