001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master; 020 021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; 022import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed; 023import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task; 024import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired; 025import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done; 026import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err; 027import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned; 028import static org.junit.Assert.assertEquals; 029import static org.junit.Assert.assertFalse; 030import static org.junit.Assert.assertTrue; 031import static org.junit.Assert.fail; 032 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Iterator; 037import java.util.List; 038import java.util.NavigableSet; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.Future; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.TimeoutException; 044import java.util.concurrent.atomic.LongAdder; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FSDataOutputStream; 047import org.apache.hadoop.fs.FileStatus; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.fs.PathFilter; 051import org.apache.hadoop.hbase.HBaseTestingUtility; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.KeyValue; 054import org.apache.hadoop.hbase.MiniHBaseCluster; 055import org.apache.hadoop.hbase.NamespaceDescriptor; 056import org.apache.hadoop.hbase.ServerName; 057import org.apache.hadoop.hbase.SplitLogCounters; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.Waiter; 060import org.apache.hadoop.hbase.client.Put; 061import org.apache.hadoop.hbase.client.RegionInfo; 062import org.apache.hadoop.hbase.client.RegionInfoBuilder; 063import org.apache.hadoop.hbase.client.RegionLocator; 064import org.apache.hadoop.hbase.client.Table; 065import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; 066import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 067import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; 068import org.apache.hadoop.hbase.master.assignment.RegionStates; 069import org.apache.hadoop.hbase.regionserver.HRegionServer; 070import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 071import org.apache.hadoop.hbase.regionserver.Region; 072import org.apache.hadoop.hbase.util.Bytes; 073import org.apache.hadoop.hbase.util.FSUtils; 074import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 075import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 076import org.apache.hadoop.hbase.util.Threads; 077import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 078import org.apache.hadoop.hbase.wal.WAL; 079import org.apache.hadoop.hbase.wal.WALEdit; 080import org.apache.hadoop.hbase.wal.WALFactory; 081import org.apache.hadoop.hbase.wal.WALKeyImpl; 082import org.apache.hadoop.hbase.wal.WALSplitter; 083import org.apache.hadoop.hbase.zookeeper.ZKUtil; 084import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 085import org.junit.After; 086import org.junit.AfterClass; 087import org.junit.Before; 088import org.junit.BeforeClass; 089import org.junit.Rule; 090import org.junit.Test; 091import org.junit.rules.TestName; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 096 097/** 098 * Base class for testing distributed log splitting. 099 */ 100public abstract class AbstractTestDLS { 101 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 102 103 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 104 105 // Start a cluster with 2 masters and 5 regionservers 106 private static final int NUM_MASTERS = 2; 107 private static final int NUM_RS = 5; 108 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); 109 110 @Rule 111 public TestName testName = new TestName(); 112 113 private TableName tableName; 114 private MiniHBaseCluster cluster; 115 private HMaster master; 116 private Configuration conf; 117 118 @Rule 119 public TestName name = new TestName(); 120 121 @BeforeClass 122 public static void setup() throws Exception { 123 // Uncomment the following line if more verbosity is needed for 124 // debugging (see HBASE-12285 for details). 125 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); 126 TEST_UTIL.startMiniZKCluster(); 127 TEST_UTIL.startMiniDFSCluster(3); 128 } 129 130 @AfterClass 131 public static void tearDown() throws Exception { 132 TEST_UTIL.shutdownMiniCluster(); 133 } 134 135 protected abstract String getWalProvider(); 136 137 private void startCluster(int numRS) throws Exception { 138 SplitLogCounters.resetCounters(); 139 LOG.info("Starting cluster"); 140 conf.setLong("hbase.splitlog.max.resubmit", 0); 141 // Make the failure test faster 142 conf.setInt("zookeeper.recovery.retry", 0); 143 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); 144 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing 145 conf.setInt("hbase.regionserver.wal.max.splitters", 3); 146 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 147 conf.set("hbase.wal.provider", getWalProvider()); 148 TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, numRS); 149 cluster = TEST_UTIL.getHBaseCluster(); 150 LOG.info("Waiting for active/ready master"); 151 cluster.waitForActiveAndReadyMaster(); 152 master = cluster.getMaster(); 153 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 154 @Override 155 public boolean evaluate() throws Exception { 156 return cluster.getLiveRegionServerThreads().size() >= numRS; 157 } 158 }); 159 } 160 161 @Before 162 public void before() throws Exception { 163 conf = TEST_UTIL.getConfiguration(); 164 tableName = TableName.valueOf(testName.getMethodName()); 165 } 166 167 @After 168 public void after() throws Exception { 169 TEST_UTIL.shutdownMiniHBaseCluster(); 170 TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true); 171 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); 172 } 173 174 @Test 175 public void testRecoveredEdits() throws Exception { 176 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal 177 startCluster(NUM_RS); 178 179 int numLogLines = 10000; 180 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 181 // turn off load balancing to prevent regions from moving around otherwise 182 // they will consume recovered.edits 183 master.balanceSwitch(false); 184 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 185 186 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 187 188 Path rootdir = FSUtils.getRootDir(conf); 189 190 int numRegions = 50; 191 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); 192 Table t = installTable(zkw, numRegions)) { 193 TableName table = t.getName(); 194 List<RegionInfo> regions = null; 195 HRegionServer hrs = null; 196 for (int i = 0; i < NUM_RS; i++) { 197 hrs = rsts.get(i).getRegionServer(); 198 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 199 // At least one RS will have >= to average number of regions. 200 if (regions.size() >= numRegions / NUM_RS) { 201 break; 202 } 203 } 204 Path logDir = new Path(rootdir, 205 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); 206 207 LOG.info("#regions = " + regions.size()); 208 Iterator<RegionInfo> it = regions.iterator(); 209 while (it.hasNext()) { 210 RegionInfo region = it.next(); 211 if (region.getTable().getNamespaceAsString() 212 .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) { 213 it.remove(); 214 } 215 } 216 217 makeWAL(hrs, regions, numLogLines, 100); 218 219 slm.splitLogDistributed(logDir); 220 221 int count = 0; 222 for (RegionInfo hri : regions) { 223 Path tdir = FSUtils.getWALTableDir(conf, table); 224 @SuppressWarnings("deprecation") 225 Path editsdir = WALSplitter 226 .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf, 227 tableName, hri.getEncodedName())); 228 LOG.debug("checking edits dir " + editsdir); 229 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 230 @Override 231 public boolean accept(Path p) { 232 if (WALSplitter.isSequenceIdFile(p)) { 233 return false; 234 } 235 return true; 236 } 237 }); 238 assertTrue( 239 "edits dir should have more than a single file in it. instead has " + files.length, 240 files.length > 1); 241 for (int i = 0; i < files.length; i++) { 242 int c = countWAL(files[i].getPath(), fs, conf); 243 count += c; 244 } 245 LOG.info(count + " edits in " + files.length + " recovered edits files."); 246 } 247 248 // check that the log file is moved 249 assertFalse(fs.exists(logDir)); 250 assertEquals(numLogLines, count); 251 } 252 } 253 254 @Test 255 public void testMasterStartsUpWithLogSplittingWork() throws Exception { 256 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); 257 startCluster(NUM_RS); 258 259 int numRegionsToCreate = 40; 260 int numLogLines = 1000; 261 // turn off load balancing to prevent regions from moving around otherwise 262 // they will consume recovered.edits 263 master.balanceSwitch(false); 264 265 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); 266 Table ht = installTable(zkw, numRegionsToCreate);) { 267 HRegionServer hrs = findRSToKill(false); 268 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 269 makeWAL(hrs, regions, numLogLines, 100); 270 271 // abort master 272 abortMaster(cluster); 273 274 // abort RS 275 LOG.info("Aborting region server: " + hrs.getServerName()); 276 hrs.abort("testing"); 277 278 // wait for abort completes 279 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 280 @Override 281 public boolean evaluate() throws Exception { 282 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1; 283 } 284 }); 285 286 Thread.sleep(2000); 287 LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); 288 289 // wait for abort completes 290 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 291 @Override 292 public boolean evaluate() throws Exception { 293 return (HBaseTestingUtility.getAllOnlineRegions(cluster) 294 .size() >= (numRegionsToCreate + 1)); 295 } 296 }); 297 298 LOG.info("Current Open Regions After Master Node Starts Up:" + 299 HBaseTestingUtility.getAllOnlineRegions(cluster).size()); 300 301 assertEquals(numLogLines, TEST_UTIL.countRows(ht)); 302 } 303 } 304 305 /** 306 * The original intention of this test was to force an abort of a region server and to make sure 307 * that the failure path in the region servers is properly evaluated. But it is difficult to 308 * ensure that the region server doesn't finish the log splitting before it aborts. Also now, 309 * there is this code path where the master will preempt the region server when master detects 310 * that the region server has aborted. 311 * @throws Exception 312 */ 313 // Was marked flaky before Distributed Log Replay cleanup. 314 @Test 315 public void testWorkerAbort() throws Exception { 316 LOG.info("testWorkerAbort"); 317 startCluster(3); 318 int numLogLines = 10000; 319 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 320 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 321 322 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 323 HRegionServer hrs = findRSToKill(false); 324 Path rootdir = FSUtils.getRootDir(conf); 325 final Path logDir = new Path(rootdir, 326 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); 327 328 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); 329 Table t = installTable(zkw, 40)) { 330 makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100); 331 332 new Thread() { 333 @Override 334 public void run() { 335 try { 336 waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); 337 } catch (InterruptedException e) { 338 } 339 for (RegionServerThread rst : rsts) { 340 rst.getRegionServer().abort("testing"); 341 break; 342 } 343 } 344 }.start(); 345 FileStatus[] logfiles = fs.listStatus(logDir); 346 TaskBatch batch = new TaskBatch(); 347 slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); 348 // waitForCounter but for one of the 2 counters 349 long curt = System.currentTimeMillis(); 350 long waitTime = 80000; 351 long endt = curt + waitTime; 352 while (curt < endt) { 353 if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + 354 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + 355 tot_wkr_preempt_task.sum()) == 0) { 356 Thread.sleep(100); 357 curt = System.currentTimeMillis(); 358 } else { 359 assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + 360 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + 361 tot_wkr_preempt_task.sum())); 362 return; 363 } 364 } 365 fail("none of the following counters went up in " + waitTime + " milliseconds - " + 366 "tot_wkr_task_resigned, tot_wkr_task_err, " + 367 "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task"); 368 } 369 } 370 371 @Test 372 public void testThreeRSAbort() throws Exception { 373 LOG.info("testThreeRSAbort"); 374 int numRegionsToCreate = 40; 375 int numRowsPerRegion = 100; 376 377 startCluster(NUM_RS); // NUM_RS=6. 378 379 try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null); 380 Table table = installTable(zkw, numRegionsToCreate)) { 381 populateDataInTable(numRowsPerRegion); 382 383 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 384 assertEquals(NUM_RS, rsts.size()); 385 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName()); 386 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); 387 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); 388 389 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() { 390 391 @Override 392 public boolean evaluate() throws Exception { 393 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; 394 } 395 396 @Override 397 public String explainFailure() throws Exception { 398 return "Timed out waiting for server aborts."; 399 } 400 }); 401 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 402 int rows; 403 try { 404 rows = TEST_UTIL.countRows(table); 405 } catch (Exception e) { 406 Threads.printThreadInfo(System.out, "Thread dump before fail"); 407 throw e; 408 } 409 assertEquals(numRegionsToCreate * numRowsPerRegion, rows); 410 } 411 } 412 413 @Test 414 public void testDelayedDeleteOnFailure() throws Exception { 415 LOG.info("testDelayedDeleteOnFailure"); 416 startCluster(1); 417 final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 418 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 419 final Path logDir = new Path(new Path(FSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME), 420 ServerName.valueOf("x", 1, 1).toString()); 421 fs.mkdirs(logDir); 422 ExecutorService executor = null; 423 try { 424 final Path corruptedLogFile = new Path(logDir, "x"); 425 FSDataOutputStream out; 426 out = fs.create(corruptedLogFile); 427 out.write(0); 428 out.write(Bytes.toBytes("corrupted bytes")); 429 out.close(); 430 ZKSplitLogManagerCoordination coordination = 431 (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager()) 432 .getSplitLogManagerCoordination(); 433 coordination.setIgnoreDeleteForTesting(true); 434 executor = Executors.newSingleThreadExecutor(); 435 Runnable runnable = new Runnable() { 436 @Override 437 public void run() { 438 try { 439 // since the logDir is a fake, corrupted one, so the split log worker 440 // will finish it quickly with error, and this call will fail and throw 441 // an IOException. 442 slm.splitLogDistributed(logDir); 443 } catch (IOException ioe) { 444 try { 445 assertTrue(fs.exists(corruptedLogFile)); 446 // this call will block waiting for the task to be removed from the 447 // tasks map which is not going to happen since ignoreZKDeleteForTesting 448 // is set to true, until it is interrupted. 449 slm.splitLogDistributed(logDir); 450 } catch (IOException e) { 451 assertTrue(Thread.currentThread().isInterrupted()); 452 return; 453 } 454 fail("did not get the expected IOException from the 2nd call"); 455 } 456 fail("did not get the expected IOException from the 1st call"); 457 } 458 }; 459 Future<?> result = executor.submit(runnable); 460 try { 461 result.get(2000, TimeUnit.MILLISECONDS); 462 } catch (TimeoutException te) { 463 // it is ok, expected. 464 } 465 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); 466 executor.shutdownNow(); 467 executor = null; 468 469 // make sure the runnable is finished with no exception thrown. 470 result.get(); 471 } finally { 472 if (executor != null) { 473 // interrupt the thread in case the test fails in the middle. 474 // it has no effect if the thread is already terminated. 475 executor.shutdownNow(); 476 } 477 fs.delete(logDir, true); 478 } 479 } 480 481 private Table installTable(ZKWatcher zkw, int nrs) throws Exception { 482 return installTable(zkw, nrs, 0); 483 } 484 485 private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception { 486 // Create a table with regions 487 byte[] family = Bytes.toBytes("family"); 488 LOG.info("Creating table with " + nrs + " regions"); 489 Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); 490 int numRegions = -1; 491 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 492 numRegions = r.getStartKeys().length; 493 } 494 assertEquals(nrs, numRegions); 495 LOG.info("Waiting for no more RIT\n"); 496 blockUntilNoRIT(zkw, master); 497 // disable-enable cycle to get rid of table's dead regions left behind 498 // by createMultiRegions 499 assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName)); 500 LOG.debug("Disabling table\n"); 501 TEST_UTIL.getAdmin().disableTable(tableName); 502 LOG.debug("Waiting for no more RIT\n"); 503 blockUntilNoRIT(zkw, master); 504 NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster); 505 LOG.debug("Verifying only catalog and namespace regions are assigned\n"); 506 if (regions.size() != 2) { 507 for (String oregion : regions) 508 LOG.debug("Region still online: " + oregion); 509 } 510 assertEquals(2 + existingRegions, regions.size()); 511 LOG.debug("Enabling table\n"); 512 TEST_UTIL.getAdmin().enableTable(tableName); 513 LOG.debug("Waiting for no more RIT\n"); 514 blockUntilNoRIT(zkw, master); 515 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); 516 regions = HBaseTestingUtility.getAllOnlineRegions(cluster); 517 assertEquals(numRegions + 2 + existingRegions, regions.size()); 518 return table; 519 } 520 521 void populateDataInTable(int nrows) throws Exception { 522 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 523 assertEquals(NUM_RS, rsts.size()); 524 525 for (RegionServerThread rst : rsts) { 526 HRegionServer hrs = rst.getRegionServer(); 527 List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 528 for (RegionInfo hri : hris) { 529 if (hri.getTable().isSystemTable()) { 530 continue; 531 } 532 LOG.debug( 533 "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString()); 534 Region region = hrs.getOnlineRegion(hri.getRegionName()); 535 assertTrue(region != null); 536 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 537 } 538 } 539 540 for (MasterThread mt : cluster.getLiveMasterThreads()) { 541 HRegionServer hrs = mt.getMaster(); 542 List<RegionInfo> hris; 543 try { 544 hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 545 } catch (ServerNotRunningYetException e) { 546 // It's ok: this master may be a backup. Ignored. 547 continue; 548 } 549 for (RegionInfo hri : hris) { 550 if (hri.getTable().isSystemTable()) { 551 continue; 552 } 553 LOG.debug( 554 "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString()); 555 Region region = hrs.getOnlineRegion(hri.getRegionName()); 556 assertTrue(region != null); 557 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 558 } 559 } 560 } 561 562 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size) 563 throws IOException { 564 makeWAL(hrs, regions, num_edits, edit_size, true); 565 } 566 567 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize, 568 boolean cleanShutdown) throws IOException { 569 // remove root and meta region 570 regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); 571 572 for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) { 573 RegionInfo regionInfo = iter.next(); 574 if (regionInfo.getTable().isSystemTable()) { 575 iter.remove(); 576 } 577 } 578 byte[] value = new byte[editSize]; 579 580 List<RegionInfo> hris = new ArrayList<>(); 581 for (RegionInfo region : regions) { 582 if (region.getTable() != tableName) { 583 continue; 584 } 585 hris.add(region); 586 } 587 LOG.info("Creating wal edits across " + hris.size() + " regions."); 588 for (int i = 0; i < editSize; i++) { 589 value[i] = (byte) ('a' + (i % 26)); 590 } 591 int n = hris.size(); 592 int[] counts = new int[n]; 593 // sync every ~30k to line up with desired wal rolls 594 final int syncEvery = 30 * 1024 / editSize; 595 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 596 if (n > 0) { 597 for (int i = 0; i < numEdits; i += 1) { 598 WALEdit e = new WALEdit(); 599 RegionInfo curRegionInfo = hris.get(i % n); 600 WAL log = hrs.getWAL(curRegionInfo); 601 byte[] startRow = curRegionInfo.getStartKey(); 602 if (startRow == null || startRow.length == 0) { 603 startRow = new byte[] { 0, 0, 0, 0, 1 }; 604 } 605 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); 606 row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because 607 // HBaseTestingUtility.createMultiRegions use 5 bytes key 608 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); 609 e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); 610 log.append(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName, 611 System.currentTimeMillis(), mvcc), 612 e, true); 613 if (0 == i % syncEvery) { 614 log.sync(); 615 } 616 counts[i % n] += 1; 617 } 618 } 619 // done as two passes because the regions might share logs. shutdown is idempotent, but sync 620 // will cause errors if done after. 621 for (RegionInfo info : hris) { 622 WAL log = hrs.getWAL(info); 623 log.sync(); 624 } 625 if (cleanShutdown) { 626 for (RegionInfo info : hris) { 627 WAL log = hrs.getWAL(info); 628 log.shutdown(); 629 } 630 } 631 for (int i = 0; i < n; i++) { 632 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); 633 } 634 return; 635 } 636 637 private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException { 638 int count = 0; 639 try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) { 640 WAL.Entry e; 641 while ((e = in.next()) != null) { 642 if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { 643 count++; 644 } 645 } 646 } 647 return count; 648 } 649 650 private void blockUntilNoRIT(ZKWatcher zkw, HMaster master) throws Exception { 651 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 652 } 653 654 private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families) 655 throws IOException { 656 for (int i = 0; i < numRows; i++) { 657 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); 658 for (byte[] family : families) { 659 put.addColumn(family, qf, null); 660 } 661 region.put(put); 662 } 663 } 664 665 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 666 throws InterruptedException { 667 long curt = System.currentTimeMillis(); 668 long endt = curt + timems; 669 while (curt < endt) { 670 if (ctr.sum() == oldval) { 671 Thread.sleep(100); 672 curt = System.currentTimeMillis(); 673 } else { 674 assertEquals(newval, ctr.sum()); 675 return; 676 } 677 } 678 fail(); 679 } 680 681 private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { 682 for (MasterThread mt : cluster.getLiveMasterThreads()) { 683 if (mt.getMaster().isActiveMaster()) { 684 mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); 685 mt.join(); 686 break; 687 } 688 } 689 LOG.debug("Master is aborted"); 690 } 691 692 /** 693 * Find a RS that has regions of a table. 694 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well 695 */ 696 private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception { 697 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 698 List<RegionInfo> regions = null; 699 HRegionServer hrs = null; 700 701 for (RegionServerThread rst : rsts) { 702 hrs = rst.getRegionServer(); 703 while (rst.isAlive() && !hrs.isOnline()) { 704 Thread.sleep(100); 705 } 706 if (!rst.isAlive()) { 707 continue; 708 } 709 boolean isCarryingMeta = false; 710 boolean foundTableRegion = false; 711 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 712 for (RegionInfo region : regions) { 713 if (region.isMetaRegion()) { 714 isCarryingMeta = true; 715 } 716 if (region.getTable() == tableName) { 717 foundTableRegion = true; 718 } 719 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { 720 break; 721 } 722 } 723 if (isCarryingMeta && hasMetaRegion) { 724 // clients ask for a RS with META 725 if (!foundTableRegion) { 726 HRegionServer destRS = hrs; 727 // the RS doesn't have regions of the specified table so we need move one to this RS 728 List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); 729 RegionInfo hri = tableRegions.get(0); 730 TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), 731 Bytes.toBytes(destRS.getServerName().getServerName())); 732 // wait for region move completes 733 RegionStates regionStates = 734 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 735 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { 736 @Override 737 public boolean evaluate() throws Exception { 738 ServerName sn = regionStates.getRegionServerOfRegion(hri); 739 return (sn != null && sn.equals(destRS.getServerName())); 740 } 741 }); 742 } 743 return hrs; 744 } else if (hasMetaRegion || isCarryingMeta) { 745 continue; 746 } 747 if (foundTableRegion) { 748 break; 749 } 750 } 751 752 return hrs; 753 } 754}