001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.lang.reflect.Method; 028import java.security.PrivilegedExceptionAction; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.NavigableSet; 037import java.util.Objects; 038import java.util.Set; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FSDataInputStream; 045import org.apache.hadoop.fs.FSDataOutputStream; 046import org.apache.hadoop.fs.FileStatus; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.FileUtil; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.fs.PathFilter; 051import org.apache.hadoop.hbase.Cell; 052import org.apache.hadoop.hbase.HBaseClassTestRule; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtility; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.ServerName; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.client.RegionInfoBuilder; 061import org.apache.hadoop.hbase.regionserver.HRegion; 062import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; 063import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; 064import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 065import org.apache.hadoop.hbase.security.User; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.testclassification.RegionServerTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CancelableProgressable; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.FSUtils; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.hbase.wal.WAL.Entry; 074import org.apache.hadoop.hbase.wal.WAL.Reader; 075import org.apache.hadoop.hbase.wal.WALProvider.Writer; 076import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; 077import org.apache.hadoop.hdfs.DFSTestUtil; 078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 079import org.apache.hadoop.ipc.RemoteException; 080import org.junit.After; 081import org.junit.AfterClass; 082import org.junit.Before; 083import org.junit.BeforeClass; 084import org.junit.ClassRule; 085import org.junit.Rule; 086import org.junit.Test; 087import org.junit.experimental.categories.Category; 088import org.junit.rules.TestName; 089import org.mockito.Mockito; 090import org.mockito.invocation.InvocationOnMock; 091import org.mockito.stubbing.Answer; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 097import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 098import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 099 100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 102 103/** 104 * Testing {@link WAL} splitting code. 105 */ 106@Category({RegionServerTests.class, LargeTests.class}) 107public class TestWALSplit { 108 109 @ClassRule 110 public static final HBaseClassTestRule CLASS_RULE = 111 HBaseClassTestRule.forClass(TestWALSplit.class); 112 113 { 114 // Uncomment the following lines if more verbosity is needed for 115 // debugging (see HBASE-12285 for details). 116 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); 117 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); 118 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); 119 } 120 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class); 121 122 private static Configuration conf; 123 private FileSystem fs; 124 125 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 126 127 private Path HBASEDIR; 128 private Path HBASELOGDIR; 129 private Path WALDIR; 130 private Path OLDLOGDIR; 131 private Path CORRUPTDIR; 132 private Path TABLEDIR; 133 134 private static final int NUM_WRITERS = 10; 135 private static final int ENTRIES = 10; // entries per writer per region 136 137 private static final String FILENAME_BEING_SPLIT = "testfile"; 138 private static final TableName TABLE_NAME = 139 TableName.valueOf("t1"); 140 private static final byte[] FAMILY = Bytes.toBytes("f1"); 141 private static final byte[] QUALIFIER = Bytes.toBytes("q1"); 142 private static final byte[] VALUE = Bytes.toBytes("v1"); 143 private static final String WAL_FILE_PREFIX = "wal.dat."; 144 private static List<String> REGIONS = new ArrayList<>(); 145 private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors"; 146 private static String ROBBER; 147 private static String ZOMBIE; 148 private static String [] GROUP = new String [] {"supergroup"}; 149 150 static enum Corruptions { 151 INSERT_GARBAGE_ON_FIRST_LINE, 152 INSERT_GARBAGE_IN_THE_MIDDLE, 153 APPEND_GARBAGE, 154 TRUNCATE, 155 TRUNCATE_TRAILER 156 } 157 158 @BeforeClass 159 public static void setUpBeforeClass() throws Exception { 160 conf = TEST_UTIL.getConfiguration(); 161 conf.setClass("hbase.regionserver.hlog.writer.impl", 162 InstrumentedLogWriter.class, Writer.class); 163 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. 164 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 165 // Create fake maping user to group and set it to the conf. 166 Map<String, String []> u2g_map = new HashMap<>(2); 167 ROBBER = User.getCurrent().getName() + "-robber"; 168 ZOMBIE = User.getCurrent().getName() + "-zombie"; 169 u2g_map.put(ROBBER, GROUP); 170 u2g_map.put(ZOMBIE, GROUP); 171 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); 172 conf.setInt("dfs.heartbeat.interval", 1); 173 TEST_UTIL.startMiniDFSCluster(2); 174 } 175 176 @AfterClass 177 public static void tearDownAfterClass() throws Exception { 178 TEST_UTIL.shutdownMiniDFSCluster(); 179 } 180 181 @Rule 182 public TestName name = new TestName(); 183 private WALFactory wals = null; 184 185 @Before 186 public void setUp() throws Exception { 187 LOG.info("Cleaning up cluster for new test."); 188 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 189 HBASEDIR = TEST_UTIL.createRootDir(); 190 HBASELOGDIR = TEST_UTIL.createWALRootDir(); 191 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME); 192 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME); 193 TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); 194 REGIONS.clear(); 195 Collections.addAll(REGIONS, "bbb", "ccc"); 196 InstrumentedLogWriter.activateFailure = false; 197 wals = new WALFactory(conf, name.getMethodName()); 198 WALDIR = new Path(HBASELOGDIR, 199 AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(), 200 16010, System.currentTimeMillis()).toString())); 201 //fs.mkdirs(WALDIR); 202 } 203 204 @After 205 public void tearDown() throws Exception { 206 try { 207 wals.close(); 208 } catch(IOException exception) { 209 // Some tests will move WALs out from under us. In those cases, we'll get an error on close. 210 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" + 211 " you see a failure look here."); 212 LOG.debug("exception details", exception); 213 } finally { 214 wals = null; 215 fs.delete(HBASEDIR, true); 216 fs.delete(HBASELOGDIR, true); 217 } 218 } 219 220 /** 221 * Simulates splitting a WAL out from under a regionserver that is still trying to write it. 222 * Ensures we do not lose edits. 223 * @throws IOException 224 * @throws InterruptedException 225 */ 226 @Test 227 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { 228 final AtomicLong counter = new AtomicLong(0); 229 AtomicBoolean stop = new AtomicBoolean(false); 230 // Region we'll write edits too and then later examine to make sure they all made it in. 231 final String region = REGIONS.get(0); 232 final int numWriters = 3; 233 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters); 234 try { 235 long startCount = counter.get(); 236 zombie.start(); 237 // Wait till writer starts going. 238 while (startCount == counter.get()) Threads.sleep(1); 239 // Give it a second to write a few appends. 240 Threads.sleep(1000); 241 final Configuration conf2 = HBaseConfiguration.create(conf); 242 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP); 243 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() { 244 @Override 245 public Integer run() throws Exception { 246 StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR) 247 .append("):\n"); 248 for (FileStatus status : fs.listStatus(WALDIR)) { 249 ls.append("\t").append(status.toString()).append("\n"); 250 } 251 LOG.debug(Objects.toString(ls)); 252 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); 253 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); 254 LOG.info("Finished splitting out from under zombie."); 255 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 256 assertEquals("wrong number of split files for region", numWriters, logfiles.length); 257 int count = 0; 258 for (Path logfile: logfiles) { 259 count += countWAL(logfile); 260 } 261 return count; 262 } 263 }); 264 LOG.info("zombie=" + counter.get() + ", robber=" + count); 265 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " + 266 "Zombie could write " + counter.get() + " and logfile had only " + count, 267 counter.get() == count || counter.get() + 1 == count); 268 } finally { 269 stop.set(true); 270 zombie.interrupt(); 271 Threads.threadDumpingIsAlive(zombie); 272 } 273 } 274 275 /** 276 * This thread will keep writing to a 'wal' file even after the split process has started. 277 * It simulates a region server that was considered dead but woke up and wrote some more to the 278 * last log entry. Does its writing as an alternate user in another filesystem instance to 279 * simulate better it being a regionserver. 280 */ 281 class ZombieLastLogWriterRegionServer extends Thread { 282 final AtomicLong editsCount; 283 final AtomicBoolean stop; 284 final int numOfWriters; 285 /** 286 * Region to write edits for. 287 */ 288 final String region; 289 final User user; 290 291 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop, 292 final String region, final int writers) 293 throws IOException, InterruptedException { 294 super("ZombieLastLogWriterRegionServer"); 295 setDaemon(true); 296 this.stop = stop; 297 this.editsCount = counter; 298 this.region = region; 299 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP); 300 numOfWriters = writers; 301 } 302 303 @Override 304 public void run() { 305 try { 306 doWriting(); 307 } catch (IOException e) { 308 LOG.warn(getName() + " Writer exiting " + e); 309 } catch (InterruptedException e) { 310 LOG.warn(getName() + " Writer exiting " + e); 311 } 312 } 313 314 private void doWriting() throws IOException, InterruptedException { 315 this.user.runAs(new PrivilegedExceptionAction<Object>() { 316 @Override 317 public Object run() throws Exception { 318 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose 319 // index we supply here. 320 int walToKeepOpen = numOfWriters - 1; 321 // The below method writes numOfWriters files each with ENTRIES entries for a total of 322 // numOfWriters * ENTRIES added per column family in the region. 323 Writer writer = null; 324 try { 325 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); 326 } catch (IOException e1) { 327 throw new RuntimeException("Failed", e1); 328 } 329 // Update counter so has all edits written so far. 330 editsCount.addAndGet(numOfWriters * ENTRIES); 331 loop(writer); 332 // If we've been interruped, then things should have shifted out from under us. 333 // closing should error 334 try { 335 writer.close(); 336 fail("Writing closing after parsing should give an error."); 337 } catch (IOException exception) { 338 LOG.debug("ignoring error when closing final writer.", exception); 339 } 340 return null; 341 } 342 }); 343 } 344 345 private void loop(final Writer writer) { 346 byte [] regionBytes = Bytes.toBytes(this.region); 347 while (!stop.get()) { 348 try { 349 long seq = appendEntry(writer, TABLE_NAME, regionBytes, 350 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0); 351 long count = editsCount.incrementAndGet(); 352 LOG.info(getName() + " sync count=" + count + ", seq=" + seq); 353 try { 354 Thread.sleep(1); 355 } catch (InterruptedException e) { 356 // 357 } 358 } catch (IOException ex) { 359 LOG.error(getName() + " ex " + ex.toString()); 360 if (ex instanceof RemoteException) { 361 LOG.error("Juliet: got RemoteException " + ex.getMessage() + 362 " while writing " + (editsCount.get() + 1)); 363 } else { 364 LOG.error(getName() + " failed to write....at " + editsCount.get()); 365 fail("Failed to write " + editsCount.get()); 366 } 367 break; 368 } catch (Throwable t) { 369 LOG.error(getName() + " HOW? " + t); 370 LOG.debug("exception details", t); 371 break; 372 } 373 } 374 LOG.info(getName() + " Writer exiting"); 375 } 376 } 377 378 /** 379 * @see "https://issues.apache.org/jira/browse/HBASE-3020" 380 */ 381 @Test 382 public void testRecoveredEditsPathForMeta() throws IOException { 383 Path p = createRecoveredEditsPathForRegion(); 384 String parentOfParent = p.getParent().getParent().getName(); 385 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 386 } 387 388 /** 389 * Test old recovered edits file doesn't break WALSplitter. 390 * This is useful in upgrading old instances. 391 */ 392 @Test 393 public void testOldRecoveredEditsFileSidelined() throws IOException { 394 Path p = createRecoveredEditsPathForRegion(); 395 Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); 396 Path regiondir = new Path(tdir, 397 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 398 fs.mkdirs(regiondir); 399 Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); 400 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); 401 fs.createNewFile(parent); // create a recovered.edits file 402 String parentOfParent = p.getParent().getParent().getName(); 403 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 404 WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); 405 } 406 407 private Path createRecoveredEditsPathForRegion() throws IOException{ 408 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); 409 long now = System.currentTimeMillis(); 410 Entry entry = 411 new Entry(new WALKeyImpl(encoded, 412 TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), 413 new WALEdit()); 414 Path p = WALSplitter.getRegionSplitEditsPath(entry, 415 FILENAME_BEING_SPLIT, conf); 416 return p; 417 } 418 419 /** 420 * Test hasRecoveredEdits correctly identifies proper recovered edits file on related dir. 421 * @throws IOException on any issues found while creating test required files/directories. 422 */ 423 @Test 424 public void testHasRecoveredEdits() throws IOException { 425 Path p = createRecoveredEditsPathForRegion(); 426 assertFalse(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 427 String renamedEdit = p.getName().split("-")[0]; 428 fs.createNewFile(new Path(p.getParent(), renamedEdit)); 429 assertTrue(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 430 } 431 432 private void useDifferentDFSClient() throws IOException { 433 // make fs act as a different client now 434 // initialize will create a new DFSClient with a new client ID 435 fs.initialize(fs.getUri(), conf); 436 } 437 438 @Test 439 public void testSplitPreservesEdits() throws IOException{ 440 final String REGION = "region__1"; 441 REGIONS.clear(); 442 REGIONS.add(REGION); 443 444 generateWALs(1, 10, -1, 0); 445 useDifferentDFSClient(); 446 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 447 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 448 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 449 assertEquals(1, splitLog.length); 450 451 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 452 } 453 454 @Test 455 public void testSplitRemovesRegionEventsEdits() throws IOException{ 456 final String REGION = "region__1"; 457 REGIONS.clear(); 458 REGIONS.add(REGION); 459 460 generateWALs(1, 10, -1, 100); 461 useDifferentDFSClient(); 462 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 463 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 464 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 465 assertEquals(1, splitLog.length); 466 467 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 468 // split log should only have the test edits 469 assertEquals(10, countWAL(splitLog[0])); 470 } 471 472 473 @Test 474 public void testSplitLeavesCompactionEventsEdits() throws IOException{ 475 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 476 REGIONS.clear(); 477 REGIONS.add(hri.getEncodedName()); 478 Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName()); 479 LOG.info("Creating region directory: " + regionDir); 480 assertTrue(fs.mkdirs(regionDir)); 481 482 Writer writer = generateWALs(1, 10, 0, 10); 483 String[] compactInputs = new String[]{"file1", "file2", "file3"}; 484 String compactOutput = "file4"; 485 appendCompactionEvent(writer, hri, compactInputs, compactOutput); 486 writer.close(); 487 488 useDifferentDFSClient(); 489 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 490 491 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 492 // original log should have 10 test edits, 10 region markers, 1 compaction marker 493 assertEquals(21, countWAL(originalLog)); 494 495 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); 496 assertEquals(1, splitLog.length); 497 498 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 499 // split log should have 10 test edits plus 1 compaction marker 500 assertEquals(11, countWAL(splitLog[0])); 501 } 502 503 /** 504 * @param expectedEntries -1 to not assert 505 * @return the count across all regions 506 */ 507 private int splitAndCount(final int expectedFiles, final int expectedEntries) 508 throws IOException { 509 useDifferentDFSClient(); 510 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 511 int result = 0; 512 for (String region : REGIONS) { 513 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 514 assertEquals(expectedFiles, logfiles.length); 515 int count = 0; 516 for (Path logfile: logfiles) { 517 count += countWAL(logfile); 518 } 519 if (-1 != expectedEntries) { 520 assertEquals(expectedEntries, count); 521 } 522 result += count; 523 } 524 return result; 525 } 526 527 @Test 528 public void testEmptyLogFiles() throws IOException { 529 testEmptyLogFiles(true); 530 } 531 532 @Test 533 public void testEmptyOpenLogFiles() throws IOException { 534 testEmptyLogFiles(false); 535 } 536 537 private void testEmptyLogFiles(final boolean close) throws IOException { 538 // we won't create the hlog dir until getWAL got called, so 539 // make dir here when testing empty log file 540 fs.mkdirs(WALDIR); 541 injectEmptyFile(".empty", close); 542 generateWALs(Integer.MAX_VALUE); 543 injectEmptyFile("empty", close); 544 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty 545 } 546 547 @Test 548 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { 549 // generate logs but leave wal.dat.5 open. 550 generateWALs(5); 551 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 552 } 553 554 @Test 555 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { 556 conf.setBoolean(HBASE_SKIP_ERRORS, true); 557 generateWALs(Integer.MAX_VALUE); 558 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 559 Corruptions.APPEND_GARBAGE, true); 560 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 561 } 562 563 @Test 564 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { 565 conf.setBoolean(HBASE_SKIP_ERRORS, true); 566 generateWALs(Integer.MAX_VALUE); 567 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 568 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 569 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt 570 } 571 572 @Test 573 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { 574 conf.setBoolean(HBASE_SKIP_ERRORS, true); 575 generateWALs(Integer.MAX_VALUE); 576 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 577 Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false); 578 // the entries in the original logs are alternating regions 579 // considering the sequence file header, the middle corruption should 580 // affect at least half of the entries 581 int goodEntries = (NUM_WRITERS - 1) * ENTRIES; 582 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; 583 int allRegionsCount = splitAndCount(NUM_WRITERS, -1); 584 assertTrue("The file up to the corrupted area hasn't been parsed", 585 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount); 586 } 587 588 @Test 589 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { 590 conf.setBoolean(HBASE_SKIP_ERRORS, true); 591 List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays 592 .asList(FaultyProtobufLogReader.FailureType.values()).stream() 593 .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList()); 594 for (FaultyProtobufLogReader.FailureType failureType : failureTypes) { 595 final Set<String> walDirContents = splitCorruptWALs(failureType); 596 final Set<String> archivedLogs = new HashSet<>(); 597 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); 598 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 599 archived.append("\n\t").append(log.toString()); 600 archivedLogs.add(log.getPath().getName()); 601 } 602 LOG.debug(archived.toString()); 603 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs, 604 walDirContents); 605 } 606 } 607 608 /** 609 * @return set of wal names present prior to split attempt. 610 * @throws IOException if the split process fails 611 */ 612 private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType) 613 throws IOException { 614 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", 615 Reader.class); 616 InstrumentedLogWriter.activateFailure = false; 617 618 try { 619 conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class, 620 Reader.class); 621 conf.set("faultyprotobuflogreader.failuretype", failureType.name()); 622 // Clean up from previous tests or previous loop 623 try { 624 wals.shutdown(); 625 } catch (IOException exception) { 626 // since we're splitting out from under the factory, we should expect some closing failures. 627 LOG.debug("Ignoring problem closing WALFactory.", exception); 628 } 629 wals.close(); 630 try { 631 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 632 fs.delete(log.getPath(), true); 633 } 634 } catch (FileNotFoundException exception) { 635 LOG.debug("no previous CORRUPTDIR to clean."); 636 } 637 // change to the faulty reader 638 wals = new WALFactory(conf, name.getMethodName()); 639 generateWALs(-1); 640 // Our reader will render all of these files corrupt. 641 final Set<String> walDirContents = new HashSet<>(); 642 for (FileStatus status : fs.listStatus(WALDIR)) { 643 walDirContents.add(status.getPath().getName()); 644 } 645 useDifferentDFSClient(); 646 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 647 return walDirContents; 648 } finally { 649 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, 650 Reader.class); 651 } 652 } 653 654 @Test (expected = IOException.class) 655 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() 656 throws IOException { 657 conf.setBoolean(HBASE_SKIP_ERRORS, false); 658 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 659 } 660 661 @Test 662 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() 663 throws IOException { 664 conf.setBoolean(HBASE_SKIP_ERRORS, false); 665 try { 666 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 667 } catch (IOException e) { 668 LOG.debug("split with 'skip errors' set to 'false' correctly threw"); 669 } 670 assertEquals("if skip.errors is false all files should remain in place", 671 NUM_WRITERS, fs.listStatus(WALDIR).length); 672 } 673 674 private void ignoreCorruption(final Corruptions corruption, final int entryCount, 675 final int expectedCount) throws IOException { 676 conf.setBoolean(HBASE_SKIP_ERRORS, false); 677 678 final String REGION = "region__1"; 679 REGIONS.clear(); 680 REGIONS.add(REGION); 681 682 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); 683 generateWALs(1, entryCount, -1, 0); 684 corruptWAL(c1, corruption, true); 685 686 useDifferentDFSClient(); 687 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 688 689 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 690 assertEquals(1, splitLog.length); 691 692 int actualCount = 0; 693 Reader in = wals.createReader(fs, splitLog[0]); 694 @SuppressWarnings("unused") 695 Entry entry; 696 while ((entry = in.next()) != null) ++actualCount; 697 assertEquals(expectedCount, actualCount); 698 in.close(); 699 700 // should not have stored the EOF files as corrupt 701 FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); 702 assertEquals(0, archivedLogs.length); 703 704 } 705 706 @Test 707 public void testEOFisIgnored() throws IOException { 708 int entryCount = 10; 709 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1); 710 } 711 712 @Test 713 public void testCorruptWALTrailer() throws IOException { 714 int entryCount = 10; 715 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount); 716 } 717 718 @Test 719 public void testLogsGetArchivedAfterSplit() throws IOException { 720 conf.setBoolean(HBASE_SKIP_ERRORS, false); 721 generateWALs(-1); 722 useDifferentDFSClient(); 723 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 724 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); 725 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); 726 } 727 728 @Test 729 public void testSplit() throws IOException { 730 generateWALs(-1); 731 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 732 } 733 734 @Test 735 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() 736 throws IOException { 737 generateWALs(-1); 738 useDifferentDFSClient(); 739 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 740 FileStatus [] statuses = null; 741 try { 742 statuses = fs.listStatus(WALDIR); 743 if (statuses != null) { 744 fail("Files left in log dir: " + 745 Joiner.on(",").join(FileUtil.stat2Paths(statuses))); 746 } 747 } catch (FileNotFoundException e) { 748 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null 749 } 750 } 751 752 @Test(expected = IOException.class) 753 public void testSplitWillFailIfWritingToRegionFails() throws Exception { 754 //leave 5th log open so we could append the "trap" 755 Writer writer = generateWALs(4); 756 useDifferentDFSClient(); 757 758 String region = "break"; 759 Path regiondir = new Path(TABLEDIR, region); 760 fs.mkdirs(regiondir); 761 762 InstrumentedLogWriter.activateFailure = false; 763 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), 764 Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0); 765 writer.close(); 766 767 try { 768 InstrumentedLogWriter.activateFailure = true; 769 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 770 } catch (IOException e) { 771 assertTrue(e.getMessage(). 772 contains("This exception is instrumented and should only be thrown for testing")); 773 throw e; 774 } finally { 775 InstrumentedLogWriter.activateFailure = false; 776 } 777 } 778 779 @Test 780 public void testSplitDeletedRegion() throws IOException { 781 REGIONS.clear(); 782 String region = "region_that_splits"; 783 REGIONS.add(region); 784 785 generateWALs(1); 786 useDifferentDFSClient(); 787 788 Path regiondir = new Path(TABLEDIR, region); 789 fs.delete(regiondir, true); 790 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 791 assertFalse(fs.exists(regiondir)); 792 } 793 794 @Test 795 public void testIOEOnOutputThread() throws Exception { 796 conf.setBoolean(HBASE_SKIP_ERRORS, false); 797 798 generateWALs(-1); 799 useDifferentDFSClient(); 800 FileStatus[] logfiles = fs.listStatus(WALDIR); 801 assertTrue("There should be some log file", 802 logfiles != null && logfiles.length > 0); 803 // wals with no entries (like the one we don't use in the factory) 804 // won't cause a failure since nothing will ever be written. 805 // pick the largest one since it's most likely to have entries. 806 int largestLogFile = 0; 807 long largestSize = 0; 808 for (int i = 0; i < logfiles.length; i++) { 809 if (logfiles[i].getLen() > largestSize) { 810 largestLogFile = i; 811 largestSize = logfiles[i].getLen(); 812 } 813 } 814 assertTrue("There should be some log greater than size 0.", 0 < largestSize); 815 // Set up a splitter that will throw an IOE on the output side 816 WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { 817 @Override 818 protected Writer createWriter(Path logfile) throws IOException { 819 Writer mockWriter = Mockito.mock(Writer.class); 820 Mockito.doThrow(new IOException("Injected")).when( 821 mockWriter).append(Mockito.<Entry>any()); 822 return mockWriter; 823 } 824 }; 825 // Set up a background thread dumper. Needs a thread to depend on and then we need to run 826 // the thread dumping in a background thread so it does not hold up the test. 827 final AtomicBoolean stop = new AtomicBoolean(false); 828 final Thread someOldThread = new Thread("Some-old-thread") { 829 @Override 830 public void run() { 831 while(!stop.get()) Threads.sleep(10); 832 } 833 }; 834 someOldThread.setDaemon(true); 835 someOldThread.start(); 836 final Thread t = new Thread("Background-thread-dumper") { 837 @Override 838 public void run() { 839 try { 840 Threads.threadDumpingIsAlive(someOldThread); 841 } catch (InterruptedException e) { 842 e.printStackTrace(); 843 } 844 } 845 }; 846 t.setDaemon(true); 847 t.start(); 848 try { 849 logSplitter.splitLogFile(logfiles[largestLogFile], null); 850 fail("Didn't throw!"); 851 } catch (IOException ioe) { 852 assertTrue(ioe.toString().contains("Injected")); 853 } finally { 854 // Setting this to true will turn off the background thread dumper. 855 stop.set(true); 856 } 857 } 858 859 /** 860 * @param spiedFs should be instrumented for failure. 861 */ 862 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { 863 generateWALs(-1); 864 useDifferentDFSClient(); 865 866 try { 867 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); 868 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); 869 assertFalse(fs.exists(WALDIR)); 870 } catch (IOException e) { 871 fail("There shouldn't be any exception but: " + e.toString()); 872 } 873 } 874 875 // Test for HBASE-3412 876 @Test 877 public void testMovedWALDuringRecovery() throws Exception { 878 // This partial mock will throw LEE for every file simulating 879 // files that were moved 880 FileSystem spiedFs = Mockito.spy(fs); 881 // The "File does not exist" part is very important, 882 // that's how it comes out of HDFS 883 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). 884 when(spiedFs).append(Mockito.<Path>any()); 885 retryOverHdfsProblem(spiedFs); 886 } 887 888 @Test 889 public void testRetryOpenDuringRecovery() throws Exception { 890 FileSystem spiedFs = Mockito.spy(fs); 891 // The "Cannot obtain block length", "Could not obtain the last block", 892 // and "Blocklist for [^ ]* has changed.*" part is very important, 893 // that's how it comes out of HDFS. If HDFS changes the exception 894 // message, this test needs to be adjusted accordingly. 895 // 896 // When DFSClient tries to open a file, HDFS needs to locate 897 // the last block of the file and get its length. However, if the 898 // last block is under recovery, HDFS may have problem to obtain 899 // the block length, in which case, retry may help. 900 Mockito.doAnswer(new Answer<FSDataInputStream>() { 901 private final String[] errors = new String[] { 902 "Cannot obtain block length", "Could not obtain the last block", 903 "Blocklist for " + OLDLOGDIR + " has changed"}; 904 private int count = 0; 905 906 @Override 907 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 908 if (count < 3) { 909 throw new IOException(errors[count++]); 910 } 911 return (FSDataInputStream)invocation.callRealMethod(); 912 } 913 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); 914 retryOverHdfsProblem(spiedFs); 915 } 916 917 @Test 918 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { 919 generateWALs(1, 10, -1); 920 FileStatus logfile = fs.listStatus(WALDIR)[0]; 921 useDifferentDFSClient(); 922 923 final AtomicInteger count = new AtomicInteger(); 924 925 CancelableProgressable localReporter 926 = new CancelableProgressable() { 927 @Override 928 public boolean progress() { 929 count.getAndIncrement(); 930 return false; 931 } 932 }; 933 934 FileSystem spiedFs = Mockito.spy(fs); 935 Mockito.doAnswer(new Answer<FSDataInputStream>() { 936 @Override 937 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 938 Thread.sleep(1500); // Sleep a while and wait report status invoked 939 return (FSDataInputStream)invocation.callRealMethod(); 940 } 941 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); 942 943 try { 944 conf.setInt("hbase.splitlog.report.period", 1000); 945 boolean ret = WALSplitter.splitLogFile( 946 HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals); 947 assertFalse("Log splitting should failed", ret); 948 assertTrue(count.get() > 0); 949 } catch (IOException e) { 950 fail("There shouldn't be any exception but: " + e.toString()); 951 } finally { 952 // reset it back to its default value 953 conf.setInt("hbase.splitlog.report.period", 59000); 954 } 955 } 956 957 /** 958 * Test log split process with fake data and lots of edits to trigger threading 959 * issues. 960 */ 961 @Test 962 public void testThreading() throws Exception { 963 doTestThreading(20000, 128*1024*1024, 0); 964 } 965 966 /** 967 * Test blocking behavior of the log split process if writers are writing slower 968 * than the reader is reading. 969 */ 970 @Test 971 public void testThreadingSlowWriterSmallBuffer() throws Exception { 972 doTestThreading(200, 1024, 50); 973 } 974 975 /** 976 * Sets up a log splitter with a mock reader and writer. The mock reader generates 977 * a specified number of edits spread across 5 regions. The mock writer optionally 978 * sleeps for each edit it is fed. 979 * * 980 * After the split is complete, verifies that the statistics show the correct number 981 * of edits output into each region. 982 * 983 * @param numFakeEdits number of fake edits to push through pipeline 984 * @param bufferSize size of in-memory buffer 985 * @param writerSlowness writer threads will sleep this many ms per edit 986 */ 987 private void doTestThreading(final int numFakeEdits, 988 final int bufferSize, 989 final int writerSlowness) throws Exception { 990 991 Configuration localConf = new Configuration(conf); 992 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); 993 994 // Create a fake log file (we'll override the reader to produce a stream of edits) 995 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake"); 996 FSDataOutputStream out = fs.create(logPath); 997 out.close(); 998 999 // Make region dirs for our destination regions so the output doesn't get skipped 1000 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 1001 makeRegionDirs(regions); 1002 1003 // Create a splitter that reads and writes the data without touching disk 1004 WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) { 1005 1006 /* Produce a mock writer that doesn't write anywhere */ 1007 @Override 1008 protected Writer createWriter(Path logfile) throws IOException { 1009 Writer mockWriter = Mockito.mock(Writer.class); 1010 Mockito.doAnswer(new Answer<Void>() { 1011 int expectedIndex = 0; 1012 1013 @Override 1014 public Void answer(InvocationOnMock invocation) { 1015 if (writerSlowness > 0) { 1016 try { 1017 Thread.sleep(writerSlowness); 1018 } catch (InterruptedException ie) { 1019 Thread.currentThread().interrupt(); 1020 } 1021 } 1022 Entry entry = (Entry) invocation.getArgument(0); 1023 WALEdit edit = entry.getEdit(); 1024 List<Cell> cells = edit.getCells(); 1025 assertEquals(1, cells.size()); 1026 Cell cell = cells.get(0); 1027 1028 // Check that the edits come in the right order. 1029 assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), 1030 cell.getRowLength())); 1031 expectedIndex++; 1032 return null; 1033 } 1034 }).when(mockWriter).append(Mockito.<Entry>any()); 1035 return mockWriter; 1036 } 1037 1038 /* Produce a mock reader that generates fake entries */ 1039 @Override 1040 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) 1041 throws IOException { 1042 Reader mockReader = Mockito.mock(Reader.class); 1043 Mockito.doAnswer(new Answer<Entry>() { 1044 int index = 0; 1045 1046 @Override 1047 public Entry answer(InvocationOnMock invocation) throws Throwable { 1048 if (index >= numFakeEdits) return null; 1049 1050 // Generate r0 through r4 in round robin fashion 1051 int regionIdx = index % regions.size(); 1052 byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)}; 1053 1054 Entry ret = createTestEntry(TABLE_NAME, region, 1055 Bytes.toBytes(index / regions.size()), 1056 FAMILY, QUALIFIER, VALUE, index); 1057 index++; 1058 return ret; 1059 } 1060 }).when(mockReader).next(); 1061 return mockReader; 1062 } 1063 }; 1064 1065 logSplitter.splitLogFile(fs.getFileStatus(logPath), null); 1066 1067 // Verify number of written edits per region 1068 Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts(); 1069 for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) { 1070 LOG.info("Got " + entry.getValue() + " output edits for region " + 1071 Bytes.toString(entry.getKey())); 1072 assertEquals((long)entry.getValue(), numFakeEdits / regions.size()); 1073 } 1074 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size()); 1075 } 1076 1077 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests? 1078 @Test 1079 public void testSplitLogFileDeletedRegionDir() throws IOException { 1080 LOG.info("testSplitLogFileDeletedRegionDir"); 1081 final String REGION = "region__1"; 1082 REGIONS.clear(); 1083 REGIONS.add(REGION); 1084 1085 generateWALs(1, 10, -1); 1086 useDifferentDFSClient(); 1087 1088 Path regiondir = new Path(TABLEDIR, REGION); 1089 LOG.info("Region directory is" + regiondir); 1090 fs.delete(regiondir, true); 1091 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1092 assertFalse(fs.exists(regiondir)); 1093 } 1094 1095 @Test 1096 public void testSplitLogFileEmpty() throws IOException { 1097 LOG.info("testSplitLogFileEmpty"); 1098 // we won't create the hlog dir until getWAL got called, so 1099 // make dir here when testing empty log file 1100 fs.mkdirs(WALDIR); 1101 injectEmptyFile(".empty", true); 1102 useDifferentDFSClient(); 1103 1104 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1105 Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); 1106 assertFalse(fs.exists(tdir)); 1107 1108 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath())); 1109 } 1110 1111 @Test 1112 public void testSplitLogFileMultipleRegions() throws IOException { 1113 LOG.info("testSplitLogFileMultipleRegions"); 1114 generateWALs(1, 10, -1); 1115 splitAndCount(1, 10); 1116 } 1117 1118 @Test 1119 public void testSplitLogFileFirstLineCorruptionLog() 1120 throws IOException { 1121 conf.setBoolean(HBASE_SKIP_ERRORS, true); 1122 generateWALs(1, 10, -1); 1123 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1124 1125 corruptWAL(logfile.getPath(), 1126 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 1127 1128 useDifferentDFSClient(); 1129 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1130 1131 final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 1132 assertEquals(1, fs.listStatus(corruptDir).length); 1133 } 1134 1135 /** 1136 * @see "https://issues.apache.org/jira/browse/HBASE-4862" 1137 */ 1138 @Test 1139 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { 1140 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); 1141 // Generate wals for our destination region 1142 String regionName = "r0"; 1143 final Path regiondir = new Path(TABLEDIR, regionName); 1144 REGIONS.clear(); 1145 REGIONS.add(regionName); 1146 generateWALs(-1); 1147 1148 wals.getWAL(null); 1149 FileStatus[] logfiles = fs.listStatus(WALDIR); 1150 assertTrue("There should be some log file", 1151 logfiles != null && logfiles.length > 0); 1152 1153 WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { 1154 @Override 1155 protected Writer createWriter(Path logfile) 1156 throws IOException { 1157 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); 1158 // After creating writer, simulate region's 1159 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this 1160 // region and delete them, excluding files with '.temp' suffix. 1161 NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); 1162 if (files != null && !files.isEmpty()) { 1163 for (Path file : files) { 1164 if (!this.walFS.delete(file, false)) { 1165 LOG.error("Failed delete of " + file); 1166 } else { 1167 LOG.debug("Deleted recovered.edits file=" + file); 1168 } 1169 } 1170 } 1171 return writer; 1172 } 1173 }; 1174 try{ 1175 logSplitter.splitLogFile(logfiles[0], null); 1176 } catch (IOException e) { 1177 LOG.info(e.toString(), e); 1178 fail("Throws IOException when spliting " 1179 + "log, it is most likely because writing file does not " 1180 + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); 1181 } 1182 if (fs.exists(CORRUPTDIR)) { 1183 if (fs.listStatus(CORRUPTDIR).length > 0) { 1184 fail("There are some corrupt logs, " 1185 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); 1186 } 1187 } 1188 } 1189 1190 private Writer generateWALs(int leaveOpen) throws IOException { 1191 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0); 1192 } 1193 1194 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { 1195 return generateWALs(writers, entries, leaveOpen, 7); 1196 } 1197 1198 private void makeRegionDirs(List<String> regions) throws IOException { 1199 for (String region : regions) { 1200 LOG.debug("Creating dir for region " + region); 1201 fs.mkdirs(new Path(TABLEDIR, region)); 1202 } 1203 } 1204 1205 /** 1206 * @param leaveOpen index to leave un-closed. -1 to close all. 1207 * @return the writer that's still open, or null if all were closed. 1208 */ 1209 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException { 1210 makeRegionDirs(REGIONS); 1211 fs.mkdirs(WALDIR); 1212 Writer [] ws = new Writer[writers]; 1213 int seq = 0; 1214 int numRegionEventsAdded = 0; 1215 for (int i = 0; i < writers; i++) { 1216 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); 1217 for (int j = 0; j < entries; j++) { 1218 int prefix = 0; 1219 for (String region : REGIONS) { 1220 String row_key = region + prefix++ + i + j; 1221 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY, 1222 QUALIFIER, VALUE, seq++); 1223 1224 if (numRegionEventsAdded < regionEvents) { 1225 numRegionEventsAdded ++; 1226 appendRegionEvent(ws[i], region); 1227 } 1228 } 1229 } 1230 if (i != leaveOpen) { 1231 ws[i].close(); 1232 LOG.info("Closing writer " + i); 1233 } 1234 } 1235 if (leaveOpen < 0 || leaveOpen >= writers) { 1236 return null; 1237 } 1238 return ws[leaveOpen]; 1239 } 1240 1241 1242 1243 private Path[] getLogForRegion(TableName table, String region) 1244 throws IOException { 1245 Path tdir = FSUtils.getWALTableDir(conf, table); 1246 @SuppressWarnings("deprecation") 1247 Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, 1248 Bytes.toString(Bytes.toBytes(region)))); 1249 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 1250 @Override 1251 public boolean accept(Path p) { 1252 if (WALSplitter.isSequenceIdFile(p)) { 1253 return false; 1254 } 1255 return true; 1256 } 1257 }); 1258 Path[] paths = new Path[files.length]; 1259 for (int i = 0; i < files.length; i++) { 1260 paths[i] = files[i].getPath(); 1261 } 1262 return paths; 1263 } 1264 1265 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException { 1266 FSDataOutputStream out; 1267 int fileSize = (int) fs.listStatus(path)[0].getLen(); 1268 1269 FSDataInputStream in = fs.open(path); 1270 byte[] corrupted_bytes = new byte[fileSize]; 1271 in.readFully(0, corrupted_bytes, 0, fileSize); 1272 in.close(); 1273 1274 switch (corruption) { 1275 case APPEND_GARBAGE: 1276 fs.delete(path, false); 1277 out = fs.create(path); 1278 out.write(corrupted_bytes); 1279 out.write(Bytes.toBytes("-----")); 1280 closeOrFlush(close, out); 1281 break; 1282 1283 case INSERT_GARBAGE_ON_FIRST_LINE: 1284 fs.delete(path, false); 1285 out = fs.create(path); 1286 out.write(0); 1287 out.write(corrupted_bytes); 1288 closeOrFlush(close, out); 1289 break; 1290 1291 case INSERT_GARBAGE_IN_THE_MIDDLE: 1292 fs.delete(path, false); 1293 out = fs.create(path); 1294 int middle = (int) Math.floor(corrupted_bytes.length / 2); 1295 out.write(corrupted_bytes, 0, middle); 1296 out.write(0); 1297 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); 1298 closeOrFlush(close, out); 1299 break; 1300 1301 case TRUNCATE: 1302 fs.delete(path, false); 1303 out = fs.create(path); 1304 out.write(corrupted_bytes, 0, fileSize 1305 - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); 1306 closeOrFlush(close, out); 1307 break; 1308 1309 case TRUNCATE_TRAILER: 1310 fs.delete(path, false); 1311 out = fs.create(path); 1312 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. 1313 closeOrFlush(close, out); 1314 break; 1315 } 1316 } 1317 1318 private void closeOrFlush(boolean close, FSDataOutputStream out) 1319 throws IOException { 1320 if (close) { 1321 out.close(); 1322 } else { 1323 Method syncMethod = null; 1324 try { 1325 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{}); 1326 } catch (NoSuchMethodException e) { 1327 try { 1328 syncMethod = out.getClass().getMethod("sync", new Class<?> []{}); 1329 } catch (NoSuchMethodException ex) { 1330 throw new IOException("This version of Hadoop supports " + 1331 "neither Syncable.sync() nor Syncable.hflush()."); 1332 } 1333 } 1334 try { 1335 syncMethod.invoke(out, new Object[]{}); 1336 } catch (Exception e) { 1337 throw new IOException(e); 1338 } 1339 // Not in 0out.hflush(); 1340 } 1341 } 1342 1343 private int countWAL(Path log) throws IOException { 1344 int count = 0; 1345 Reader in = wals.createReader(fs, log); 1346 while (in.next() != null) { 1347 count++; 1348 } 1349 in.close(); 1350 return count; 1351 } 1352 1353 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs, 1354 String output) throws IOException { 1355 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder(); 1356 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes())) 1357 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes())) 1358 .setRegionName(ByteString.copyFrom(hri.getRegionName())) 1359 .setFamilyName(ByteString.copyFrom(FAMILY)) 1360 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY)) 1361 .addAllCompactionInput(Arrays.asList(inputs)) 1362 .addCompactionOutput(output); 1363 1364 WALEdit edit = WALEdit.createCompaction(hri, desc.build()); 1365 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, 1366 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); 1367 w.append(new Entry(key, edit)); 1368 w.sync(); 1369 } 1370 1371 private static void appendRegionEvent(Writer w, String region) throws IOException { 1372 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( 1373 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, 1374 TABLE_NAME.toBytes(), 1375 Bytes.toBytes(region), 1376 Bytes.toBytes(String.valueOf(region.hashCode())), 1377 1, 1378 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of()); 1379 final long time = EnvironmentEdgeManager.currentTime(); 1380 KeyValue kv = new KeyValue(Bytes.toBytes(region), WALEdit.METAFAMILY, WALEdit.REGION_EVENT, 1381 time, regionOpenDesc.toByteArray()); 1382 final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, 1383 HConstants.DEFAULT_CLUSTER_ID); 1384 w.append( 1385 new Entry(walKey, new WALEdit().add(kv))); 1386 w.sync(); 1387 } 1388 1389 public static long appendEntry(Writer writer, TableName table, byte[] region, 1390 byte[] row, byte[] family, byte[] qualifier, 1391 byte[] value, long seq) 1392 throws IOException { 1393 LOG.info(Thread.currentThread().getName() + " append"); 1394 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); 1395 LOG.info(Thread.currentThread().getName() + " sync"); 1396 writer.sync(); 1397 return seq; 1398 } 1399 1400 private static Entry createTestEntry( 1401 TableName table, byte[] region, 1402 byte[] row, byte[] family, byte[] qualifier, 1403 byte[] value, long seq) { 1404 long time = System.nanoTime(); 1405 1406 seq++; 1407 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); 1408 WALEdit edit = new WALEdit(); 1409 edit.add(cell); 1410 return new Entry(new WALKeyImpl(region, table, seq, time, 1411 HConstants.DEFAULT_CLUSTER_ID), edit); 1412 } 1413 1414 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { 1415 Writer writer = 1416 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); 1417 if (closeFile) { 1418 writer.close(); 1419 } 1420 } 1421 1422 private boolean logsAreEqual(Path p1, Path p2) throws IOException { 1423 Reader in1, in2; 1424 in1 = wals.createReader(fs, p1); 1425 in2 = wals.createReader(fs, p2); 1426 Entry entry1; 1427 Entry entry2; 1428 while ((entry1 = in1.next()) != null) { 1429 entry2 = in2.next(); 1430 if ((entry1.getKey().compareTo(entry2.getKey()) != 0) || 1431 (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) { 1432 return false; 1433 } 1434 } 1435 in1.close(); 1436 in2.close(); 1437 return true; 1438 } 1439}