001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.junit.Assume.assumeFalse; 025 026import java.io.FileNotFoundException; 027import java.io.IOException; 028import java.lang.reflect.Method; 029import java.security.PrivilegedExceptionAction; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Map; 037import java.util.NavigableSet; 038import java.util.Objects; 039import java.util.Set; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.stream.Collectors; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FSDataInputStream; 046import org.apache.hadoop.fs.FSDataOutputStream; 047import org.apache.hadoop.fs.FileStatus; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.FileUtil; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseClassTestRule; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.HBaseTestingUtility; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.KeyValue; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.client.RegionInfo; 061import org.apache.hadoop.hbase.client.RegionInfoBuilder; 062import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 063import org.apache.hadoop.hbase.regionserver.HRegion; 064import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; 065import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; 066import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 067import org.apache.hadoop.hbase.security.User; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.testclassification.RegionServerTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.CancelableProgressable; 072import org.apache.hadoop.hbase.util.CommonFSUtils; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 074import org.apache.hadoop.hbase.util.Threads; 075import org.apache.hadoop.hbase.wal.WAL.Entry; 076import org.apache.hadoop.hbase.wal.WAL.Reader; 077import org.apache.hadoop.hbase.wal.WALProvider.Writer; 078import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; 079import org.apache.hadoop.hdfs.DFSTestUtil; 080import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 081import org.apache.hadoop.ipc.RemoteException; 082import org.junit.After; 083import org.junit.AfterClass; 084import org.junit.Before; 085import org.junit.BeforeClass; 086import org.junit.ClassRule; 087import org.junit.Rule; 088import org.junit.Test; 089import org.junit.experimental.categories.Category; 090import org.junit.rules.TestName; 091import org.mockito.Mockito; 092import org.mockito.invocation.InvocationOnMock; 093import org.mockito.stubbing.Answer; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 098import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 099import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 100import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 101 102import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 104 105/** 106 * Testing {@link WAL} splitting code. 107 */ 108@Category({ RegionServerTests.class, LargeTests.class }) 109public class TestWALSplit { 110 @ClassRule 111 public static final HBaseClassTestRule CLASS_RULE = 112 HBaseClassTestRule.forClass(TestWALSplit.class); 113 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class); 114 115 private static Configuration conf; 116 private FileSystem fs; 117 118 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 119 120 private Path HBASEDIR; 121 private Path HBASELOGDIR; 122 private Path WALDIR; 123 private Path OLDLOGDIR; 124 private Path CORRUPTDIR; 125 private Path TABLEDIR; 126 private String TMPDIRNAME; 127 128 private static final int NUM_WRITERS = 10; 129 private static final int ENTRIES = 10; // entries per writer per region 130 131 private static final String FILENAME_BEING_SPLIT = "testfile"; 132 private static final TableName TABLE_NAME = TableName.valueOf("t1"); 133 private static final byte[] FAMILY = Bytes.toBytes("f1"); 134 private static final byte[] QUALIFIER = Bytes.toBytes("q1"); 135 private static final byte[] VALUE = Bytes.toBytes("v1"); 136 private static final String WAL_FILE_PREFIX = "wal.dat."; 137 private static List<String> REGIONS = new ArrayList<>(); 138 private static String ROBBER; 139 private static String ZOMBIE; 140 private static String[] GROUP = new String[] { "supergroup" }; 141 142 static enum Corruptions { 143 INSERT_GARBAGE_ON_FIRST_LINE, 144 INSERT_GARBAGE_IN_THE_MIDDLE, 145 APPEND_GARBAGE, 146 TRUNCATE, 147 TRUNCATE_TRAILER 148 } 149 150 @BeforeClass 151 public static void setUpBeforeClass() throws Exception { 152 conf = TEST_UTIL.getConfiguration(); 153 conf.setClass("hbase.regionserver.hlog.writer.impl", InstrumentedLogWriter.class, Writer.class); 154 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. 155 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 156 // Create fake maping user to group and set it to the conf. 157 Map<String, String[]> u2g_map = new HashMap<>(2); 158 ROBBER = User.getCurrent().getName() + "-robber"; 159 ZOMBIE = User.getCurrent().getName() + "-zombie"; 160 u2g_map.put(ROBBER, GROUP); 161 u2g_map.put(ZOMBIE, GROUP); 162 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); 163 conf.setInt("dfs.heartbeat.interval", 1); 164 TEST_UTIL.startMiniDFSCluster(2); 165 } 166 167 @AfterClass 168 public static void tearDownAfterClass() throws Exception { 169 TEST_UTIL.shutdownMiniDFSCluster(); 170 } 171 172 @Rule 173 public TestName name = new TestName(); 174 private WALFactory wals = null; 175 176 @Before 177 public void setUp() throws Exception { 178 LOG.info("Cleaning up cluster for new test."); 179 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 180 HBASEDIR = TEST_UTIL.createRootDir(); 181 HBASELOGDIR = TEST_UTIL.createWALRootDir(); 182 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME); 183 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME); 184 TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 185 TMPDIRNAME = 186 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 187 REGIONS.clear(); 188 Collections.addAll(REGIONS, "bbb", "ccc"); 189 InstrumentedLogWriter.activateFailure = false; 190 wals = new WALFactory(conf, name.getMethodName()); 191 WALDIR = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(ServerName 192 .valueOf(name.getMethodName(), 16010, EnvironmentEdgeManager.currentTime()).toString())); 193 // fs.mkdirs(WALDIR); 194 } 195 196 @After 197 public void tearDown() throws Exception { 198 try { 199 wals.close(); 200 } catch (IOException exception) { 201 // Some tests will move WALs out from under us. In those cases, we'll get an error on close. 202 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" 203 + " you see a failure look here."); 204 LOG.debug("exception details", exception); 205 } finally { 206 wals = null; 207 fs.delete(HBASEDIR, true); 208 fs.delete(HBASELOGDIR, true); 209 } 210 } 211 212 /** 213 * Simulates splitting a WAL out from under a regionserver that is still trying to write it. 214 * Ensures we do not lose edits. 215 */ 216 @Test 217 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { 218 final AtomicLong counter = new AtomicLong(0); 219 AtomicBoolean stop = new AtomicBoolean(false); 220 // Region we'll write edits too and then later examine to make sure they all made it in. 221 final String region = REGIONS.get(0); 222 final int numWriters = 3; 223 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters); 224 try { 225 long startCount = counter.get(); 226 zombie.start(); 227 // Wait till writer starts going. 228 while (startCount == counter.get()) 229 Threads.sleep(1); 230 // Give it a second to write a few appends. 231 Threads.sleep(1000); 232 final Configuration conf2 = HBaseConfiguration.create(conf); 233 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP); 234 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() { 235 @Override 236 public Integer run() throws Exception { 237 StringBuilder ls = 238 new StringBuilder("Contents of WALDIR (").append(WALDIR).append("):\n"); 239 for (FileStatus status : fs.listStatus(WALDIR)) { 240 ls.append("\t").append(status.toString()).append("\n"); 241 } 242 LOG.debug(Objects.toString(ls)); 243 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); 244 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); 245 LOG.info("Finished splitting out from under zombie."); 246 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 247 assertEquals("wrong number of split files for region", numWriters, logfiles.length); 248 int count = 0; 249 for (Path logfile : logfiles) { 250 count += countWAL(logfile); 251 } 252 return count; 253 } 254 }); 255 LOG.info("zombie=" + counter.get() + ", robber=" + count); 256 assertTrue( 257 "The log file could have at most 1 extra log entry, but can't have less. " 258 + "Zombie could write " + counter.get() + " and logfile had only " + count, 259 counter.get() == count || counter.get() + 1 == count); 260 } finally { 261 stop.set(true); 262 zombie.interrupt(); 263 Threads.threadDumpingIsAlive(zombie); 264 } 265 } 266 267 /** 268 * This thread will keep writing to a 'wal' file even after the split process has started. It 269 * simulates a region server that was considered dead but woke up and wrote some more to the last 270 * log entry. Does its writing as an alternate user in another filesystem instance to simulate 271 * better it being a regionserver. 272 */ 273 class ZombieLastLogWriterRegionServer extends Thread { 274 final AtomicLong editsCount; 275 final AtomicBoolean stop; 276 final int numOfWriters; 277 /** 278 * Region to write edits for. 279 */ 280 final String region; 281 final User user; 282 283 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop, 284 final String region, final int writers) throws IOException, InterruptedException { 285 super("ZombieLastLogWriterRegionServer"); 286 setDaemon(true); 287 this.stop = stop; 288 this.editsCount = counter; 289 this.region = region; 290 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP); 291 numOfWriters = writers; 292 } 293 294 @Override 295 public void run() { 296 try { 297 doWriting(); 298 } catch (IOException e) { 299 LOG.warn(getName() + " Writer exiting " + e); 300 } catch (InterruptedException e) { 301 LOG.warn(getName() + " Writer exiting " + e); 302 } 303 } 304 305 private void doWriting() throws IOException, InterruptedException { 306 this.user.runAs(new PrivilegedExceptionAction<Object>() { 307 @Override 308 public Object run() throws Exception { 309 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose 310 // index we supply here. 311 int walToKeepOpen = numOfWriters - 1; 312 // The below method writes numOfWriters files each with ENTRIES entries for a total of 313 // numOfWriters * ENTRIES added per column family in the region. 314 Writer writer = null; 315 try { 316 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); 317 } catch (IOException e1) { 318 throw new RuntimeException("Failed", e1); 319 } 320 // Update counter so has all edits written so far. 321 editsCount.addAndGet(numOfWriters * ENTRIES); 322 loop(writer); 323 // If we've been interruped, then things should have shifted out from under us. 324 // closing should error 325 try { 326 writer.close(); 327 fail("Writing closing after parsing should give an error."); 328 } catch (IOException exception) { 329 LOG.debug("ignoring error when closing final writer.", exception); 330 } 331 return null; 332 } 333 }); 334 } 335 336 private void loop(final Writer writer) { 337 byte[] regionBytes = Bytes.toBytes(this.region); 338 while (!stop.get()) { 339 try { 340 long seq = appendEntry(writer, TABLE_NAME, regionBytes, 341 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0); 342 long count = editsCount.incrementAndGet(); 343 LOG.info(getName() + " sync count=" + count + ", seq=" + seq); 344 try { 345 Thread.sleep(1); 346 } catch (InterruptedException e) { 347 // 348 } 349 } catch (IOException ex) { 350 LOG.error(getName() + " ex " + ex.toString()); 351 if (ex instanceof RemoteException) { 352 LOG.error("Juliet: got RemoteException " + ex.getMessage() + " while writing " 353 + (editsCount.get() + 1)); 354 } else { 355 LOG.error(getName() + " failed to write....at " + editsCount.get()); 356 fail("Failed to write " + editsCount.get()); 357 } 358 break; 359 } catch (Throwable t) { 360 LOG.error(getName() + " HOW? " + t); 361 LOG.debug("exception details", t); 362 break; 363 } 364 } 365 LOG.info(getName() + " Writer exiting"); 366 } 367 } 368 369 /** 370 * @see "https://issues.apache.org/jira/browse/HBASE-3020" 371 */ 372 @Test 373 public void testRecoveredEditsPathForMeta() throws IOException { 374 Path p = createRecoveredEditsPathForRegion(); 375 String parentOfParent = p.getParent().getParent().getName(); 376 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 377 } 378 379 /** 380 * Test old recovered edits file doesn't break WALSplitter. This is useful in upgrading old 381 * instances. 382 */ 383 @Test 384 public void testOldRecoveredEditsFileSidelined() throws IOException { 385 Path p = createRecoveredEditsPathForRegion(); 386 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); 387 Path regiondir = new Path(tdir, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 388 fs.mkdirs(regiondir); 389 Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 390 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); 391 fs.createNewFile(parent); // create a recovered.edits file 392 String parentOfParent = p.getParent().getParent().getName(); 393 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 394 WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); 395 } 396 397 private Path createRecoveredEditsPathForRegion() throws IOException { 398 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); 399 long now = EnvironmentEdgeManager.currentTime(); 400 Entry entry = new Entry( 401 new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), 402 new WALEdit()); 403 Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, 404 FILENAME_BEING_SPLIT, TMPDIRNAME, conf); 405 return p; 406 } 407 408 @Test 409 public void testHasRecoveredEdits() throws IOException { 410 Path p = createRecoveredEditsPathForRegion(); 411 assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 412 String renamedEdit = p.getName().split("-")[0]; 413 fs.createNewFile(new Path(p.getParent(), renamedEdit)); 414 assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 415 } 416 417 private void useDifferentDFSClient() throws IOException { 418 // make fs act as a different client now 419 // initialize will create a new DFSClient with a new client ID 420 fs.initialize(fs.getUri(), conf); 421 } 422 423 @Test 424 public void testSplitPreservesEdits() throws IOException { 425 final String REGION = "region__1"; 426 REGIONS.clear(); 427 REGIONS.add(REGION); 428 429 generateWALs(1, 10, -1, 0); 430 useDifferentDFSClient(); 431 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 432 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 433 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 434 assertEquals(1, splitLog.length); 435 436 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 437 } 438 439 @Test 440 public void testSplitRemovesRegionEventsEdits() throws IOException { 441 final String REGION = "region__1"; 442 REGIONS.clear(); 443 REGIONS.add(REGION); 444 445 generateWALs(1, 10, -1, 100); 446 useDifferentDFSClient(); 447 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 448 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 449 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 450 assertEquals(1, splitLog.length); 451 452 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 453 // split log should only have the test edits 454 assertEquals(10, countWAL(splitLog[0])); 455 } 456 457 @Test 458 public void testSplitLeavesCompactionEventsEdits() throws IOException { 459 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 460 REGIONS.clear(); 461 REGIONS.add(hri.getEncodedName()); 462 Path regionDir = 463 new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName()); 464 LOG.info("Creating region directory: " + regionDir); 465 assertTrue(fs.mkdirs(regionDir)); 466 467 Writer writer = generateWALs(1, 10, 0, 10); 468 String[] compactInputs = new String[] { "file1", "file2", "file3" }; 469 String compactOutput = "file4"; 470 appendCompactionEvent(writer, hri, compactInputs, compactOutput); 471 writer.close(); 472 473 useDifferentDFSClient(); 474 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 475 476 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 477 // original log should have 10 test edits, 10 region markers, 1 compaction marker 478 assertEquals(21, countWAL(originalLog)); 479 480 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); 481 assertEquals(1, splitLog.length); 482 483 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 484 // split log should have 10 test edits plus 1 compaction marker 485 assertEquals(11, countWAL(splitLog[0])); 486 } 487 488 /** 489 * @param expectedEntries -1 to not assert 490 * @return the count across all regions 491 */ 492 private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException { 493 useDifferentDFSClient(); 494 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 495 int result = 0; 496 for (String region : REGIONS) { 497 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 498 assertEquals(expectedFiles, logfiles.length); 499 int count = 0; 500 for (Path logfile : logfiles) { 501 count += countWAL(logfile); 502 } 503 if (-1 != expectedEntries) { 504 assertEquals(expectedEntries, count); 505 } 506 result += count; 507 } 508 return result; 509 } 510 511 @Test 512 public void testEmptyLogFiles() throws IOException { 513 testEmptyLogFiles(true); 514 } 515 516 @Test 517 public void testEmptyOpenLogFiles() throws IOException { 518 testEmptyLogFiles(false); 519 } 520 521 private void testEmptyLogFiles(final boolean close) throws IOException { 522 // we won't create the hlog dir until getWAL got called, so 523 // make dir here when testing empty log file 524 fs.mkdirs(WALDIR); 525 injectEmptyFile(".empty", close); 526 generateWALs(Integer.MAX_VALUE); 527 injectEmptyFile("empty", close); 528 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty 529 } 530 531 @Test 532 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { 533 // generate logs but leave wal.dat.5 open. 534 generateWALs(5); 535 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 536 } 537 538 @Test 539 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { 540 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 541 generateWALs(Integer.MAX_VALUE); 542 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.APPEND_GARBAGE, true); 543 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 544 } 545 546 @Test 547 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { 548 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 549 generateWALs(Integer.MAX_VALUE); 550 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, 551 true); 552 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); // 1 corrupt 553 } 554 555 @Test 556 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { 557 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 558 generateWALs(Integer.MAX_VALUE); 559 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, 560 false); 561 // the entries in the original logs are alternating regions 562 // considering the sequence file header, the middle corruption should 563 // affect at least half of the entries 564 int goodEntries = (NUM_WRITERS - 1) * ENTRIES; 565 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; 566 int allRegionsCount = splitAndCount(NUM_WRITERS, -1); 567 assertTrue("The file up to the corrupted area hasn't been parsed", 568 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount); 569 } 570 571 @Test 572 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { 573 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 574 List<FaultyProtobufLogReader.FailureType> failureTypes = 575 Arrays.asList(FaultyProtobufLogReader.FailureType.values()).stream() 576 .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList()); 577 for (FaultyProtobufLogReader.FailureType failureType : failureTypes) { 578 final Set<String> walDirContents = splitCorruptWALs(failureType); 579 final Set<String> archivedLogs = new HashSet<>(); 580 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); 581 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 582 archived.append("\n\t").append(log.toString()); 583 archivedLogs.add(log.getPath().getName()); 584 } 585 LOG.debug(archived.toString()); 586 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs, 587 walDirContents); 588 } 589 } 590 591 /** 592 * @return set of wal names present prior to split attempt. 593 * @throws IOException if the split process fails 594 */ 595 private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType) 596 throws IOException { 597 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class); 598 InstrumentedLogWriter.activateFailure = false; 599 600 try { 601 conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class, 602 Reader.class); 603 conf.set("faultyprotobuflogreader.failuretype", failureType.name()); 604 // Clean up from previous tests or previous loop 605 try { 606 wals.shutdown(); 607 } catch (IOException exception) { 608 // since we're splitting out from under the factory, we should expect some closing failures. 609 LOG.debug("Ignoring problem closing WALFactory.", exception); 610 } 611 wals.close(); 612 try { 613 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 614 fs.delete(log.getPath(), true); 615 } 616 } catch (FileNotFoundException exception) { 617 LOG.debug("no previous CORRUPTDIR to clean."); 618 } 619 // change to the faulty reader 620 wals = new WALFactory(conf, name.getMethodName()); 621 generateWALs(-1); 622 // Our reader will render all of these files corrupt. 623 final Set<String> walDirContents = new HashSet<>(); 624 for (FileStatus status : fs.listStatus(WALDIR)) { 625 walDirContents.add(status.getPath().getName()); 626 } 627 useDifferentDFSClient(); 628 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 629 return walDirContents; 630 } finally { 631 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class); 632 } 633 } 634 635 @Test(expected = IOException.class) 636 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { 637 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 638 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 639 } 640 641 @Test 642 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { 643 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 644 try { 645 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING); 646 } catch (IOException e) { 647 LOG.debug("split with 'skip errors' set to 'false' correctly threw"); 648 } 649 assertEquals("if skip.errors is false all files should remain in place", NUM_WRITERS, 650 fs.listStatus(WALDIR).length); 651 } 652 653 private void ignoreCorruption(final Corruptions corruption, final int entryCount, 654 final int expectedCount) throws IOException { 655 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 656 657 final String REGION = "region__1"; 658 REGIONS.clear(); 659 REGIONS.add(REGION); 660 661 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); 662 generateWALs(1, entryCount, -1, 0); 663 corruptWAL(c1, corruption, true); 664 665 useDifferentDFSClient(); 666 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 667 668 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 669 assertEquals(1, splitLog.length); 670 671 int actualCount = 0; 672 Reader in = wals.createReader(fs, splitLog[0]); 673 @SuppressWarnings("unused") 674 Entry entry; 675 while ((entry = in.next()) != null) 676 ++actualCount; 677 assertEquals(expectedCount, actualCount); 678 in.close(); 679 680 // should not have stored the EOF files as corrupt 681 FileStatus[] archivedLogs = 682 fs.exists(CORRUPTDIR) ? fs.listStatus(CORRUPTDIR) : new FileStatus[0]; 683 assertEquals(0, archivedLogs.length); 684 685 } 686 687 @Test 688 public void testEOFisIgnored() throws IOException { 689 int entryCount = 10; 690 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount - 1); 691 } 692 693 @Test 694 public void testCorruptWALTrailer() throws IOException { 695 int entryCount = 10; 696 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount); 697 } 698 699 @Test 700 public void testLogsGetArchivedAfterSplit() throws IOException { 701 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 702 generateWALs(-1); 703 useDifferentDFSClient(); 704 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 705 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); 706 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); 707 } 708 709 @Test 710 public void testSplit() throws IOException { 711 generateWALs(-1); 712 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 713 } 714 715 @Test 716 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException { 717 generateWALs(-1); 718 useDifferentDFSClient(); 719 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 720 FileStatus[] statuses = null; 721 try { 722 statuses = fs.listStatus(WALDIR); 723 if (statuses != null) { 724 fail("Files left in log dir: " + Joiner.on(",").join(FileUtil.stat2Paths(statuses))); 725 } 726 } catch (FileNotFoundException e) { 727 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null 728 } 729 } 730 731 @Test(expected = IOException.class) 732 public void testSplitWillFailIfWritingToRegionFails() throws Exception { 733 // leave 5th log open so we could append the "trap" 734 Writer writer = generateWALs(4); 735 useDifferentDFSClient(); 736 737 String region = "break"; 738 Path regiondir = new Path(TABLEDIR, region); 739 fs.mkdirs(regiondir); 740 741 InstrumentedLogWriter.activateFailure = false; 742 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes("r" + 999), FAMILY, 743 QUALIFIER, VALUE, 0); 744 writer.close(); 745 746 try { 747 InstrumentedLogWriter.activateFailure = true; 748 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 749 } catch (IOException e) { 750 assertTrue(e.getMessage() 751 .contains("This exception is instrumented and should only be thrown for testing")); 752 throw e; 753 } finally { 754 InstrumentedLogWriter.activateFailure = false; 755 } 756 } 757 758 @Test 759 public void testSplitDeletedRegion() throws IOException { 760 REGIONS.clear(); 761 String region = "region_that_splits"; 762 REGIONS.add(region); 763 764 generateWALs(1); 765 useDifferentDFSClient(); 766 767 Path regiondir = new Path(TABLEDIR, region); 768 fs.delete(regiondir, true); 769 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 770 assertFalse(fs.exists(regiondir)); 771 } 772 773 @Test 774 public void testIOEOnOutputThread() throws Exception { 775 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 776 777 generateWALs(-1); 778 useDifferentDFSClient(); 779 FileStatus[] logfiles = fs.listStatus(WALDIR); 780 assertTrue("There should be some log file", logfiles != null && logfiles.length > 0); 781 // wals with no entries (like the one we don't use in the factory) 782 // won't cause a failure since nothing will ever be written. 783 // pick the largest one since it's most likely to have entries. 784 int largestLogFile = 0; 785 long largestSize = 0; 786 for (int i = 0; i < logfiles.length; i++) { 787 if (logfiles[i].getLen() > largestSize) { 788 largestLogFile = i; 789 largestSize = logfiles[i].getLen(); 790 } 791 } 792 assertTrue("There should be some log greater than size 0.", 0 < largestSize); 793 // Set up a splitter that will throw an IOE on the output side 794 WALSplitter logSplitter = 795 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 796 @Override 797 protected Writer createWriter(Path logfile) throws IOException { 798 Writer mockWriter = Mockito.mock(Writer.class); 799 Mockito.doThrow(new IOException("Injected")).when(mockWriter) 800 .append(Mockito.<Entry> any()); 801 return mockWriter; 802 } 803 }; 804 // Set up a background thread dumper. Needs a thread to depend on and then we need to run 805 // the thread dumping in a background thread so it does not hold up the test. 806 final AtomicBoolean stop = new AtomicBoolean(false); 807 final Thread someOldThread = new Thread("Some-old-thread") { 808 @Override 809 public void run() { 810 while (!stop.get()) 811 Threads.sleep(10); 812 } 813 }; 814 someOldThread.setDaemon(true); 815 someOldThread.start(); 816 final Thread t = new Thread("Background-thread-dumper") { 817 @Override 818 public void run() { 819 try { 820 Threads.threadDumpingIsAlive(someOldThread); 821 } catch (InterruptedException e) { 822 e.printStackTrace(); 823 } 824 } 825 }; 826 t.setDaemon(true); 827 t.start(); 828 try { 829 logSplitter.splitWAL(logfiles[largestLogFile], null); 830 fail("Didn't throw!"); 831 } catch (IOException ioe) { 832 assertTrue(ioe.toString().contains("Injected")); 833 } finally { 834 // Setting this to true will turn off the background thread dumper. 835 stop.set(true); 836 } 837 } 838 839 /** 840 * @param spiedFs should be instrumented for failure. 841 */ 842 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { 843 generateWALs(-1); 844 useDifferentDFSClient(); 845 846 try { 847 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); 848 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); 849 assertFalse(fs.exists(WALDIR)); 850 } catch (IOException e) { 851 fail("There shouldn't be any exception but: " + e.toString()); 852 } 853 } 854 855 // Test for HBASE-3412 856 @Test 857 public void testMovedWALDuringRecovery() throws Exception { 858 // This partial mock will throw LEE for every file simulating 859 // files that were moved 860 FileSystem spiedFs = Mockito.spy(fs); 861 // The "File does not exist" part is very important, 862 // that's how it comes out of HDFS 863 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).when(spiedFs) 864 .append(Mockito.<Path> any()); 865 retryOverHdfsProblem(spiedFs); 866 } 867 868 @Test 869 public void testRetryOpenDuringRecovery() throws Exception { 870 FileSystem spiedFs = Mockito.spy(fs); 871 // The "Cannot obtain block length", "Could not obtain the last block", 872 // and "Blocklist for [^ ]* has changed.*" part is very important, 873 // that's how it comes out of HDFS. If HDFS changes the exception 874 // message, this test needs to be adjusted accordingly. 875 // 876 // When DFSClient tries to open a file, HDFS needs to locate 877 // the last block of the file and get its length. However, if the 878 // last block is under recovery, HDFS may have problem to obtain 879 // the block length, in which case, retry may help. 880 Mockito.doAnswer(new Answer<FSDataInputStream>() { 881 private final String[] errors = new String[] { "Cannot obtain block length", 882 "Could not obtain the last block", "Blocklist for " + OLDLOGDIR + " has changed" }; 883 private int count = 0; 884 885 @Override 886 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 887 if (count < 3) { 888 throw new IOException(errors[count++]); 889 } 890 return (FSDataInputStream) invocation.callRealMethod(); 891 } 892 }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt()); 893 retryOverHdfsProblem(spiedFs); 894 } 895 896 @Test 897 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { 898 generateWALs(1, 10, -1); 899 FileStatus logfile = fs.listStatus(WALDIR)[0]; 900 useDifferentDFSClient(); 901 902 final AtomicInteger count = new AtomicInteger(); 903 904 CancelableProgressable localReporter = new CancelableProgressable() { 905 @Override 906 public boolean progress() { 907 count.getAndIncrement(); 908 return false; 909 } 910 }; 911 912 FileSystem spiedFs = Mockito.spy(fs); 913 Mockito.doAnswer(new Answer<FSDataInputStream>() { 914 @Override 915 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 916 Thread.sleep(1500); // Sleep a while and wait report status invoked 917 return (FSDataInputStream) invocation.callRealMethod(); 918 } 919 }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt()); 920 921 try { 922 conf.setInt("hbase.splitlog.report.period", 1000); 923 boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null, 924 Mockito.mock(SplitLogWorkerCoordination.class), wals, null); 925 assertFalse("Log splitting should failed", ret); 926 assertTrue(count.get() > 0); 927 } catch (IOException e) { 928 fail("There shouldn't be any exception but: " + e.toString()); 929 } finally { 930 // reset it back to its default value 931 conf.setInt("hbase.splitlog.report.period", 59000); 932 } 933 } 934 935 /** 936 * Test log split process with fake data and lots of edits to trigger threading issues. 937 */ 938 @Test 939 public void testThreading() throws Exception { 940 doTestThreading(20000, 128 * 1024 * 1024, 0); 941 } 942 943 /** 944 * Test blocking behavior of the log split process if writers are writing slower than the reader 945 * is reading. 946 */ 947 @Test 948 public void testThreadingSlowWriterSmallBuffer() throws Exception { 949 // The logic of this test has conflict with the limit writers split logic, skip this test for 950 // TestWALSplitBoundedLogWriterCreation 951 assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation); 952 doTestThreading(200, 1024, 50); 953 } 954 955 /** 956 * Sets up a log splitter with a mock reader and writer. The mock reader generates a specified 957 * number of edits spread across 5 regions. The mock writer optionally sleeps for each edit it is 958 * fed. * After the split is complete, verifies that the statistics show the correct number of 959 * edits output into each region. 960 * @param numFakeEdits number of fake edits to push through pipeline 961 * @param bufferSize size of in-memory buffer 962 * @param writerSlowness writer threads will sleep this many ms per edit 963 */ 964 private void doTestThreading(final int numFakeEdits, final int bufferSize, 965 final int writerSlowness) throws Exception { 966 967 Configuration localConf = new Configuration(conf); 968 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); 969 970 // Create a fake log file (we'll override the reader to produce a stream of edits) 971 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake"); 972 FSDataOutputStream out = fs.create(logPath); 973 out.close(); 974 975 // Make region dirs for our destination regions so the output doesn't get skipped 976 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 977 makeRegionDirs(regions); 978 979 // Create a splitter that reads and writes the data without touching disk 980 WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) { 981 /* Produce a mock writer that doesn't write anywhere */ 982 @Override 983 protected Writer createWriter(Path logfile) throws IOException { 984 Writer mockWriter = Mockito.mock(Writer.class); 985 Mockito.doAnswer(new Answer<Void>() { 986 int expectedIndex = 0; 987 988 @Override 989 public Void answer(InvocationOnMock invocation) { 990 if (writerSlowness > 0) { 991 try { 992 Thread.sleep(writerSlowness); 993 } catch (InterruptedException ie) { 994 Thread.currentThread().interrupt(); 995 } 996 } 997 Entry entry = (Entry) invocation.getArgument(0); 998 WALEdit edit = entry.getEdit(); 999 List<Cell> cells = edit.getCells(); 1000 assertEquals(1, cells.size()); 1001 Cell cell = cells.get(0); 1002 1003 // Check that the edits come in the right order. 1004 assertEquals(expectedIndex, 1005 Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 1006 expectedIndex++; 1007 return null; 1008 } 1009 }).when(mockWriter).append(Mockito.<Entry> any()); 1010 return mockWriter; 1011 } 1012 1013 /* Produce a mock reader that generates fake entries */ 1014 @Override 1015 protected Reader getReader(FileStatus file, boolean skipErrors, 1016 CancelableProgressable reporter) throws IOException, CorruptedLogFileException { 1017 Reader mockReader = Mockito.mock(Reader.class); 1018 Mockito.doAnswer(new Answer<Entry>() { 1019 int index = 0; 1020 1021 @Override 1022 public Entry answer(InvocationOnMock invocation) throws Throwable { 1023 if (index >= numFakeEdits) return null; 1024 1025 // Generate r0 through r4 in round robin fashion 1026 int regionIdx = index % regions.size(); 1027 byte region[] = new byte[] { (byte) 'r', (byte) (0x30 + regionIdx) }; 1028 1029 Entry ret = createTestEntry(TABLE_NAME, region, Bytes.toBytes(index / regions.size()), 1030 FAMILY, QUALIFIER, VALUE, index); 1031 index++; 1032 return ret; 1033 } 1034 }).when(mockReader).next(); 1035 return mockReader; 1036 } 1037 }; 1038 1039 logSplitter.splitWAL(fs.getFileStatus(logPath), null); 1040 1041 // Verify number of written edits per region 1042 Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts(); 1043 for (Map.Entry<String, Long> entry : outputCounts.entrySet()) { 1044 LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey()); 1045 assertEquals((long) entry.getValue(), numFakeEdits / regions.size()); 1046 } 1047 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size()); 1048 } 1049 1050 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests? 1051 @Test 1052 public void testSplitLogFileDeletedRegionDir() throws IOException { 1053 LOG.info("testSplitLogFileDeletedRegionDir"); 1054 final String REGION = "region__1"; 1055 REGIONS.clear(); 1056 REGIONS.add(REGION); 1057 1058 generateWALs(1, 10, -1); 1059 useDifferentDFSClient(); 1060 1061 Path regiondir = new Path(TABLEDIR, REGION); 1062 LOG.info("Region directory is" + regiondir); 1063 fs.delete(regiondir, true); 1064 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1065 assertFalse(fs.exists(regiondir)); 1066 } 1067 1068 @Test 1069 public void testSplitLogFileEmpty() throws IOException { 1070 LOG.info("testSplitLogFileEmpty"); 1071 // we won't create the hlog dir until getWAL got called, so 1072 // make dir here when testing empty log file 1073 fs.mkdirs(WALDIR); 1074 injectEmptyFile(".empty", true); 1075 useDifferentDFSClient(); 1076 1077 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1078 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 1079 assertFalse(fs.exists(tdir)); 1080 1081 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath())); 1082 } 1083 1084 @Test 1085 public void testSplitLogFileMultipleRegions() throws IOException { 1086 LOG.info("testSplitLogFileMultipleRegions"); 1087 generateWALs(1, 10, -1); 1088 splitAndCount(1, 10); 1089 } 1090 1091 @Test 1092 public void testSplitLogFileFirstLineCorruptionLog() throws IOException { 1093 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 1094 generateWALs(1, 10, -1); 1095 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1096 1097 corruptWAL(logfile.getPath(), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 1098 1099 useDifferentDFSClient(); 1100 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1101 1102 final Path corruptDir = 1103 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 1104 assertEquals(1, fs.listStatus(corruptDir).length); 1105 } 1106 1107 /** 1108 * @see "https://issues.apache.org/jira/browse/HBASE-4862" 1109 */ 1110 @Test 1111 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { 1112 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); 1113 // Generate wals for our destination region 1114 String regionName = "r0"; 1115 final Path regiondir = new Path(TABLEDIR, regionName); 1116 REGIONS.clear(); 1117 REGIONS.add(regionName); 1118 generateWALs(-1); 1119 1120 wals.getWAL(null); 1121 FileStatus[] logfiles = fs.listStatus(WALDIR); 1122 assertTrue("There should be some log file", logfiles != null && logfiles.length > 0); 1123 1124 WALSplitter logSplitter = 1125 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 1126 @Override 1127 protected Writer createWriter(Path logfile) throws IOException { 1128 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); 1129 // After creating writer, simulate region's 1130 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this 1131 // region and delete them, excluding files with '.temp' suffix. 1132 NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir); 1133 if (files != null && !files.isEmpty()) { 1134 for (Path file : files) { 1135 if (!this.walFS.delete(file, false)) { 1136 LOG.error("Failed delete of " + file); 1137 } else { 1138 LOG.debug("Deleted recovered.edits file=" + file); 1139 } 1140 } 1141 } 1142 return writer; 1143 } 1144 }; 1145 try { 1146 logSplitter.splitWAL(logfiles[0], null); 1147 } catch (IOException e) { 1148 LOG.info(e.toString(), e); 1149 fail("Throws IOException when spliting " 1150 + "log, it is most likely because writing file does not " 1151 + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); 1152 } 1153 if (fs.exists(CORRUPTDIR)) { 1154 if (fs.listStatus(CORRUPTDIR).length > 0) { 1155 fail("There are some corrupt logs, " 1156 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); 1157 } 1158 } 1159 } 1160 1161 @Test 1162 public void testRecoveredEditsStoragePolicy() throws IOException { 1163 conf.set(HConstants.WAL_STORAGE_POLICY, "ALL_SSD"); 1164 try { 1165 Path path = createRecoveredEditsPathForRegion(); 1166 assertEquals("ALL_SSD", fs.getStoragePolicy(path.getParent()).getName()); 1167 } finally { 1168 conf.unset(HConstants.WAL_STORAGE_POLICY); 1169 } 1170 1171 } 1172 1173 private Writer generateWALs(int leaveOpen) throws IOException { 1174 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0); 1175 } 1176 1177 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { 1178 return generateWALs(writers, entries, leaveOpen, 7); 1179 } 1180 1181 private void makeRegionDirs(List<String> regions) throws IOException { 1182 for (String region : regions) { 1183 LOG.debug("Creating dir for region " + region); 1184 fs.mkdirs(new Path(TABLEDIR, region)); 1185 } 1186 } 1187 1188 /** 1189 * @param leaveOpen index to leave un-closed. -1 to close all. 1190 * @return the writer that's still open, or null if all were closed. 1191 */ 1192 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) 1193 throws IOException { 1194 makeRegionDirs(REGIONS); 1195 fs.mkdirs(WALDIR); 1196 Writer[] ws = new Writer[writers]; 1197 int seq = 0; 1198 int numRegionEventsAdded = 0; 1199 for (int i = 0; i < writers; i++) { 1200 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); 1201 for (int j = 0; j < entries; j++) { 1202 int prefix = 0; 1203 for (String region : REGIONS) { 1204 String row_key = region + prefix++ + i + j; 1205 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY, 1206 QUALIFIER, VALUE, seq++); 1207 1208 if (numRegionEventsAdded < regionEvents) { 1209 numRegionEventsAdded++; 1210 appendRegionEvent(ws[i], region); 1211 } 1212 } 1213 } 1214 if (i != leaveOpen) { 1215 ws[i].close(); 1216 LOG.info("Closing writer " + i); 1217 } 1218 } 1219 if (leaveOpen < 0 || leaveOpen >= writers) { 1220 return null; 1221 } 1222 return ws[leaveOpen]; 1223 } 1224 1225 private Path[] getLogForRegion(TableName table, String region) throws IOException { 1226 Path tdir = CommonFSUtils.getWALTableDir(conf, table); 1227 @SuppressWarnings("deprecation") 1228 Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir( 1229 HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region)))); 1230 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 1231 @Override 1232 public boolean accept(Path p) { 1233 if (WALSplitUtil.isSequenceIdFile(p)) { 1234 return false; 1235 } 1236 return true; 1237 } 1238 }); 1239 Path[] paths = new Path[files.length]; 1240 for (int i = 0; i < files.length; i++) { 1241 paths[i] = files[i].getPath(); 1242 } 1243 return paths; 1244 } 1245 1246 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException { 1247 FSDataOutputStream out; 1248 int fileSize = (int) fs.listStatus(path)[0].getLen(); 1249 1250 FSDataInputStream in = fs.open(path); 1251 byte[] corrupted_bytes = new byte[fileSize]; 1252 in.readFully(0, corrupted_bytes, 0, fileSize); 1253 in.close(); 1254 1255 switch (corruption) { 1256 case APPEND_GARBAGE: 1257 fs.delete(path, false); 1258 out = fs.create(path); 1259 out.write(corrupted_bytes); 1260 out.write(Bytes.toBytes("-----")); 1261 closeOrFlush(close, out); 1262 break; 1263 1264 case INSERT_GARBAGE_ON_FIRST_LINE: 1265 fs.delete(path, false); 1266 out = fs.create(path); 1267 out.write(0); 1268 out.write(corrupted_bytes); 1269 closeOrFlush(close, out); 1270 break; 1271 1272 case INSERT_GARBAGE_IN_THE_MIDDLE: 1273 fs.delete(path, false); 1274 out = fs.create(path); 1275 int middle = (int) Math.floor(corrupted_bytes.length / 2); 1276 out.write(corrupted_bytes, 0, middle); 1277 out.write(0); 1278 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); 1279 closeOrFlush(close, out); 1280 break; 1281 1282 case TRUNCATE: 1283 fs.delete(path, false); 1284 out = fs.create(path); 1285 out.write(corrupted_bytes, 0, 1286 fileSize - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); 1287 closeOrFlush(close, out); 1288 break; 1289 1290 case TRUNCATE_TRAILER: 1291 fs.delete(path, false); 1292 out = fs.create(path); 1293 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. 1294 closeOrFlush(close, out); 1295 break; 1296 } 1297 } 1298 1299 private void closeOrFlush(boolean close, FSDataOutputStream out) throws IOException { 1300 if (close) { 1301 out.close(); 1302 } else { 1303 Method syncMethod = null; 1304 try { 1305 syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {}); 1306 } catch (NoSuchMethodException e) { 1307 try { 1308 syncMethod = out.getClass().getMethod("sync", new Class<?>[] {}); 1309 } catch (NoSuchMethodException ex) { 1310 throw new IOException( 1311 "This version of Hadoop supports " + "neither Syncable.sync() nor Syncable.hflush()."); 1312 } 1313 } 1314 try { 1315 syncMethod.invoke(out, new Object[] {}); 1316 } catch (Exception e) { 1317 throw new IOException(e); 1318 } 1319 // Not in 0out.hflush(); 1320 } 1321 } 1322 1323 private int countWAL(Path log) throws IOException { 1324 int count = 0; 1325 Reader in = wals.createReader(fs, log); 1326 while (in.next() != null) { 1327 count++; 1328 } 1329 in.close(); 1330 return count; 1331 } 1332 1333 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs, 1334 String output) throws IOException { 1335 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder(); 1336 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes())) 1337 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes())) 1338 .setRegionName(ByteString.copyFrom(hri.getRegionName())) 1339 .setFamilyName(ByteString.copyFrom(FAMILY)) 1340 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY)) 1341 .addAllCompactionInput(Arrays.asList(inputs)).addCompactionOutput(output); 1342 1343 WALEdit edit = WALEdit.createCompaction(hri, desc.build()); 1344 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, 1345 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); 1346 w.append(new Entry(key, edit)); 1347 w.sync(false); 1348 } 1349 1350 private static void appendRegionEvent(Writer w, String region) throws IOException { 1351 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( 1352 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, TABLE_NAME.toBytes(), 1353 Bytes.toBytes(region), Bytes.toBytes(String.valueOf(region.hashCode())), 1, 1354 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>> of()); 1355 final long time = EnvironmentEdgeManager.currentTime(); 1356 final WALKeyImpl walKey = 1357 new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, HConstants.DEFAULT_CLUSTER_ID); 1358 WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc); 1359 w.append(new Entry(walKey, we)); 1360 w.sync(false); 1361 } 1362 1363 public static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row, 1364 byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException { 1365 LOG.info(Thread.currentThread().getName() + " append"); 1366 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); 1367 LOG.info(Thread.currentThread().getName() + " sync"); 1368 writer.sync(false); 1369 return seq; 1370 } 1371 1372 private static Entry createTestEntry(TableName table, byte[] region, byte[] row, byte[] family, 1373 byte[] qualifier, byte[] value, long seq) { 1374 long time = System.nanoTime(); 1375 1376 seq++; 1377 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); 1378 WALEdit edit = new WALEdit(); 1379 edit.add(cell); 1380 return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit); 1381 } 1382 1383 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { 1384 Writer writer = 1385 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); 1386 if (closeFile) { 1387 writer.close(); 1388 } 1389 } 1390 1391 private boolean logsAreEqual(Path p1, Path p2) throws IOException { 1392 Reader in1, in2; 1393 in1 = wals.createReader(fs, p1); 1394 in2 = wals.createReader(fs, p2); 1395 Entry entry1; 1396 Entry entry2; 1397 while ((entry1 = in1.next()) != null) { 1398 entry2 = in2.next(); 1399 if ( 1400 (entry1.getKey().compareTo(entry2.getKey()) != 0) 1401 || (!entry1.getEdit().toString().equals(entry2.getEdit().toString())) 1402 ) { 1403 return false; 1404 } 1405 } 1406 in1.close(); 1407 in2.close(); 1408 return true; 1409 } 1410}