001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.lang.reflect.Method; 027import java.security.PrivilegedExceptionAction; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.NavigableSet; 036import java.util.Objects; 037import java.util.Set; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.AtomicLong; 041import java.util.stream.Collectors; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FSDataInputStream; 044import org.apache.hadoop.fs.FSDataOutputStream; 045import org.apache.hadoop.fs.FileStatus; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.FileUtil; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.fs.PathFilter; 050import org.apache.hadoop.hbase.Cell; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtility; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.KeyValue; 056import org.apache.hadoop.hbase.ServerName; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.client.RegionInfo; 059import org.apache.hadoop.hbase.client.RegionInfoBuilder; 060import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 061import org.apache.hadoop.hbase.regionserver.HRegion; 062import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; 063import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; 064import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 065import org.apache.hadoop.hbase.security.User; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.testclassification.RegionServerTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CancelableProgressable; 070import org.apache.hadoop.hbase.util.CommonFSUtils; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.hbase.wal.WAL.Entry; 074import org.apache.hadoop.hbase.wal.WAL.Reader; 075import org.apache.hadoop.hbase.wal.WALProvider.Writer; 076import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; 077import org.apache.hadoop.hdfs.DFSTestUtil; 078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 079import org.apache.hadoop.ipc.RemoteException; 080import org.junit.After; 081import org.junit.AfterClass; 082import org.junit.Before; 083import org.junit.BeforeClass; 084import org.junit.ClassRule; 085import org.junit.Rule; 086import org.junit.Test; 087import org.junit.experimental.categories.Category; 088import org.junit.rules.TestName; 089import org.mockito.Mockito; 090import org.mockito.invocation.InvocationOnMock; 091import org.mockito.stubbing.Answer; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 095import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 097import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 098import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 100 101/** 102 * Testing {@link WAL} splitting code. 103 */ 104@Category({RegionServerTests.class, LargeTests.class}) 105public class TestWALSplit { 106 @ClassRule 107 public static final HBaseClassTestRule CLASS_RULE = 108 HBaseClassTestRule.forClass(TestWALSplit.class); 109 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class); 110 111 private static Configuration conf; 112 private FileSystem fs; 113 114 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 115 116 private Path HBASEDIR; 117 private Path HBASELOGDIR; 118 private Path WALDIR; 119 private Path OLDLOGDIR; 120 private Path CORRUPTDIR; 121 private Path TABLEDIR; 122 private String TMPDIRNAME; 123 124 private static final int NUM_WRITERS = 10; 125 private static final int ENTRIES = 10; // entries per writer per region 126 127 private static final String FILENAME_BEING_SPLIT = "testfile"; 128 private static final TableName TABLE_NAME = 129 TableName.valueOf("t1"); 130 private static final byte[] FAMILY = Bytes.toBytes("f1"); 131 private static final byte[] QUALIFIER = Bytes.toBytes("q1"); 132 private static final byte[] VALUE = Bytes.toBytes("v1"); 133 private static final String WAL_FILE_PREFIX = "wal.dat."; 134 private static List<String> REGIONS = new ArrayList<>(); 135 private static String ROBBER; 136 private static String ZOMBIE; 137 private static String [] GROUP = new String [] {"supergroup"}; 138 139 static enum Corruptions { 140 INSERT_GARBAGE_ON_FIRST_LINE, 141 INSERT_GARBAGE_IN_THE_MIDDLE, 142 APPEND_GARBAGE, 143 TRUNCATE, 144 TRUNCATE_TRAILER 145 } 146 147 @BeforeClass 148 public static void setUpBeforeClass() throws Exception { 149 conf = TEST_UTIL.getConfiguration(); 150 conf.setClass("hbase.regionserver.hlog.writer.impl", 151 InstrumentedLogWriter.class, Writer.class); 152 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. 153 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 154 // Create fake maping user to group and set it to the conf. 155 Map<String, String []> u2g_map = new HashMap<>(2); 156 ROBBER = User.getCurrent().getName() + "-robber"; 157 ZOMBIE = User.getCurrent().getName() + "-zombie"; 158 u2g_map.put(ROBBER, GROUP); 159 u2g_map.put(ZOMBIE, GROUP); 160 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); 161 conf.setInt("dfs.heartbeat.interval", 1); 162 TEST_UTIL.startMiniDFSCluster(2); 163 } 164 165 @AfterClass 166 public static void tearDownAfterClass() throws Exception { 167 TEST_UTIL.shutdownMiniDFSCluster(); 168 } 169 170 @Rule 171 public TestName name = new TestName(); 172 private WALFactory wals = null; 173 174 @Before 175 public void setUp() throws Exception { 176 LOG.info("Cleaning up cluster for new test."); 177 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 178 HBASEDIR = TEST_UTIL.createRootDir(); 179 HBASELOGDIR = TEST_UTIL.createWALRootDir(); 180 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME); 181 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME); 182 TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 183 TMPDIRNAME = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 184 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 185 REGIONS.clear(); 186 Collections.addAll(REGIONS, "bbb", "ccc"); 187 InstrumentedLogWriter.activateFailure = false; 188 wals = new WALFactory(conf, name.getMethodName()); 189 WALDIR = new Path(HBASELOGDIR, 190 AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(), 191 16010, System.currentTimeMillis()).toString())); 192 //fs.mkdirs(WALDIR); 193 } 194 195 @After 196 public void tearDown() throws Exception { 197 try { 198 wals.close(); 199 } catch(IOException exception) { 200 // Some tests will move WALs out from under us. In those cases, we'll get an error on close. 201 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" + 202 " you see a failure look here."); 203 LOG.debug("exception details", exception); 204 } finally { 205 wals = null; 206 fs.delete(HBASEDIR, true); 207 fs.delete(HBASELOGDIR, true); 208 } 209 } 210 211 /** 212 * Simulates splitting a WAL out from under a regionserver that is still trying to write it. 213 * Ensures we do not lose edits. 214 */ 215 @Test 216 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { 217 final AtomicLong counter = new AtomicLong(0); 218 AtomicBoolean stop = new AtomicBoolean(false); 219 // Region we'll write edits too and then later examine to make sure they all made it in. 220 final String region = REGIONS.get(0); 221 final int numWriters = 3; 222 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters); 223 try { 224 long startCount = counter.get(); 225 zombie.start(); 226 // Wait till writer starts going. 227 while (startCount == counter.get()) Threads.sleep(1); 228 // Give it a second to write a few appends. 229 Threads.sleep(1000); 230 final Configuration conf2 = HBaseConfiguration.create(conf); 231 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP); 232 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() { 233 @Override 234 public Integer run() throws Exception { 235 StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR) 236 .append("):\n"); 237 for (FileStatus status : fs.listStatus(WALDIR)) { 238 ls.append("\t").append(status.toString()).append("\n"); 239 } 240 LOG.debug(Objects.toString(ls)); 241 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); 242 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); 243 LOG.info("Finished splitting out from under zombie."); 244 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 245 assertEquals("wrong number of split files for region", numWriters, logfiles.length); 246 int count = 0; 247 for (Path logfile: logfiles) { 248 count += countWAL(logfile); 249 } 250 return count; 251 } 252 }); 253 LOG.info("zombie=" + counter.get() + ", robber=" + count); 254 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " + 255 "Zombie could write " + counter.get() + " and logfile had only " + count, 256 counter.get() == count || counter.get() + 1 == count); 257 } finally { 258 stop.set(true); 259 zombie.interrupt(); 260 Threads.threadDumpingIsAlive(zombie); 261 } 262 } 263 264 /** 265 * This thread will keep writing to a 'wal' file even after the split process has started. 266 * It simulates a region server that was considered dead but woke up and wrote some more to the 267 * last log entry. Does its writing as an alternate user in another filesystem instance to 268 * simulate better it being a regionserver. 269 */ 270 class ZombieLastLogWriterRegionServer extends Thread { 271 final AtomicLong editsCount; 272 final AtomicBoolean stop; 273 final int numOfWriters; 274 /** 275 * Region to write edits for. 276 */ 277 final String region; 278 final User user; 279 280 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop, 281 final String region, final int writers) 282 throws IOException, InterruptedException { 283 super("ZombieLastLogWriterRegionServer"); 284 setDaemon(true); 285 this.stop = stop; 286 this.editsCount = counter; 287 this.region = region; 288 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP); 289 numOfWriters = writers; 290 } 291 292 @Override 293 public void run() { 294 try { 295 doWriting(); 296 } catch (IOException e) { 297 LOG.warn(getName() + " Writer exiting " + e); 298 } catch (InterruptedException e) { 299 LOG.warn(getName() + " Writer exiting " + e); 300 } 301 } 302 303 private void doWriting() throws IOException, InterruptedException { 304 this.user.runAs(new PrivilegedExceptionAction<Object>() { 305 @Override 306 public Object run() throws Exception { 307 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose 308 // index we supply here. 309 int walToKeepOpen = numOfWriters - 1; 310 // The below method writes numOfWriters files each with ENTRIES entries for a total of 311 // numOfWriters * ENTRIES added per column family in the region. 312 Writer writer = null; 313 try { 314 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); 315 } catch (IOException e1) { 316 throw new RuntimeException("Failed", e1); 317 } 318 // Update counter so has all edits written so far. 319 editsCount.addAndGet(numOfWriters * ENTRIES); 320 loop(writer); 321 // If we've been interruped, then things should have shifted out from under us. 322 // closing should error 323 try { 324 writer.close(); 325 fail("Writing closing after parsing should give an error."); 326 } catch (IOException exception) { 327 LOG.debug("ignoring error when closing final writer.", exception); 328 } 329 return null; 330 } 331 }); 332 } 333 334 private void loop(final Writer writer) { 335 byte [] regionBytes = Bytes.toBytes(this.region); 336 while (!stop.get()) { 337 try { 338 long seq = appendEntry(writer, TABLE_NAME, regionBytes, 339 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0); 340 long count = editsCount.incrementAndGet(); 341 LOG.info(getName() + " sync count=" + count + ", seq=" + seq); 342 try { 343 Thread.sleep(1); 344 } catch (InterruptedException e) { 345 // 346 } 347 } catch (IOException ex) { 348 LOG.error(getName() + " ex " + ex.toString()); 349 if (ex instanceof RemoteException) { 350 LOG.error("Juliet: got RemoteException " + ex.getMessage() + 351 " while writing " + (editsCount.get() + 1)); 352 } else { 353 LOG.error(getName() + " failed to write....at " + editsCount.get()); 354 fail("Failed to write " + editsCount.get()); 355 } 356 break; 357 } catch (Throwable t) { 358 LOG.error(getName() + " HOW? " + t); 359 LOG.debug("exception details", t); 360 break; 361 } 362 } 363 LOG.info(getName() + " Writer exiting"); 364 } 365 } 366 367 /** 368 * @see "https://issues.apache.org/jira/browse/HBASE-3020" 369 */ 370 @Test 371 public void testRecoveredEditsPathForMeta() throws IOException { 372 Path p = createRecoveredEditsPathForRegion(); 373 String parentOfParent = p.getParent().getParent().getName(); 374 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 375 } 376 377 /** 378 * Test old recovered edits file doesn't break WALSplitter. 379 * This is useful in upgrading old instances. 380 */ 381 @Test 382 public void testOldRecoveredEditsFileSidelined() throws IOException { 383 Path p = createRecoveredEditsPathForRegion(); 384 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); 385 Path regiondir = new Path(tdir, 386 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 387 fs.mkdirs(regiondir); 388 Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 389 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); 390 fs.createNewFile(parent); // create a recovered.edits file 391 String parentOfParent = p.getParent().getParent().getName(); 392 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 393 WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); 394 } 395 396 private Path createRecoveredEditsPathForRegion() throws IOException { 397 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); 398 long now = System.currentTimeMillis(); 399 Entry entry = new Entry( 400 new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), 401 new WALEdit()); 402 Path p = WALSplitUtil 403 .getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT, 404 TMPDIRNAME, conf); 405 return p; 406 } 407 408 @Test 409 public void testHasRecoveredEdits() throws IOException { 410 Path p = createRecoveredEditsPathForRegion(); 411 assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 412 String renamedEdit = p.getName().split("-")[0]; 413 fs.createNewFile(new Path(p.getParent(), renamedEdit)); 414 assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 415 } 416 417 private void useDifferentDFSClient() throws IOException { 418 // make fs act as a different client now 419 // initialize will create a new DFSClient with a new client ID 420 fs.initialize(fs.getUri(), conf); 421 } 422 423 @Test 424 public void testSplitPreservesEdits() throws IOException{ 425 final String REGION = "region__1"; 426 REGIONS.clear(); 427 REGIONS.add(REGION); 428 429 generateWALs(1, 10, -1, 0); 430 useDifferentDFSClient(); 431 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 432 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 433 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 434 assertEquals(1, splitLog.length); 435 436 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 437 } 438 439 @Test 440 public void testSplitRemovesRegionEventsEdits() throws IOException{ 441 final String REGION = "region__1"; 442 REGIONS.clear(); 443 REGIONS.add(REGION); 444 445 generateWALs(1, 10, -1, 100); 446 useDifferentDFSClient(); 447 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 448 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 449 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 450 assertEquals(1, splitLog.length); 451 452 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 453 // split log should only have the test edits 454 assertEquals(10, countWAL(splitLog[0])); 455 } 456 457 458 @Test 459 public void testSplitLeavesCompactionEventsEdits() throws IOException{ 460 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 461 REGIONS.clear(); 462 REGIONS.add(hri.getEncodedName()); 463 Path regionDir = 464 new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName()); 465 LOG.info("Creating region directory: " + regionDir); 466 assertTrue(fs.mkdirs(regionDir)); 467 468 Writer writer = generateWALs(1, 10, 0, 10); 469 String[] compactInputs = new String[]{"file1", "file2", "file3"}; 470 String compactOutput = "file4"; 471 appendCompactionEvent(writer, hri, compactInputs, compactOutput); 472 writer.close(); 473 474 useDifferentDFSClient(); 475 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 476 477 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 478 // original log should have 10 test edits, 10 region markers, 1 compaction marker 479 assertEquals(21, countWAL(originalLog)); 480 481 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); 482 assertEquals(1, splitLog.length); 483 484 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 485 // split log should have 10 test edits plus 1 compaction marker 486 assertEquals(11, countWAL(splitLog[0])); 487 } 488 489 /** 490 * @param expectedEntries -1 to not assert 491 * @return the count across all regions 492 */ 493 private int splitAndCount(final int expectedFiles, final int expectedEntries) 494 throws IOException { 495 useDifferentDFSClient(); 496 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 497 int result = 0; 498 for (String region : REGIONS) { 499 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 500 assertEquals(expectedFiles, logfiles.length); 501 int count = 0; 502 for (Path logfile: logfiles) { 503 count += countWAL(logfile); 504 } 505 if (-1 != expectedEntries) { 506 assertEquals(expectedEntries, count); 507 } 508 result += count; 509 } 510 return result; 511 } 512 513 @Test 514 public void testEmptyLogFiles() throws IOException { 515 testEmptyLogFiles(true); 516 } 517 518 @Test 519 public void testEmptyOpenLogFiles() throws IOException { 520 testEmptyLogFiles(false); 521 } 522 523 private void testEmptyLogFiles(final boolean close) throws IOException { 524 // we won't create the hlog dir until getWAL got called, so 525 // make dir here when testing empty log file 526 fs.mkdirs(WALDIR); 527 injectEmptyFile(".empty", close); 528 generateWALs(Integer.MAX_VALUE); 529 injectEmptyFile("empty", close); 530 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty 531 } 532 533 @Test 534 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { 535 // generate logs but leave wal.dat.5 open. 536 generateWALs(5); 537 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 538 } 539 540 @Test 541 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { 542 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 543 generateWALs(Integer.MAX_VALUE); 544 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 545 Corruptions.APPEND_GARBAGE, true); 546 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 547 } 548 549 @Test 550 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { 551 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 552 generateWALs(Integer.MAX_VALUE); 553 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 554 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 555 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt 556 } 557 558 @Test 559 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { 560 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 561 generateWALs(Integer.MAX_VALUE); 562 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), 563 Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false); 564 // the entries in the original logs are alternating regions 565 // considering the sequence file header, the middle corruption should 566 // affect at least half of the entries 567 int goodEntries = (NUM_WRITERS - 1) * ENTRIES; 568 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; 569 int allRegionsCount = splitAndCount(NUM_WRITERS, -1); 570 assertTrue("The file up to the corrupted area hasn't been parsed", 571 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount); 572 } 573 574 @Test 575 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { 576 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 577 List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays 578 .asList(FaultyProtobufLogReader.FailureType.values()).stream() 579 .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList()); 580 for (FaultyProtobufLogReader.FailureType failureType : failureTypes) { 581 final Set<String> walDirContents = splitCorruptWALs(failureType); 582 final Set<String> archivedLogs = new HashSet<>(); 583 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); 584 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 585 archived.append("\n\t").append(log.toString()); 586 archivedLogs.add(log.getPath().getName()); 587 } 588 LOG.debug(archived.toString()); 589 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs, 590 walDirContents); 591 } 592 } 593 594 /** 595 * @return set of wal names present prior to split attempt. 596 * @throws IOException if the split process fails 597 */ 598 private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType) 599 throws IOException { 600 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", 601 Reader.class); 602 InstrumentedLogWriter.activateFailure = false; 603 604 try { 605 conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class, 606 Reader.class); 607 conf.set("faultyprotobuflogreader.failuretype", failureType.name()); 608 // Clean up from previous tests or previous loop 609 try { 610 wals.shutdown(); 611 } catch (IOException exception) { 612 // since we're splitting out from under the factory, we should expect some closing failures. 613 LOG.debug("Ignoring problem closing WALFactory.", exception); 614 } 615 wals.close(); 616 try { 617 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 618 fs.delete(log.getPath(), true); 619 } 620 } catch (FileNotFoundException exception) { 621 LOG.debug("no previous CORRUPTDIR to clean."); 622 } 623 // change to the faulty reader 624 wals = new WALFactory(conf, name.getMethodName()); 625 generateWALs(-1); 626 // Our reader will render all of these files corrupt. 627 final Set<String> walDirContents = new HashSet<>(); 628 for (FileStatus status : fs.listStatus(WALDIR)) { 629 walDirContents.add(status.getPath().getName()); 630 } 631 useDifferentDFSClient(); 632 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 633 return walDirContents; 634 } finally { 635 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, 636 Reader.class); 637 } 638 } 639 640 @Test (expected = IOException.class) 641 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() 642 throws IOException { 643 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 644 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 645 } 646 647 @Test 648 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() 649 throws IOException { 650 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 651 try { 652 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 653 } catch (IOException e) { 654 LOG.debug("split with 'skip errors' set to 'false' correctly threw"); 655 } 656 assertEquals("if skip.errors is false all files should remain in place", 657 NUM_WRITERS, fs.listStatus(WALDIR).length); 658 } 659 660 private void ignoreCorruption(final Corruptions corruption, final int entryCount, 661 final int expectedCount) throws IOException { 662 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 663 664 final String REGION = "region__1"; 665 REGIONS.clear(); 666 REGIONS.add(REGION); 667 668 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); 669 generateWALs(1, entryCount, -1, 0); 670 corruptWAL(c1, corruption, true); 671 672 useDifferentDFSClient(); 673 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 674 675 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 676 assertEquals(1, splitLog.length); 677 678 int actualCount = 0; 679 Reader in = wals.createReader(fs, splitLog[0]); 680 @SuppressWarnings("unused") 681 Entry entry; 682 while ((entry = in.next()) != null) ++actualCount; 683 assertEquals(expectedCount, actualCount); 684 in.close(); 685 686 // should not have stored the EOF files as corrupt 687 FileStatus[] archivedLogs = fs.exists(CORRUPTDIR)? fs.listStatus(CORRUPTDIR): new FileStatus[0]; 688 assertEquals(0, archivedLogs.length); 689 690 } 691 692 @Test 693 public void testEOFisIgnored() throws IOException { 694 int entryCount = 10; 695 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1); 696 } 697 698 @Test 699 public void testCorruptWALTrailer() throws IOException { 700 int entryCount = 10; 701 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount); 702 } 703 704 @Test 705 public void testLogsGetArchivedAfterSplit() throws IOException { 706 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 707 generateWALs(-1); 708 useDifferentDFSClient(); 709 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 710 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); 711 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); 712 } 713 714 @Test 715 public void testSplit() throws IOException { 716 generateWALs(-1); 717 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 718 } 719 720 @Test 721 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() 722 throws IOException { 723 generateWALs(-1); 724 useDifferentDFSClient(); 725 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 726 FileStatus [] statuses = null; 727 try { 728 statuses = fs.listStatus(WALDIR); 729 if (statuses != null) { 730 fail("Files left in log dir: " + 731 Joiner.on(",").join(FileUtil.stat2Paths(statuses))); 732 } 733 } catch (FileNotFoundException e) { 734 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null 735 } 736 } 737 738 @Test(expected = IOException.class) 739 public void testSplitWillFailIfWritingToRegionFails() throws Exception { 740 //leave 5th log open so we could append the "trap" 741 Writer writer = generateWALs(4); 742 useDifferentDFSClient(); 743 744 String region = "break"; 745 Path regiondir = new Path(TABLEDIR, region); 746 fs.mkdirs(regiondir); 747 748 InstrumentedLogWriter.activateFailure = false; 749 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), 750 Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0); 751 writer.close(); 752 753 try { 754 InstrumentedLogWriter.activateFailure = true; 755 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 756 } catch (IOException e) { 757 assertTrue(e.getMessage(). 758 contains("This exception is instrumented and should only be thrown for testing")); 759 throw e; 760 } finally { 761 InstrumentedLogWriter.activateFailure = false; 762 } 763 } 764 765 @Test 766 public void testSplitDeletedRegion() throws IOException { 767 REGIONS.clear(); 768 String region = "region_that_splits"; 769 REGIONS.add(region); 770 771 generateWALs(1); 772 useDifferentDFSClient(); 773 774 Path regiondir = new Path(TABLEDIR, region); 775 fs.delete(regiondir, true); 776 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 777 assertFalse(fs.exists(regiondir)); 778 } 779 780 @Test 781 public void testIOEOnOutputThread() throws Exception { 782 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 783 784 generateWALs(-1); 785 useDifferentDFSClient(); 786 FileStatus[] logfiles = fs.listStatus(WALDIR); 787 assertTrue("There should be some log file", 788 logfiles != null && logfiles.length > 0); 789 // wals with no entries (like the one we don't use in the factory) 790 // won't cause a failure since nothing will ever be written. 791 // pick the largest one since it's most likely to have entries. 792 int largestLogFile = 0; 793 long largestSize = 0; 794 for (int i = 0; i < logfiles.length; i++) { 795 if (logfiles[i].getLen() > largestSize) { 796 largestLogFile = i; 797 largestSize = logfiles[i].getLen(); 798 } 799 } 800 assertTrue("There should be some log greater than size 0.", 0 < largestSize); 801 // Set up a splitter that will throw an IOE on the output side 802 WALSplitter logSplitter = 803 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 804 @Override 805 protected Writer createWriter(Path logfile) throws IOException { 806 Writer mockWriter = Mockito.mock(Writer.class); 807 Mockito.doThrow(new IOException("Injected")).when(mockWriter) 808 .append(Mockito.<Entry> any()); 809 return mockWriter; 810 } 811 }; 812 // Set up a background thread dumper. Needs a thread to depend on and then we need to run 813 // the thread dumping in a background thread so it does not hold up the test. 814 final AtomicBoolean stop = new AtomicBoolean(false); 815 final Thread someOldThread = new Thread("Some-old-thread") { 816 @Override 817 public void run() { 818 while(!stop.get()) Threads.sleep(10); 819 } 820 }; 821 someOldThread.setDaemon(true); 822 someOldThread.start(); 823 final Thread t = new Thread("Background-thread-dumper") { 824 @Override 825 public void run() { 826 try { 827 Threads.threadDumpingIsAlive(someOldThread); 828 } catch (InterruptedException e) { 829 e.printStackTrace(); 830 } 831 } 832 }; 833 t.setDaemon(true); 834 t.start(); 835 try { 836 logSplitter.splitWAL(logfiles[largestLogFile], null); 837 fail("Didn't throw!"); 838 } catch (IOException ioe) { 839 assertTrue(ioe.toString().contains("Injected")); 840 } finally { 841 // Setting this to true will turn off the background thread dumper. 842 stop.set(true); 843 } 844 } 845 846 /** 847 * @param spiedFs should be instrumented for failure. 848 */ 849 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { 850 generateWALs(-1); 851 useDifferentDFSClient(); 852 853 try { 854 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); 855 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); 856 assertFalse(fs.exists(WALDIR)); 857 } catch (IOException e) { 858 fail("There shouldn't be any exception but: " + e.toString()); 859 } 860 } 861 862 // Test for HBASE-3412 863 @Test 864 public void testMovedWALDuringRecovery() throws Exception { 865 // This partial mock will throw LEE for every file simulating 866 // files that were moved 867 FileSystem spiedFs = Mockito.spy(fs); 868 // The "File does not exist" part is very important, 869 // that's how it comes out of HDFS 870 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). 871 when(spiedFs).append(Mockito.<Path>any()); 872 retryOverHdfsProblem(spiedFs); 873 } 874 875 @Test 876 public void testRetryOpenDuringRecovery() throws Exception { 877 FileSystem spiedFs = Mockito.spy(fs); 878 // The "Cannot obtain block length", "Could not obtain the last block", 879 // and "Blocklist for [^ ]* has changed.*" part is very important, 880 // that's how it comes out of HDFS. If HDFS changes the exception 881 // message, this test needs to be adjusted accordingly. 882 // 883 // When DFSClient tries to open a file, HDFS needs to locate 884 // the last block of the file and get its length. However, if the 885 // last block is under recovery, HDFS may have problem to obtain 886 // the block length, in which case, retry may help. 887 Mockito.doAnswer(new Answer<FSDataInputStream>() { 888 private final String[] errors = new String[] { 889 "Cannot obtain block length", "Could not obtain the last block", 890 "Blocklist for " + OLDLOGDIR + " has changed"}; 891 private int count = 0; 892 893 @Override 894 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 895 if (count < 3) { 896 throw new IOException(errors[count++]); 897 } 898 return (FSDataInputStream)invocation.callRealMethod(); 899 } 900 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); 901 retryOverHdfsProblem(spiedFs); 902 } 903 904 @Test 905 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { 906 generateWALs(1, 10, -1); 907 FileStatus logfile = fs.listStatus(WALDIR)[0]; 908 useDifferentDFSClient(); 909 910 final AtomicInteger count = new AtomicInteger(); 911 912 CancelableProgressable localReporter 913 = new CancelableProgressable() { 914 @Override 915 public boolean progress() { 916 count.getAndIncrement(); 917 return false; 918 } 919 }; 920 921 FileSystem spiedFs = Mockito.spy(fs); 922 Mockito.doAnswer(new Answer<FSDataInputStream>() { 923 @Override 924 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 925 Thread.sleep(1500); // Sleep a while and wait report status invoked 926 return (FSDataInputStream)invocation.callRealMethod(); 927 } 928 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt()); 929 930 try { 931 conf.setInt("hbase.splitlog.report.period", 1000); 932 boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null, 933 Mockito.mock(SplitLogWorkerCoordination.class), wals, null); 934 assertFalse("Log splitting should failed", ret); 935 assertTrue(count.get() > 0); 936 } catch (IOException e) { 937 fail("There shouldn't be any exception but: " + e.toString()); 938 } finally { 939 // reset it back to its default value 940 conf.setInt("hbase.splitlog.report.period", 59000); 941 } 942 } 943 944 /** 945 * Test log split process with fake data and lots of edits to trigger threading 946 * issues. 947 */ 948 @Test 949 public void testThreading() throws Exception { 950 doTestThreading(20000, 128*1024*1024, 0); 951 } 952 953 /** 954 * Test blocking behavior of the log split process if writers are writing slower 955 * than the reader is reading. 956 */ 957 @Test 958 public void testThreadingSlowWriterSmallBuffer() throws Exception { 959 doTestThreading(200, 1024, 50); 960 } 961 962 /** 963 * Sets up a log splitter with a mock reader and writer. The mock reader generates 964 * a specified number of edits spread across 5 regions. The mock writer optionally 965 * sleeps for each edit it is fed. 966 * * 967 * After the split is complete, verifies that the statistics show the correct number 968 * of edits output into each region. 969 * 970 * @param numFakeEdits number of fake edits to push through pipeline 971 * @param bufferSize size of in-memory buffer 972 * @param writerSlowness writer threads will sleep this many ms per edit 973 */ 974 private void doTestThreading(final int numFakeEdits, 975 final int bufferSize, 976 final int writerSlowness) throws Exception { 977 978 Configuration localConf = new Configuration(conf); 979 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); 980 981 // Create a fake log file (we'll override the reader to produce a stream of edits) 982 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake"); 983 FSDataOutputStream out = fs.create(logPath); 984 out.close(); 985 986 // Make region dirs for our destination regions so the output doesn't get skipped 987 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 988 makeRegionDirs(regions); 989 990 // Create a splitter that reads and writes the data without touching disk 991 WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) { 992 /* Produce a mock writer that doesn't write anywhere */ 993 @Override 994 protected Writer createWriter(Path logfile) throws IOException { 995 Writer mockWriter = Mockito.mock(Writer.class); 996 Mockito.doAnswer(new Answer<Void>() { 997 int expectedIndex = 0; 998 999 @Override 1000 public Void answer(InvocationOnMock invocation) { 1001 if (writerSlowness > 0) { 1002 try { 1003 Thread.sleep(writerSlowness); 1004 } catch (InterruptedException ie) { 1005 Thread.currentThread().interrupt(); 1006 } 1007 } 1008 Entry entry = (Entry) invocation.getArgument(0); 1009 WALEdit edit = entry.getEdit(); 1010 List<Cell> cells = edit.getCells(); 1011 assertEquals(1, cells.size()); 1012 Cell cell = cells.get(0); 1013 1014 // Check that the edits come in the right order. 1015 assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), 1016 cell.getRowLength())); 1017 expectedIndex++; 1018 return null; 1019 } 1020 }).when(mockWriter).append(Mockito.<Entry>any()); 1021 return mockWriter; 1022 } 1023 1024 /* Produce a mock reader that generates fake entries */ 1025 @Override 1026 protected Reader getReader(FileStatus file, boolean skipErrors, 1027 CancelableProgressable reporter) throws IOException, CorruptedLogFileException { 1028 Reader mockReader = Mockito.mock(Reader.class); 1029 Mockito.doAnswer(new Answer<Entry>() { 1030 int index = 0; 1031 1032 @Override 1033 public Entry answer(InvocationOnMock invocation) throws Throwable { 1034 if (index >= numFakeEdits) return null; 1035 1036 // Generate r0 through r4 in round robin fashion 1037 int regionIdx = index % regions.size(); 1038 byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)}; 1039 1040 Entry ret = createTestEntry(TABLE_NAME, region, 1041 Bytes.toBytes(index / regions.size()), 1042 FAMILY, QUALIFIER, VALUE, index); 1043 index++; 1044 return ret; 1045 } 1046 }).when(mockReader).next(); 1047 return mockReader; 1048 } 1049 }; 1050 1051 logSplitter.splitWAL(fs.getFileStatus(logPath), null); 1052 1053 // Verify number of written edits per region 1054 Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts(); 1055 for (Map.Entry<String, Long> entry : outputCounts.entrySet()) { 1056 LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey()); 1057 assertEquals((long) entry.getValue(), numFakeEdits / regions.size()); 1058 } 1059 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size()); 1060 } 1061 1062 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests? 1063 @Test 1064 public void testSplitLogFileDeletedRegionDir() throws IOException { 1065 LOG.info("testSplitLogFileDeletedRegionDir"); 1066 final String REGION = "region__1"; 1067 REGIONS.clear(); 1068 REGIONS.add(REGION); 1069 1070 generateWALs(1, 10, -1); 1071 useDifferentDFSClient(); 1072 1073 Path regiondir = new Path(TABLEDIR, REGION); 1074 LOG.info("Region directory is" + regiondir); 1075 fs.delete(regiondir, true); 1076 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1077 assertFalse(fs.exists(regiondir)); 1078 } 1079 1080 @Test 1081 public void testSplitLogFileEmpty() throws IOException { 1082 LOG.info("testSplitLogFileEmpty"); 1083 // we won't create the hlog dir until getWAL got called, so 1084 // make dir here when testing empty log file 1085 fs.mkdirs(WALDIR); 1086 injectEmptyFile(".empty", true); 1087 useDifferentDFSClient(); 1088 1089 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1090 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 1091 assertFalse(fs.exists(tdir)); 1092 1093 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath())); 1094 } 1095 1096 @Test 1097 public void testSplitLogFileMultipleRegions() throws IOException { 1098 LOG.info("testSplitLogFileMultipleRegions"); 1099 generateWALs(1, 10, -1); 1100 splitAndCount(1, 10); 1101 } 1102 1103 @Test 1104 public void testSplitLogFileFirstLineCorruptionLog() 1105 throws IOException { 1106 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 1107 generateWALs(1, 10, -1); 1108 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1109 1110 corruptWAL(logfile.getPath(), 1111 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 1112 1113 useDifferentDFSClient(); 1114 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1115 1116 final Path corruptDir = 1117 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 1118 assertEquals(1, fs.listStatus(corruptDir).length); 1119 } 1120 1121 /** 1122 * @see "https://issues.apache.org/jira/browse/HBASE-4862" 1123 */ 1124 @Test 1125 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { 1126 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); 1127 // Generate wals for our destination region 1128 String regionName = "r0"; 1129 final Path regiondir = new Path(TABLEDIR, regionName); 1130 REGIONS.clear(); 1131 REGIONS.add(regionName); 1132 generateWALs(-1); 1133 1134 wals.getWAL(null); 1135 FileStatus[] logfiles = fs.listStatus(WALDIR); 1136 assertTrue("There should be some log file", 1137 logfiles != null && logfiles.length > 0); 1138 1139 WALSplitter logSplitter = 1140 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 1141 @Override 1142 protected Writer createWriter(Path logfile) 1143 throws IOException { 1144 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); 1145 // After creating writer, simulate region's 1146 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this 1147 // region and delete them, excluding files with '.temp' suffix. 1148 NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir); 1149 if (files != null && !files.isEmpty()) { 1150 for (Path file : files) { 1151 if (!this.walFS.delete(file, false)) { 1152 LOG.error("Failed delete of " + file); 1153 } else { 1154 LOG.debug("Deleted recovered.edits file=" + file); 1155 } 1156 } 1157 } 1158 return writer; 1159 } 1160 }; 1161 try{ 1162 logSplitter.splitWAL(logfiles[0], null); 1163 } catch (IOException e) { 1164 LOG.info(e.toString(), e); 1165 fail("Throws IOException when spliting " 1166 + "log, it is most likely because writing file does not " 1167 + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); 1168 } 1169 if (fs.exists(CORRUPTDIR)) { 1170 if (fs.listStatus(CORRUPTDIR).length > 0) { 1171 fail("There are some corrupt logs, " 1172 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); 1173 } 1174 } 1175 } 1176 1177 private Writer generateWALs(int leaveOpen) throws IOException { 1178 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0); 1179 } 1180 1181 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { 1182 return generateWALs(writers, entries, leaveOpen, 7); 1183 } 1184 1185 private void makeRegionDirs(List<String> regions) throws IOException { 1186 for (String region : regions) { 1187 LOG.debug("Creating dir for region " + region); 1188 fs.mkdirs(new Path(TABLEDIR, region)); 1189 } 1190 } 1191 1192 /** 1193 * @param leaveOpen index to leave un-closed. -1 to close all. 1194 * @return the writer that's still open, or null if all were closed. 1195 */ 1196 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException { 1197 makeRegionDirs(REGIONS); 1198 fs.mkdirs(WALDIR); 1199 Writer [] ws = new Writer[writers]; 1200 int seq = 0; 1201 int numRegionEventsAdded = 0; 1202 for (int i = 0; i < writers; i++) { 1203 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); 1204 for (int j = 0; j < entries; j++) { 1205 int prefix = 0; 1206 for (String region : REGIONS) { 1207 String row_key = region + prefix++ + i + j; 1208 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY, 1209 QUALIFIER, VALUE, seq++); 1210 1211 if (numRegionEventsAdded < regionEvents) { 1212 numRegionEventsAdded ++; 1213 appendRegionEvent(ws[i], region); 1214 } 1215 } 1216 } 1217 if (i != leaveOpen) { 1218 ws[i].close(); 1219 LOG.info("Closing writer " + i); 1220 } 1221 } 1222 if (leaveOpen < 0 || leaveOpen >= writers) { 1223 return null; 1224 } 1225 return ws[leaveOpen]; 1226 } 1227 1228 1229 1230 private Path[] getLogForRegion(TableName table, String region) 1231 throws IOException { 1232 Path tdir = CommonFSUtils.getWALTableDir(conf, table); 1233 @SuppressWarnings("deprecation") 1234 Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, 1235 Bytes.toString(Bytes.toBytes(region)))); 1236 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 1237 @Override 1238 public boolean accept(Path p) { 1239 if (WALSplitUtil.isSequenceIdFile(p)) { 1240 return false; 1241 } 1242 return true; 1243 } 1244 }); 1245 Path[] paths = new Path[files.length]; 1246 for (int i = 0; i < files.length; i++) { 1247 paths[i] = files[i].getPath(); 1248 } 1249 return paths; 1250 } 1251 1252 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException { 1253 FSDataOutputStream out; 1254 int fileSize = (int) fs.listStatus(path)[0].getLen(); 1255 1256 FSDataInputStream in = fs.open(path); 1257 byte[] corrupted_bytes = new byte[fileSize]; 1258 in.readFully(0, corrupted_bytes, 0, fileSize); 1259 in.close(); 1260 1261 switch (corruption) { 1262 case APPEND_GARBAGE: 1263 fs.delete(path, false); 1264 out = fs.create(path); 1265 out.write(corrupted_bytes); 1266 out.write(Bytes.toBytes("-----")); 1267 closeOrFlush(close, out); 1268 break; 1269 1270 case INSERT_GARBAGE_ON_FIRST_LINE: 1271 fs.delete(path, false); 1272 out = fs.create(path); 1273 out.write(0); 1274 out.write(corrupted_bytes); 1275 closeOrFlush(close, out); 1276 break; 1277 1278 case INSERT_GARBAGE_IN_THE_MIDDLE: 1279 fs.delete(path, false); 1280 out = fs.create(path); 1281 int middle = (int) Math.floor(corrupted_bytes.length / 2); 1282 out.write(corrupted_bytes, 0, middle); 1283 out.write(0); 1284 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); 1285 closeOrFlush(close, out); 1286 break; 1287 1288 case TRUNCATE: 1289 fs.delete(path, false); 1290 out = fs.create(path); 1291 out.write(corrupted_bytes, 0, fileSize 1292 - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); 1293 closeOrFlush(close, out); 1294 break; 1295 1296 case TRUNCATE_TRAILER: 1297 fs.delete(path, false); 1298 out = fs.create(path); 1299 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. 1300 closeOrFlush(close, out); 1301 break; 1302 } 1303 } 1304 1305 private void closeOrFlush(boolean close, FSDataOutputStream out) 1306 throws IOException { 1307 if (close) { 1308 out.close(); 1309 } else { 1310 Method syncMethod = null; 1311 try { 1312 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{}); 1313 } catch (NoSuchMethodException e) { 1314 try { 1315 syncMethod = out.getClass().getMethod("sync", new Class<?> []{}); 1316 } catch (NoSuchMethodException ex) { 1317 throw new IOException("This version of Hadoop supports " + 1318 "neither Syncable.sync() nor Syncable.hflush()."); 1319 } 1320 } 1321 try { 1322 syncMethod.invoke(out, new Object[]{}); 1323 } catch (Exception e) { 1324 throw new IOException(e); 1325 } 1326 // Not in 0out.hflush(); 1327 } 1328 } 1329 1330 private int countWAL(Path log) throws IOException { 1331 int count = 0; 1332 Reader in = wals.createReader(fs, log); 1333 while (in.next() != null) { 1334 count++; 1335 } 1336 in.close(); 1337 return count; 1338 } 1339 1340 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs, 1341 String output) throws IOException { 1342 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder(); 1343 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes())) 1344 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes())) 1345 .setRegionName(ByteString.copyFrom(hri.getRegionName())) 1346 .setFamilyName(ByteString.copyFrom(FAMILY)) 1347 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY)) 1348 .addAllCompactionInput(Arrays.asList(inputs)) 1349 .addCompactionOutput(output); 1350 1351 WALEdit edit = WALEdit.createCompaction(hri, desc.build()); 1352 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, 1353 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); 1354 w.append(new Entry(key, edit)); 1355 w.sync(false); 1356 } 1357 1358 private static void appendRegionEvent(Writer w, String region) throws IOException { 1359 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( 1360 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, 1361 TABLE_NAME.toBytes(), 1362 Bytes.toBytes(region), 1363 Bytes.toBytes(String.valueOf(region.hashCode())), 1364 1, 1365 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of()); 1366 final long time = EnvironmentEdgeManager.currentTime(); 1367 final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, 1368 HConstants.DEFAULT_CLUSTER_ID); 1369 WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc); 1370 w.append(new Entry(walKey, we)); 1371 w.sync(false); 1372 } 1373 1374 public static long appendEntry(Writer writer, TableName table, byte[] region, 1375 byte[] row, byte[] family, byte[] qualifier, 1376 byte[] value, long seq) 1377 throws IOException { 1378 LOG.info(Thread.currentThread().getName() + " append"); 1379 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); 1380 LOG.info(Thread.currentThread().getName() + " sync"); 1381 writer.sync(false); 1382 return seq; 1383 } 1384 1385 private static Entry createTestEntry( 1386 TableName table, byte[] region, 1387 byte[] row, byte[] family, byte[] qualifier, 1388 byte[] value, long seq) { 1389 long time = System.nanoTime(); 1390 1391 seq++; 1392 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); 1393 WALEdit edit = new WALEdit(); 1394 edit.add(cell); 1395 return new Entry(new WALKeyImpl(region, table, seq, time, 1396 HConstants.DEFAULT_CLUSTER_ID), edit); 1397 } 1398 1399 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { 1400 Writer writer = 1401 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); 1402 if (closeFile) { 1403 writer.close(); 1404 } 1405 } 1406 1407 private boolean logsAreEqual(Path p1, Path p2) throws IOException { 1408 Reader in1, in2; 1409 in1 = wals.createReader(fs, p1); 1410 in2 = wals.createReader(fs, p2); 1411 Entry entry1; 1412 Entry entry2; 1413 while ((entry1 = in1.next()) != null) { 1414 entry2 = in2.next(); 1415 if ((entry1.getKey().compareTo(entry2.getKey()) != 0) || 1416 (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) { 1417 return false; 1418 } 1419 } 1420 in1.close(); 1421 in2.close(); 1422 return true; 1423 } 1424}