001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master; 020 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; 023import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed; 024import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task; 025import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired; 026import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done; 027import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err; 028import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned; 029import static org.junit.Assert.assertEquals; 030import static org.junit.Assert.assertFalse; 031import static org.junit.Assert.assertTrue; 032import static org.junit.Assert.fail; 033 034import java.io.IOException; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Iterator; 038import java.util.List; 039import java.util.NavigableSet; 040import java.util.concurrent.ExecutorService; 041import java.util.concurrent.Executors; 042import java.util.concurrent.Future; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.TimeoutException; 045import java.util.concurrent.atomic.LongAdder; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FSDataOutputStream; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.HBaseTestingUtility; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.KeyValue; 055import org.apache.hadoop.hbase.MiniHBaseCluster; 056import org.apache.hadoop.hbase.NamespaceDescriptor; 057import org.apache.hadoop.hbase.ServerName; 058import org.apache.hadoop.hbase.SplitLogCounters; 059import org.apache.hadoop.hbase.StartMiniClusterOption; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.Waiter; 062import org.apache.hadoop.hbase.client.Put; 063import org.apache.hadoop.hbase.client.RegionInfo; 064import org.apache.hadoop.hbase.client.RegionInfoBuilder; 065import org.apache.hadoop.hbase.client.RegionLocator; 066import org.apache.hadoop.hbase.client.Table; 067import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; 068import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 069import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; 070import org.apache.hadoop.hbase.master.assignment.RegionStates; 071import org.apache.hadoop.hbase.regionserver.HRegionServer; 072import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 073import org.apache.hadoop.hbase.regionserver.Region; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.FSUtils; 076import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 077import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 078import org.apache.hadoop.hbase.util.Threads; 079import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 080import org.apache.hadoop.hbase.wal.WAL; 081import org.apache.hadoop.hbase.wal.WALEdit; 082import org.apache.hadoop.hbase.wal.WALFactory; 083import org.apache.hadoop.hbase.wal.WALKeyImpl; 084import org.apache.hadoop.hbase.wal.WALSplitUtil; 085import org.apache.hadoop.hbase.zookeeper.ZKUtil; 086import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 087import org.junit.After; 088import org.junit.AfterClass; 089import org.junit.Before; 090import org.junit.BeforeClass; 091import org.junit.Rule; 092import org.junit.Test; 093import org.junit.rules.TestName; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 098 099/** 100 * Base class for testing distributed log splitting. 101 */ 102public abstract class AbstractTestDLS { 103 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 104 105 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 106 107 // Start a cluster with 2 masters and 5 regionservers 108 private static final int NUM_MASTERS = 2; 109 private static final int NUM_RS = 5; 110 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); 111 112 @Rule 113 public TestName testName = new TestName(); 114 115 private TableName tableName; 116 private MiniHBaseCluster cluster; 117 private HMaster master; 118 private Configuration conf; 119 120 @Rule 121 public TestName name = new TestName(); 122 123 @BeforeClass 124 public static void setup() throws Exception { 125 // Uncomment the following line if more verbosity is needed for 126 // debugging (see HBASE-12285 for details). 127 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); 128 TEST_UTIL.startMiniZKCluster(); 129 TEST_UTIL.startMiniDFSCluster(3); 130 } 131 132 @AfterClass 133 public static void tearDown() throws Exception { 134 TEST_UTIL.shutdownMiniCluster(); 135 } 136 137 protected abstract String getWalProvider(); 138 139 private void startCluster(int numRS) throws Exception { 140 SplitLogCounters.resetCounters(); 141 LOG.info("Starting cluster"); 142 conf.setLong("hbase.splitlog.max.resubmit", 0); 143 // Make the failure test faster 144 conf.setInt("zookeeper.recovery.retry", 0); 145 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); 146 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing 147 conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3); 148 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 149 conf.set("hbase.wal.provider", getWalProvider()); 150 StartMiniClusterOption option = StartMiniClusterOption.builder() 151 .numMasters(NUM_MASTERS).numRegionServers(numRS).build(); 152 TEST_UTIL.startMiniHBaseCluster(option); 153 cluster = TEST_UTIL.getHBaseCluster(); 154 LOG.info("Waiting for active/ready master"); 155 cluster.waitForActiveAndReadyMaster(); 156 master = cluster.getMaster(); 157 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 158 @Override 159 public boolean evaluate() throws Exception { 160 return cluster.getLiveRegionServerThreads().size() >= numRS; 161 } 162 }); 163 } 164 165 @Before 166 public void before() throws Exception { 167 conf = TEST_UTIL.getConfiguration(); 168 tableName = TableName.valueOf(testName.getMethodName()); 169 } 170 171 @After 172 public void after() throws Exception { 173 TEST_UTIL.shutdownMiniHBaseCluster(); 174 TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true); 175 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); 176 } 177 178 @Test 179 public void testRecoveredEdits() throws Exception { 180 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal 181 startCluster(NUM_RS); 182 183 int numLogLines = 10000; 184 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 185 // turn off load balancing to prevent regions from moving around otherwise 186 // they will consume recovered.edits 187 master.balanceSwitch(false); 188 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 189 190 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 191 192 Path rootdir = FSUtils.getRootDir(conf); 193 194 int numRegions = 50; 195 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); 196 Table t = installTable(zkw, numRegions)) { 197 TableName table = t.getName(); 198 List<RegionInfo> regions = null; 199 HRegionServer hrs = null; 200 for (int i = 0; i < NUM_RS; i++) { 201 hrs = rsts.get(i).getRegionServer(); 202 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 203 // At least one RS will have >= to average number of regions. 204 if (regions.size() >= numRegions / NUM_RS) { 205 break; 206 } 207 } 208 Path logDir = new Path(rootdir, 209 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); 210 211 LOG.info("#regions = " + regions.size()); 212 Iterator<RegionInfo> it = regions.iterator(); 213 while (it.hasNext()) { 214 RegionInfo region = it.next(); 215 if (region.getTable().getNamespaceAsString() 216 .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) { 217 it.remove(); 218 } 219 } 220 221 makeWAL(hrs, regions, numLogLines, 100); 222 223 slm.splitLogDistributed(logDir); 224 225 int count = 0; 226 for (RegionInfo hri : regions) { 227 Path tdir = FSUtils.getWALTableDir(conf, table); 228 @SuppressWarnings("deprecation") 229 Path editsdir = WALSplitUtil 230 .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf, 231 tableName, hri.getEncodedName())); 232 LOG.debug("checking edits dir " + editsdir); 233 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 234 @Override 235 public boolean accept(Path p) { 236 if (WALSplitUtil.isSequenceIdFile(p)) { 237 return false; 238 } 239 return true; 240 } 241 }); 242 assertTrue( 243 "edits dir should have more than a single file in it. instead has " + files.length, 244 files.length > 1); 245 for (int i = 0; i < files.length; i++) { 246 int c = countWAL(files[i].getPath(), fs, conf); 247 count += c; 248 } 249 LOG.info(count + " edits in " + files.length + " recovered edits files."); 250 } 251 252 // check that the log file is moved 253 assertFalse(fs.exists(logDir)); 254 assertEquals(numLogLines, count); 255 } 256 } 257 258 @Test 259 public void testMasterStartsUpWithLogSplittingWork() throws Exception { 260 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); 261 startCluster(NUM_RS); 262 263 int numRegionsToCreate = 40; 264 int numLogLines = 1000; 265 // turn off load balancing to prevent regions from moving around otherwise 266 // they will consume recovered.edits 267 master.balanceSwitch(false); 268 269 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); 270 Table ht = installTable(zkw, numRegionsToCreate);) { 271 HRegionServer hrs = findRSToKill(false); 272 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 273 makeWAL(hrs, regions, numLogLines, 100); 274 275 // abort master 276 abortMaster(cluster); 277 278 // abort RS 279 LOG.info("Aborting region server: " + hrs.getServerName()); 280 hrs.abort("testing"); 281 282 // wait for abort completes 283 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 284 @Override 285 public boolean evaluate() throws Exception { 286 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1; 287 } 288 }); 289 290 Thread.sleep(2000); 291 LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); 292 293 // wait for abort completes 294 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 295 @Override 296 public boolean evaluate() throws Exception { 297 return (HBaseTestingUtility.getAllOnlineRegions(cluster) 298 .size() >= (numRegionsToCreate + 1)); 299 } 300 }); 301 302 LOG.info("Current Open Regions After Master Node Starts Up:" + 303 HBaseTestingUtility.getAllOnlineRegions(cluster).size()); 304 305 assertEquals(numLogLines, TEST_UTIL.countRows(ht)); 306 } 307 } 308 309 /** 310 * The original intention of this test was to force an abort of a region server and to make sure 311 * that the failure path in the region servers is properly evaluated. But it is difficult to 312 * ensure that the region server doesn't finish the log splitting before it aborts. Also now, 313 * there is this code path where the master will preempt the region server when master detects 314 * that the region server has aborted. 315 * @throws Exception 316 */ 317 // Was marked flaky before Distributed Log Replay cleanup. 318 @Test 319 public void testWorkerAbort() throws Exception { 320 LOG.info("testWorkerAbort"); 321 startCluster(3); 322 int numLogLines = 10000; 323 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 324 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 325 326 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 327 HRegionServer hrs = findRSToKill(false); 328 Path rootdir = FSUtils.getRootDir(conf); 329 final Path logDir = new Path(rootdir, 330 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); 331 332 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); 333 Table t = installTable(zkw, 40)) { 334 makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100); 335 336 new Thread() { 337 @Override 338 public void run() { 339 try { 340 waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); 341 } catch (InterruptedException e) { 342 } 343 for (RegionServerThread rst : rsts) { 344 rst.getRegionServer().abort("testing"); 345 break; 346 } 347 } 348 }.start(); 349 FileStatus[] logfiles = fs.listStatus(logDir); 350 TaskBatch batch = new TaskBatch(); 351 slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); 352 // waitForCounter but for one of the 2 counters 353 long curt = System.currentTimeMillis(); 354 long waitTime = 80000; 355 long endt = curt + waitTime; 356 while (curt < endt) { 357 if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + 358 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + 359 tot_wkr_preempt_task.sum()) == 0) { 360 Thread.sleep(100); 361 curt = System.currentTimeMillis(); 362 } else { 363 assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + 364 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + 365 tot_wkr_preempt_task.sum())); 366 return; 367 } 368 } 369 fail("none of the following counters went up in " + waitTime + " milliseconds - " + 370 "tot_wkr_task_resigned, tot_wkr_task_err, " + 371 "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task"); 372 } 373 } 374 375 @Test 376 public void testThreeRSAbort() throws Exception { 377 LOG.info("testThreeRSAbort"); 378 int numRegionsToCreate = 40; 379 int numRowsPerRegion = 100; 380 381 startCluster(NUM_RS); // NUM_RS=6. 382 383 try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null); 384 Table table = installTable(zkw, numRegionsToCreate)) { 385 populateDataInTable(numRowsPerRegion); 386 387 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 388 assertEquals(NUM_RS, rsts.size()); 389 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName()); 390 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); 391 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); 392 393 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() { 394 395 @Override 396 public boolean evaluate() throws Exception { 397 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; 398 } 399 400 @Override 401 public String explainFailure() throws Exception { 402 return "Timed out waiting for server aborts."; 403 } 404 }); 405 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 406 int rows; 407 try { 408 rows = TEST_UTIL.countRows(table); 409 } catch (Exception e) { 410 Threads.printThreadInfo(System.out, "Thread dump before fail"); 411 throw e; 412 } 413 assertEquals(numRegionsToCreate * numRowsPerRegion, rows); 414 } 415 } 416 417 @Test 418 public void testDelayedDeleteOnFailure() throws Exception { 419 LOG.info("testDelayedDeleteOnFailure"); 420 startCluster(1); 421 final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 422 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 423 final Path logDir = new Path(new Path(FSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME), 424 ServerName.valueOf("x", 1, 1).toString()); 425 fs.mkdirs(logDir); 426 ExecutorService executor = null; 427 try { 428 final Path corruptedLogFile = new Path(logDir, "x"); 429 FSDataOutputStream out; 430 out = fs.create(corruptedLogFile); 431 out.write(0); 432 out.write(Bytes.toBytes("corrupted bytes")); 433 out.close(); 434 ZKSplitLogManagerCoordination coordination = 435 (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager()) 436 .getSplitLogManagerCoordination(); 437 coordination.setIgnoreDeleteForTesting(true); 438 executor = Executors.newSingleThreadExecutor(); 439 Runnable runnable = new Runnable() { 440 @Override 441 public void run() { 442 try { 443 // since the logDir is a fake, corrupted one, so the split log worker 444 // will finish it quickly with error, and this call will fail and throw 445 // an IOException. 446 slm.splitLogDistributed(logDir); 447 } catch (IOException ioe) { 448 try { 449 assertTrue(fs.exists(corruptedLogFile)); 450 // this call will block waiting for the task to be removed from the 451 // tasks map which is not going to happen since ignoreZKDeleteForTesting 452 // is set to true, until it is interrupted. 453 slm.splitLogDistributed(logDir); 454 } catch (IOException e) { 455 assertTrue(Thread.currentThread().isInterrupted()); 456 return; 457 } 458 fail("did not get the expected IOException from the 2nd call"); 459 } 460 fail("did not get the expected IOException from the 1st call"); 461 } 462 }; 463 Future<?> result = executor.submit(runnable); 464 try { 465 result.get(2000, TimeUnit.MILLISECONDS); 466 } catch (TimeoutException te) { 467 // it is ok, expected. 468 } 469 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); 470 executor.shutdownNow(); 471 executor = null; 472 473 // make sure the runnable is finished with no exception thrown. 474 result.get(); 475 } finally { 476 if (executor != null) { 477 // interrupt the thread in case the test fails in the middle. 478 // it has no effect if the thread is already terminated. 479 executor.shutdownNow(); 480 } 481 fs.delete(logDir, true); 482 } 483 } 484 485 private Table installTable(ZKWatcher zkw, int nrs) throws Exception { 486 return installTable(zkw, nrs, 0); 487 } 488 489 private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception { 490 // Create a table with regions 491 byte[] family = Bytes.toBytes("family"); 492 LOG.info("Creating table with " + nrs + " regions"); 493 Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); 494 int numRegions = -1; 495 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 496 numRegions = r.getStartKeys().length; 497 } 498 assertEquals(nrs, numRegions); 499 LOG.info("Waiting for no more RIT\n"); 500 blockUntilNoRIT(zkw, master); 501 // disable-enable cycle to get rid of table's dead regions left behind 502 // by createMultiRegions 503 assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName)); 504 LOG.debug("Disabling table\n"); 505 TEST_UTIL.getAdmin().disableTable(tableName); 506 LOG.debug("Waiting for no more RIT\n"); 507 blockUntilNoRIT(zkw, master); 508 NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster); 509 LOG.debug("Verifying only catalog and namespace regions are assigned\n"); 510 if (regions.size() != 2) { 511 for (String oregion : regions) 512 LOG.debug("Region still online: " + oregion); 513 } 514 assertEquals(2 + existingRegions, regions.size()); 515 LOG.debug("Enabling table\n"); 516 TEST_UTIL.getAdmin().enableTable(tableName); 517 LOG.debug("Waiting for no more RIT\n"); 518 blockUntilNoRIT(zkw, master); 519 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); 520 regions = HBaseTestingUtility.getAllOnlineRegions(cluster); 521 assertEquals(numRegions + 2 + existingRegions, regions.size()); 522 return table; 523 } 524 525 void populateDataInTable(int nrows) throws Exception { 526 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 527 assertEquals(NUM_RS, rsts.size()); 528 529 for (RegionServerThread rst : rsts) { 530 HRegionServer hrs = rst.getRegionServer(); 531 List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 532 for (RegionInfo hri : hris) { 533 if (hri.getTable().isSystemTable()) { 534 continue; 535 } 536 LOG.debug( 537 "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString()); 538 Region region = hrs.getOnlineRegion(hri.getRegionName()); 539 assertTrue(region != null); 540 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 541 } 542 } 543 544 for (MasterThread mt : cluster.getLiveMasterThreads()) { 545 HRegionServer hrs = mt.getMaster(); 546 List<RegionInfo> hris; 547 try { 548 hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 549 } catch (ServerNotRunningYetException e) { 550 // It's ok: this master may be a backup. Ignored. 551 continue; 552 } 553 for (RegionInfo hri : hris) { 554 if (hri.getTable().isSystemTable()) { 555 continue; 556 } 557 LOG.debug( 558 "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString()); 559 Region region = hrs.getOnlineRegion(hri.getRegionName()); 560 assertTrue(region != null); 561 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 562 } 563 } 564 } 565 566 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size) 567 throws IOException { 568 makeWAL(hrs, regions, num_edits, edit_size, true); 569 } 570 571 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize, 572 boolean cleanShutdown) throws IOException { 573 // remove root and meta region 574 regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); 575 576 for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) { 577 RegionInfo regionInfo = iter.next(); 578 if (regionInfo.getTable().isSystemTable()) { 579 iter.remove(); 580 } 581 } 582 byte[] value = new byte[editSize]; 583 584 List<RegionInfo> hris = new ArrayList<>(); 585 for (RegionInfo region : regions) { 586 if (region.getTable() != tableName) { 587 continue; 588 } 589 hris.add(region); 590 } 591 LOG.info("Creating wal edits across " + hris.size() + " regions."); 592 for (int i = 0; i < editSize; i++) { 593 value[i] = (byte) ('a' + (i % 26)); 594 } 595 int n = hris.size(); 596 int[] counts = new int[n]; 597 // sync every ~30k to line up with desired wal rolls 598 final int syncEvery = 30 * 1024 / editSize; 599 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 600 if (n > 0) { 601 for (int i = 0; i < numEdits; i += 1) { 602 WALEdit e = new WALEdit(); 603 RegionInfo curRegionInfo = hris.get(i % n); 604 WAL log = hrs.getWAL(curRegionInfo); 605 byte[] startRow = curRegionInfo.getStartKey(); 606 if (startRow == null || startRow.length == 0) { 607 startRow = new byte[] { 0, 0, 0, 0, 1 }; 608 } 609 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); 610 row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because 611 // HBaseTestingUtility.createMultiRegions use 5 bytes key 612 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); 613 e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); 614 log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), 615 tableName, System.currentTimeMillis(), mvcc), e); 616 if (0 == i % syncEvery) { 617 log.sync(); 618 } 619 counts[i % n] += 1; 620 } 621 } 622 // done as two passes because the regions might share logs. shutdown is idempotent, but sync 623 // will cause errors if done after. 624 for (RegionInfo info : hris) { 625 WAL log = hrs.getWAL(info); 626 log.sync(); 627 } 628 if (cleanShutdown) { 629 for (RegionInfo info : hris) { 630 WAL log = hrs.getWAL(info); 631 log.shutdown(); 632 } 633 } 634 for (int i = 0; i < n; i++) { 635 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); 636 } 637 return; 638 } 639 640 private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException { 641 int count = 0; 642 try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) { 643 WAL.Entry e; 644 while ((e = in.next()) != null) { 645 if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { 646 count++; 647 } 648 } 649 } 650 return count; 651 } 652 653 private void blockUntilNoRIT(ZKWatcher zkw, HMaster master) throws Exception { 654 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 655 } 656 657 private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families) 658 throws IOException { 659 for (int i = 0; i < numRows; i++) { 660 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); 661 for (byte[] family : families) { 662 put.addColumn(family, qf, null); 663 } 664 region.put(put); 665 } 666 } 667 668 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 669 throws InterruptedException { 670 long curt = System.currentTimeMillis(); 671 long endt = curt + timems; 672 while (curt < endt) { 673 if (ctr.sum() == oldval) { 674 Thread.sleep(100); 675 curt = System.currentTimeMillis(); 676 } else { 677 assertEquals(newval, ctr.sum()); 678 return; 679 } 680 } 681 fail(); 682 } 683 684 private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { 685 for (MasterThread mt : cluster.getLiveMasterThreads()) { 686 if (mt.getMaster().isActiveMaster()) { 687 mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); 688 mt.join(); 689 break; 690 } 691 } 692 LOG.debug("Master is aborted"); 693 } 694 695 /** 696 * Find a RS that has regions of a table. 697 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well 698 */ 699 private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception { 700 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 701 List<RegionInfo> regions = null; 702 HRegionServer hrs = null; 703 704 for (RegionServerThread rst : rsts) { 705 hrs = rst.getRegionServer(); 706 while (rst.isAlive() && !hrs.isOnline()) { 707 Thread.sleep(100); 708 } 709 if (!rst.isAlive()) { 710 continue; 711 } 712 boolean isCarryingMeta = false; 713 boolean foundTableRegion = false; 714 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 715 for (RegionInfo region : regions) { 716 if (region.isMetaRegion()) { 717 isCarryingMeta = true; 718 } 719 if (region.getTable() == tableName) { 720 foundTableRegion = true; 721 } 722 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { 723 break; 724 } 725 } 726 if (isCarryingMeta && hasMetaRegion) { 727 // clients ask for a RS with META 728 if (!foundTableRegion) { 729 HRegionServer destRS = hrs; 730 // the RS doesn't have regions of the specified table so we need move one to this RS 731 List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); 732 RegionInfo hri = tableRegions.get(0); 733 TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName()); 734 // wait for region move completes 735 RegionStates regionStates = 736 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 737 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { 738 @Override 739 public boolean evaluate() throws Exception { 740 ServerName sn = regionStates.getRegionServerOfRegion(hri); 741 return (sn != null && sn.equals(destRS.getServerName())); 742 } 743 }); 744 } 745 return hrs; 746 } else if (hasMetaRegion || isCarryingMeta) { 747 continue; 748 } 749 if (foundTableRegion) { 750 break; 751 } 752 } 753 754 return hrs; 755 } 756}