001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.lang.reflect.Method; 028import java.security.PrivilegedExceptionAction; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.NavigableSet; 037import java.util.Objects; 038import java.util.Set; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FSDataInputStream; 045import org.apache.hadoop.fs.FSDataOutputStream; 046import org.apache.hadoop.fs.FileStatus; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.FileUtil; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.fs.PathFilter; 051import org.apache.hadoop.hbase.Cell; 052import org.apache.hadoop.hbase.HBaseClassTestRule; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtility; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.ServerName; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.client.RegionInfoBuilder; 061import org.apache.hadoop.hbase.regionserver.HRegion; 062import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; 063import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; 064import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 065import org.apache.hadoop.hbase.security.User; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.testclassification.RegionServerTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CancelableProgressable; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.FSUtils; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.hbase.wal.WAL.Entry; 074import org.apache.hadoop.hbase.wal.WAL.Reader; 075import org.apache.hadoop.hbase.wal.WALProvider.Writer; 076import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; 077import org.apache.hadoop.hdfs.DFSTestUtil; 078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 079import org.apache.hadoop.ipc.RemoteException; 080import org.junit.After; 081import org.junit.AfterClass; 082import org.junit.Before; 083import org.junit.BeforeClass; 084import org.junit.ClassRule; 085import org.junit.Rule; 086import org.junit.Test; 087import org.junit.experimental.categories.Category; 088import org.junit.rules.TestName; 089import org.mockito.Mockito; 090import org.mockito.invocation.InvocationOnMock; 091import org.mockito.stubbing.Answer; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 097import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 098import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 099 100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 102 103/** 104 * Testing {@link WAL} splitting code. 105 */ 106@Category({RegionServerTests.class, LargeTests.class}) 107public class TestWALSplit { 108 109 @ClassRule 110 public static final HBaseClassTestRule CLASS_RULE = 111 HBaseClassTestRule.forClass(TestWALSplit.class); 112 113 { 114 // Uncomment the following lines if more verbosity is needed for 115 // debugging (see HBASE-12285 for details). 116 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); 117 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); 118 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); 119 } 120 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class); 121 122 private static Configuration conf; 123 private FileSystem fs; 124 125 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 126 127 private Path HBASEDIR; 128 private Path HBASELOGDIR; 129 private Path WALDIR; 130 private Path OLDLOGDIR; 131 private Path CORRUPTDIR; 132 private Path TABLEDIR; 133 private String TMPDIRNAME; 134 135 private static final int NUM_WRITERS = 10; 136 private static final int ENTRIES = 10; // entries per writer per region 137 138 private static final String FILENAME_BEING_SPLIT = "testfile"; 139 private static final TableName TABLE_NAME = 140 TableName.valueOf("t1"); 141 private static final byte[] FAMILY = Bytes.toBytes("f1"); 142 private static final byte[] QUALIFIER = Bytes.toBytes("q1"); 143 private static final byte[] VALUE = Bytes.toBytes("v1"); 144 private static final String WAL_FILE_PREFIX = "wal.dat."; 145 private static List<String> REGIONS = new ArrayList<>(); 146 private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors"; 147 private static String ROBBER; 148 private static String ZOMBIE; 149 private static String [] GROUP = new String [] {"supergroup"}; 150 151 static enum Corruptions { 152 INSERT_GARBAGE_ON_FIRST_LINE, 153 INSERT_GARBAGE_IN_THE_MIDDLE, 154 APPEND_GARBAGE, 155 TRUNCATE, 156 TRUNCATE_TRAILER 157 } 158 159 @BeforeClass 160 public static void setUpBeforeClass() throws Exception { 161 conf = TEST_UTIL.getConfiguration(); 162 conf.setClass("hbase.regionserver.hlog.writer.impl", 163 InstrumentedLogWriter.class, Writer.class); 164 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. 165 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 166 // Create fake maping user to group and set it to the conf. 167 Map<String, String []> u2g_map = new HashMap<>(2); 168 ROBBER = User.getCurrent().getName() + "-robber"; 169 ZOMBIE = User.getCurrent().getName() + "-zombie"; 170 u2g_map.put(ROBBER, GROUP); 171 u2g_map.put(ZOMBIE, GROUP); 172 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); 173 conf.setInt("dfs.heartbeat.interval", 1); 174 TEST_UTIL.startMiniDFSCluster(2); 175 } 176 177 @AfterClass 178 public static void tearDownAfterClass() throws Exception { 179 TEST_UTIL.shutdownMiniDFSCluster(); 180 } 181 182 @Rule 183 public TestName name = new TestName(); 184 private WALFactory wals = null; 185 186 @Before 187 public void setUp() throws Exception { 188 LOG.info("Cleaning up cluster for new test."); 189 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 190 HBASEDIR = TEST_UTIL.createRootDir(); 191 HBASELOGDIR = TEST_UTIL.createWALRootDir(); 192 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME); 193 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME); 194 TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); 195 TMPDIRNAME = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 196 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 197 REGIONS.clear(); 198 Collections.addAll(REGIONS, "bbb", "ccc"); 199 InstrumentedLogWriter.activateFailure = false; 200 wals = new WALFactory(conf, name.getMethodName()); 201 WALDIR = new Path(HBASELOGDIR, 202 AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(), 203 16010, System.currentTimeMillis()).toString())); 204 //fs.mkdirs(WALDIR); 205 } 206 207 @After 208 public void tearDown() throws Exception { 209 try { 210 wals.close(); 211 } catch(IOException exception) { 212 // Some tests will move WALs out from under us. In those cases, we'll get an error on close. 213 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" + 214 " you see a failure look here."); 215 LOG.debug("exception details", exception); 216 } finally { 217 wals = null; 218 fs.delete(HBASEDIR, true); 219 fs.delete(HBASELOGDIR, true); 220 } 221 } 222 223 /** 224 * Simulates splitting a WAL out from under a regionserver that is still trying to write it. 225 * Ensures we do not lose edits. 226 * @throws IOException 227 * @throws InterruptedException 228 */ 229 @Test 230 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { 231 final AtomicLong counter = new AtomicLong(0); 232 AtomicBoolean stop = new AtomicBoolean(false); 233 // Region we'll write edits too and then later examine to make sure they all made it in. 234 final String region = REGIONS.get(0); 235 final int numWriters = 3; 236 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters); 237 try { 238 long startCount = counter.get(); 239 zombie.start(); 240 // Wait till writer starts going. 241 while (startCount == counter.get()) Threads.sleep(1); 242 // Give it a second to write a few appends. 243 Threads.sleep(1000); 244 final Configuration conf2 = HBaseConfiguration.create(conf); 245 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP); 246 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() { 247 @Override 248 public Integer run() throws Exception { 249 StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR) 250 .append("):\n"); 251 for (FileStatus status : fs.listStatus(WALDIR)) { 252 ls.append("\t").append(status.toString()).append("\n"); 253 } 254 LOG.debug(Objects.toString(ls)); 255 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); 256 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); 257 LOG.info("Finished splitting out from under zombie."); 258 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 259 assertEquals("wrong number of split files for region", numWriters, logfiles.length); 260 int count = 0; 261 for (Path logfile: logfiles) { 262 count += countWAL(logfile); 263 } 264 return count; 265 } 266 }); 267 LOG.info("zombie=" + counter.get() + ", robber=" + count); 268 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " + 269 "Zombie could write " + counter.get() + " and logfile had only " + count, 270 counter.get() == count || counter.get() + 1 == count); 271 } finally { 272 stop.set(true); 273 zombie.interrupt(); 274 Threads.threadDumpingIsAlive(zombie); 275 } 276 } 277 278 /** 279 * This thread will keep writing to a 'wal' file even after the split process has started. 280 * It simulates a region server that was considered dead but woke up and wrote some more to the 281 * last log entry. Does its writing as an alternate user in another filesystem instance to 282 * simulate better it being a regionserver. 283 */ 284 class ZombieLastLogWriterRegionServer extends Thread { 285 final AtomicLong editsCount; 286 final AtomicBoolean stop; 287 final int numOfWriters; 288 /** 289 * Region to write edits for. 290 */ 291 final String region; 292 final User user; 293 294 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop, 295 final String region, final int writers) 296 throws IOException, InterruptedException { 297 super("ZombieLastLogWriterRegionServer"); 298 setDaemon(true); 299 this.stop = stop; 300 this.editsCount = counter; 301 this.region = region; 302 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP); 303 numOfWriters = writers; 304 } 305 306 @Override 307 public void run() { 308 try { 309 doWriting(); 310 } catch (IOException e) { 311 LOG.warn(getName() + " Writer exiting " + e); 312 } catch (InterruptedException e) { 313 LOG.warn(getName() + " Writer exiting " + e); 314 } 315 } 316 317 private void doWriting() throws IOException, InterruptedException { 318 this.user.runAs(new PrivilegedExceptionAction<Object>() { 319 @Override 320 public Object run() throws Exception { 321 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose 322 // index we supply here. 323 int walToKeepOpen = numOfWriters - 1; 324 // The below method writes numOfWriters files each with ENTRIES entries for a total of 325 // numOfWriters * ENTRIES added per column family in the region. 326 Writer writer = null; 327 try { 328 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); 329 } catch (IOException e1) { 330 throw new RuntimeException("Failed", e1); 331 } 332 // Update counter so has all edits written so far. 333 editsCount.addAndGet(numOfWriters * ENTRIES); 334 loop(writer); 335 // If we've been interruped, then things should have shifted out from under us. 336 // closing should error 337 try { 338 writer.close(); 339 fail("Writing closing after parsing should give an error."); 340 } catch (IOException exception) { 341 LOG.debug("ignoring error when closing final writer.", exception); 342 } 343 return null; 344 } 345 }); 346 } 347 348 private void loop(final Writer writer) { 349 byte [] regionBytes = Bytes.toBytes(this.region); 350 while (!stop.get()) { 351 try { 352 long seq = appendEntry(writer, TABLE_NAME, regionBytes, 353 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0); 354 long count = editsCount.incrementAndGet(); 355 LOG.info(getName() + " sync count=" + count + ", seq=" + seq); 356 try { 357 Thread.sleep(1); 358 } catch (InterruptedException e) { 359 // 360 } 361 } catch (IOException ex) { 362 LOG.error(getName() + " ex " + ex.toString()); 363 if (ex instanceof RemoteException) { 364 LOG.error("Juliet: got RemoteException " + ex.getMessage() + 365 " while writing " + (editsCount.get() + 1)); 366 } else { 367 LOG.error(getName() + " failed to write....at " + editsCount.get()); 368 fail("Failed to write " + editsCount.get()); 369 } 370 break; 371 } catch (Throwable t) { 372 LOG.error(getName() + " HOW? " + t); 373 LOG.debug("exception details", t); 374 break; 375 } 376 } 377 LOG.info(getName() + " Writer exiting"); 378 } 379 } 380 381 /** 382 * @see "https://issues.apache.org/jira/browse/HBASE-3020" 383 */ 384 @Test 385 public void testRecoveredEditsPathForMeta() throws IOException { 386 Path p = createRecoveredEditsPathForRegion(); 387 String parentOfParent = p.getParent().getParent().getName(); 388 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 389 } 390 391 /** 392 * Test old recovered edits file doesn't break WALSplitter. 393 * This is useful in upgrading old instances. 394 */ 395 @Test 396 public void testOldRecoveredEditsFileSidelined() throws IOException { 397 Path p = createRecoveredEditsPathForRegion(); 398 Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); 399 Path regiondir = new Path(tdir, 400 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 401 fs.mkdirs(regiondir); 402 Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); 403 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); 404 fs.createNewFile(parent); // create a recovered.edits file 405 String parentOfParent = p.getParent().getParent().getName(); 406 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 407 WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); 408 } 409 410 private Path createRecoveredEditsPathForRegion() throws IOException{ 411 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); 412 long now = System.currentTimeMillis(); 413 Entry entry = 414 new Entry(new WALKeyImpl(encoded, 415 TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), 416 new WALEdit()); 417 Path p = WALSplitter.getRegionSplitEditsPath(entry, 418 FILENAME_BEING_SPLIT, TMPDIRNAME, conf); 419 return p; 420 } 421 422 /** 423 * Test hasRecoveredEdits correctly identifies proper recovered edits file on related dir. 424 * @throws IOException on any issues found while creating test required files/directories. 425 */ 426 @Test 427 public void testHasRecoveredEdits() throws IOException { 428 Path p = createRecoveredEditsPathForRegion(); 429 assertFalse(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 430 String renamedEdit = p.getName().split("-")[0]; 431 fs.createNewFile(new Path(p.getParent(), renamedEdit)); 432 assertTrue(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 433 } 434 435 private void useDifferentDFSClient() throws IOException { 436 // make fs act as a different client now 437 // initialize will create a new DFSClient with a new client ID 438 fs.initialize(fs.getUri(), conf); 439 } 440 441 @Test 442 public void testSplitPreservesEdits() throws IOException{ 443 final String REGION = "region__1"; 444 REGIONS.clear(); 445 REGIONS.add(REGION); 446 447 generateWALs(1, 10, -1, 0); 448 useDifferentDFSClient(); 449 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 450 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 451 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 452 assertEquals(1, splitLog.length); 453 454 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 455 } 456 457 @Test 458 public void testSplitRemovesRegionEventsEdits() throws IOException{ 459 final String REGION = "region__1"; 460 REGIONS.clear(); 461 REGIONS.add(REGION); 462 463 generateWALs(1, 10, -1, 100); 464 useDifferentDFSClient(); 465 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 466 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 467 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 468 assertEquals(1, splitLog.length); 469 470 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 471 // split log should only have the test edits 472 assertEquals(10, countWAL(splitLog[0])); 473 } 474 475 476 @Test 477 public void testSplitLeavesCompactionEventsEdits() throws IOException{ 478 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 479 REGIONS.clear(); 480 REGIONS.add(hri.getEncodedName()); 481 Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName()); 482 LOG.info("Creating region directory: " + regionDir); 483 assertTrue(fs.mkdirs(regionDir)); 484 485 Writer writer = generateWALs(1, 10, 0, 10); 486 String[] compactInputs = new String[]{"file1", "file2", "file3"}; 487 String compactOutput = "file4"; 488 appendCompactionEvent(writer, hri, compactInputs, compactOutput); 489 writer.close(); 490 491 useDifferentDFSClient(); 492 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 493 494 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 495 // original log should have 10 test edits, 10 region markers, 1 compaction marker 496 assertEquals(21, countWAL(originalLog)); 497 498 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); 499 assertEquals(1, splitLog.length); 500 501 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 502 // split log should have 10 test edits plus 1 compaction marker 503 assertEquals(11, countWAL(splitLog[0])); 504 } 505 506 /** 507 * @param expectedEntries -1 to not assert 508 * @return the count across all regions 509 */ 510 private int splitAndCount(final int expectedFiles, final int expectedEntries) 511 throws IOException { 512 useDifferentDFSClient(); 513 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 514 int result = 0; 515 for (String region : REGIONS) { 516 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 517 assertEquals(expectedFiles, logfiles.length); 518 int count = 0; 519 for (Path logfile: logfiles) { 520 count += countWAL(logfile); 521 } 522 if (-1 != expectedEntries) { 523 assertEquals(expectedEntries, count); 524 } 525 result += count; 526 } 527 return result; 528 } 529 530 @Test 531 public void testEmptyLogFiles() throws IOException { 532 testEmptyLogFiles(true); 533 } 534 535 @Test 536 public void testEmptyOpenLogFiles() throws IOException { 537 testEmptyLogFiles(false); 538 } 539 540 private void testEmptyLogFiles(final boolean close) throws IOException { 541 // we won't create the hlog dir until getWAL got called, so 542 // make dir here when testing empty log file 543 fs.mkdirs(WALDIR); 544 injectEmptyFile(".empty", close); 545 generateWALs(Integer.MAX_VALUE); 546 injectEmptyFile("empty", close); 547 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty 548 } 549 550 @Test 551 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { 552 // generate logs but leave wal.dat.5 open. 553 generateWALs(5); 554 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 555 } 556 557 @Test 558 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { 559 conf.setBoolean(HBASE_SKIP_ERRORS, true); 560 generateWALs(Integer.MAX_VALUE); 561 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 562 Corruptions.APPEND_GARBAGE, true); 563 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 564 } 565 566 @Test 567 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { 568 conf.setBoolean(HBASE_SKIP_ERRORS, true); 569 generateWALs(Integer.MAX_VALUE); 570 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 571 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 572 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt 573 } 574 575 @Test 576 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { 577 conf.setBoolean(HBASE_SKIP_ERRORS, true); 578 generateWALs(Integer.MAX_VALUE); 579 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 580 Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false); 581 // the entries in the original logs are alternating regions 582 // considering the sequence file header, the middle corruption should 583 // affect at least half of the entries 584 int goodEntries = (NUM_WRITERS - 1) * ENTRIES; 585 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; 586 int allRegionsCount = splitAndCount(NUM_WRITERS, -1); 587 assertTrue("The file up to the corrupted area hasn't been parsed", 588 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount); 589 } 590 591 @Test 592 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { 593 conf.setBoolean(HBASE_SKIP_ERRORS, true); 594 List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays 595 .asList(FaultyProtobufLogReader.FailureType.values()).stream() 596 .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList()); 597 for (FaultyProtobufLogReader.FailureType failureType : failureTypes) { 598 final Set<String> walDirContents = splitCorruptWALs(failureType); 599 final Set<String> archivedLogs = new HashSet<>(); 600 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); 601 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 602 archived.append("\n\t").append(log.toString()); 603 archivedLogs.add(log.getPath().getName()); 604 } 605 LOG.debug(archived.toString()); 606 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs, 607 walDirContents); 608 } 609 } 610 611 /** 612 * @return set of wal names present prior to split attempt. 613 * @throws IOException if the split process fails 614 */ 615 private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType) 616 throws IOException { 617 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", 618 Reader.class); 619 InstrumentedLogWriter.activateFailure = false; 620 621 try { 622 conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class, 623 Reader.class); 624 conf.set("faultyprotobuflogreader.failuretype", failureType.name()); 625 // Clean up from previous tests or previous loop 626 try { 627 wals.shutdown(); 628 } catch (IOException exception) { 629 // since we're splitting out from under the factory, we should expect some closing failures. 630 LOG.debug("Ignoring problem closing WALFactory.", exception); 631 } 632 wals.close(); 633 try { 634 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 635 fs.delete(log.getPath(), true); 636 } 637 } catch (FileNotFoundException exception) { 638 LOG.debug("no previous CORRUPTDIR to clean."); 639 } 640 // change to the faulty reader 641 wals = new WALFactory(conf, name.getMethodName()); 642 generateWALs(-1); 643 // Our reader will render all of these files corrupt. 644 final Set<String> walDirContents = new HashSet<>(); 645 for (FileStatus status : fs.listStatus(WALDIR)) { 646 walDirContents.add(status.getPath().getName()); 647 } 648 useDifferentDFSClient(); 649 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 650 return walDirContents; 651 } finally { 652 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, 653 Reader.class); 654 } 655 } 656 657 @Test (expected = IOException.class) 658 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() 659 throws IOException { 660 conf.setBoolean(HBASE_SKIP_ERRORS, false); 661 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 662 } 663 664 @Test 665 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() 666 throws IOException { 667 conf.setBoolean(HBASE_SKIP_ERRORS, false); 668 try { 669 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 670 } catch (IOException e) { 671 LOG.debug("split with 'skip errors' set to 'false' correctly threw"); 672 } 673 assertEquals("if skip.errors is false all files should remain in place", 674 NUM_WRITERS, fs.listStatus(WALDIR).length); 675 } 676 677 private void ignoreCorruption(final Corruptions corruption, final int entryCount, 678 final int expectedCount) throws IOException { 679 conf.setBoolean(HBASE_SKIP_ERRORS, false); 680 681 final String REGION = "region__1"; 682 REGIONS.clear(); 683 REGIONS.add(REGION); 684 685 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); 686 generateWALs(1, entryCount, -1, 0); 687 corruptWAL(c1, corruption, true); 688 689 useDifferentDFSClient(); 690 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 691 692 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 693 assertEquals(1, splitLog.length); 694 695 int actualCount = 0; 696 Reader in = wals.createReader(fs, splitLog[0]); 697 @SuppressWarnings("unused") 698 Entry entry; 699 while ((entry = in.next()) != null) ++actualCount; 700 assertEquals(expectedCount, actualCount); 701 in.close(); 702 703 // should not have stored the EOF files as corrupt 704 FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); 705 assertEquals(0, archivedLogs.length); 706 707 } 708 709 @Test 710 public void testEOFisIgnored() throws IOException { 711 int entryCount = 10; 712 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1); 713 } 714 715 @Test 716 public void testCorruptWALTrailer() throws IOException { 717 int entryCount = 10; 718 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount); 719 } 720 721 @Test 722 public void testLogsGetArchivedAfterSplit() throws IOException { 723 conf.setBoolean(HBASE_SKIP_ERRORS, false); 724 generateWALs(-1); 725 useDifferentDFSClient(); 726 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 727 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); 728 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); 729 } 730 731 @Test 732 public void testSplit() throws IOException { 733 generateWALs(-1); 734 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 735 } 736 737 @Test 738 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() 739 throws IOException { 740 generateWALs(-1); 741 useDifferentDFSClient(); 742 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 743 FileStatus [] statuses = null; 744 try { 745 statuses = fs.listStatus(WALDIR); 746 if (statuses != null) { 747 fail("Files left in log dir: " + 748 Joiner.on(",").join(FileUtil.stat2Paths(statuses))); 749 } 750 } catch (FileNotFoundException e) { 751 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null 752 } 753 } 754 755 @Test(expected = IOException.class) 756 public void testSplitWillFailIfWritingToRegionFails() throws Exception { 757 //leave 5th log open so we could append the "trap" 758 Writer writer = generateWALs(4); 759 useDifferentDFSClient(); 760 761 String region = "break"; 762 Path regiondir = new Path(TABLEDIR, region); 763 fs.mkdirs(regiondir); 764 765 InstrumentedLogWriter.activateFailure = false; 766 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), 767 Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0); 768 writer.close(); 769 770 try { 771 InstrumentedLogWriter.activateFailure = true; 772 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 773 } catch (IOException e) { 774 assertTrue(e.getMessage(). 775 contains("This exception is instrumented and should only be thrown for testing")); 776 throw e; 777 } finally { 778 InstrumentedLogWriter.activateFailure = false; 779 } 780 } 781 782 @Test 783 public void testSplitDeletedRegion() throws IOException { 784 REGIONS.clear(); 785 String region = "region_that_splits"; 786 REGIONS.add(region); 787 788 generateWALs(1); 789 useDifferentDFSClient(); 790 791 Path regiondir = new Path(TABLEDIR, region); 792 fs.delete(regiondir, true); 793 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 794 assertFalse(fs.exists(regiondir)); 795 } 796 797 @Test 798 public void testIOEOnOutputThread() throws Exception { 799 conf.setBoolean(HBASE_SKIP_ERRORS, false); 800 801 generateWALs(-1); 802 useDifferentDFSClient(); 803 FileStatus[] logfiles = fs.listStatus(WALDIR); 804 assertTrue("There should be some log file", 805 logfiles != null && logfiles.length > 0); 806 // wals with no entries (like the one we don't use in the factory) 807 // won't cause a failure since nothing will ever be written. 808 // pick the largest one since it's most likely to have entries. 809 int largestLogFile = 0; 810 long largestSize = 0; 811 for (int i = 0; i < logfiles.length; i++) { 812 if (logfiles[i].getLen() > largestSize) { 813 largestLogFile = i; 814 largestSize = logfiles[i].getLen(); 815 } 816 } 817 assertTrue("There should be some log greater than size 0.", 0 < largestSize); 818 // Set up a splitter that will throw an IOE on the output side 819 WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { 820 @Override 821 protected Writer createWriter(Path logfile) throws IOException { 822 Writer mockWriter = Mockito.mock(Writer.class); 823 Mockito.doThrow(new IOException("Injected")).when( 824 mockWriter).append(Mockito.<Entry>any()); 825 return mockWriter; 826 } 827 }; 828 // Set up a background thread dumper. Needs a thread to depend on and then we need to run 829 // the thread dumping in a background thread so it does not hold up the test. 830 final AtomicBoolean stop = new AtomicBoolean(false); 831 final Thread someOldThread = new Thread("Some-old-thread") { 832 @Override 833 public void run() { 834 while(!stop.get()) Threads.sleep(10); 835 } 836 }; 837 someOldThread.setDaemon(true); 838 someOldThread.start(); 839 final Thread t = new Thread("Background-thread-dumper") { 840 @Override 841 public void run() { 842 try { 843 Threads.threadDumpingIsAlive(someOldThread); 844 } catch (InterruptedException e) { 845 e.printStackTrace(); 846 } 847 } 848 }; 849 t.setDaemon(true); 850 t.start(); 851 try { 852 logSplitter.splitLogFile(logfiles[largestLogFile], null); 853 fail("Didn't throw!"); 854 } catch (IOException ioe) { 855 assertTrue(ioe.toString().contains("Injected")); 856 } finally { 857 // Setting this to true will turn off the background thread dumper. 858 stop.set(true); 859 } 860 } 861 862 /** 863 * @param spiedFs should be instrumented for failure. 864 */ 865 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { 866 generateWALs(-1); 867 useDifferentDFSClient(); 868 869 try { 870 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); 871 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); 872 assertFalse(fs.exists(WALDIR)); 873 } catch (IOException e) { 874 fail("There shouldn't be any exception but: " + e.toString()); 875 } 876 } 877 878 // Test for HBASE-3412 879 @Test 880 public void testMovedWALDuringRecovery() throws Exception { 881 // This partial mock will throw LEE for every file simulating 882 // files that were moved 883 FileSystem spiedFs = Mockito.spy(fs); 884 // The "File does not exist" part is very important, 885 // that's how it comes out of HDFS 886 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). 887 when(spiedFs).append(Mockito.<Path>any()); 888 retryOverHdfsProblem(spiedFs); 889 } 890 891 @Test 892 public void testRetryOpenDuringRecovery() throws Exception { 893 FileSystem spiedFs = Mockito.spy(fs); 894 // The "Cannot obtain block length", "Could not obtain the last block", 895 // and "Blocklist for [^ ]* has changed.*" part is very important, 896 // that's how it comes out of HDFS. If HDFS changes the exception 897 // message, this test needs to be adjusted accordingly. 898 // 899 // When DFSClient tries to open a file, HDFS needs to locate 900 // the last block of the file and get its length. However, if the 901 // last block is under recovery, HDFS may have problem to obtain 902 // the block length, in which case, retry may help. 903 Mockito.doAnswer(new Answer<FSDataInputStream>() { 904 private final String[] errors = new String[] { 905 "Cannot obtain block length", "Could not obtain the last block", 906 "Blocklist for " + OLDLOGDIR + " has changed"}; 907 private int count = 0; 908 909 @Override 910 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 911 if (count < 3) { 912 throw new IOException(errors[count++]); 913 } 914 return (FSDataInputStream)invocation.callRealMethod(); 915 } 916 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); 917 retryOverHdfsProblem(spiedFs); 918 } 919 920 @Test 921 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { 922 generateWALs(1, 10, -1); 923 FileStatus logfile = fs.listStatus(WALDIR)[0]; 924 useDifferentDFSClient(); 925 926 final AtomicInteger count = new AtomicInteger(); 927 928 CancelableProgressable localReporter 929 = new CancelableProgressable() { 930 @Override 931 public boolean progress() { 932 count.getAndIncrement(); 933 return false; 934 } 935 }; 936 937 FileSystem spiedFs = Mockito.spy(fs); 938 Mockito.doAnswer(new Answer<FSDataInputStream>() { 939 @Override 940 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 941 Thread.sleep(1500); // Sleep a while and wait report status invoked 942 return (FSDataInputStream)invocation.callRealMethod(); 943 } 944 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); 945 946 try { 947 conf.setInt("hbase.splitlog.report.period", 1000); 948 boolean ret = WALSplitter.splitLogFile( 949 HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals); 950 assertFalse("Log splitting should failed", ret); 951 assertTrue(count.get() > 0); 952 } catch (IOException e) { 953 fail("There shouldn't be any exception but: " + e.toString()); 954 } finally { 955 // reset it back to its default value 956 conf.setInt("hbase.splitlog.report.period", 59000); 957 } 958 } 959 960 /** 961 * Test log split process with fake data and lots of edits to trigger threading 962 * issues. 963 */ 964 @Test 965 public void testThreading() throws Exception { 966 doTestThreading(20000, 128*1024*1024, 0); 967 } 968 969 /** 970 * Test blocking behavior of the log split process if writers are writing slower 971 * than the reader is reading. 972 */ 973 @Test 974 public void testThreadingSlowWriterSmallBuffer() throws Exception { 975 doTestThreading(200, 1024, 50); 976 } 977 978 /** 979 * Sets up a log splitter with a mock reader and writer. The mock reader generates 980 * a specified number of edits spread across 5 regions. The mock writer optionally 981 * sleeps for each edit it is fed. 982 * * 983 * After the split is complete, verifies that the statistics show the correct number 984 * of edits output into each region. 985 * 986 * @param numFakeEdits number of fake edits to push through pipeline 987 * @param bufferSize size of in-memory buffer 988 * @param writerSlowness writer threads will sleep this many ms per edit 989 */ 990 private void doTestThreading(final int numFakeEdits, 991 final int bufferSize, 992 final int writerSlowness) throws Exception { 993 994 Configuration localConf = new Configuration(conf); 995 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); 996 997 // Create a fake log file (we'll override the reader to produce a stream of edits) 998 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake"); 999 FSDataOutputStream out = fs.create(logPath); 1000 out.close(); 1001 1002 // Make region dirs for our destination regions so the output doesn't get skipped 1003 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 1004 makeRegionDirs(regions); 1005 1006 // Create a splitter that reads and writes the data without touching disk 1007 WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) { 1008 1009 /* Produce a mock writer that doesn't write anywhere */ 1010 @Override 1011 protected Writer createWriter(Path logfile) throws IOException { 1012 Writer mockWriter = Mockito.mock(Writer.class); 1013 Mockito.doAnswer(new Answer<Void>() { 1014 int expectedIndex = 0; 1015 1016 @Override 1017 public Void answer(InvocationOnMock invocation) { 1018 if (writerSlowness > 0) { 1019 try { 1020 Thread.sleep(writerSlowness); 1021 } catch (InterruptedException ie) { 1022 Thread.currentThread().interrupt(); 1023 } 1024 } 1025 Entry entry = (Entry) invocation.getArgument(0); 1026 WALEdit edit = entry.getEdit(); 1027 List<Cell> cells = edit.getCells(); 1028 assertEquals(1, cells.size()); 1029 Cell cell = cells.get(0); 1030 1031 // Check that the edits come in the right order. 1032 assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), 1033 cell.getRowLength())); 1034 expectedIndex++; 1035 return null; 1036 } 1037 }).when(mockWriter).append(Mockito.<Entry>any()); 1038 return mockWriter; 1039 } 1040 1041 /* Produce a mock reader that generates fake entries */ 1042 @Override 1043 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) 1044 throws IOException { 1045 Reader mockReader = Mockito.mock(Reader.class); 1046 Mockito.doAnswer(new Answer<Entry>() { 1047 int index = 0; 1048 1049 @Override 1050 public Entry answer(InvocationOnMock invocation) throws Throwable { 1051 if (index >= numFakeEdits) return null; 1052 1053 // Generate r0 through r4 in round robin fashion 1054 int regionIdx = index % regions.size(); 1055 byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)}; 1056 1057 Entry ret = createTestEntry(TABLE_NAME, region, 1058 Bytes.toBytes(index / regions.size()), 1059 FAMILY, QUALIFIER, VALUE, index); 1060 index++; 1061 return ret; 1062 } 1063 }).when(mockReader).next(); 1064 return mockReader; 1065 } 1066 }; 1067 1068 logSplitter.splitLogFile(fs.getFileStatus(logPath), null); 1069 1070 // Verify number of written edits per region 1071 Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts(); 1072 for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) { 1073 LOG.info("Got " + entry.getValue() + " output edits for region " + 1074 Bytes.toString(entry.getKey())); 1075 assertEquals((long)entry.getValue(), numFakeEdits / regions.size()); 1076 } 1077 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size()); 1078 } 1079 1080 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests? 1081 @Test 1082 public void testSplitLogFileDeletedRegionDir() throws IOException { 1083 LOG.info("testSplitLogFileDeletedRegionDir"); 1084 final String REGION = "region__1"; 1085 REGIONS.clear(); 1086 REGIONS.add(REGION); 1087 1088 generateWALs(1, 10, -1); 1089 useDifferentDFSClient(); 1090 1091 Path regiondir = new Path(TABLEDIR, REGION); 1092 LOG.info("Region directory is" + regiondir); 1093 fs.delete(regiondir, true); 1094 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1095 assertFalse(fs.exists(regiondir)); 1096 } 1097 1098 @Test 1099 public void testSplitLogFileEmpty() throws IOException { 1100 LOG.info("testSplitLogFileEmpty"); 1101 // we won't create the hlog dir until getWAL got called, so 1102 // make dir here when testing empty log file 1103 fs.mkdirs(WALDIR); 1104 injectEmptyFile(".empty", true); 1105 useDifferentDFSClient(); 1106 1107 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1108 Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME); 1109 assertFalse(fs.exists(tdir)); 1110 1111 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath())); 1112 } 1113 1114 @Test 1115 public void testSplitLogFileMultipleRegions() throws IOException { 1116 LOG.info("testSplitLogFileMultipleRegions"); 1117 generateWALs(1, 10, -1); 1118 splitAndCount(1, 10); 1119 } 1120 1121 @Test 1122 public void testSplitLogFileFirstLineCorruptionLog() 1123 throws IOException { 1124 conf.setBoolean(HBASE_SKIP_ERRORS, true); 1125 generateWALs(1, 10, -1); 1126 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1127 1128 corruptWAL(logfile.getPath(), 1129 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 1130 1131 useDifferentDFSClient(); 1132 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1133 1134 final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 1135 assertEquals(1, fs.listStatus(corruptDir).length); 1136 } 1137 1138 /** 1139 * @see "https://issues.apache.org/jira/browse/HBASE-4862" 1140 */ 1141 @Test 1142 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { 1143 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); 1144 // Generate wals for our destination region 1145 String regionName = "r0"; 1146 final Path regiondir = new Path(TABLEDIR, regionName); 1147 REGIONS.clear(); 1148 REGIONS.add(regionName); 1149 generateWALs(-1); 1150 1151 wals.getWAL(null); 1152 FileStatus[] logfiles = fs.listStatus(WALDIR); 1153 assertTrue("There should be some log file", 1154 logfiles != null && logfiles.length > 0); 1155 1156 WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { 1157 @Override 1158 protected Writer createWriter(Path logfile) 1159 throws IOException { 1160 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); 1161 // After creating writer, simulate region's 1162 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this 1163 // region and delete them, excluding files with '.temp' suffix. 1164 NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); 1165 if (files != null && !files.isEmpty()) { 1166 for (Path file : files) { 1167 if (!this.walFS.delete(file, false)) { 1168 LOG.error("Failed delete of " + file); 1169 } else { 1170 LOG.debug("Deleted recovered.edits file=" + file); 1171 } 1172 } 1173 } 1174 return writer; 1175 } 1176 }; 1177 try{ 1178 logSplitter.splitLogFile(logfiles[0], null); 1179 } catch (IOException e) { 1180 LOG.info(e.toString(), e); 1181 fail("Throws IOException when spliting " 1182 + "log, it is most likely because writing file does not " 1183 + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); 1184 } 1185 if (fs.exists(CORRUPTDIR)) { 1186 if (fs.listStatus(CORRUPTDIR).length > 0) { 1187 fail("There are some corrupt logs, " 1188 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); 1189 } 1190 } 1191 } 1192 1193 private Writer generateWALs(int leaveOpen) throws IOException { 1194 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0); 1195 } 1196 1197 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { 1198 return generateWALs(writers, entries, leaveOpen, 7); 1199 } 1200 1201 private void makeRegionDirs(List<String> regions) throws IOException { 1202 for (String region : regions) { 1203 LOG.debug("Creating dir for region " + region); 1204 fs.mkdirs(new Path(TABLEDIR, region)); 1205 } 1206 } 1207 1208 /** 1209 * @param leaveOpen index to leave un-closed. -1 to close all. 1210 * @return the writer that's still open, or null if all were closed. 1211 */ 1212 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException { 1213 makeRegionDirs(REGIONS); 1214 fs.mkdirs(WALDIR); 1215 Writer [] ws = new Writer[writers]; 1216 int seq = 0; 1217 int numRegionEventsAdded = 0; 1218 for (int i = 0; i < writers; i++) { 1219 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); 1220 for (int j = 0; j < entries; j++) { 1221 int prefix = 0; 1222 for (String region : REGIONS) { 1223 String row_key = region + prefix++ + i + j; 1224 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY, 1225 QUALIFIER, VALUE, seq++); 1226 1227 if (numRegionEventsAdded < regionEvents) { 1228 numRegionEventsAdded ++; 1229 appendRegionEvent(ws[i], region); 1230 } 1231 } 1232 } 1233 if (i != leaveOpen) { 1234 ws[i].close(); 1235 LOG.info("Closing writer " + i); 1236 } 1237 } 1238 if (leaveOpen < 0 || leaveOpen >= writers) { 1239 return null; 1240 } 1241 return ws[leaveOpen]; 1242 } 1243 1244 1245 1246 private Path[] getLogForRegion(TableName table, String region) 1247 throws IOException { 1248 Path tdir = FSUtils.getWALTableDir(conf, table); 1249 @SuppressWarnings("deprecation") 1250 Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, 1251 Bytes.toString(Bytes.toBytes(region)))); 1252 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 1253 @Override 1254 public boolean accept(Path p) { 1255 if (WALSplitter.isSequenceIdFile(p)) { 1256 return false; 1257 } 1258 return true; 1259 } 1260 }); 1261 Path[] paths = new Path[files.length]; 1262 for (int i = 0; i < files.length; i++) { 1263 paths[i] = files[i].getPath(); 1264 } 1265 return paths; 1266 } 1267 1268 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException { 1269 FSDataOutputStream out; 1270 int fileSize = (int) fs.listStatus(path)[0].getLen(); 1271 1272 FSDataInputStream in = fs.open(path); 1273 byte[] corrupted_bytes = new byte[fileSize]; 1274 in.readFully(0, corrupted_bytes, 0, fileSize); 1275 in.close(); 1276 1277 switch (corruption) { 1278 case APPEND_GARBAGE: 1279 fs.delete(path, false); 1280 out = fs.create(path); 1281 out.write(corrupted_bytes); 1282 out.write(Bytes.toBytes("-----")); 1283 closeOrFlush(close, out); 1284 break; 1285 1286 case INSERT_GARBAGE_ON_FIRST_LINE: 1287 fs.delete(path, false); 1288 out = fs.create(path); 1289 out.write(0); 1290 out.write(corrupted_bytes); 1291 closeOrFlush(close, out); 1292 break; 1293 1294 case INSERT_GARBAGE_IN_THE_MIDDLE: 1295 fs.delete(path, false); 1296 out = fs.create(path); 1297 int middle = (int) Math.floor(corrupted_bytes.length / 2); 1298 out.write(corrupted_bytes, 0, middle); 1299 out.write(0); 1300 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); 1301 closeOrFlush(close, out); 1302 break; 1303 1304 case TRUNCATE: 1305 fs.delete(path, false); 1306 out = fs.create(path); 1307 out.write(corrupted_bytes, 0, fileSize 1308 - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); 1309 closeOrFlush(close, out); 1310 break; 1311 1312 case TRUNCATE_TRAILER: 1313 fs.delete(path, false); 1314 out = fs.create(path); 1315 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. 1316 closeOrFlush(close, out); 1317 break; 1318 } 1319 } 1320 1321 private void closeOrFlush(boolean close, FSDataOutputStream out) 1322 throws IOException { 1323 if (close) { 1324 out.close(); 1325 } else { 1326 Method syncMethod = null; 1327 try { 1328 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{}); 1329 } catch (NoSuchMethodException e) { 1330 try { 1331 syncMethod = out.getClass().getMethod("sync", new Class<?> []{}); 1332 } catch (NoSuchMethodException ex) { 1333 throw new IOException("This version of Hadoop supports " + 1334 "neither Syncable.sync() nor Syncable.hflush()."); 1335 } 1336 } 1337 try { 1338 syncMethod.invoke(out, new Object[]{}); 1339 } catch (Exception e) { 1340 throw new IOException(e); 1341 } 1342 // Not in 0out.hflush(); 1343 } 1344 } 1345 1346 private int countWAL(Path log) throws IOException { 1347 int count = 0; 1348 Reader in = wals.createReader(fs, log); 1349 while (in.next() != null) { 1350 count++; 1351 } 1352 in.close(); 1353 return count; 1354 } 1355 1356 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs, 1357 String output) throws IOException { 1358 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder(); 1359 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes())) 1360 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes())) 1361 .setRegionName(ByteString.copyFrom(hri.getRegionName())) 1362 .setFamilyName(ByteString.copyFrom(FAMILY)) 1363 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY)) 1364 .addAllCompactionInput(Arrays.asList(inputs)) 1365 .addCompactionOutput(output); 1366 1367 WALEdit edit = WALEdit.createCompaction(hri, desc.build()); 1368 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, 1369 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); 1370 w.append(new Entry(key, edit)); 1371 w.sync(false); 1372 } 1373 1374 private static void appendRegionEvent(Writer w, String region) throws IOException { 1375 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( 1376 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, 1377 TABLE_NAME.toBytes(), 1378 Bytes.toBytes(region), 1379 Bytes.toBytes(String.valueOf(region.hashCode())), 1380 1, 1381 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of()); 1382 final long time = EnvironmentEdgeManager.currentTime(); 1383 final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, 1384 HConstants.DEFAULT_CLUSTER_ID); 1385 WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc); 1386 w.append(new Entry(walKey, we)); 1387 w.sync(false); 1388 } 1389 1390 public static long appendEntry(Writer writer, TableName table, byte[] region, 1391 byte[] row, byte[] family, byte[] qualifier, 1392 byte[] value, long seq) 1393 throws IOException { 1394 LOG.info(Thread.currentThread().getName() + " append"); 1395 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); 1396 LOG.info(Thread.currentThread().getName() + " sync"); 1397 writer.sync(false); 1398 return seq; 1399 } 1400 1401 private static Entry createTestEntry( 1402 TableName table, byte[] region, 1403 byte[] row, byte[] family, byte[] qualifier, 1404 byte[] value, long seq) { 1405 long time = System.nanoTime(); 1406 1407 seq++; 1408 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); 1409 WALEdit edit = new WALEdit(); 1410 edit.add(cell); 1411 return new Entry(new WALKeyImpl(region, table, seq, time, 1412 HConstants.DEFAULT_CLUSTER_ID), edit); 1413 } 1414 1415 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { 1416 Writer writer = 1417 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); 1418 if (closeFile) { 1419 writer.close(); 1420 } 1421 } 1422 1423 private boolean logsAreEqual(Path p1, Path p2) throws IOException { 1424 Reader in1, in2; 1425 in1 = wals.createReader(fs, p1); 1426 in2 = wals.createReader(fs, p2); 1427 Entry entry1; 1428 Entry entry2; 1429 while ((entry1 = in1.next()) != null) { 1430 entry2 = in2.next(); 1431 if ((entry1.getKey().compareTo(entry2.getKey()) != 0) || 1432 (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) { 1433 return false; 1434 } 1435 } 1436 in1.close(); 1437 in2.close(); 1438 return true; 1439 } 1440}