001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.lang.reflect.Method; 028import java.security.PrivilegedExceptionAction; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.NavigableSet; 037import java.util.Objects; 038import java.util.Set; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FSDataInputStream; 045import org.apache.hadoop.fs.FSDataOutputStream; 046import org.apache.hadoop.fs.FileStatus; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.FileUtil; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.fs.PathFilter; 051import org.apache.hadoop.hbase.Cell; 052import org.apache.hadoop.hbase.HBaseClassTestRule; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtility; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.ServerName; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.client.RegionInfoBuilder; 061import org.apache.hadoop.hbase.regionserver.HRegion; 062import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; 063import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; 064import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 065import org.apache.hadoop.hbase.security.User; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.testclassification.RegionServerTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CancelableProgressable; 070import org.apache.hadoop.hbase.util.CommonFSUtils; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.hbase.wal.WAL.Entry; 074import org.apache.hadoop.hbase.wal.WAL.Reader; 075import org.apache.hadoop.hbase.wal.WALProvider.Writer; 076import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; 077import org.apache.hadoop.hdfs.DFSTestUtil; 078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 079import org.apache.hadoop.ipc.RemoteException; 080import org.junit.After; 081import org.junit.AfterClass; 082import org.junit.Before; 083import org.junit.BeforeClass; 084import org.junit.ClassRule; 085import org.junit.Rule; 086import org.junit.Test; 087import org.junit.experimental.categories.Category; 088import org.junit.rules.TestName; 089import org.mockito.Mockito; 090import org.mockito.invocation.InvocationOnMock; 091import org.mockito.stubbing.Answer; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 097import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 098import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 099 100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 102 103/** 104 * Testing {@link WAL} splitting code. 105 */ 106@Category({RegionServerTests.class, LargeTests.class}) 107public class TestWALSplit { 108 109 @ClassRule 110 public static final HBaseClassTestRule CLASS_RULE = 111 HBaseClassTestRule.forClass(TestWALSplit.class); 112 113 { 114 // Uncomment the following lines if more verbosity is needed for 115 // debugging (see HBASE-12285 for details). 116 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); 117 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); 118 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); 119 } 120 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class); 121 122 private static Configuration conf; 123 private FileSystem fs; 124 125 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 126 127 private Path HBASEDIR; 128 private Path HBASELOGDIR; 129 private Path WALDIR; 130 private Path OLDLOGDIR; 131 private Path CORRUPTDIR; 132 private Path TABLEDIR; 133 private String TMPDIRNAME; 134 135 private static final int NUM_WRITERS = 10; 136 private static final int ENTRIES = 10; // entries per writer per region 137 138 private static final String FILENAME_BEING_SPLIT = "testfile"; 139 private static final TableName TABLE_NAME = 140 TableName.valueOf("t1"); 141 private static final byte[] FAMILY = Bytes.toBytes("f1"); 142 private static final byte[] QUALIFIER = Bytes.toBytes("q1"); 143 private static final byte[] VALUE = Bytes.toBytes("v1"); 144 private static final String WAL_FILE_PREFIX = "wal.dat."; 145 private static List<String> REGIONS = new ArrayList<>(); 146 private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors"; 147 private static String ROBBER; 148 private static String ZOMBIE; 149 private static String [] GROUP = new String [] {"supergroup"}; 150 151 static enum Corruptions { 152 INSERT_GARBAGE_ON_FIRST_LINE, 153 INSERT_GARBAGE_IN_THE_MIDDLE, 154 APPEND_GARBAGE, 155 TRUNCATE, 156 TRUNCATE_TRAILER 157 } 158 159 @BeforeClass 160 public static void setUpBeforeClass() throws Exception { 161 conf = TEST_UTIL.getConfiguration(); 162 conf.setClass("hbase.regionserver.hlog.writer.impl", 163 InstrumentedLogWriter.class, Writer.class); 164 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. 165 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 166 // Create fake maping user to group and set it to the conf. 167 Map<String, String []> u2g_map = new HashMap<>(2); 168 ROBBER = User.getCurrent().getName() + "-robber"; 169 ZOMBIE = User.getCurrent().getName() + "-zombie"; 170 u2g_map.put(ROBBER, GROUP); 171 u2g_map.put(ZOMBIE, GROUP); 172 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); 173 conf.setInt("dfs.heartbeat.interval", 1); 174 TEST_UTIL.startMiniDFSCluster(2); 175 } 176 177 @AfterClass 178 public static void tearDownAfterClass() throws Exception { 179 TEST_UTIL.shutdownMiniDFSCluster(); 180 } 181 182 @Rule 183 public TestName name = new TestName(); 184 private WALFactory wals = null; 185 186 @Before 187 public void setUp() throws Exception { 188 LOG.info("Cleaning up cluster for new test."); 189 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 190 HBASEDIR = TEST_UTIL.createRootDir(); 191 HBASELOGDIR = TEST_UTIL.createWALRootDir(); 192 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME); 193 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME); 194 TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 195 TMPDIRNAME = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 196 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 197 REGIONS.clear(); 198 Collections.addAll(REGIONS, "bbb", "ccc"); 199 InstrumentedLogWriter.activateFailure = false; 200 wals = new WALFactory(conf, name.getMethodName()); 201 WALDIR = new Path(HBASELOGDIR, 202 AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(), 203 16010, System.currentTimeMillis()).toString())); 204 //fs.mkdirs(WALDIR); 205 } 206 207 @After 208 public void tearDown() throws Exception { 209 try { 210 wals.close(); 211 } catch(IOException exception) { 212 // Some tests will move WALs out from under us. In those cases, we'll get an error on close. 213 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" + 214 " you see a failure look here."); 215 LOG.debug("exception details", exception); 216 } finally { 217 wals = null; 218 fs.delete(HBASEDIR, true); 219 fs.delete(HBASELOGDIR, true); 220 } 221 } 222 223 /** 224 * Simulates splitting a WAL out from under a regionserver that is still trying to write it. 225 * Ensures we do not lose edits. 226 * @throws IOException 227 * @throws InterruptedException 228 */ 229 @Test 230 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { 231 final AtomicLong counter = new AtomicLong(0); 232 AtomicBoolean stop = new AtomicBoolean(false); 233 // Region we'll write edits too and then later examine to make sure they all made it in. 234 final String region = REGIONS.get(0); 235 final int numWriters = 3; 236 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters); 237 try { 238 long startCount = counter.get(); 239 zombie.start(); 240 // Wait till writer starts going. 241 while (startCount == counter.get()) Threads.sleep(1); 242 // Give it a second to write a few appends. 243 Threads.sleep(1000); 244 final Configuration conf2 = HBaseConfiguration.create(conf); 245 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP); 246 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() { 247 @Override 248 public Integer run() throws Exception { 249 StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR) 250 .append("):\n"); 251 for (FileStatus status : fs.listStatus(WALDIR)) { 252 ls.append("\t").append(status.toString()).append("\n"); 253 } 254 LOG.debug(Objects.toString(ls)); 255 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); 256 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); 257 LOG.info("Finished splitting out from under zombie."); 258 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 259 assertEquals("wrong number of split files for region", numWriters, logfiles.length); 260 int count = 0; 261 for (Path logfile: logfiles) { 262 count += countWAL(logfile); 263 } 264 return count; 265 } 266 }); 267 LOG.info("zombie=" + counter.get() + ", robber=" + count); 268 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " + 269 "Zombie could write " + counter.get() + " and logfile had only " + count, 270 counter.get() == count || counter.get() + 1 == count); 271 } finally { 272 stop.set(true); 273 zombie.interrupt(); 274 Threads.threadDumpingIsAlive(zombie); 275 } 276 } 277 278 /** 279 * This thread will keep writing to a 'wal' file even after the split process has started. 280 * It simulates a region server that was considered dead but woke up and wrote some more to the 281 * last log entry. Does its writing as an alternate user in another filesystem instance to 282 * simulate better it being a regionserver. 283 */ 284 class ZombieLastLogWriterRegionServer extends Thread { 285 final AtomicLong editsCount; 286 final AtomicBoolean stop; 287 final int numOfWriters; 288 /** 289 * Region to write edits for. 290 */ 291 final String region; 292 final User user; 293 294 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop, 295 final String region, final int writers) 296 throws IOException, InterruptedException { 297 super("ZombieLastLogWriterRegionServer"); 298 setDaemon(true); 299 this.stop = stop; 300 this.editsCount = counter; 301 this.region = region; 302 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP); 303 numOfWriters = writers; 304 } 305 306 @Override 307 public void run() { 308 try { 309 doWriting(); 310 } catch (IOException e) { 311 LOG.warn(getName() + " Writer exiting " + e); 312 } catch (InterruptedException e) { 313 LOG.warn(getName() + " Writer exiting " + e); 314 } 315 } 316 317 private void doWriting() throws IOException, InterruptedException { 318 this.user.runAs(new PrivilegedExceptionAction<Object>() { 319 @Override 320 public Object run() throws Exception { 321 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose 322 // index we supply here. 323 int walToKeepOpen = numOfWriters - 1; 324 // The below method writes numOfWriters files each with ENTRIES entries for a total of 325 // numOfWriters * ENTRIES added per column family in the region. 326 Writer writer = null; 327 try { 328 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); 329 } catch (IOException e1) { 330 throw new RuntimeException("Failed", e1); 331 } 332 // Update counter so has all edits written so far. 333 editsCount.addAndGet(numOfWriters * ENTRIES); 334 loop(writer); 335 // If we've been interruped, then things should have shifted out from under us. 336 // closing should error 337 try { 338 writer.close(); 339 fail("Writing closing after parsing should give an error."); 340 } catch (IOException exception) { 341 LOG.debug("ignoring error when closing final writer.", exception); 342 } 343 return null; 344 } 345 }); 346 } 347 348 private void loop(final Writer writer) { 349 byte [] regionBytes = Bytes.toBytes(this.region); 350 while (!stop.get()) { 351 try { 352 long seq = appendEntry(writer, TABLE_NAME, regionBytes, 353 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0); 354 long count = editsCount.incrementAndGet(); 355 LOG.info(getName() + " sync count=" + count + ", seq=" + seq); 356 try { 357 Thread.sleep(1); 358 } catch (InterruptedException e) { 359 // 360 } 361 } catch (IOException ex) { 362 LOG.error(getName() + " ex " + ex.toString()); 363 if (ex instanceof RemoteException) { 364 LOG.error("Juliet: got RemoteException " + ex.getMessage() + 365 " while writing " + (editsCount.get() + 1)); 366 } else { 367 LOG.error(getName() + " failed to write....at " + editsCount.get()); 368 fail("Failed to write " + editsCount.get()); 369 } 370 break; 371 } catch (Throwable t) { 372 LOG.error(getName() + " HOW? " + t); 373 LOG.debug("exception details", t); 374 break; 375 } 376 } 377 LOG.info(getName() + " Writer exiting"); 378 } 379 } 380 381 /** 382 * @see "https://issues.apache.org/jira/browse/HBASE-3020" 383 */ 384 @Test 385 public void testRecoveredEditsPathForMeta() throws IOException { 386 Path p = createRecoveredEditsPathForRegion(); 387 String parentOfParent = p.getParent().getParent().getName(); 388 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 389 } 390 391 /** 392 * Test old recovered edits file doesn't break WALSplitter. 393 * This is useful in upgrading old instances. 394 */ 395 @Test 396 public void testOldRecoveredEditsFileSidelined() throws IOException { 397 Path p = createRecoveredEditsPathForRegion(); 398 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); 399 Path regiondir = new Path(tdir, 400 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 401 fs.mkdirs(regiondir); 402 Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 403 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); 404 fs.createNewFile(parent); // create a recovered.edits file 405 String parentOfParent = p.getParent().getParent().getName(); 406 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 407 WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); 408 } 409 410 private Path createRecoveredEditsPathForRegion() throws IOException { 411 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); 412 long now = System.currentTimeMillis(); 413 Entry entry = new Entry( 414 new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), 415 new WALEdit()); 416 Path p = WALSplitUtil 417 .getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT, 418 TMPDIRNAME, conf); 419 return p; 420 } 421 422 @Test 423 public void testHasRecoveredEdits() throws IOException { 424 Path p = createRecoveredEditsPathForRegion(); 425 assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 426 String renamedEdit = p.getName().split("-")[0]; 427 fs.createNewFile(new Path(p.getParent(), renamedEdit)); 428 assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 429 } 430 431 private void useDifferentDFSClient() throws IOException { 432 // make fs act as a different client now 433 // initialize will create a new DFSClient with a new client ID 434 fs.initialize(fs.getUri(), conf); 435 } 436 437 @Test 438 public void testSplitPreservesEdits() throws IOException{ 439 final String REGION = "region__1"; 440 REGIONS.clear(); 441 REGIONS.add(REGION); 442 443 generateWALs(1, 10, -1, 0); 444 useDifferentDFSClient(); 445 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 446 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 447 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 448 assertEquals(1, splitLog.length); 449 450 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 451 } 452 453 @Test 454 public void testSplitRemovesRegionEventsEdits() throws IOException{ 455 final String REGION = "region__1"; 456 REGIONS.clear(); 457 REGIONS.add(REGION); 458 459 generateWALs(1, 10, -1, 100); 460 useDifferentDFSClient(); 461 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 462 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 463 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 464 assertEquals(1, splitLog.length); 465 466 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 467 // split log should only have the test edits 468 assertEquals(10, countWAL(splitLog[0])); 469 } 470 471 472 @Test 473 public void testSplitLeavesCompactionEventsEdits() throws IOException{ 474 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 475 REGIONS.clear(); 476 REGIONS.add(hri.getEncodedName()); 477 Path regionDir = 478 new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName()); 479 LOG.info("Creating region directory: " + regionDir); 480 assertTrue(fs.mkdirs(regionDir)); 481 482 Writer writer = generateWALs(1, 10, 0, 10); 483 String[] compactInputs = new String[]{"file1", "file2", "file3"}; 484 String compactOutput = "file4"; 485 appendCompactionEvent(writer, hri, compactInputs, compactOutput); 486 writer.close(); 487 488 useDifferentDFSClient(); 489 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 490 491 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 492 // original log should have 10 test edits, 10 region markers, 1 compaction marker 493 assertEquals(21, countWAL(originalLog)); 494 495 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); 496 assertEquals(1, splitLog.length); 497 498 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 499 // split log should have 10 test edits plus 1 compaction marker 500 assertEquals(11, countWAL(splitLog[0])); 501 } 502 503 /** 504 * @param expectedEntries -1 to not assert 505 * @return the count across all regions 506 */ 507 private int splitAndCount(final int expectedFiles, final int expectedEntries) 508 throws IOException { 509 useDifferentDFSClient(); 510 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 511 int result = 0; 512 for (String region : REGIONS) { 513 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 514 assertEquals(expectedFiles, logfiles.length); 515 int count = 0; 516 for (Path logfile: logfiles) { 517 count += countWAL(logfile); 518 } 519 if (-1 != expectedEntries) { 520 assertEquals(expectedEntries, count); 521 } 522 result += count; 523 } 524 return result; 525 } 526 527 @Test 528 public void testEmptyLogFiles() throws IOException { 529 testEmptyLogFiles(true); 530 } 531 532 @Test 533 public void testEmptyOpenLogFiles() throws IOException { 534 testEmptyLogFiles(false); 535 } 536 537 private void testEmptyLogFiles(final boolean close) throws IOException { 538 // we won't create the hlog dir until getWAL got called, so 539 // make dir here when testing empty log file 540 fs.mkdirs(WALDIR); 541 injectEmptyFile(".empty", close); 542 generateWALs(Integer.MAX_VALUE); 543 injectEmptyFile("empty", close); 544 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty 545 } 546 547 @Test 548 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { 549 // generate logs but leave wal.dat.5 open. 550 generateWALs(5); 551 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 552 } 553 554 @Test 555 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { 556 conf.setBoolean(HBASE_SKIP_ERRORS, true); 557 generateWALs(Integer.MAX_VALUE); 558 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 559 Corruptions.APPEND_GARBAGE, true); 560 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 561 } 562 563 @Test 564 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { 565 conf.setBoolean(HBASE_SKIP_ERRORS, true); 566 generateWALs(Integer.MAX_VALUE); 567 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 568 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 569 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt 570 } 571 572 @Test 573 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { 574 conf.setBoolean(HBASE_SKIP_ERRORS, true); 575 generateWALs(Integer.MAX_VALUE); 576 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 577 Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false); 578 // the entries in the original logs are alternating regions 579 // considering the sequence file header, the middle corruption should 580 // affect at least half of the entries 581 int goodEntries = (NUM_WRITERS - 1) * ENTRIES; 582 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; 583 int allRegionsCount = splitAndCount(NUM_WRITERS, -1); 584 assertTrue("The file up to the corrupted area hasn't been parsed", 585 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount); 586 } 587 588 @Test 589 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { 590 conf.setBoolean(HBASE_SKIP_ERRORS, true); 591 List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays 592 .asList(FaultyProtobufLogReader.FailureType.values()).stream() 593 .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList()); 594 for (FaultyProtobufLogReader.FailureType failureType : failureTypes) { 595 final Set<String> walDirContents = splitCorruptWALs(failureType); 596 final Set<String> archivedLogs = new HashSet<>(); 597 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); 598 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 599 archived.append("\n\t").append(log.toString()); 600 archivedLogs.add(log.getPath().getName()); 601 } 602 LOG.debug(archived.toString()); 603 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs, 604 walDirContents); 605 } 606 } 607 608 /** 609 * @return set of wal names present prior to split attempt. 610 * @throws IOException if the split process fails 611 */ 612 private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType) 613 throws IOException { 614 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", 615 Reader.class); 616 InstrumentedLogWriter.activateFailure = false; 617 618 try { 619 conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class, 620 Reader.class); 621 conf.set("faultyprotobuflogreader.failuretype", failureType.name()); 622 // Clean up from previous tests or previous loop 623 try { 624 wals.shutdown(); 625 } catch (IOException exception) { 626 // since we're splitting out from under the factory, we should expect some closing failures. 627 LOG.debug("Ignoring problem closing WALFactory.", exception); 628 } 629 wals.close(); 630 try { 631 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 632 fs.delete(log.getPath(), true); 633 } 634 } catch (FileNotFoundException exception) { 635 LOG.debug("no previous CORRUPTDIR to clean."); 636 } 637 // change to the faulty reader 638 wals = new WALFactory(conf, name.getMethodName()); 639 generateWALs(-1); 640 // Our reader will render all of these files corrupt. 641 final Set<String> walDirContents = new HashSet<>(); 642 for (FileStatus status : fs.listStatus(WALDIR)) { 643 walDirContents.add(status.getPath().getName()); 644 } 645 useDifferentDFSClient(); 646 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 647 return walDirContents; 648 } finally { 649 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, 650 Reader.class); 651 } 652 } 653 654 @Test (expected = IOException.class) 655 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() 656 throws IOException { 657 conf.setBoolean(HBASE_SKIP_ERRORS, false); 658 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 659 } 660 661 @Test 662 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() 663 throws IOException { 664 conf.setBoolean(HBASE_SKIP_ERRORS, false); 665 try { 666 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 667 } catch (IOException e) { 668 LOG.debug("split with 'skip errors' set to 'false' correctly threw"); 669 } 670 assertEquals("if skip.errors is false all files should remain in place", 671 NUM_WRITERS, fs.listStatus(WALDIR).length); 672 } 673 674 private void ignoreCorruption(final Corruptions corruption, final int entryCount, 675 final int expectedCount) throws IOException { 676 conf.setBoolean(HBASE_SKIP_ERRORS, false); 677 678 final String REGION = "region__1"; 679 REGIONS.clear(); 680 REGIONS.add(REGION); 681 682 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); 683 generateWALs(1, entryCount, -1, 0); 684 corruptWAL(c1, corruption, true); 685 686 useDifferentDFSClient(); 687 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 688 689 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 690 assertEquals(1, splitLog.length); 691 692 int actualCount = 0; 693 Reader in = wals.createReader(fs, splitLog[0]); 694 @SuppressWarnings("unused") 695 Entry entry; 696 while ((entry = in.next()) != null) ++actualCount; 697 assertEquals(expectedCount, actualCount); 698 in.close(); 699 700 // should not have stored the EOF files as corrupt 701 FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); 702 assertEquals(0, archivedLogs.length); 703 704 } 705 706 @Test 707 public void testEOFisIgnored() throws IOException { 708 int entryCount = 10; 709 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1); 710 } 711 712 @Test 713 public void testCorruptWALTrailer() throws IOException { 714 int entryCount = 10; 715 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount); 716 } 717 718 @Test 719 public void testLogsGetArchivedAfterSplit() throws IOException { 720 conf.setBoolean(HBASE_SKIP_ERRORS, false); 721 generateWALs(-1); 722 useDifferentDFSClient(); 723 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 724 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); 725 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); 726 } 727 728 @Test 729 public void testSplit() throws IOException { 730 generateWALs(-1); 731 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 732 } 733 734 @Test 735 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() 736 throws IOException { 737 generateWALs(-1); 738 useDifferentDFSClient(); 739 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 740 FileStatus [] statuses = null; 741 try { 742 statuses = fs.listStatus(WALDIR); 743 if (statuses != null) { 744 fail("Files left in log dir: " + 745 Joiner.on(",").join(FileUtil.stat2Paths(statuses))); 746 } 747 } catch (FileNotFoundException e) { 748 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null 749 } 750 } 751 752 @Test(expected = IOException.class) 753 public void testSplitWillFailIfWritingToRegionFails() throws Exception { 754 //leave 5th log open so we could append the "trap" 755 Writer writer = generateWALs(4); 756 useDifferentDFSClient(); 757 758 String region = "break"; 759 Path regiondir = new Path(TABLEDIR, region); 760 fs.mkdirs(regiondir); 761 762 InstrumentedLogWriter.activateFailure = false; 763 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), 764 Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0); 765 writer.close(); 766 767 try { 768 InstrumentedLogWriter.activateFailure = true; 769 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 770 } catch (IOException e) { 771 assertTrue(e.getMessage(). 772 contains("This exception is instrumented and should only be thrown for testing")); 773 throw e; 774 } finally { 775 InstrumentedLogWriter.activateFailure = false; 776 } 777 } 778 779 @Test 780 public void testSplitDeletedRegion() throws IOException { 781 REGIONS.clear(); 782 String region = "region_that_splits"; 783 REGIONS.add(region); 784 785 generateWALs(1); 786 useDifferentDFSClient(); 787 788 Path regiondir = new Path(TABLEDIR, region); 789 fs.delete(regiondir, true); 790 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 791 assertFalse(fs.exists(regiondir)); 792 } 793 794 @Test 795 public void testIOEOnOutputThread() throws Exception { 796 conf.setBoolean(HBASE_SKIP_ERRORS, false); 797 798 generateWALs(-1); 799 useDifferentDFSClient(); 800 FileStatus[] logfiles = fs.listStatus(WALDIR); 801 assertTrue("There should be some log file", 802 logfiles != null && logfiles.length > 0); 803 // wals with no entries (like the one we don't use in the factory) 804 // won't cause a failure since nothing will ever be written. 805 // pick the largest one since it's most likely to have entries. 806 int largestLogFile = 0; 807 long largestSize = 0; 808 for (int i = 0; i < logfiles.length; i++) { 809 if (logfiles[i].getLen() > largestSize) { 810 largestLogFile = i; 811 largestSize = logfiles[i].getLen(); 812 } 813 } 814 assertTrue("There should be some log greater than size 0.", 0 < largestSize); 815 // Set up a splitter that will throw an IOE on the output side 816 WALSplitter logSplitter = 817 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 818 @Override 819 protected Writer createWriter(Path logfile) throws IOException { 820 Writer mockWriter = Mockito.mock(Writer.class); 821 Mockito.doThrow(new IOException("Injected")).when(mockWriter) 822 .append(Mockito.<Entry> any()); 823 return mockWriter; 824 } 825 }; 826 // Set up a background thread dumper. Needs a thread to depend on and then we need to run 827 // the thread dumping in a background thread so it does not hold up the test. 828 final AtomicBoolean stop = new AtomicBoolean(false); 829 final Thread someOldThread = new Thread("Some-old-thread") { 830 @Override 831 public void run() { 832 while(!stop.get()) Threads.sleep(10); 833 } 834 }; 835 someOldThread.setDaemon(true); 836 someOldThread.start(); 837 final Thread t = new Thread("Background-thread-dumper") { 838 @Override 839 public void run() { 840 try { 841 Threads.threadDumpingIsAlive(someOldThread); 842 } catch (InterruptedException e) { 843 e.printStackTrace(); 844 } 845 } 846 }; 847 t.setDaemon(true); 848 t.start(); 849 try { 850 logSplitter.splitLogFile(logfiles[largestLogFile], null); 851 fail("Didn't throw!"); 852 } catch (IOException ioe) { 853 assertTrue(ioe.toString().contains("Injected")); 854 } finally { 855 // Setting this to true will turn off the background thread dumper. 856 stop.set(true); 857 } 858 } 859 860 /** 861 * @param spiedFs should be instrumented for failure. 862 */ 863 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { 864 generateWALs(-1); 865 useDifferentDFSClient(); 866 867 try { 868 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); 869 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); 870 assertFalse(fs.exists(WALDIR)); 871 } catch (IOException e) { 872 fail("There shouldn't be any exception but: " + e.toString()); 873 } 874 } 875 876 // Test for HBASE-3412 877 @Test 878 public void testMovedWALDuringRecovery() throws Exception { 879 // This partial mock will throw LEE for every file simulating 880 // files that were moved 881 FileSystem spiedFs = Mockito.spy(fs); 882 // The "File does not exist" part is very important, 883 // that's how it comes out of HDFS 884 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). 885 when(spiedFs).append(Mockito.<Path>any()); 886 retryOverHdfsProblem(spiedFs); 887 } 888 889 @Test 890 public void testRetryOpenDuringRecovery() throws Exception { 891 FileSystem spiedFs = Mockito.spy(fs); 892 // The "Cannot obtain block length", "Could not obtain the last block", 893 // and "Blocklist for [^ ]* has changed.*" part is very important, 894 // that's how it comes out of HDFS. If HDFS changes the exception 895 // message, this test needs to be adjusted accordingly. 896 // 897 // When DFSClient tries to open a file, HDFS needs to locate 898 // the last block of the file and get its length. However, if the 899 // last block is under recovery, HDFS may have problem to obtain 900 // the block length, in which case, retry may help. 901 Mockito.doAnswer(new Answer<FSDataInputStream>() { 902 private final String[] errors = new String[] { 903 "Cannot obtain block length", "Could not obtain the last block", 904 "Blocklist for " + OLDLOGDIR + " has changed"}; 905 private int count = 0; 906 907 @Override 908 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 909 if (count < 3) { 910 throw new IOException(errors[count++]); 911 } 912 return (FSDataInputStream)invocation.callRealMethod(); 913 } 914 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); 915 retryOverHdfsProblem(spiedFs); 916 } 917 918 @Test 919 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { 920 generateWALs(1, 10, -1); 921 FileStatus logfile = fs.listStatus(WALDIR)[0]; 922 useDifferentDFSClient(); 923 924 final AtomicInteger count = new AtomicInteger(); 925 926 CancelableProgressable localReporter 927 = new CancelableProgressable() { 928 @Override 929 public boolean progress() { 930 count.getAndIncrement(); 931 return false; 932 } 933 }; 934 935 FileSystem spiedFs = Mockito.spy(fs); 936 Mockito.doAnswer(new Answer<FSDataInputStream>() { 937 @Override 938 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 939 Thread.sleep(1500); // Sleep a while and wait report status invoked 940 return (FSDataInputStream)invocation.callRealMethod(); 941 } 942 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); 943 944 try { 945 conf.setInt("hbase.splitlog.report.period", 1000); 946 boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null, 947 null, wals, null); 948 assertFalse("Log splitting should failed", ret); 949 assertTrue(count.get() > 0); 950 } catch (IOException e) { 951 fail("There shouldn't be any exception but: " + e.toString()); 952 } finally { 953 // reset it back to its default value 954 conf.setInt("hbase.splitlog.report.period", 59000); 955 } 956 } 957 958 /** 959 * Test log split process with fake data and lots of edits to trigger threading 960 * issues. 961 */ 962 @Test 963 public void testThreading() throws Exception { 964 doTestThreading(20000, 128*1024*1024, 0); 965 } 966 967 /** 968 * Test blocking behavior of the log split process if writers are writing slower 969 * than the reader is reading. 970 */ 971 @Test 972 public void testThreadingSlowWriterSmallBuffer() throws Exception { 973 doTestThreading(200, 1024, 50); 974 } 975 976 /** 977 * Sets up a log splitter with a mock reader and writer. The mock reader generates 978 * a specified number of edits spread across 5 regions. The mock writer optionally 979 * sleeps for each edit it is fed. 980 * * 981 * After the split is complete, verifies that the statistics show the correct number 982 * of edits output into each region. 983 * 984 * @param numFakeEdits number of fake edits to push through pipeline 985 * @param bufferSize size of in-memory buffer 986 * @param writerSlowness writer threads will sleep this many ms per edit 987 */ 988 private void doTestThreading(final int numFakeEdits, 989 final int bufferSize, 990 final int writerSlowness) throws Exception { 991 992 Configuration localConf = new Configuration(conf); 993 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); 994 995 // Create a fake log file (we'll override the reader to produce a stream of edits) 996 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake"); 997 FSDataOutputStream out = fs.create(logPath); 998 out.close(); 999 1000 // Make region dirs for our destination regions so the output doesn't get skipped 1001 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 1002 makeRegionDirs(regions); 1003 1004 // Create a splitter that reads and writes the data without touching disk 1005 WALSplitter logSplitter = 1006 new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 1007 1008 /* Produce a mock writer that doesn't write anywhere */ 1009 @Override 1010 protected Writer createWriter(Path logfile) throws IOException { 1011 Writer mockWriter = Mockito.mock(Writer.class); 1012 Mockito.doAnswer(new Answer<Void>() { 1013 int expectedIndex = 0; 1014 1015 @Override 1016 public Void answer(InvocationOnMock invocation) { 1017 if (writerSlowness > 0) { 1018 try { 1019 Thread.sleep(writerSlowness); 1020 } catch (InterruptedException ie) { 1021 Thread.currentThread().interrupt(); 1022 } 1023 } 1024 Entry entry = (Entry) invocation.getArgument(0); 1025 WALEdit edit = entry.getEdit(); 1026 List<Cell> cells = edit.getCells(); 1027 assertEquals(1, cells.size()); 1028 Cell cell = cells.get(0); 1029 1030 // Check that the edits come in the right order. 1031 assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), 1032 cell.getRowLength())); 1033 expectedIndex++; 1034 return null; 1035 } 1036 }).when(mockWriter).append(Mockito.<Entry>any()); 1037 return mockWriter; 1038 } 1039 1040 /* Produce a mock reader that generates fake entries */ 1041 @Override 1042 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) 1043 throws IOException { 1044 Reader mockReader = Mockito.mock(Reader.class); 1045 Mockito.doAnswer(new Answer<Entry>() { 1046 int index = 0; 1047 1048 @Override 1049 public Entry answer(InvocationOnMock invocation) throws Throwable { 1050 if (index >= numFakeEdits) return null; 1051 1052 // Generate r0 through r4 in round robin fashion 1053 int regionIdx = index % regions.size(); 1054 byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)}; 1055 1056 Entry ret = createTestEntry(TABLE_NAME, region, 1057 Bytes.toBytes(index / regions.size()), 1058 FAMILY, QUALIFIER, VALUE, index); 1059 index++; 1060 return ret; 1061 } 1062 }).when(mockReader).next(); 1063 return mockReader; 1064 } 1065 }; 1066 1067 logSplitter.splitLogFile(fs.getFileStatus(logPath), null); 1068 1069 // Verify number of written edits per region 1070 Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts(); 1071 for (Map.Entry<String, Long> entry : outputCounts.entrySet()) { 1072 LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey()); 1073 assertEquals((long) entry.getValue(), numFakeEdits / regions.size()); 1074 } 1075 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size()); 1076 } 1077 1078 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests? 1079 @Test 1080 public void testSplitLogFileDeletedRegionDir() throws IOException { 1081 LOG.info("testSplitLogFileDeletedRegionDir"); 1082 final String REGION = "region__1"; 1083 REGIONS.clear(); 1084 REGIONS.add(REGION); 1085 1086 generateWALs(1, 10, -1); 1087 useDifferentDFSClient(); 1088 1089 Path regiondir = new Path(TABLEDIR, REGION); 1090 LOG.info("Region directory is" + regiondir); 1091 fs.delete(regiondir, true); 1092 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1093 assertFalse(fs.exists(regiondir)); 1094 } 1095 1096 @Test 1097 public void testSplitLogFileEmpty() throws IOException { 1098 LOG.info("testSplitLogFileEmpty"); 1099 // we won't create the hlog dir until getWAL got called, so 1100 // make dir here when testing empty log file 1101 fs.mkdirs(WALDIR); 1102 injectEmptyFile(".empty", true); 1103 useDifferentDFSClient(); 1104 1105 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1106 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 1107 assertFalse(fs.exists(tdir)); 1108 1109 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath())); 1110 } 1111 1112 @Test 1113 public void testSplitLogFileMultipleRegions() throws IOException { 1114 LOG.info("testSplitLogFileMultipleRegions"); 1115 generateWALs(1, 10, -1); 1116 splitAndCount(1, 10); 1117 } 1118 1119 @Test 1120 public void testSplitLogFileFirstLineCorruptionLog() 1121 throws IOException { 1122 conf.setBoolean(HBASE_SKIP_ERRORS, true); 1123 generateWALs(1, 10, -1); 1124 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1125 1126 corruptWAL(logfile.getPath(), 1127 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 1128 1129 useDifferentDFSClient(); 1130 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1131 1132 final Path corruptDir = 1133 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 1134 assertEquals(1, fs.listStatus(corruptDir).length); 1135 } 1136 1137 /** 1138 * @see "https://issues.apache.org/jira/browse/HBASE-4862" 1139 */ 1140 @Test 1141 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { 1142 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); 1143 // Generate wals for our destination region 1144 String regionName = "r0"; 1145 final Path regiondir = new Path(TABLEDIR, regionName); 1146 REGIONS.clear(); 1147 REGIONS.add(regionName); 1148 generateWALs(-1); 1149 1150 wals.getWAL(null); 1151 FileStatus[] logfiles = fs.listStatus(WALDIR); 1152 assertTrue("There should be some log file", 1153 logfiles != null && logfiles.length > 0); 1154 1155 WALSplitter logSplitter = 1156 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 1157 @Override 1158 protected Writer createWriter(Path logfile) 1159 throws IOException { 1160 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); 1161 // After creating writer, simulate region's 1162 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this 1163 // region and delete them, excluding files with '.temp' suffix. 1164 NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir); 1165 if (files != null && !files.isEmpty()) { 1166 for (Path file : files) { 1167 if (!this.walFS.delete(file, false)) { 1168 LOG.error("Failed delete of " + file); 1169 } else { 1170 LOG.debug("Deleted recovered.edits file=" + file); 1171 } 1172 } 1173 } 1174 return writer; 1175 } 1176 }; 1177 try{ 1178 logSplitter.splitLogFile(logfiles[0], null); 1179 } catch (IOException e) { 1180 LOG.info(e.toString(), e); 1181 fail("Throws IOException when spliting " 1182 + "log, it is most likely because writing file does not " 1183 + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); 1184 } 1185 if (fs.exists(CORRUPTDIR)) { 1186 if (fs.listStatus(CORRUPTDIR).length > 0) { 1187 fail("There are some corrupt logs, " 1188 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); 1189 } 1190 } 1191 } 1192 1193 private Writer generateWALs(int leaveOpen) throws IOException { 1194 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0); 1195 } 1196 1197 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { 1198 return generateWALs(writers, entries, leaveOpen, 7); 1199 } 1200 1201 private void makeRegionDirs(List<String> regions) throws IOException { 1202 for (String region : regions) { 1203 LOG.debug("Creating dir for region " + region); 1204 fs.mkdirs(new Path(TABLEDIR, region)); 1205 } 1206 } 1207 1208 /** 1209 * @param leaveOpen index to leave un-closed. -1 to close all. 1210 * @return the writer that's still open, or null if all were closed. 1211 */ 1212 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException { 1213 makeRegionDirs(REGIONS); 1214 fs.mkdirs(WALDIR); 1215 Writer [] ws = new Writer[writers]; 1216 int seq = 0; 1217 int numRegionEventsAdded = 0; 1218 for (int i = 0; i < writers; i++) { 1219 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); 1220 for (int j = 0; j < entries; j++) { 1221 int prefix = 0; 1222 for (String region : REGIONS) { 1223 String row_key = region + prefix++ + i + j; 1224 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY, 1225 QUALIFIER, VALUE, seq++); 1226 1227 if (numRegionEventsAdded < regionEvents) { 1228 numRegionEventsAdded ++; 1229 appendRegionEvent(ws[i], region); 1230 } 1231 } 1232 } 1233 if (i != leaveOpen) { 1234 ws[i].close(); 1235 LOG.info("Closing writer " + i); 1236 } 1237 } 1238 if (leaveOpen < 0 || leaveOpen >= writers) { 1239 return null; 1240 } 1241 return ws[leaveOpen]; 1242 } 1243 1244 1245 1246 private Path[] getLogForRegion(TableName table, String region) 1247 throws IOException { 1248 Path tdir = CommonFSUtils.getWALTableDir(conf, table); 1249 @SuppressWarnings("deprecation") 1250 Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, 1251 Bytes.toString(Bytes.toBytes(region)))); 1252 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 1253 @Override 1254 public boolean accept(Path p) { 1255 if (WALSplitUtil.isSequenceIdFile(p)) { 1256 return false; 1257 } 1258 return true; 1259 } 1260 }); 1261 Path[] paths = new Path[files.length]; 1262 for (int i = 0; i < files.length; i++) { 1263 paths[i] = files[i].getPath(); 1264 } 1265 return paths; 1266 } 1267 1268 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException { 1269 FSDataOutputStream out; 1270 int fileSize = (int) fs.listStatus(path)[0].getLen(); 1271 1272 FSDataInputStream in = fs.open(path); 1273 byte[] corrupted_bytes = new byte[fileSize]; 1274 in.readFully(0, corrupted_bytes, 0, fileSize); 1275 in.close(); 1276 1277 switch (corruption) { 1278 case APPEND_GARBAGE: 1279 fs.delete(path, false); 1280 out = fs.create(path); 1281 out.write(corrupted_bytes); 1282 out.write(Bytes.toBytes("-----")); 1283 closeOrFlush(close, out); 1284 break; 1285 1286 case INSERT_GARBAGE_ON_FIRST_LINE: 1287 fs.delete(path, false); 1288 out = fs.create(path); 1289 out.write(0); 1290 out.write(corrupted_bytes); 1291 closeOrFlush(close, out); 1292 break; 1293 1294 case INSERT_GARBAGE_IN_THE_MIDDLE: 1295 fs.delete(path, false); 1296 out = fs.create(path); 1297 int middle = (int) Math.floor(corrupted_bytes.length / 2); 1298 out.write(corrupted_bytes, 0, middle); 1299 out.write(0); 1300 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); 1301 closeOrFlush(close, out); 1302 break; 1303 1304 case TRUNCATE: 1305 fs.delete(path, false); 1306 out = fs.create(path); 1307 out.write(corrupted_bytes, 0, fileSize 1308 - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); 1309 closeOrFlush(close, out); 1310 break; 1311 1312 case TRUNCATE_TRAILER: 1313 fs.delete(path, false); 1314 out = fs.create(path); 1315 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. 1316 closeOrFlush(close, out); 1317 break; 1318 } 1319 } 1320 1321 private void closeOrFlush(boolean close, FSDataOutputStream out) 1322 throws IOException { 1323 if (close) { 1324 out.close(); 1325 } else { 1326 Method syncMethod = null; 1327 try { 1328 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{}); 1329 } catch (NoSuchMethodException e) { 1330 try { 1331 syncMethod = out.getClass().getMethod("sync", new Class<?> []{}); 1332 } catch (NoSuchMethodException ex) { 1333 throw new IOException("This version of Hadoop supports " + 1334 "neither Syncable.sync() nor Syncable.hflush()."); 1335 } 1336 } 1337 try { 1338 syncMethod.invoke(out, new Object[]{}); 1339 } catch (Exception e) { 1340 throw new IOException(e); 1341 } 1342 // Not in 0out.hflush(); 1343 } 1344 } 1345 1346 private int countWAL(Path log) throws IOException { 1347 int count = 0; 1348 Reader in = wals.createReader(fs, log); 1349 while (in.next() != null) { 1350 count++; 1351 } 1352 in.close(); 1353 return count; 1354 } 1355 1356 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs, 1357 String output) throws IOException { 1358 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder(); 1359 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes())) 1360 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes())) 1361 .setRegionName(ByteString.copyFrom(hri.getRegionName())) 1362 .setFamilyName(ByteString.copyFrom(FAMILY)) 1363 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY)) 1364 .addAllCompactionInput(Arrays.asList(inputs)) 1365 .addCompactionOutput(output); 1366 1367 WALEdit edit = WALEdit.createCompaction(hri, desc.build()); 1368 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, 1369 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); 1370 w.append(new Entry(key, edit)); 1371 w.sync(false); 1372 } 1373 1374 private static void appendRegionEvent(Writer w, String region) throws IOException { 1375 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( 1376 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, 1377 TABLE_NAME.toBytes(), 1378 Bytes.toBytes(region), 1379 Bytes.toBytes(String.valueOf(region.hashCode())), 1380 1, 1381 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of()); 1382 final long time = EnvironmentEdgeManager.currentTime(); 1383 final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, 1384 HConstants.DEFAULT_CLUSTER_ID); 1385 WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc); 1386 w.append(new Entry(walKey, we)); 1387 w.sync(false); 1388 } 1389 1390 public static long appendEntry(Writer writer, TableName table, byte[] region, 1391 byte[] row, byte[] family, byte[] qualifier, 1392 byte[] value, long seq) 1393 throws IOException { 1394 LOG.info(Thread.currentThread().getName() + " append"); 1395 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); 1396 LOG.info(Thread.currentThread().getName() + " sync"); 1397 writer.sync(false); 1398 return seq; 1399 } 1400 1401 private static Entry createTestEntry( 1402 TableName table, byte[] region, 1403 byte[] row, byte[] family, byte[] qualifier, 1404 byte[] value, long seq) { 1405 long time = System.nanoTime(); 1406 1407 seq++; 1408 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); 1409 WALEdit edit = new WALEdit(); 1410 edit.add(cell); 1411 return new Entry(new WALKeyImpl(region, table, seq, time, 1412 HConstants.DEFAULT_CLUSTER_ID), edit); 1413 } 1414 1415 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { 1416 Writer writer = 1417 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); 1418 if (closeFile) { 1419 writer.close(); 1420 } 1421 } 1422 1423 private boolean logsAreEqual(Path p1, Path p2) throws IOException { 1424 Reader in1, in2; 1425 in1 = wals.createReader(fs, p1); 1426 in2 = wals.createReader(fs, p2); 1427 Entry entry1; 1428 Entry entry2; 1429 while ((entry1 = in1.next()) != null) { 1430 entry2 = in2.next(); 1431 if ((entry1.getKey().compareTo(entry2.getKey()) != 0) || 1432 (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) { 1433 return false; 1434 } 1435 } 1436 in1.close(); 1437 in2.close(); 1438 return true; 1439 } 1440}