001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master; 020 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; 023import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed; 024import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task; 025import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired; 026import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done; 027import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err; 028import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned; 029import static org.junit.Assert.assertEquals; 030import static org.junit.Assert.assertFalse; 031import static org.junit.Assert.assertTrue; 032import static org.junit.Assert.fail; 033 034import java.io.IOException; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Iterator; 038import java.util.List; 039import java.util.NavigableSet; 040import java.util.concurrent.ExecutorService; 041import java.util.concurrent.Executors; 042import java.util.concurrent.Future; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.TimeoutException; 045import java.util.concurrent.atomic.LongAdder; 046import java.util.stream.Collectors; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataOutputStream; 049import org.apache.hadoop.fs.FileStatus; 050import org.apache.hadoop.fs.FileSystem; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.fs.PathFilter; 053import org.apache.hadoop.hbase.HBaseTestingUtility; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.KeyValue; 056import org.apache.hadoop.hbase.MiniHBaseCluster; 057import org.apache.hadoop.hbase.NamespaceDescriptor; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.SplitLogCounters; 060import org.apache.hadoop.hbase.StartMiniClusterOption; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.Waiter; 063import org.apache.hadoop.hbase.client.Put; 064import org.apache.hadoop.hbase.client.RegionInfo; 065import org.apache.hadoop.hbase.client.RegionInfoBuilder; 066import org.apache.hadoop.hbase.client.RegionLocator; 067import org.apache.hadoop.hbase.client.Table; 068import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; 069import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 070import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; 071import org.apache.hadoop.hbase.master.assignment.RegionStates; 072import org.apache.hadoop.hbase.regionserver.HRegionServer; 073import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 074import org.apache.hadoop.hbase.regionserver.Region; 075import org.apache.hadoop.hbase.util.Bytes; 076import org.apache.hadoop.hbase.util.CommonFSUtils; 077import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 078import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 079import org.apache.hadoop.hbase.util.Threads; 080import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 081import org.apache.hadoop.hbase.wal.WAL; 082import org.apache.hadoop.hbase.wal.WALEdit; 083import org.apache.hadoop.hbase.wal.WALFactory; 084import org.apache.hadoop.hbase.wal.WALKeyImpl; 085import org.apache.hadoop.hbase.wal.WALSplitUtil; 086import org.apache.hadoop.hbase.zookeeper.ZKUtil; 087import org.junit.After; 088import org.junit.AfterClass; 089import org.junit.Before; 090import org.junit.BeforeClass; 091import org.junit.Rule; 092import org.junit.Test; 093import org.junit.rules.TestName; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 098 099/** 100 * Base class for testing distributed log splitting. 101 */ 102public abstract class AbstractTestDLS { 103 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 104 105 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 106 107 // Start a cluster with 2 masters and 5 regionservers 108 private static final int NUM_MASTERS = 2; 109 private static final int NUM_RS = 5; 110 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); 111 112 @Rule 113 public TestName testName = new TestName(); 114 115 private TableName tableName; 116 private MiniHBaseCluster cluster; 117 private HMaster master; 118 private Configuration conf; 119 120 @Rule 121 public TestName name = new TestName(); 122 123 @BeforeClass 124 public static void setup() throws Exception { 125 // Uncomment the following line if more verbosity is needed for 126 // debugging (see HBASE-12285 for details). 127 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); 128 TEST_UTIL.startMiniZKCluster(); 129 TEST_UTIL.startMiniDFSCluster(3); 130 } 131 132 @AfterClass 133 public static void tearDown() throws Exception { 134 TEST_UTIL.shutdownMiniCluster(); 135 } 136 137 protected abstract String getWalProvider(); 138 139 private void startCluster(int numRS) throws Exception { 140 SplitLogCounters.resetCounters(); 141 LOG.info("Starting cluster"); 142 conf.setLong("hbase.splitlog.max.resubmit", 0); 143 // Make the failure test faster 144 conf.setInt("zookeeper.recovery.retry", 0); 145 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); 146 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing 147 conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3); 148 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 149 conf.set("hbase.wal.provider", getWalProvider()); 150 StartMiniClusterOption option = StartMiniClusterOption.builder() 151 .numMasters(NUM_MASTERS).numRegionServers(numRS).build(); 152 TEST_UTIL.startMiniHBaseCluster(option); 153 cluster = TEST_UTIL.getHBaseCluster(); 154 LOG.info("Waiting for active/ready master"); 155 cluster.waitForActiveAndReadyMaster(); 156 master = cluster.getMaster(); 157 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 158 @Override 159 public boolean evaluate() throws Exception { 160 return cluster.getLiveRegionServerThreads().size() >= numRS; 161 } 162 }); 163 } 164 165 @Before 166 public void before() throws Exception { 167 conf = TEST_UTIL.getConfiguration(); 168 tableName = TableName.valueOf(testName.getMethodName()); 169 } 170 171 @After 172 public void after() throws Exception { 173 TEST_UTIL.shutdownMiniHBaseCluster(); 174 TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()), 175 true); 176 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); 177 } 178 179 @Test 180 public void testRecoveredEdits() throws Exception { 181 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal 182 startCluster(NUM_RS); 183 184 int numLogLines = 10000; 185 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 186 // turn off load balancing to prevent regions from moving around otherwise 187 // they will consume recovered.edits 188 master.balanceSwitch(false); 189 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 190 191 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 192 193 Path rootdir = CommonFSUtils.getRootDir(conf); 194 195 int numRegions = 50; 196 try (Table t = installTable(numRegions)) { 197 List<RegionInfo> regions = null; 198 HRegionServer hrs = null; 199 for (int i = 0; i < NUM_RS; i++) { 200 hrs = rsts.get(i).getRegionServer(); 201 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 202 // At least one RS will have >= to average number of regions. 203 if (regions.size() >= numRegions / NUM_RS) { 204 break; 205 } 206 } 207 Path logDir = new Path(rootdir, 208 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); 209 210 LOG.info("#regions = " + regions.size()); 211 Iterator<RegionInfo> it = regions.iterator(); 212 while (it.hasNext()) { 213 RegionInfo region = it.next(); 214 if (region.getTable().getNamespaceAsString() 215 .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) { 216 it.remove(); 217 } 218 } 219 220 makeWAL(hrs, regions, numLogLines, 100); 221 222 slm.splitLogDistributed(logDir); 223 224 int count = 0; 225 for (RegionInfo hri : regions) { 226 @SuppressWarnings("deprecation") 227 Path editsdir = WALSplitUtil 228 .getRegionDirRecoveredEditsDir(CommonFSUtils.getWALRegionDir(conf, 229 tableName, hri.getEncodedName())); 230 LOG.debug("Checking edits dir " + editsdir); 231 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 232 @Override 233 public boolean accept(Path p) { 234 if (WALSplitUtil.isSequenceIdFile(p)) { 235 return false; 236 } 237 return true; 238 } 239 }); 240 LOG.info("Files {}", Arrays.stream(files).map(f -> f.getPath().toString()). 241 collect(Collectors.joining(","))); 242 assertTrue("Edits dir should have more than a one file", files.length > 1); 243 for (int i = 0; i < files.length; i++) { 244 int c = countWAL(files[i].getPath(), fs, conf); 245 count += c; 246 } 247 LOG.info(count + " edits in " + files.length + " recovered edits files."); 248 } 249 250 // check that the log file is moved 251 assertFalse(fs.exists(logDir)); 252 assertEquals(numLogLines, count); 253 } 254 } 255 256 @Test 257 public void testMasterStartsUpWithLogSplittingWork() throws Exception { 258 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); 259 startCluster(NUM_RS); 260 261 int numRegionsToCreate = 40; 262 int numLogLines = 1000; 263 // turn off load balancing to prevent regions from moving around otherwise 264 // they will consume recovered.edits 265 master.balanceSwitch(false); 266 267 try (Table ht = installTable(numRegionsToCreate)) { 268 HRegionServer hrs = findRSToKill(false); 269 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 270 makeWAL(hrs, regions, numLogLines, 100); 271 272 // abort master 273 abortMaster(cluster); 274 275 // abort RS 276 LOG.info("Aborting region server: " + hrs.getServerName()); 277 hrs.abort("testing"); 278 279 // wait for abort completes 280 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 281 @Override 282 public boolean evaluate() throws Exception { 283 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1; 284 } 285 }); 286 287 Thread.sleep(2000); 288 LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); 289 290 // wait for abort completes 291 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 292 @Override 293 public boolean evaluate() throws Exception { 294 return (HBaseTestingUtility.getAllOnlineRegions(cluster) 295 .size() >= (numRegionsToCreate + 1)); 296 } 297 }); 298 299 LOG.info("Current Open Regions After Master Node Starts Up:" + 300 HBaseTestingUtility.getAllOnlineRegions(cluster).size()); 301 302 assertEquals(numLogLines, TEST_UTIL.countRows(ht)); 303 } 304 } 305 306 /** 307 * The original intention of this test was to force an abort of a region server and to make sure 308 * that the failure path in the region servers is properly evaluated. But it is difficult to 309 * ensure that the region server doesn't finish the log splitting before it aborts. Also now, 310 * there is this code path where the master will preempt the region server when master detects 311 * that the region server has aborted. 312 * @throws Exception 313 */ 314 // Was marked flaky before Distributed Log Replay cleanup. 315 @Test 316 public void testWorkerAbort() throws Exception { 317 LOG.info("testWorkerAbort"); 318 startCluster(3); 319 int numLogLines = 10000; 320 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 321 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 322 323 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 324 HRegionServer hrs = findRSToKill(false); 325 Path rootdir = CommonFSUtils.getRootDir(conf); 326 final Path logDir = new Path(rootdir, 327 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); 328 329 try (Table t = installTable(40)) { 330 makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100); 331 332 new Thread() { 333 @Override 334 public void run() { 335 try { 336 waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); 337 } catch (InterruptedException e) { 338 } 339 for (RegionServerThread rst : rsts) { 340 rst.getRegionServer().abort("testing"); 341 break; 342 } 343 } 344 }.start(); 345 FileStatus[] logfiles = fs.listStatus(logDir); 346 TaskBatch batch = new TaskBatch(); 347 slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); 348 // waitForCounter but for one of the 2 counters 349 long curt = System.currentTimeMillis(); 350 long waitTime = 80000; 351 long endt = curt + waitTime; 352 while (curt < endt) { 353 if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + 354 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + 355 tot_wkr_preempt_task.sum()) == 0) { 356 Thread.sleep(100); 357 curt = System.currentTimeMillis(); 358 } else { 359 assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + 360 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + 361 tot_wkr_preempt_task.sum())); 362 return; 363 } 364 } 365 fail("none of the following counters went up in " + waitTime + " milliseconds - " + 366 "tot_wkr_task_resigned, tot_wkr_task_err, " + 367 "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task"); 368 } 369 } 370 371 @Test 372 public void testThreeRSAbort() throws Exception { 373 LOG.info("testThreeRSAbort"); 374 int numRegionsToCreate = 40; 375 int numRowsPerRegion = 100; 376 377 startCluster(NUM_RS); // NUM_RS=6. 378 379 try (Table table = installTable(numRegionsToCreate)) { 380 populateDataInTable(numRowsPerRegion); 381 382 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 383 assertEquals(NUM_RS, rsts.size()); 384 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName()); 385 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); 386 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); 387 388 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() { 389 390 @Override 391 public boolean evaluate() throws Exception { 392 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; 393 } 394 395 @Override 396 public String explainFailure() throws Exception { 397 return "Timed out waiting for server aborts."; 398 } 399 }); 400 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 401 int rows; 402 try { 403 rows = TEST_UTIL.countRows(table); 404 } catch (Exception e) { 405 Threads.printThreadInfo(System.out, "Thread dump before fail"); 406 throw e; 407 } 408 assertEquals(numRegionsToCreate * numRowsPerRegion, rows); 409 } 410 } 411 412 @Test 413 public void testDelayedDeleteOnFailure() throws Exception { 414 LOG.info("testDelayedDeleteOnFailure"); 415 startCluster(1); 416 final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 417 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 418 final Path logDir = 419 new Path(new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME), 420 ServerName.valueOf("x", 1, 1).toString()); 421 fs.mkdirs(logDir); 422 ExecutorService executor = null; 423 try { 424 final Path corruptedLogFile = new Path(logDir, "x"); 425 FSDataOutputStream out; 426 out = fs.create(corruptedLogFile); 427 out.write(0); 428 out.write(Bytes.toBytes("corrupted bytes")); 429 out.close(); 430 ZKSplitLogManagerCoordination coordination = 431 (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager()) 432 .getSplitLogManagerCoordination(); 433 coordination.setIgnoreDeleteForTesting(true); 434 executor = Executors.newSingleThreadExecutor(); 435 Runnable runnable = new Runnable() { 436 @Override 437 public void run() { 438 try { 439 // since the logDir is a fake, corrupted one, so the split log worker 440 // will finish it quickly with error, and this call will fail and throw 441 // an IOException. 442 slm.splitLogDistributed(logDir); 443 } catch (IOException ioe) { 444 try { 445 assertTrue(fs.exists(corruptedLogFile)); 446 // this call will block waiting for the task to be removed from the 447 // tasks map which is not going to happen since ignoreZKDeleteForTesting 448 // is set to true, until it is interrupted. 449 slm.splitLogDistributed(logDir); 450 } catch (IOException e) { 451 assertTrue(Thread.currentThread().isInterrupted()); 452 return; 453 } 454 fail("did not get the expected IOException from the 2nd call"); 455 } 456 fail("did not get the expected IOException from the 1st call"); 457 } 458 }; 459 Future<?> result = executor.submit(runnable); 460 try { 461 result.get(2000, TimeUnit.MILLISECONDS); 462 } catch (TimeoutException te) { 463 // it is ok, expected. 464 } 465 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); 466 executor.shutdownNow(); 467 executor = null; 468 469 // make sure the runnable is finished with no exception thrown. 470 result.get(); 471 } finally { 472 if (executor != null) { 473 // interrupt the thread in case the test fails in the middle. 474 // it has no effect if the thread is already terminated. 475 executor.shutdownNow(); 476 } 477 fs.delete(logDir, true); 478 } 479 } 480 481 private Table installTable(int nrs) throws Exception { 482 return installTable(nrs, 0); 483 } 484 485 private Table installTable(int nrs, int existingRegions) throws Exception { 486 // Create a table with regions 487 byte[] family = Bytes.toBytes("family"); 488 LOG.info("Creating table with " + nrs + " regions"); 489 Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); 490 int numRegions = -1; 491 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 492 numRegions = r.getStartKeys().length; 493 } 494 assertEquals(nrs, numRegions); 495 LOG.info("Waiting for no more RIT\n"); 496 blockUntilNoRIT(); 497 // disable-enable cycle to get rid of table's dead regions left behind 498 // by createMultiRegions 499 assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName)); 500 LOG.debug("Disabling table\n"); 501 TEST_UTIL.getAdmin().disableTable(tableName); 502 LOG.debug("Waiting for no more RIT\n"); 503 blockUntilNoRIT(); 504 NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster); 505 LOG.debug("Verifying only catalog and namespace regions are assigned\n"); 506 if (regions.size() != 2) { 507 for (String oregion : regions) 508 LOG.debug("Region still online: " + oregion); 509 } 510 assertEquals(2 + existingRegions, regions.size()); 511 LOG.debug("Enabling table\n"); 512 TEST_UTIL.getAdmin().enableTable(tableName); 513 LOG.debug("Waiting for no more RIT\n"); 514 blockUntilNoRIT(); 515 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); 516 regions = HBaseTestingUtility.getAllOnlineRegions(cluster); 517 assertEquals(numRegions + 2 + existingRegions, regions.size()); 518 return table; 519 } 520 521 void populateDataInTable(int nrows) throws Exception { 522 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 523 assertEquals(NUM_RS, rsts.size()); 524 525 for (RegionServerThread rst : rsts) { 526 HRegionServer hrs = rst.getRegionServer(); 527 List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 528 for (RegionInfo hri : hris) { 529 if (hri.getTable().isSystemTable()) { 530 continue; 531 } 532 LOG.debug( 533 "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString()); 534 Region region = hrs.getOnlineRegion(hri.getRegionName()); 535 assertTrue(region != null); 536 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 537 } 538 } 539 540 for (MasterThread mt : cluster.getLiveMasterThreads()) { 541 HRegionServer hrs = mt.getMaster(); 542 List<RegionInfo> hris; 543 try { 544 hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 545 } catch (ServerNotRunningYetException e) { 546 // It's ok: this master may be a backup. Ignored. 547 continue; 548 } 549 for (RegionInfo hri : hris) { 550 if (hri.getTable().isSystemTable()) { 551 continue; 552 } 553 LOG.debug( 554 "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString()); 555 Region region = hrs.getOnlineRegion(hri.getRegionName()); 556 assertTrue(region != null); 557 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 558 } 559 } 560 } 561 562 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size) 563 throws IOException { 564 makeWAL(hrs, regions, num_edits, edit_size, true); 565 } 566 567 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize, 568 boolean cleanShutdown) throws IOException { 569 // remove root and meta region 570 regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); 571 572 for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) { 573 RegionInfo regionInfo = iter.next(); 574 if (regionInfo.getTable().isSystemTable()) { 575 iter.remove(); 576 } 577 } 578 byte[] value = new byte[editSize]; 579 580 List<RegionInfo> hris = new ArrayList<>(); 581 for (RegionInfo region : regions) { 582 if (region.getTable() != tableName) { 583 continue; 584 } 585 hris.add(region); 586 } 587 LOG.info("Creating wal edits across " + hris.size() + " regions."); 588 for (int i = 0; i < editSize; i++) { 589 value[i] = (byte) ('a' + (i % 26)); 590 } 591 int n = hris.size(); 592 int[] counts = new int[n]; 593 // sync every ~30k to line up with desired wal rolls 594 final int syncEvery = 30 * 1024 / editSize; 595 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 596 if (n > 0) { 597 for (int i = 0; i < numEdits; i += 1) { 598 WALEdit e = new WALEdit(); 599 RegionInfo curRegionInfo = hris.get(i % n); 600 WAL log = hrs.getWAL(curRegionInfo); 601 byte[] startRow = curRegionInfo.getStartKey(); 602 if (startRow == null || startRow.length == 0) { 603 startRow = new byte[] { 0, 0, 0, 0, 1 }; 604 } 605 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); 606 row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because 607 // HBaseTestingUtility.createMultiRegions use 5 bytes key 608 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); 609 e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); 610 log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), 611 tableName, System.currentTimeMillis(), mvcc), e); 612 if (0 == i % syncEvery) { 613 log.sync(); 614 } 615 counts[i % n] += 1; 616 } 617 } 618 // done as two passes because the regions might share logs. shutdown is idempotent, but sync 619 // will cause errors if done after. 620 for (RegionInfo info : hris) { 621 WAL log = hrs.getWAL(info); 622 log.sync(); 623 } 624 if (cleanShutdown) { 625 for (RegionInfo info : hris) { 626 WAL log = hrs.getWAL(info); 627 log.shutdown(); 628 } 629 } 630 for (int i = 0; i < n; i++) { 631 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); 632 } 633 return; 634 } 635 636 private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException { 637 int count = 0; 638 try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) { 639 WAL.Entry e; 640 while ((e = in.next()) != null) { 641 if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { 642 count++; 643 } 644 } 645 } 646 return count; 647 } 648 649 private void blockUntilNoRIT() throws Exception { 650 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 651 } 652 653 private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families) 654 throws IOException { 655 for (int i = 0; i < numRows; i++) { 656 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); 657 for (byte[] family : families) { 658 put.addColumn(family, qf, null); 659 } 660 region.put(put); 661 } 662 } 663 664 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 665 throws InterruptedException { 666 long curt = System.currentTimeMillis(); 667 long endt = curt + timems; 668 while (curt < endt) { 669 if (ctr.sum() == oldval) { 670 Thread.sleep(100); 671 curt = System.currentTimeMillis(); 672 } else { 673 assertEquals(newval, ctr.sum()); 674 return; 675 } 676 } 677 fail(); 678 } 679 680 private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { 681 for (MasterThread mt : cluster.getLiveMasterThreads()) { 682 if (mt.getMaster().isActiveMaster()) { 683 mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); 684 mt.join(); 685 break; 686 } 687 } 688 LOG.debug("Master is aborted"); 689 } 690 691 /** 692 * Find a RS that has regions of a table. 693 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well 694 */ 695 private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception { 696 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 697 List<RegionInfo> regions = null; 698 HRegionServer hrs = null; 699 700 for (RegionServerThread rst : rsts) { 701 hrs = rst.getRegionServer(); 702 while (rst.isAlive() && !hrs.isOnline()) { 703 Thread.sleep(100); 704 } 705 if (!rst.isAlive()) { 706 continue; 707 } 708 boolean isCarryingMeta = false; 709 boolean foundTableRegion = false; 710 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 711 for (RegionInfo region : regions) { 712 if (region.isMetaRegion()) { 713 isCarryingMeta = true; 714 } 715 if (region.getTable() == tableName) { 716 foundTableRegion = true; 717 } 718 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { 719 break; 720 } 721 } 722 if (isCarryingMeta && hasMetaRegion) { 723 // clients ask for a RS with META 724 if (!foundTableRegion) { 725 HRegionServer destRS = hrs; 726 // the RS doesn't have regions of the specified table so we need move one to this RS 727 List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); 728 RegionInfo hri = tableRegions.get(0); 729 TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName()); 730 // wait for region move completes 731 RegionStates regionStates = 732 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 733 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { 734 @Override 735 public boolean evaluate() throws Exception { 736 ServerName sn = regionStates.getRegionServerOfRegion(hri); 737 return (sn != null && sn.equals(destRS.getServerName())); 738 } 739 }); 740 } 741 return hrs; 742 } else if (hasMetaRegion || isCarryingMeta) { 743 continue; 744 } 745 if (foundTableRegion) { 746 break; 747 } 748 } 749 750 return hrs; 751 } 752}