001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.lang.reflect.Method; 028import java.security.PrivilegedExceptionAction; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.NavigableSet; 037import java.util.Objects; 038import java.util.Set; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FSDataInputStream; 045import org.apache.hadoop.fs.FSDataOutputStream; 046import org.apache.hadoop.fs.FileStatus; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.FileUtil; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.fs.PathFilter; 051import org.apache.hadoop.hbase.Cell; 052import org.apache.hadoop.hbase.HBaseClassTestRule; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtility; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.ServerName; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.client.RegionInfoBuilder; 061import org.apache.hadoop.hbase.regionserver.HRegion; 062import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; 063import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; 064import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 065import org.apache.hadoop.hbase.security.User; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.testclassification.RegionServerTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CancelableProgressable; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.FSUtils; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.hbase.wal.WAL.Entry; 074import org.apache.hadoop.hbase.wal.WAL.Reader; 075import org.apache.hadoop.hbase.wal.WALProvider.Writer; 076import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; 077import org.apache.hadoop.hdfs.DFSTestUtil; 078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 079import org.apache.hadoop.ipc.RemoteException; 080import org.junit.After; 081import org.junit.AfterClass; 082import org.junit.Before; 083import org.junit.BeforeClass; 084import org.junit.ClassRule; 085import org.junit.Rule; 086import org.junit.Test; 087import org.junit.experimental.categories.Category; 088import org.junit.rules.TestName; 089import org.mockito.Mockito; 090import org.mockito.invocation.InvocationOnMock; 091import org.mockito.stubbing.Answer; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 097import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 098import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 099 100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 102 103/** 104 * Testing {@link WAL} splitting code. 105 */ 106@Category({RegionServerTests.class, LargeTests.class}) 107public class TestWALSplit { 108 109 @ClassRule 110 public static final HBaseClassTestRule CLASS_RULE = 111 HBaseClassTestRule.forClass(TestWALSplit.class); 112 113 { 114 // Uncomment the following lines if more verbosity is needed for 115 // debugging (see HBASE-12285 for details). 116 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); 117 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); 118 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); 119 } 120 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class); 121 122 private static Configuration conf; 123 private FileSystem fs; 124 125 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 126 127 private Path HBASEDIR; 128 private Path HBASELOGDIR; 129 private Path WALDIR; 130 private Path OLDLOGDIR; 131 private Path CORRUPTDIR; 132 private Path TABLEDIR; 133 private String TMPDIRNAME; 134 135 private static final int NUM_WRITERS = 10; 136 private static final int ENTRIES = 10; // entries per writer per region 137 138 private static final String FILENAME_BEING_SPLIT = "testfile"; 139 private static final TableName TABLE_NAME = 140 TableName.valueOf("t1"); 141 private static final byte[] FAMILY = Bytes.toBytes("f1"); 142 private static final byte[] QUALIFIER = Bytes.toBytes("q1"); 143 private static final byte[] VALUE = Bytes.toBytes("v1"); 144 private static final String WAL_FILE_PREFIX = "wal.dat."; 145 private static List<String> REGIONS = new ArrayList<>(); 146 private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors"; 147 private static String ROBBER; 148 private static String ZOMBIE; 149 private static String [] GROUP = new String [] {"supergroup"}; 150 151 static enum Corruptions { 152 INSERT_GARBAGE_ON_FIRST_LINE, 153 INSERT_GARBAGE_IN_THE_MIDDLE, 154 APPEND_GARBAGE, 155 TRUNCATE, 156 TRUNCATE_TRAILER 157 } 158 159 @BeforeClass 160 public static void setUpBeforeClass() throws Exception { 161 conf = TEST_UTIL.getConfiguration(); 162 conf.setClass("hbase.regionserver.hlog.writer.impl", 163 InstrumentedLogWriter.class, Writer.class); 164 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. 165 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 166 // Create fake maping user to group and set it to the conf. 167 Map<String, String []> u2g_map = new HashMap<>(2); 168 ROBBER = User.getCurrent().getName() + "-robber"; 169 ZOMBIE = User.getCurrent().getName() + "-zombie"; 170 u2g_map.put(ROBBER, GROUP); 171 u2g_map.put(ZOMBIE, GROUP); 172 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); 173 conf.setInt("dfs.heartbeat.interval", 1); 174 TEST_UTIL.startMiniDFSCluster(2); 175 } 176 177 @AfterClass 178 public static void tearDownAfterClass() throws Exception { 179 TEST_UTIL.shutdownMiniDFSCluster(); 180 } 181 182 @Rule 183 public TestName name = new TestName(); 184 private WALFactory wals = null; 185 186 @Before 187 public void setUp() throws Exception { 188 LOG.info("Cleaning up cluster for new test."); 189 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 190 HBASEDIR = TEST_UTIL.createRootDir(); 191 HBASELOGDIR = TEST_UTIL.createWALRootDir(); 192 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME); 193 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME); 194 TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); 195 TMPDIRNAME = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 196 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 197 REGIONS.clear(); 198 Collections.addAll(REGIONS, "bbb", "ccc"); 199 InstrumentedLogWriter.activateFailure = false; 200 wals = new WALFactory(conf, name.getMethodName()); 201 WALDIR = new Path(HBASELOGDIR, 202 AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(), 203 16010, System.currentTimeMillis()).toString())); 204 //fs.mkdirs(WALDIR); 205 } 206 207 @After 208 public void tearDown() throws Exception { 209 try { 210 wals.close(); 211 } catch(IOException exception) { 212 // Some tests will move WALs out from under us. In those cases, we'll get an error on close. 213 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" + 214 " you see a failure look here."); 215 LOG.debug("exception details", exception); 216 } finally { 217 wals = null; 218 fs.delete(HBASEDIR, true); 219 fs.delete(HBASELOGDIR, true); 220 } 221 } 222 223 /** 224 * Simulates splitting a WAL out from under a regionserver that is still trying to write it. 225 * Ensures we do not lose edits. 226 * @throws IOException 227 * @throws InterruptedException 228 */ 229 @Test 230 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { 231 final AtomicLong counter = new AtomicLong(0); 232 AtomicBoolean stop = new AtomicBoolean(false); 233 // Region we'll write edits too and then later examine to make sure they all made it in. 234 final String region = REGIONS.get(0); 235 final int numWriters = 3; 236 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters); 237 try { 238 long startCount = counter.get(); 239 zombie.start(); 240 // Wait till writer starts going. 241 while (startCount == counter.get()) Threads.sleep(1); 242 // Give it a second to write a few appends. 243 Threads.sleep(1000); 244 final Configuration conf2 = HBaseConfiguration.create(conf); 245 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP); 246 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() { 247 @Override 248 public Integer run() throws Exception { 249 StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR) 250 .append("):\n"); 251 for (FileStatus status : fs.listStatus(WALDIR)) { 252 ls.append("\t").append(status.toString()).append("\n"); 253 } 254 LOG.debug(Objects.toString(ls)); 255 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); 256 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); 257 LOG.info("Finished splitting out from under zombie."); 258 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 259 assertEquals("wrong number of split files for region", numWriters, logfiles.length); 260 int count = 0; 261 for (Path logfile: logfiles) { 262 count += countWAL(logfile); 263 } 264 return count; 265 } 266 }); 267 LOG.info("zombie=" + counter.get() + ", robber=" + count); 268 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " + 269 "Zombie could write " + counter.get() + " and logfile had only " + count, 270 counter.get() == count || counter.get() + 1 == count); 271 } finally { 272 stop.set(true); 273 zombie.interrupt(); 274 Threads.threadDumpingIsAlive(zombie); 275 } 276 } 277 278 /** 279 * This thread will keep writing to a 'wal' file even after the split process has started. 280 * It simulates a region server that was considered dead but woke up and wrote some more to the 281 * last log entry. Does its writing as an alternate user in another filesystem instance to 282 * simulate better it being a regionserver. 283 */ 284 class ZombieLastLogWriterRegionServer extends Thread { 285 final AtomicLong editsCount; 286 final AtomicBoolean stop; 287 final int numOfWriters; 288 /** 289 * Region to write edits for. 290 */ 291 final String region; 292 final User user; 293 294 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop, 295 final String region, final int writers) 296 throws IOException, InterruptedException { 297 super("ZombieLastLogWriterRegionServer"); 298 setDaemon(true); 299 this.stop = stop; 300 this.editsCount = counter; 301 this.region = region; 302 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP); 303 numOfWriters = writers; 304 } 305 306 @Override 307 public void run() { 308 try { 309 doWriting(); 310 } catch (IOException e) { 311 LOG.warn(getName() + " Writer exiting " + e); 312 } catch (InterruptedException e) { 313 LOG.warn(getName() + " Writer exiting " + e); 314 } 315 } 316 317 private void doWriting() throws IOException, InterruptedException { 318 this.user.runAs(new PrivilegedExceptionAction<Object>() { 319 @Override 320 public Object run() throws Exception { 321 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose 322 // index we supply here. 323 int walToKeepOpen = numOfWriters - 1; 324 // The below method writes numOfWriters files each with ENTRIES entries for a total of 325 // numOfWriters * ENTRIES added per column family in the region. 326 Writer writer = null; 327 try { 328 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); 329 } catch (IOException e1) { 330 throw new RuntimeException("Failed", e1); 331 } 332 // Update counter so has all edits written so far. 333 editsCount.addAndGet(numOfWriters * ENTRIES); 334 loop(writer); 335 // If we've been interruped, then things should have shifted out from under us. 336 // closing should error 337 try { 338 writer.close(); 339 fail("Writing closing after parsing should give an error."); 340 } catch (IOException exception) { 341 LOG.debug("ignoring error when closing final writer.", exception); 342 } 343 return null; 344 } 345 }); 346 } 347 348 private void loop(final Writer writer) { 349 byte [] regionBytes = Bytes.toBytes(this.region); 350 while (!stop.get()) { 351 try { 352 long seq = appendEntry(writer, TABLE_NAME, regionBytes, 353 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0); 354 long count = editsCount.incrementAndGet(); 355 LOG.info(getName() + " sync count=" + count + ", seq=" + seq); 356 try { 357 Thread.sleep(1); 358 } catch (InterruptedException e) { 359 // 360 } 361 } catch (IOException ex) { 362 LOG.error(getName() + " ex " + ex.toString()); 363 if (ex instanceof RemoteException) { 364 LOG.error("Juliet: got RemoteException " + ex.getMessage() + 365 " while writing " + (editsCount.get() + 1)); 366 } else { 367 LOG.error(getName() + " failed to write....at " + editsCount.get()); 368 fail("Failed to write " + editsCount.get()); 369 } 370 break; 371 } catch (Throwable t) { 372 LOG.error(getName() + " HOW? " + t); 373 LOG.debug("exception details", t); 374 break; 375 } 376 } 377 LOG.info(getName() + " Writer exiting"); 378 } 379 } 380 381 /** 382 * @see "https://issues.apache.org/jira/browse/HBASE-3020" 383 */ 384 @Test 385 public void testRecoveredEditsPathForMeta() throws IOException { 386 Path p = createRecoveredEditsPathForRegion(); 387 String parentOfParent = p.getParent().getParent().getName(); 388 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 389 } 390 391 /** 392 * Test old recovered edits file doesn't break WALSplitter. 393 * This is useful in upgrading old instances. 394 */ 395 @Test 396 public void testOldRecoveredEditsFileSidelined() throws IOException { 397 Path p = createRecoveredEditsPathForRegion(); 398 Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); 399 Path regiondir = new Path(tdir, 400 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 401 fs.mkdirs(regiondir); 402 Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 403 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); 404 fs.createNewFile(parent); // create a recovered.edits file 405 String parentOfParent = p.getParent().getParent().getName(); 406 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 407 WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); 408 } 409 410 private Path createRecoveredEditsPathForRegion() throws IOException{ 411 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); 412 long now = System.currentTimeMillis(); 413 Entry entry = 414 new Entry(new WALKeyImpl(encoded, 415 TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), 416 new WALEdit()); 417 Path p = WALSplitUtil.getRegionSplitEditsPath(entry, 418 FILENAME_BEING_SPLIT, TMPDIRNAME, conf); 419 return p; 420 } 421 422 @Test 423 public void testHasRecoveredEdits() throws IOException { 424 Path p = createRecoveredEditsPathForRegion(); 425 assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 426 String renamedEdit = p.getName().split("-")[0]; 427 fs.createNewFile(new Path(p.getParent(), renamedEdit)); 428 assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 429 } 430 431 private void useDifferentDFSClient() throws IOException { 432 // make fs act as a different client now 433 // initialize will create a new DFSClient with a new client ID 434 fs.initialize(fs.getUri(), conf); 435 } 436 437 @Test 438 public void testSplitPreservesEdits() throws IOException{ 439 final String REGION = "region__1"; 440 REGIONS.clear(); 441 REGIONS.add(REGION); 442 443 generateWALs(1, 10, -1, 0); 444 useDifferentDFSClient(); 445 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 446 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 447 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 448 assertEquals(1, splitLog.length); 449 450 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 451 } 452 453 @Test 454 public void testSplitRemovesRegionEventsEdits() throws IOException{ 455 final String REGION = "region__1"; 456 REGIONS.clear(); 457 REGIONS.add(REGION); 458 459 generateWALs(1, 10, -1, 100); 460 useDifferentDFSClient(); 461 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 462 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 463 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 464 assertEquals(1, splitLog.length); 465 466 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 467 // split log should only have the test edits 468 assertEquals(10, countWAL(splitLog[0])); 469 } 470 471 472 @Test 473 public void testSplitLeavesCompactionEventsEdits() throws IOException{ 474 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 475 REGIONS.clear(); 476 REGIONS.add(hri.getEncodedName()); 477 Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName()); 478 LOG.info("Creating region directory: " + regionDir); 479 assertTrue(fs.mkdirs(regionDir)); 480 481 Writer writer = generateWALs(1, 10, 0, 10); 482 String[] compactInputs = new String[]{"file1", "file2", "file3"}; 483 String compactOutput = "file4"; 484 appendCompactionEvent(writer, hri, compactInputs, compactOutput); 485 writer.close(); 486 487 useDifferentDFSClient(); 488 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 489 490 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 491 // original log should have 10 test edits, 10 region markers, 1 compaction marker 492 assertEquals(21, countWAL(originalLog)); 493 494 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); 495 assertEquals(1, splitLog.length); 496 497 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 498 // split log should have 10 test edits plus 1 compaction marker 499 assertEquals(11, countWAL(splitLog[0])); 500 } 501 502 /** 503 * @param expectedEntries -1 to not assert 504 * @return the count across all regions 505 */ 506 private int splitAndCount(final int expectedFiles, final int expectedEntries) 507 throws IOException { 508 useDifferentDFSClient(); 509 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 510 int result = 0; 511 for (String region : REGIONS) { 512 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 513 assertEquals(expectedFiles, logfiles.length); 514 int count = 0; 515 for (Path logfile: logfiles) { 516 count += countWAL(logfile); 517 } 518 if (-1 != expectedEntries) { 519 assertEquals(expectedEntries, count); 520 } 521 result += count; 522 } 523 return result; 524 } 525 526 @Test 527 public void testEmptyLogFiles() throws IOException { 528 testEmptyLogFiles(true); 529 } 530 531 @Test 532 public void testEmptyOpenLogFiles() throws IOException { 533 testEmptyLogFiles(false); 534 } 535 536 private void testEmptyLogFiles(final boolean close) throws IOException { 537 // we won't create the hlog dir until getWAL got called, so 538 // make dir here when testing empty log file 539 fs.mkdirs(WALDIR); 540 injectEmptyFile(".empty", close); 541 generateWALs(Integer.MAX_VALUE); 542 injectEmptyFile("empty", close); 543 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty 544 } 545 546 @Test 547 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { 548 // generate logs but leave wal.dat.5 open. 549 generateWALs(5); 550 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 551 } 552 553 @Test 554 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { 555 conf.setBoolean(HBASE_SKIP_ERRORS, true); 556 generateWALs(Integer.MAX_VALUE); 557 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 558 Corruptions.APPEND_GARBAGE, true); 559 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 560 } 561 562 @Test 563 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { 564 conf.setBoolean(HBASE_SKIP_ERRORS, true); 565 generateWALs(Integer.MAX_VALUE); 566 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 567 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 568 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt 569 } 570 571 @Test 572 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { 573 conf.setBoolean(HBASE_SKIP_ERRORS, true); 574 generateWALs(Integer.MAX_VALUE); 575 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 576 Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false); 577 // the entries in the original logs are alternating regions 578 // considering the sequence file header, the middle corruption should 579 // affect at least half of the entries 580 int goodEntries = (NUM_WRITERS - 1) * ENTRIES; 581 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; 582 int allRegionsCount = splitAndCount(NUM_WRITERS, -1); 583 assertTrue("The file up to the corrupted area hasn't been parsed", 584 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount); 585 } 586 587 @Test 588 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { 589 conf.setBoolean(HBASE_SKIP_ERRORS, true); 590 List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays 591 .asList(FaultyProtobufLogReader.FailureType.values()).stream() 592 .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList()); 593 for (FaultyProtobufLogReader.FailureType failureType : failureTypes) { 594 final Set<String> walDirContents = splitCorruptWALs(failureType); 595 final Set<String> archivedLogs = new HashSet<>(); 596 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); 597 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 598 archived.append("\n\t").append(log.toString()); 599 archivedLogs.add(log.getPath().getName()); 600 } 601 LOG.debug(archived.toString()); 602 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs, 603 walDirContents); 604 } 605 } 606 607 /** 608 * @return set of wal names present prior to split attempt. 609 * @throws IOException if the split process fails 610 */ 611 private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType) 612 throws IOException { 613 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", 614 Reader.class); 615 InstrumentedLogWriter.activateFailure = false; 616 617 try { 618 conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class, 619 Reader.class); 620 conf.set("faultyprotobuflogreader.failuretype", failureType.name()); 621 // Clean up from previous tests or previous loop 622 try { 623 wals.shutdown(); 624 } catch (IOException exception) { 625 // since we're splitting out from under the factory, we should expect some closing failures. 626 LOG.debug("Ignoring problem closing WALFactory.", exception); 627 } 628 wals.close(); 629 try { 630 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 631 fs.delete(log.getPath(), true); 632 } 633 } catch (FileNotFoundException exception) { 634 LOG.debug("no previous CORRUPTDIR to clean."); 635 } 636 // change to the faulty reader 637 wals = new WALFactory(conf, name.getMethodName()); 638 generateWALs(-1); 639 // Our reader will render all of these files corrupt. 640 final Set<String> walDirContents = new HashSet<>(); 641 for (FileStatus status : fs.listStatus(WALDIR)) { 642 walDirContents.add(status.getPath().getName()); 643 } 644 useDifferentDFSClient(); 645 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 646 return walDirContents; 647 } finally { 648 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, 649 Reader.class); 650 } 651 } 652 653 @Test (expected = IOException.class) 654 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() 655 throws IOException { 656 conf.setBoolean(HBASE_SKIP_ERRORS, false); 657 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 658 } 659 660 @Test 661 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() 662 throws IOException { 663 conf.setBoolean(HBASE_SKIP_ERRORS, false); 664 try { 665 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 666 } catch (IOException e) { 667 LOG.debug("split with 'skip errors' set to 'false' correctly threw"); 668 } 669 assertEquals("if skip.errors is false all files should remain in place", 670 NUM_WRITERS, fs.listStatus(WALDIR).length); 671 } 672 673 private void ignoreCorruption(final Corruptions corruption, final int entryCount, 674 final int expectedCount) throws IOException { 675 conf.setBoolean(HBASE_SKIP_ERRORS, false); 676 677 final String REGION = "region__1"; 678 REGIONS.clear(); 679 REGIONS.add(REGION); 680 681 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); 682 generateWALs(1, entryCount, -1, 0); 683 corruptWAL(c1, corruption, true); 684 685 useDifferentDFSClient(); 686 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 687 688 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 689 assertEquals(1, splitLog.length); 690 691 int actualCount = 0; 692 Reader in = wals.createReader(fs, splitLog[0]); 693 @SuppressWarnings("unused") 694 Entry entry; 695 while ((entry = in.next()) != null) ++actualCount; 696 assertEquals(expectedCount, actualCount); 697 in.close(); 698 699 // should not have stored the EOF files as corrupt 700 FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); 701 assertEquals(0, archivedLogs.length); 702 703 } 704 705 @Test 706 public void testEOFisIgnored() throws IOException { 707 int entryCount = 10; 708 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1); 709 } 710 711 @Test 712 public void testCorruptWALTrailer() throws IOException { 713 int entryCount = 10; 714 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount); 715 } 716 717 @Test 718 public void testLogsGetArchivedAfterSplit() throws IOException { 719 conf.setBoolean(HBASE_SKIP_ERRORS, false); 720 generateWALs(-1); 721 useDifferentDFSClient(); 722 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 723 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); 724 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); 725 } 726 727 @Test 728 public void testSplit() throws IOException { 729 generateWALs(-1); 730 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 731 } 732 733 @Test 734 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() 735 throws IOException { 736 generateWALs(-1); 737 useDifferentDFSClient(); 738 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 739 FileStatus [] statuses = null; 740 try { 741 statuses = fs.listStatus(WALDIR); 742 if (statuses != null) { 743 fail("Files left in log dir: " + 744 Joiner.on(",").join(FileUtil.stat2Paths(statuses))); 745 } 746 } catch (FileNotFoundException e) { 747 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null 748 } 749 } 750 751 @Test(expected = IOException.class) 752 public void testSplitWillFailIfWritingToRegionFails() throws Exception { 753 //leave 5th log open so we could append the "trap" 754 Writer writer = generateWALs(4); 755 useDifferentDFSClient(); 756 757 String region = "break"; 758 Path regiondir = new Path(TABLEDIR, region); 759 fs.mkdirs(regiondir); 760 761 InstrumentedLogWriter.activateFailure = false; 762 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), 763 Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0); 764 writer.close(); 765 766 try { 767 InstrumentedLogWriter.activateFailure = true; 768 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 769 } catch (IOException e) { 770 assertTrue(e.getMessage(). 771 contains("This exception is instrumented and should only be thrown for testing")); 772 throw e; 773 } finally { 774 InstrumentedLogWriter.activateFailure = false; 775 } 776 } 777 778 @Test 779 public void testSplitDeletedRegion() throws IOException { 780 REGIONS.clear(); 781 String region = "region_that_splits"; 782 REGIONS.add(region); 783 784 generateWALs(1); 785 useDifferentDFSClient(); 786 787 Path regiondir = new Path(TABLEDIR, region); 788 fs.delete(regiondir, true); 789 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 790 assertFalse(fs.exists(regiondir)); 791 } 792 793 @Test 794 public void testIOEOnOutputThread() throws Exception { 795 conf.setBoolean(HBASE_SKIP_ERRORS, false); 796 797 generateWALs(-1); 798 useDifferentDFSClient(); 799 FileStatus[] logfiles = fs.listStatus(WALDIR); 800 assertTrue("There should be some log file", 801 logfiles != null && logfiles.length > 0); 802 // wals with no entries (like the one we don't use in the factory) 803 // won't cause a failure since nothing will ever be written. 804 // pick the largest one since it's most likely to have entries. 805 int largestLogFile = 0; 806 long largestSize = 0; 807 for (int i = 0; i < logfiles.length; i++) { 808 if (logfiles[i].getLen() > largestSize) { 809 largestLogFile = i; 810 largestSize = logfiles[i].getLen(); 811 } 812 } 813 assertTrue("There should be some log greater than size 0.", 0 < largestSize); 814 // Set up a splitter that will throw an IOE on the output side 815 WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { 816 @Override 817 protected Writer createWriter(Path logfile) throws IOException { 818 Writer mockWriter = Mockito.mock(Writer.class); 819 Mockito.doThrow(new IOException("Injected")).when( 820 mockWriter).append(Mockito.<Entry>any()); 821 return mockWriter; 822 } 823 }; 824 // Set up a background thread dumper. Needs a thread to depend on and then we need to run 825 // the thread dumping in a background thread so it does not hold up the test. 826 final AtomicBoolean stop = new AtomicBoolean(false); 827 final Thread someOldThread = new Thread("Some-old-thread") { 828 @Override 829 public void run() { 830 while(!stop.get()) Threads.sleep(10); 831 } 832 }; 833 someOldThread.setDaemon(true); 834 someOldThread.start(); 835 final Thread t = new Thread("Background-thread-dumper") { 836 @Override 837 public void run() { 838 try { 839 Threads.threadDumpingIsAlive(someOldThread); 840 } catch (InterruptedException e) { 841 e.printStackTrace(); 842 } 843 } 844 }; 845 t.setDaemon(true); 846 t.start(); 847 try { 848 logSplitter.splitLogFile(logfiles[largestLogFile], null); 849 fail("Didn't throw!"); 850 } catch (IOException ioe) { 851 assertTrue(ioe.toString().contains("Injected")); 852 } finally { 853 // Setting this to true will turn off the background thread dumper. 854 stop.set(true); 855 } 856 } 857 858 /** 859 * @param spiedFs should be instrumented for failure. 860 */ 861 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { 862 generateWALs(-1); 863 useDifferentDFSClient(); 864 865 try { 866 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); 867 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); 868 assertFalse(fs.exists(WALDIR)); 869 } catch (IOException e) { 870 fail("There shouldn't be any exception but: " + e.toString()); 871 } 872 } 873 874 // Test for HBASE-3412 875 @Test 876 public void testMovedWALDuringRecovery() throws Exception { 877 // This partial mock will throw LEE for every file simulating 878 // files that were moved 879 FileSystem spiedFs = Mockito.spy(fs); 880 // The "File does not exist" part is very important, 881 // that's how it comes out of HDFS 882 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). 883 when(spiedFs).append(Mockito.<Path>any()); 884 retryOverHdfsProblem(spiedFs); 885 } 886 887 @Test 888 public void testRetryOpenDuringRecovery() throws Exception { 889 FileSystem spiedFs = Mockito.spy(fs); 890 // The "Cannot obtain block length", "Could not obtain the last block", 891 // and "Blocklist for [^ ]* has changed.*" part is very important, 892 // that's how it comes out of HDFS. If HDFS changes the exception 893 // message, this test needs to be adjusted accordingly. 894 // 895 // When DFSClient tries to open a file, HDFS needs to locate 896 // the last block of the file and get its length. However, if the 897 // last block is under recovery, HDFS may have problem to obtain 898 // the block length, in which case, retry may help. 899 Mockito.doAnswer(new Answer<FSDataInputStream>() { 900 private final String[] errors = new String[] { 901 "Cannot obtain block length", "Could not obtain the last block", 902 "Blocklist for " + OLDLOGDIR + " has changed"}; 903 private int count = 0; 904 905 @Override 906 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 907 if (count < 3) { 908 throw new IOException(errors[count++]); 909 } 910 return (FSDataInputStream)invocation.callRealMethod(); 911 } 912 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); 913 retryOverHdfsProblem(spiedFs); 914 } 915 916 @Test 917 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { 918 generateWALs(1, 10, -1); 919 FileStatus logfile = fs.listStatus(WALDIR)[0]; 920 useDifferentDFSClient(); 921 922 final AtomicInteger count = new AtomicInteger(); 923 924 CancelableProgressable localReporter 925 = new CancelableProgressable() { 926 @Override 927 public boolean progress() { 928 count.getAndIncrement(); 929 return false; 930 } 931 }; 932 933 FileSystem spiedFs = Mockito.spy(fs); 934 Mockito.doAnswer(new Answer<FSDataInputStream>() { 935 @Override 936 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 937 Thread.sleep(1500); // Sleep a while and wait report status invoked 938 return (FSDataInputStream)invocation.callRealMethod(); 939 } 940 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); 941 942 try { 943 conf.setInt("hbase.splitlog.report.period", 1000); 944 boolean ret = WALSplitter.splitLogFile( 945 HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals); 946 assertFalse("Log splitting should failed", ret); 947 assertTrue(count.get() > 0); 948 } catch (IOException e) { 949 fail("There shouldn't be any exception but: " + e.toString()); 950 } finally { 951 // reset it back to its default value 952 conf.setInt("hbase.splitlog.report.period", 59000); 953 } 954 } 955 956 /** 957 * Test log split process with fake data and lots of edits to trigger threading 958 * issues. 959 */ 960 @Test 961 public void testThreading() throws Exception { 962 doTestThreading(20000, 128*1024*1024, 0); 963 } 964 965 /** 966 * Test blocking behavior of the log split process if writers are writing slower 967 * than the reader is reading. 968 */ 969 @Test 970 public void testThreadingSlowWriterSmallBuffer() throws Exception { 971 doTestThreading(200, 1024, 50); 972 } 973 974 /** 975 * Sets up a log splitter with a mock reader and writer. The mock reader generates 976 * a specified number of edits spread across 5 regions. The mock writer optionally 977 * sleeps for each edit it is fed. 978 * * 979 * After the split is complete, verifies that the statistics show the correct number 980 * of edits output into each region. 981 * 982 * @param numFakeEdits number of fake edits to push through pipeline 983 * @param bufferSize size of in-memory buffer 984 * @param writerSlowness writer threads will sleep this many ms per edit 985 */ 986 private void doTestThreading(final int numFakeEdits, 987 final int bufferSize, 988 final int writerSlowness) throws Exception { 989 990 Configuration localConf = new Configuration(conf); 991 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); 992 993 // Create a fake log file (we'll override the reader to produce a stream of edits) 994 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake"); 995 FSDataOutputStream out = fs.create(logPath); 996 out.close(); 997 998 // Make region dirs for our destination regions so the output doesn't get skipped 999 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 1000 makeRegionDirs(regions); 1001 1002 // Create a splitter that reads and writes the data without touching disk 1003 WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) { 1004 1005 /* Produce a mock writer that doesn't write anywhere */ 1006 @Override 1007 protected Writer createWriter(Path logfile) throws IOException { 1008 Writer mockWriter = Mockito.mock(Writer.class); 1009 Mockito.doAnswer(new Answer<Void>() { 1010 int expectedIndex = 0; 1011 1012 @Override 1013 public Void answer(InvocationOnMock invocation) { 1014 if (writerSlowness > 0) { 1015 try { 1016 Thread.sleep(writerSlowness); 1017 } catch (InterruptedException ie) { 1018 Thread.currentThread().interrupt(); 1019 } 1020 } 1021 Entry entry = (Entry) invocation.getArgument(0); 1022 WALEdit edit = entry.getEdit(); 1023 List<Cell> cells = edit.getCells(); 1024 assertEquals(1, cells.size()); 1025 Cell cell = cells.get(0); 1026 1027 // Check that the edits come in the right order. 1028 assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), 1029 cell.getRowLength())); 1030 expectedIndex++; 1031 return null; 1032 } 1033 }).when(mockWriter).append(Mockito.<Entry>any()); 1034 return mockWriter; 1035 } 1036 1037 /* Produce a mock reader that generates fake entries */ 1038 @Override 1039 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) 1040 throws IOException { 1041 Reader mockReader = Mockito.mock(Reader.class); 1042 Mockito.doAnswer(new Answer<Entry>() { 1043 int index = 0; 1044 1045 @Override 1046 public Entry answer(InvocationOnMock invocation) throws Throwable { 1047 if (index >= numFakeEdits) return null; 1048 1049 // Generate r0 through r4 in round robin fashion 1050 int regionIdx = index % regions.size(); 1051 byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)}; 1052 1053 Entry ret = createTestEntry(TABLE_NAME, region, 1054 Bytes.toBytes(index / regions.size()), 1055 FAMILY, QUALIFIER, VALUE, index); 1056 index++; 1057 return ret; 1058 } 1059 }).when(mockReader).next(); 1060 return mockReader; 1061 } 1062 }; 1063 1064 logSplitter.splitLogFile(fs.getFileStatus(logPath), null); 1065 1066 // Verify number of written edits per region 1067 Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts(); 1068 for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) { 1069 LOG.info("Got " + entry.getValue() + " output edits for region " + 1070 Bytes.toString(entry.getKey())); 1071 assertEquals((long)entry.getValue(), numFakeEdits / regions.size()); 1072 } 1073 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size()); 1074 } 1075 1076 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests? 1077 @Test 1078 public void testSplitLogFileDeletedRegionDir() throws IOException { 1079 LOG.info("testSplitLogFileDeletedRegionDir"); 1080 final String REGION = "region__1"; 1081 REGIONS.clear(); 1082 REGIONS.add(REGION); 1083 1084 generateWALs(1, 10, -1); 1085 useDifferentDFSClient(); 1086 1087 Path regiondir = new Path(TABLEDIR, REGION); 1088 LOG.info("Region directory is" + regiondir); 1089 fs.delete(regiondir, true); 1090 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1091 assertFalse(fs.exists(regiondir)); 1092 } 1093 1094 @Test 1095 public void testSplitLogFileEmpty() throws IOException { 1096 LOG.info("testSplitLogFileEmpty"); 1097 // we won't create the hlog dir until getWAL got called, so 1098 // make dir here when testing empty log file 1099 fs.mkdirs(WALDIR); 1100 injectEmptyFile(".empty", true); 1101 useDifferentDFSClient(); 1102 1103 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1104 Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); 1105 assertFalse(fs.exists(tdir)); 1106 1107 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath())); 1108 } 1109 1110 @Test 1111 public void testSplitLogFileMultipleRegions() throws IOException { 1112 LOG.info("testSplitLogFileMultipleRegions"); 1113 generateWALs(1, 10, -1); 1114 splitAndCount(1, 10); 1115 } 1116 1117 @Test 1118 public void testSplitLogFileFirstLineCorruptionLog() 1119 throws IOException { 1120 conf.setBoolean(HBASE_SKIP_ERRORS, true); 1121 generateWALs(1, 10, -1); 1122 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1123 1124 corruptWAL(logfile.getPath(), 1125 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 1126 1127 useDifferentDFSClient(); 1128 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1129 1130 final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 1131 assertEquals(1, fs.listStatus(corruptDir).length); 1132 } 1133 1134 /** 1135 * @see "https://issues.apache.org/jira/browse/HBASE-4862" 1136 */ 1137 @Test 1138 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { 1139 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); 1140 // Generate wals for our destination region 1141 String regionName = "r0"; 1142 final Path regiondir = new Path(TABLEDIR, regionName); 1143 REGIONS.clear(); 1144 REGIONS.add(regionName); 1145 generateWALs(-1); 1146 1147 wals.getWAL(null); 1148 FileStatus[] logfiles = fs.listStatus(WALDIR); 1149 assertTrue("There should be some log file", 1150 logfiles != null && logfiles.length > 0); 1151 1152 WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { 1153 @Override 1154 protected Writer createWriter(Path logfile) 1155 throws IOException { 1156 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); 1157 // After creating writer, simulate region's 1158 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this 1159 // region and delete them, excluding files with '.temp' suffix. 1160 NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir); 1161 if (files != null && !files.isEmpty()) { 1162 for (Path file : files) { 1163 if (!this.walFS.delete(file, false)) { 1164 LOG.error("Failed delete of " + file); 1165 } else { 1166 LOG.debug("Deleted recovered.edits file=" + file); 1167 } 1168 } 1169 } 1170 return writer; 1171 } 1172 }; 1173 try{ 1174 logSplitter.splitLogFile(logfiles[0], null); 1175 } catch (IOException e) { 1176 LOG.info(e.toString(), e); 1177 fail("Throws IOException when spliting " 1178 + "log, it is most likely because writing file does not " 1179 + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); 1180 } 1181 if (fs.exists(CORRUPTDIR)) { 1182 if (fs.listStatus(CORRUPTDIR).length > 0) { 1183 fail("There are some corrupt logs, " 1184 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); 1185 } 1186 } 1187 } 1188 1189 private Writer generateWALs(int leaveOpen) throws IOException { 1190 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0); 1191 } 1192 1193 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { 1194 return generateWALs(writers, entries, leaveOpen, 7); 1195 } 1196 1197 private void makeRegionDirs(List<String> regions) throws IOException { 1198 for (String region : regions) { 1199 LOG.debug("Creating dir for region " + region); 1200 fs.mkdirs(new Path(TABLEDIR, region)); 1201 } 1202 } 1203 1204 /** 1205 * @param leaveOpen index to leave un-closed. -1 to close all. 1206 * @return the writer that's still open, or null if all were closed. 1207 */ 1208 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException { 1209 makeRegionDirs(REGIONS); 1210 fs.mkdirs(WALDIR); 1211 Writer [] ws = new Writer[writers]; 1212 int seq = 0; 1213 int numRegionEventsAdded = 0; 1214 for (int i = 0; i < writers; i++) { 1215 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); 1216 for (int j = 0; j < entries; j++) { 1217 int prefix = 0; 1218 for (String region : REGIONS) { 1219 String row_key = region + prefix++ + i + j; 1220 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY, 1221 QUALIFIER, VALUE, seq++); 1222 1223 if (numRegionEventsAdded < regionEvents) { 1224 numRegionEventsAdded ++; 1225 appendRegionEvent(ws[i], region); 1226 } 1227 } 1228 } 1229 if (i != leaveOpen) { 1230 ws[i].close(); 1231 LOG.info("Closing writer " + i); 1232 } 1233 } 1234 if (leaveOpen < 0 || leaveOpen >= writers) { 1235 return null; 1236 } 1237 return ws[leaveOpen]; 1238 } 1239 1240 1241 1242 private Path[] getLogForRegion(TableName table, String region) 1243 throws IOException { 1244 Path tdir = FSUtils.getWALTableDir(conf, table); 1245 @SuppressWarnings("deprecation") 1246 Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, 1247 Bytes.toString(Bytes.toBytes(region)))); 1248 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 1249 @Override 1250 public boolean accept(Path p) { 1251 if (WALSplitUtil.isSequenceIdFile(p)) { 1252 return false; 1253 } 1254 return true; 1255 } 1256 }); 1257 Path[] paths = new Path[files.length]; 1258 for (int i = 0; i < files.length; i++) { 1259 paths[i] = files[i].getPath(); 1260 } 1261 return paths; 1262 } 1263 1264 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException { 1265 FSDataOutputStream out; 1266 int fileSize = (int) fs.listStatus(path)[0].getLen(); 1267 1268 FSDataInputStream in = fs.open(path); 1269 byte[] corrupted_bytes = new byte[fileSize]; 1270 in.readFully(0, corrupted_bytes, 0, fileSize); 1271 in.close(); 1272 1273 switch (corruption) { 1274 case APPEND_GARBAGE: 1275 fs.delete(path, false); 1276 out = fs.create(path); 1277 out.write(corrupted_bytes); 1278 out.write(Bytes.toBytes("-----")); 1279 closeOrFlush(close, out); 1280 break; 1281 1282 case INSERT_GARBAGE_ON_FIRST_LINE: 1283 fs.delete(path, false); 1284 out = fs.create(path); 1285 out.write(0); 1286 out.write(corrupted_bytes); 1287 closeOrFlush(close, out); 1288 break; 1289 1290 case INSERT_GARBAGE_IN_THE_MIDDLE: 1291 fs.delete(path, false); 1292 out = fs.create(path); 1293 int middle = (int) Math.floor(corrupted_bytes.length / 2); 1294 out.write(corrupted_bytes, 0, middle); 1295 out.write(0); 1296 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); 1297 closeOrFlush(close, out); 1298 break; 1299 1300 case TRUNCATE: 1301 fs.delete(path, false); 1302 out = fs.create(path); 1303 out.write(corrupted_bytes, 0, fileSize 1304 - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); 1305 closeOrFlush(close, out); 1306 break; 1307 1308 case TRUNCATE_TRAILER: 1309 fs.delete(path, false); 1310 out = fs.create(path); 1311 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. 1312 closeOrFlush(close, out); 1313 break; 1314 } 1315 } 1316 1317 private void closeOrFlush(boolean close, FSDataOutputStream out) 1318 throws IOException { 1319 if (close) { 1320 out.close(); 1321 } else { 1322 Method syncMethod = null; 1323 try { 1324 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{}); 1325 } catch (NoSuchMethodException e) { 1326 try { 1327 syncMethod = out.getClass().getMethod("sync", new Class<?> []{}); 1328 } catch (NoSuchMethodException ex) { 1329 throw new IOException("This version of Hadoop supports " + 1330 "neither Syncable.sync() nor Syncable.hflush()."); 1331 } 1332 } 1333 try { 1334 syncMethod.invoke(out, new Object[]{}); 1335 } catch (Exception e) { 1336 throw new IOException(e); 1337 } 1338 // Not in 0out.hflush(); 1339 } 1340 } 1341 1342 private int countWAL(Path log) throws IOException { 1343 int count = 0; 1344 Reader in = wals.createReader(fs, log); 1345 while (in.next() != null) { 1346 count++; 1347 } 1348 in.close(); 1349 return count; 1350 } 1351 1352 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs, 1353 String output) throws IOException { 1354 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder(); 1355 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes())) 1356 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes())) 1357 .setRegionName(ByteString.copyFrom(hri.getRegionName())) 1358 .setFamilyName(ByteString.copyFrom(FAMILY)) 1359 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY)) 1360 .addAllCompactionInput(Arrays.asList(inputs)) 1361 .addCompactionOutput(output); 1362 1363 WALEdit edit = WALEdit.createCompaction(hri, desc.build()); 1364 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, 1365 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); 1366 w.append(new Entry(key, edit)); 1367 w.sync(false); 1368 } 1369 1370 private static void appendRegionEvent(Writer w, String region) throws IOException { 1371 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( 1372 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, 1373 TABLE_NAME.toBytes(), 1374 Bytes.toBytes(region), 1375 Bytes.toBytes(String.valueOf(region.hashCode())), 1376 1, 1377 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of()); 1378 final long time = EnvironmentEdgeManager.currentTime(); 1379 final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, 1380 HConstants.DEFAULT_CLUSTER_ID); 1381 WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc); 1382 w.append(new Entry(walKey, we)); 1383 w.sync(false); 1384 } 1385 1386 public static long appendEntry(Writer writer, TableName table, byte[] region, 1387 byte[] row, byte[] family, byte[] qualifier, 1388 byte[] value, long seq) 1389 throws IOException { 1390 LOG.info(Thread.currentThread().getName() + " append"); 1391 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); 1392 LOG.info(Thread.currentThread().getName() + " sync"); 1393 writer.sync(false); 1394 return seq; 1395 } 1396 1397 private static Entry createTestEntry( 1398 TableName table, byte[] region, 1399 byte[] row, byte[] family, byte[] qualifier, 1400 byte[] value, long seq) { 1401 long time = System.nanoTime(); 1402 1403 seq++; 1404 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); 1405 WALEdit edit = new WALEdit(); 1406 edit.add(cell); 1407 return new Entry(new WALKeyImpl(region, table, seq, time, 1408 HConstants.DEFAULT_CLUSTER_ID), edit); 1409 } 1410 1411 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { 1412 Writer writer = 1413 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); 1414 if (closeFile) { 1415 writer.close(); 1416 } 1417 } 1418 1419 private boolean logsAreEqual(Path p1, Path p2) throws IOException { 1420 Reader in1, in2; 1421 in1 = wals.createReader(fs, p1); 1422 in2 = wals.createReader(fs, p2); 1423 Entry entry1; 1424 Entry entry2; 1425 while ((entry1 = in1.next()) != null) { 1426 entry2 = in2.next(); 1427 if ((entry1.getKey().compareTo(entry2.getKey()) != 0) || 1428 (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) { 1429 return false; 1430 } 1431 } 1432 in1.close(); 1433 in2.close(); 1434 return true; 1435 } 1436}