001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; 021import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey; 022import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; 023import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028import static org.junit.Assume.assumeFalse; 029 030import java.io.FileNotFoundException; 031import java.io.IOException; 032import java.lang.reflect.Method; 033import java.security.PrivilegedExceptionAction; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collections; 037import java.util.HashMap; 038import java.util.HashSet; 039import java.util.List; 040import java.util.Map; 041import java.util.NavigableSet; 042import java.util.Objects; 043import java.util.Set; 044import java.util.concurrent.atomic.AtomicBoolean; 045import java.util.concurrent.atomic.AtomicInteger; 046import java.util.concurrent.atomic.AtomicLong; 047import java.util.stream.Collectors; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FSDataInputStream; 050import org.apache.hadoop.fs.FSDataOutputStream; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.FileUtil; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.fs.PathFilter; 056import org.apache.hadoop.hbase.Cell; 057import org.apache.hadoop.hbase.HBaseClassTestRule; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseTestingUtil; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.KeyValue; 062import org.apache.hadoop.hbase.ServerName; 063import org.apache.hadoop.hbase.TableName; 064import org.apache.hadoop.hbase.client.RegionInfo; 065import org.apache.hadoop.hbase.client.RegionInfoBuilder; 066import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 067import org.apache.hadoop.hbase.master.SplitLogManager; 068import org.apache.hadoop.hbase.regionserver.HRegion; 069import org.apache.hadoop.hbase.regionserver.LastSequenceId; 070import org.apache.hadoop.hbase.regionserver.RegionServerServices; 071import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader; 072import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader; 073import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; 074import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; 075import org.apache.hadoop.hbase.security.User; 076import org.apache.hadoop.hbase.testclassification.LargeTests; 077import org.apache.hadoop.hbase.testclassification.RegionServerTests; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.CancelableProgressable; 080import org.apache.hadoop.hbase.util.CommonFSUtils; 081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 082import org.apache.hadoop.hbase.util.Threads; 083import org.apache.hadoop.hbase.wal.WAL.Entry; 084import org.apache.hadoop.hbase.wal.WALProvider.Writer; 085import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; 086import org.apache.hadoop.hdfs.DFSTestUtil; 087import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 088import org.apache.hadoop.ipc.RemoteException; 089import org.junit.After; 090import org.junit.AfterClass; 091import org.junit.Before; 092import org.junit.BeforeClass; 093import org.junit.ClassRule; 094import org.junit.Rule; 095import org.junit.Test; 096import org.junit.experimental.categories.Category; 097import org.junit.rules.TestName; 098import org.mockito.Mockito; 099import org.mockito.invocation.InvocationOnMock; 100import org.mockito.stubbing.Answer; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103 104import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 105import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 106import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 107import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 108 109import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 112 113/** 114 * Testing {@link WAL} splitting code. 115 */ 116@Category({ RegionServerTests.class, LargeTests.class }) 117public class TestWALSplit { 118 @ClassRule 119 public static final HBaseClassTestRule CLASS_RULE = 120 HBaseClassTestRule.forClass(TestWALSplit.class); 121 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class); 122 123 private static Configuration conf; 124 private FileSystem fs; 125 126 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 127 128 private Path HBASEDIR; 129 private Path HBASELOGDIR; 130 private Path WALDIR; 131 private Path OLDLOGDIR; 132 private Path CORRUPTDIR; 133 private Path TABLEDIR; 134 private String TMPDIRNAME; 135 136 private static final int NUM_WRITERS = 10; 137 private static final int ENTRIES = 10; // entries per writer per region 138 139 private static final String FILENAME_BEING_SPLIT = "testfile"; 140 private static final TableName TABLE_NAME = TableName.valueOf("t1"); 141 private static final byte[] FAMILY = Bytes.toBytes("f1"); 142 private static final byte[] QUALIFIER = Bytes.toBytes("q1"); 143 private static final byte[] VALUE = Bytes.toBytes("v1"); 144 private static final String WAL_FILE_PREFIX = "wal.dat."; 145 private static List<String> REGIONS = new ArrayList<>(); 146 private static String ROBBER; 147 private static String ZOMBIE; 148 private static String[] GROUP = new String[] { "supergroup" }; 149 150 static enum Corruptions { 151 INSERT_GARBAGE_ON_FIRST_LINE, 152 INSERT_GARBAGE_IN_THE_MIDDLE, 153 APPEND_GARBAGE, 154 TRUNCATE, 155 TRUNCATE_TRAILER 156 } 157 158 @BeforeClass 159 public static void setUpBeforeClass() throws Exception { 160 conf = TEST_UTIL.getConfiguration(); 161 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 162 conf.setClass(FSHLogProvider.WRITER_IMPL, InstrumentedLogWriter.class, Writer.class); 163 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. 164 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 165 // Create fake maping user to group and set it to the conf. 166 Map<String, String[]> u2g_map = new HashMap<>(2); 167 ROBBER = User.getCurrent().getName() + "-robber"; 168 ZOMBIE = User.getCurrent().getName() + "-zombie"; 169 u2g_map.put(ROBBER, GROUP); 170 u2g_map.put(ZOMBIE, GROUP); 171 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); 172 conf.setInt("dfs.heartbeat.interval", 1); 173 TEST_UTIL.startMiniDFSCluster(2); 174 } 175 176 @AfterClass 177 public static void tearDownAfterClass() throws Exception { 178 TEST_UTIL.shutdownMiniDFSCluster(); 179 } 180 181 @Rule 182 public TestName name = new TestName(); 183 private WALFactory wals = null; 184 185 @Before 186 public void setUp() throws Exception { 187 LOG.info("Cleaning up cluster for new test."); 188 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 189 HBASEDIR = TEST_UTIL.createRootDir(); 190 HBASELOGDIR = TEST_UTIL.createWALRootDir(); 191 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME); 192 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME); 193 TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 194 TMPDIRNAME = 195 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 196 REGIONS.clear(); 197 Collections.addAll(REGIONS, "bbb", "ccc"); 198 InstrumentedLogWriter.activateFailure = false; 199 wals = new WALFactory(conf, name.getMethodName()); 200 WALDIR = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(ServerName 201 .valueOf(name.getMethodName(), 16010, EnvironmentEdgeManager.currentTime()).toString())); 202 // fs.mkdirs(WALDIR); 203 } 204 205 @After 206 public void tearDown() throws Exception { 207 try { 208 wals.close(); 209 } catch (IOException exception) { 210 // Some tests will move WALs out from under us. In those cases, we'll get an error on close. 211 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" 212 + " you see a failure look here."); 213 LOG.debug("exception details", exception); 214 } finally { 215 wals = null; 216 fs.delete(HBASEDIR, true); 217 fs.delete(HBASELOGDIR, true); 218 } 219 } 220 221 /** 222 * Simulates splitting a WAL out from under a regionserver that is still trying to write it. 223 * Ensures we do not lose edits. 224 */ 225 @Test 226 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { 227 final AtomicLong counter = new AtomicLong(0); 228 AtomicBoolean stop = new AtomicBoolean(false); 229 // Region we'll write edits too and then later examine to make sure they all made it in. 230 final String region = REGIONS.get(0); 231 final int numWriters = 3; 232 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters); 233 try { 234 long startCount = counter.get(); 235 zombie.start(); 236 // Wait till writer starts going. 237 while (startCount == counter.get()) 238 Threads.sleep(1); 239 // Give it a second to write a few appends. 240 Threads.sleep(1000); 241 final Configuration conf2 = HBaseConfiguration.create(conf); 242 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP); 243 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() { 244 @Override 245 public Integer run() throws Exception { 246 StringBuilder ls = 247 new StringBuilder("Contents of WALDIR (").append(WALDIR).append("):\n"); 248 for (FileStatus status : fs.listStatus(WALDIR)) { 249 ls.append("\t").append(status.toString()).append("\n"); 250 } 251 LOG.debug(Objects.toString(ls)); 252 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); 253 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); 254 LOG.info("Finished splitting out from under zombie."); 255 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 256 assertEquals("wrong number of split files for region", numWriters, logfiles.length); 257 int count = 0; 258 for (Path logfile : logfiles) { 259 count += countWAL(logfile); 260 } 261 return count; 262 } 263 }); 264 LOG.info("zombie=" + counter.get() + ", robber=" + count); 265 assertTrue( 266 "The log file could have at most 1 extra log entry, but can't have less. " 267 + "Zombie could write " + counter.get() + " and logfile had only " + count, 268 counter.get() == count || counter.get() + 1 == count); 269 } finally { 270 stop.set(true); 271 zombie.interrupt(); 272 Threads.threadDumpingIsAlive(zombie); 273 } 274 } 275 276 /** 277 * This thread will keep writing to a 'wal' file even after the split process has started. It 278 * simulates a region server that was considered dead but woke up and wrote some more to the last 279 * log entry. Does its writing as an alternate user in another filesystem instance to simulate 280 * better it being a regionserver. 281 */ 282 private class ZombieLastLogWriterRegionServer extends Thread { 283 final AtomicLong editsCount; 284 final AtomicBoolean stop; 285 final int numOfWriters; 286 /** 287 * Region to write edits for. 288 */ 289 final String region; 290 final User user; 291 292 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop, 293 final String region, final int writers) throws IOException, InterruptedException { 294 super("ZombieLastLogWriterRegionServer"); 295 setDaemon(true); 296 this.stop = stop; 297 this.editsCount = counter; 298 this.region = region; 299 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP); 300 numOfWriters = writers; 301 } 302 303 @Override 304 public void run() { 305 try { 306 doWriting(); 307 } catch (IOException e) { 308 LOG.warn(getName() + " Writer exiting " + e); 309 } catch (InterruptedException e) { 310 LOG.warn(getName() + " Writer exiting " + e); 311 } 312 } 313 314 private void doWriting() throws IOException, InterruptedException { 315 this.user.runAs(new PrivilegedExceptionAction<Object>() { 316 @Override 317 public Object run() throws Exception { 318 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose 319 // index we supply here. 320 int walToKeepOpen = numOfWriters - 1; 321 // The below method writes numOfWriters files each with ENTRIES entries for a total of 322 // numOfWriters * ENTRIES added per column family in the region. 323 Writer writer = null; 324 try { 325 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); 326 } catch (IOException e1) { 327 throw new RuntimeException("Failed", e1); 328 } 329 // Update counter so has all edits written so far. 330 editsCount.addAndGet(numOfWriters * ENTRIES); 331 loop(writer); 332 // If we've been interruped, then things should have shifted out from under us. 333 // closing should error 334 try { 335 writer.close(); 336 fail("Writing closing after parsing should give an error."); 337 } catch (IOException exception) { 338 LOG.debug("ignoring error when closing final writer.", exception); 339 } 340 return null; 341 } 342 }); 343 } 344 345 private void loop(final Writer writer) { 346 byte[] regionBytes = Bytes.toBytes(this.region); 347 while (!stop.get()) { 348 try { 349 long seq = appendEntry(writer, TABLE_NAME, regionBytes, 350 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0); 351 long count = editsCount.incrementAndGet(); 352 LOG.info(getName() + " sync count=" + count + ", seq=" + seq); 353 try { 354 Thread.sleep(1); 355 } catch (InterruptedException e) { 356 // 357 } 358 } catch (IOException ex) { 359 LOG.error(getName() + " ex " + ex.toString()); 360 if (ex instanceof RemoteException) { 361 LOG.error("Juliet: got RemoteException " + ex.getMessage() + " while writing " 362 + (editsCount.get() + 1)); 363 } else { 364 LOG.error(getName() + " failed to write....at " + editsCount.get()); 365 fail("Failed to write " + editsCount.get()); 366 } 367 break; 368 } catch (Throwable t) { 369 LOG.error(getName() + " HOW? " + t); 370 LOG.debug("exception details", t); 371 break; 372 } 373 } 374 LOG.info(getName() + " Writer exiting"); 375 } 376 } 377 378 // If another worker is assigned to split a WAl and last worker is still running, both should not 379 // impact each other's progress 380 @Test 381 public void testTwoWorkerSplittingSameWAL() throws IOException, InterruptedException { 382 int numWriter = 1, entries = 10; 383 generateWALs(numWriter, entries, -1, 0); 384 FileStatus logfile = fs.listStatus(WALDIR)[0]; 385 FileSystem spiedFs = Mockito.spy(fs); 386 RegionServerServices zombieRSServices = Mockito.mock(RegionServerServices.class); 387 RegionServerServices newWorkerRSServices = Mockito.mock(RegionServerServices.class); 388 Mockito.when(zombieRSServices.getServerName()) 389 .thenReturn(ServerName.valueOf("zombie-rs.abc.com,1234,1234567890")); 390 Mockito.when(newWorkerRSServices.getServerName()) 391 .thenReturn(ServerName.valueOf("worker-rs.abc.com,1234,1234569870")); 392 Thread zombieWorker = new SplitWALWorker(logfile, spiedFs, zombieRSServices); 393 Thread newWorker = new SplitWALWorker(logfile, spiedFs, newWorkerRSServices); 394 zombieWorker.start(); 395 newWorker.start(); 396 newWorker.join(); 397 zombieWorker.join(); 398 399 for (String region : REGIONS) { 400 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 401 assertEquals("wrong number of split files for region", numWriter, logfiles.length); 402 403 int count = 0; 404 for (Path lf : logfiles) { 405 count += countWAL(lf); 406 } 407 assertEquals("wrong number of edits for region " + region, entries, count); 408 } 409 } 410 411 private class SplitWALWorker extends Thread implements LastSequenceId { 412 final FileStatus logfile; 413 final FileSystem fs; 414 final RegionServerServices rsServices; 415 416 public SplitWALWorker(FileStatus logfile, FileSystem fs, RegionServerServices rsServices) { 417 super(rsServices.getServerName().toShortString()); 418 setDaemon(true); 419 this.fs = fs; 420 this.logfile = logfile; 421 this.rsServices = rsServices; 422 } 423 424 @Override 425 public void run() { 426 try { 427 boolean ret = 428 WALSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, null, this, null, wals, rsServices); 429 assertTrue("Both splitting should pass", ret); 430 } catch (IOException e) { 431 LOG.warn(getName() + " Worker exiting " + e); 432 } 433 } 434 435 @Override 436 public ClusterStatusProtos.RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 437 return ClusterStatusProtos.RegionStoreSequenceIds.newBuilder() 438 .setLastFlushedSequenceId(HConstants.NO_SEQNUM).build(); 439 } 440 } 441 442 /** 443 * @see "https://issues.apache.org/jira/browse/HBASE-3020" 444 */ 445 @Test 446 public void testRecoveredEditsPathForMeta() throws IOException { 447 Path p = createRecoveredEditsPathForRegion(); 448 String parentOfParent = p.getParent().getParent().getName(); 449 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 450 } 451 452 /** 453 * Test old recovered edits file doesn't break WALSplitter. This is useful in upgrading old 454 * instances. 455 */ 456 @Test 457 public void testOldRecoveredEditsFileSidelined() throws IOException { 458 Path p = createRecoveredEditsPathForRegion(); 459 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); 460 Path regiondir = new Path(tdir, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 461 fs.mkdirs(regiondir); 462 Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 463 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); 464 fs.createNewFile(parent); // create a recovered.edits file 465 String parentOfParent = p.getParent().getParent().getName(); 466 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 467 WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); 468 } 469 470 private Path createRecoveredEditsPathForRegion() throws IOException { 471 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); 472 Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, 473 FILENAME_BEING_SPLIT, TMPDIRNAME, conf, ""); 474 return p; 475 } 476 477 @Test 478 public void testHasRecoveredEdits() throws IOException { 479 Path p = createRecoveredEditsPathForRegion(); 480 assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 481 String renamedEdit = p.getName().split("-")[0]; 482 fs.createNewFile(new Path(p.getParent(), renamedEdit)); 483 assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 484 } 485 486 private void useDifferentDFSClient() throws IOException { 487 // make fs act as a different client now 488 // initialize will create a new DFSClient with a new client ID 489 fs.initialize(fs.getUri(), conf); 490 } 491 492 @Test 493 public void testSplitPreservesEdits() throws IOException { 494 final String REGION = "region__1"; 495 REGIONS.clear(); 496 REGIONS.add(REGION); 497 498 generateWALs(1, 10, -1, 0); 499 useDifferentDFSClient(); 500 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 501 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 502 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 503 assertEquals(1, splitLog.length); 504 505 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 506 } 507 508 @Test 509 public void testSplitRemovesRegionEventsEdits() throws IOException { 510 final String REGION = "region__1"; 511 REGIONS.clear(); 512 REGIONS.add(REGION); 513 514 generateWALs(1, 10, -1, 100); 515 useDifferentDFSClient(); 516 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 517 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 518 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 519 assertEquals(1, splitLog.length); 520 521 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 522 // split log should only have the test edits 523 assertEquals(10, countWAL(splitLog[0])); 524 } 525 526 @Test 527 public void testSplitLeavesCompactionEventsEdits() throws IOException { 528 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 529 REGIONS.clear(); 530 REGIONS.add(hri.getEncodedName()); 531 Path regionDir = 532 new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName()); 533 LOG.info("Creating region directory: " + regionDir); 534 assertTrue(fs.mkdirs(regionDir)); 535 536 Writer writer = generateWALs(1, 10, 0, 10); 537 String[] compactInputs = new String[] { "file1", "file2", "file3" }; 538 String compactOutput = "file4"; 539 appendCompactionEvent(writer, hri, compactInputs, compactOutput); 540 writer.close(); 541 542 useDifferentDFSClient(); 543 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 544 545 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 546 // original log should have 10 test edits, 10 region markers, 1 compaction marker 547 assertEquals(21, countWAL(originalLog)); 548 549 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); 550 assertEquals(1, splitLog.length); 551 552 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); 553 // split log should have 10 test edits plus 1 compaction marker 554 assertEquals(11, countWAL(splitLog[0])); 555 } 556 557 /** 558 * Tests that WalSplitter ignores replication marker edits. 559 */ 560 @Test 561 public void testSplitRemovesReplicationMarkerEdits() throws IOException { 562 RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO; 563 Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1"); 564 generateReplicationMarkerEdits(path, regionInfo); 565 useDifferentDFSClient(); 566 List<FileStatus> logFiles = 567 SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null); 568 assertEquals(1, logFiles.size()); 569 assertEquals(path, logFiles.get(0).getPath()); 570 List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 571 // Make sure that WALSplitter doesn't fail. 572 assertEquals(0, splitPaths.size()); 573 } 574 575 private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException { 576 long timestamp = EnvironmentEdgeManager.currentTime(); 577 fs.mkdirs(WALDIR); 578 try (Writer writer = wals.createWALWriter(fs, path)) { 579 WALProtos.ReplicationMarkerDescriptor.Builder builder = 580 WALProtos.ReplicationMarkerDescriptor.newBuilder(); 581 builder.setWalName("wal-name"); 582 builder.setRegionServerName("rs-name"); 583 builder.setOffset(0L); 584 WALProtos.ReplicationMarkerDescriptor desc = builder.build(); 585 appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(), 586 getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1); 587 } 588 } 589 590 /** 591 * @param expectedEntries -1 to not assert 592 * @return the count across all regions 593 */ 594 private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException { 595 useDifferentDFSClient(); 596 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 597 int result = 0; 598 for (String region : REGIONS) { 599 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 600 assertEquals(expectedFiles, logfiles.length); 601 int count = 0; 602 for (Path logfile : logfiles) { 603 count += countWAL(logfile); 604 } 605 if (-1 != expectedEntries) { 606 assertEquals(expectedEntries, count); 607 } 608 result += count; 609 } 610 return result; 611 } 612 613 @Test 614 public void testEmptyLogFiles() throws IOException { 615 testEmptyLogFiles(true); 616 } 617 618 @Test 619 public void testEmptyOpenLogFiles() throws IOException { 620 testEmptyLogFiles(false); 621 } 622 623 private void testEmptyLogFiles(final boolean close) throws IOException { 624 // we won't create the hlog dir until getWAL got called, so 625 // make dir here when testing empty log file 626 fs.mkdirs(WALDIR); 627 injectEmptyFile(".empty", close); 628 generateWALs(Integer.MAX_VALUE); 629 injectEmptyFile("empty", close); 630 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty 631 } 632 633 @Test 634 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { 635 // generate logs but leave wal.dat.5 open. 636 generateWALs(5); 637 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 638 } 639 640 @Test 641 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { 642 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 643 generateWALs(Integer.MAX_VALUE); 644 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.APPEND_GARBAGE, true); 645 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 646 } 647 648 @Test 649 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { 650 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 651 generateWALs(Integer.MAX_VALUE); 652 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, 653 true); 654 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); // 1 corrupt 655 } 656 657 @Test 658 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { 659 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 660 generateWALs(Integer.MAX_VALUE); 661 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, 662 false); 663 // the entries in the original logs are alternating regions 664 // considering the sequence file header, the middle corruption should 665 // affect at least half of the entries 666 int goodEntries = (NUM_WRITERS - 1) * ENTRIES; 667 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; 668 int allRegionsCount = splitAndCount(NUM_WRITERS, -1); 669 assertTrue("The file up to the corrupted area hasn't been parsed", 670 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount); 671 } 672 673 @Test 674 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { 675 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 676 List<FaultyProtobufWALStreamReader.FailureType> failureTypes = 677 Arrays.asList(FaultyProtobufWALStreamReader.FailureType.values()).stream() 678 .filter(x -> x != FaultyProtobufWALStreamReader.FailureType.NONE) 679 .collect(Collectors.toList()); 680 for (FaultyProtobufWALStreamReader.FailureType failureType : failureTypes) { 681 final Set<String> walDirContents = splitCorruptWALs(failureType); 682 final Set<String> archivedLogs = new HashSet<>(); 683 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); 684 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 685 archived.append("\n\t").append(log.toString()); 686 archivedLogs.add(log.getPath().getName()); 687 } 688 LOG.debug(archived.toString()); 689 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs, 690 walDirContents); 691 } 692 } 693 694 /** 695 * @return set of wal names present prior to split attempt. 696 * @throws IOException if the split process fails 697 */ 698 private Set<String> splitCorruptWALs(final FaultyProtobufWALStreamReader.FailureType failureType) 699 throws IOException { 700 String backupClass = conf.get(WALFactory.WAL_STREAM_READER_CLASS_IMPL); 701 InstrumentedLogWriter.activateFailure = false; 702 703 try { 704 conf.setClass(WALFactory.WAL_STREAM_READER_CLASS_IMPL, FaultyProtobufWALStreamReader.class, 705 WALStreamReader.class); 706 conf.set("faultyprotobuflogreader.failuretype", failureType.name()); 707 // Clean up from previous tests or previous loop 708 try { 709 wals.shutdown(); 710 } catch (IOException exception) { 711 // since we're splitting out from under the factory, we should expect some closing failures. 712 LOG.debug("Ignoring problem closing WALFactory.", exception); 713 } 714 wals.close(); 715 try { 716 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 717 fs.delete(log.getPath(), true); 718 } 719 } catch (FileNotFoundException exception) { 720 LOG.debug("no previous CORRUPTDIR to clean."); 721 } 722 // change to the faulty reader 723 wals = new WALFactory(conf, name.getMethodName()); 724 generateWALs(-1); 725 // Our reader will render all of these files corrupt. 726 final Set<String> walDirContents = new HashSet<>(); 727 for (FileStatus status : fs.listStatus(WALDIR)) { 728 walDirContents.add(status.getPath().getName()); 729 } 730 useDifferentDFSClient(); 731 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 732 return walDirContents; 733 } finally { 734 if (backupClass != null) { 735 conf.set(WALFactory.WAL_STREAM_READER_CLASS_IMPL, backupClass); 736 } else { 737 conf.unset(WALFactory.WAL_STREAM_READER_CLASS_IMPL); 738 } 739 } 740 } 741 742 @Test(expected = IOException.class) 743 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { 744 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 745 splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING); 746 } 747 748 @Test 749 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { 750 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 751 try { 752 splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING); 753 } catch (IOException e) { 754 LOG.debug("split with 'skip errors' set to 'false' correctly threw"); 755 } 756 assertEquals("if skip.errors is false all files should remain in place", NUM_WRITERS, 757 fs.listStatus(WALDIR).length); 758 } 759 760 private void ignoreCorruption(final Corruptions corruption, final int entryCount, 761 final int expectedCount) throws IOException { 762 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 763 764 final String REGION = "region__1"; 765 REGIONS.clear(); 766 REGIONS.add(REGION); 767 768 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); 769 generateWALs(1, entryCount, -1, 0); 770 corruptWAL(c1, corruption, true); 771 772 useDifferentDFSClient(); 773 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 774 775 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 776 assertEquals(1, splitLog.length); 777 778 int actualCount = 0; 779 try (WALStreamReader in = wals.createStreamReader(fs, splitLog[0])) { 780 while (in.next() != null) { 781 ++actualCount; 782 } 783 } 784 assertEquals(expectedCount, actualCount); 785 786 // should not have stored the EOF files as corrupt 787 FileStatus[] archivedLogs = 788 fs.exists(CORRUPTDIR) ? fs.listStatus(CORRUPTDIR) : new FileStatus[0]; 789 assertEquals(0, archivedLogs.length); 790 791 } 792 793 @Test 794 public void testEOFisIgnored() throws IOException { 795 int entryCount = 10; 796 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount - 1); 797 } 798 799 @Test 800 public void testCorruptWALTrailer() throws IOException { 801 int entryCount = 10; 802 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount); 803 } 804 805 @Test 806 public void testLogsGetArchivedAfterSplit() throws IOException { 807 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 808 generateWALs(-1); 809 useDifferentDFSClient(); 810 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 811 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); 812 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); 813 } 814 815 @Test 816 public void testSplit() throws IOException { 817 generateWALs(-1); 818 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 819 } 820 821 @Test 822 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException { 823 generateWALs(-1); 824 useDifferentDFSClient(); 825 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 826 FileStatus[] statuses = null; 827 try { 828 statuses = fs.listStatus(WALDIR); 829 if (statuses != null) { 830 fail("Files left in log dir: " + Joiner.on(",").join(FileUtil.stat2Paths(statuses))); 831 } 832 } catch (FileNotFoundException e) { 833 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null 834 } 835 } 836 837 @Test(expected = IOException.class) 838 public void testSplitWillFailIfWritingToRegionFails() throws Exception { 839 // leave 5th log open so we could append the "trap" 840 Writer writer = generateWALs(4); 841 useDifferentDFSClient(); 842 843 String region = "break"; 844 Path regiondir = new Path(TABLEDIR, region); 845 fs.mkdirs(regiondir); 846 847 InstrumentedLogWriter.activateFailure = false; 848 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes("r" + 999), FAMILY, 849 QUALIFIER, VALUE, 0); 850 writer.close(); 851 852 try { 853 InstrumentedLogWriter.activateFailure = true; 854 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 855 } catch (IOException e) { 856 assertTrue(e.getMessage() 857 .contains("This exception is instrumented and should only be thrown for testing")); 858 throw e; 859 } finally { 860 InstrumentedLogWriter.activateFailure = false; 861 } 862 } 863 864 @Test 865 public void testSplitDeletedRegion() throws IOException { 866 REGIONS.clear(); 867 String region = "region_that_splits"; 868 REGIONS.add(region); 869 870 generateWALs(1); 871 useDifferentDFSClient(); 872 873 Path regiondir = new Path(TABLEDIR, region); 874 fs.delete(regiondir, true); 875 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 876 assertFalse(fs.exists(regiondir)); 877 } 878 879 @Test 880 public void testIOEOnOutputThread() throws Exception { 881 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 882 883 generateWALs(-1); 884 useDifferentDFSClient(); 885 FileStatus[] logfiles = fs.listStatus(WALDIR); 886 assertTrue("There should be some log file", logfiles != null && logfiles.length > 0); 887 // wals with no entries (like the one we don't use in the factory) 888 // won't cause a failure since nothing will ever be written. 889 // pick the largest one since it's most likely to have entries. 890 int largestLogFile = 0; 891 long largestSize = 0; 892 for (int i = 0; i < logfiles.length; i++) { 893 if (logfiles[i].getLen() > largestSize) { 894 largestLogFile = i; 895 largestSize = logfiles[i].getLen(); 896 } 897 } 898 assertTrue("There should be some log greater than size 0.", 0 < largestSize); 899 // Set up a splitter that will throw an IOE on the output side 900 WALSplitter logSplitter = 901 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 902 @Override 903 protected Writer createWriter(Path logfile) throws IOException { 904 Writer mockWriter = Mockito.mock(Writer.class); 905 Mockito.doThrow(new IOException("Injected")).when(mockWriter) 906 .append(Mockito.<Entry> any()); 907 return mockWriter; 908 } 909 }; 910 // Set up a background thread dumper. Needs a thread to depend on and then we need to run 911 // the thread dumping in a background thread so it does not hold up the test. 912 final AtomicBoolean stop = new AtomicBoolean(false); 913 final Thread someOldThread = new Thread("Some-old-thread") { 914 @Override 915 public void run() { 916 while (!stop.get()) 917 Threads.sleep(10); 918 } 919 }; 920 someOldThread.setDaemon(true); 921 someOldThread.start(); 922 final Thread t = new Thread("Background-thread-dumper") { 923 @Override 924 public void run() { 925 try { 926 Threads.threadDumpingIsAlive(someOldThread); 927 } catch (InterruptedException e) { 928 e.printStackTrace(); 929 } 930 } 931 }; 932 t.setDaemon(true); 933 t.start(); 934 try { 935 logSplitter.splitWAL(logfiles[largestLogFile], null); 936 fail("Didn't throw!"); 937 } catch (IOException ioe) { 938 assertTrue(ioe.toString().contains("Injected")); 939 } finally { 940 // Setting this to true will turn off the background thread dumper. 941 stop.set(true); 942 } 943 } 944 945 /** 946 * @param spiedFs should be instrumented for failure. 947 */ 948 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { 949 generateWALs(-1); 950 useDifferentDFSClient(); 951 952 try { 953 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); 954 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); 955 assertFalse(fs.exists(WALDIR)); 956 } catch (IOException e) { 957 fail("There shouldn't be any exception but: " + e.toString()); 958 } 959 } 960 961 // Test for HBASE-3412 962 @Test 963 public void testMovedWALDuringRecovery() throws Exception { 964 // This partial mock will throw LEE for every file simulating 965 // files that were moved 966 FileSystem spiedFs = Mockito.spy(fs); 967 // The "File does not exist" part is very important, 968 // that's how it comes out of HDFS 969 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).when(spiedFs) 970 .append(Mockito.<Path> any()); 971 retryOverHdfsProblem(spiedFs); 972 } 973 974 @Test 975 public void testRetryOpenDuringRecovery() throws Exception { 976 FileSystem spiedFs = Mockito.spy(fs); 977 // The "Cannot obtain block length", "Could not obtain the last block", 978 // and "Blocklist for [^ ]* has changed.*" part is very important, 979 // that's how it comes out of HDFS. If HDFS changes the exception 980 // message, this test needs to be adjusted accordingly. 981 // 982 // When DFSClient tries to open a file, HDFS needs to locate 983 // the last block of the file and get its length. However, if the 984 // last block is under recovery, HDFS may have problem to obtain 985 // the block length, in which case, retry may help. 986 Mockito.doAnswer(new Answer<FSDataInputStream>() { 987 private final String[] errors = new String[] { "Cannot obtain block length", 988 "Could not obtain the last block", "Blocklist for " + OLDLOGDIR + " has changed" }; 989 private int count = 0; 990 991 @Override 992 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 993 if (count < 3) { 994 throw new IOException(errors[count++]); 995 } 996 return (FSDataInputStream) invocation.callRealMethod(); 997 } 998 }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt()); 999 retryOverHdfsProblem(spiedFs); 1000 } 1001 1002 @Test 1003 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { 1004 generateWALs(1, 10, -1); 1005 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1006 useDifferentDFSClient(); 1007 1008 final AtomicInteger count = new AtomicInteger(); 1009 1010 CancelableProgressable localReporter = new CancelableProgressable() { 1011 @Override 1012 public boolean progress() { 1013 count.getAndIncrement(); 1014 return false; 1015 } 1016 }; 1017 1018 FileSystem spiedFs = Mockito.spy(fs); 1019 Mockito.doAnswer(new Answer<FSDataInputStream>() { 1020 @Override 1021 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 1022 Thread.sleep(1500); // Sleep a while and wait report status invoked 1023 return (FSDataInputStream) invocation.callRealMethod(); 1024 } 1025 }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt()); 1026 1027 try { 1028 conf.setInt("hbase.splitlog.report.period", 1000); 1029 boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null, 1030 Mockito.mock(SplitLogWorkerCoordination.class), wals, null); 1031 assertFalse("Log splitting should failed", ret); 1032 assertTrue(count.get() > 0); 1033 } catch (IOException e) { 1034 fail("There shouldn't be any exception but: " + e.toString()); 1035 } finally { 1036 // reset it back to its default value 1037 conf.setInt("hbase.splitlog.report.period", 59000); 1038 } 1039 } 1040 1041 /** 1042 * Test log split process with fake data and lots of edits to trigger threading issues. 1043 */ 1044 @Test 1045 public void testThreading() throws Exception { 1046 doTestThreading(20000, 128 * 1024 * 1024, 0); 1047 } 1048 1049 /** 1050 * Test blocking behavior of the log split process if writers are writing slower than the reader 1051 * is reading. 1052 */ 1053 @Test 1054 public void testThreadingSlowWriterSmallBuffer() throws Exception { 1055 // The logic of this test has conflict with the limit writers split logic, skip this test for 1056 // TestWALSplitBoundedLogWriterCreation 1057 assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation); 1058 doTestThreading(200, 1024, 50); 1059 } 1060 1061 /** 1062 * Sets up a log splitter with a mock reader and writer. The mock reader generates a specified 1063 * number of edits spread across 5 regions. The mock writer optionally sleeps for each edit it is 1064 * fed. After the split is complete, verifies that the statistics show the correct number of edits 1065 * output into each region. 1066 * @param numFakeEdits number of fake edits to push through pipeline 1067 * @param bufferSize size of in-memory buffer 1068 * @param writerSlowness writer threads will sleep this many ms per edit 1069 */ 1070 private void doTestThreading(final int numFakeEdits, final int bufferSize, 1071 final int writerSlowness) throws Exception { 1072 1073 Configuration localConf = new Configuration(conf); 1074 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); 1075 1076 // Create a fake log file (we'll override the reader to produce a stream of edits) 1077 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake"); 1078 FSDataOutputStream out = fs.create(logPath); 1079 out.close(); 1080 1081 // Make region dirs for our destination regions so the output doesn't get skipped 1082 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 1083 makeRegionDirs(regions); 1084 1085 // Create a splitter that reads and writes the data without touching disk 1086 WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) { 1087 /* Produce a mock writer that doesn't write anywhere */ 1088 @Override 1089 protected Writer createWriter(Path logfile) throws IOException { 1090 Writer mockWriter = Mockito.mock(Writer.class); 1091 Mockito.doAnswer(new Answer<Void>() { 1092 int expectedIndex = 0; 1093 1094 @Override 1095 public Void answer(InvocationOnMock invocation) { 1096 if (writerSlowness > 0) { 1097 try { 1098 Thread.sleep(writerSlowness); 1099 } catch (InterruptedException ie) { 1100 Thread.currentThread().interrupt(); 1101 } 1102 } 1103 Entry entry = (Entry) invocation.getArgument(0); 1104 WALEdit edit = entry.getEdit(); 1105 List<Cell> cells = edit.getCells(); 1106 assertEquals(1, cells.size()); 1107 Cell cell = cells.get(0); 1108 1109 // Check that the edits come in the right order. 1110 assertEquals(expectedIndex, 1111 Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 1112 expectedIndex++; 1113 return null; 1114 } 1115 }).when(mockWriter).append(Mockito.<Entry> any()); 1116 return mockWriter; 1117 } 1118 1119 /* Produce a mock reader that generates fake entries */ 1120 @Override 1121 protected WALStreamReader getReader(FileStatus file, boolean skipErrors, 1122 CancelableProgressable reporter) throws IOException, CorruptedLogFileException { 1123 WALStreamReader mockReader = Mockito.mock(WALStreamReader.class); 1124 Mockito.doAnswer(new Answer<Entry>() { 1125 int index = 0; 1126 1127 @Override 1128 public Entry answer(InvocationOnMock invocation) throws Throwable { 1129 if (index >= numFakeEdits) { 1130 return null; 1131 } 1132 1133 // Generate r0 through r4 in round robin fashion 1134 int regionIdx = index % regions.size(); 1135 byte region[] = new byte[] { (byte) 'r', (byte) (0x30 + regionIdx) }; 1136 1137 Entry ret = createTestEntry(TABLE_NAME, region, Bytes.toBytes(index / regions.size()), 1138 FAMILY, QUALIFIER, VALUE, index); 1139 index++; 1140 return ret; 1141 } 1142 }).when(mockReader).next(); 1143 return mockReader; 1144 } 1145 }; 1146 1147 logSplitter.splitWAL(fs.getFileStatus(logPath), null); 1148 1149 // Verify number of written edits per region 1150 Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts(); 1151 for (Map.Entry<String, Long> entry : outputCounts.entrySet()) { 1152 LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey()); 1153 assertEquals((long) entry.getValue(), numFakeEdits / regions.size()); 1154 } 1155 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size()); 1156 } 1157 1158 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests? 1159 @Test 1160 public void testSplitLogFileDeletedRegionDir() throws IOException { 1161 LOG.info("testSplitLogFileDeletedRegionDir"); 1162 final String REGION = "region__1"; 1163 REGIONS.clear(); 1164 REGIONS.add(REGION); 1165 1166 generateWALs(1, 10, -1); 1167 useDifferentDFSClient(); 1168 1169 Path regiondir = new Path(TABLEDIR, REGION); 1170 LOG.info("Region directory is" + regiondir); 1171 fs.delete(regiondir, true); 1172 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1173 assertFalse(fs.exists(regiondir)); 1174 } 1175 1176 @Test 1177 public void testSplitLogFileEmpty() throws IOException { 1178 LOG.info("testSplitLogFileEmpty"); 1179 // we won't create the hlog dir until getWAL got called, so 1180 // make dir here when testing empty log file 1181 fs.mkdirs(WALDIR); 1182 injectEmptyFile(".empty", true); 1183 useDifferentDFSClient(); 1184 1185 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1186 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 1187 assertFalse(fs.exists(tdir)); 1188 1189 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath())); 1190 } 1191 1192 @Test 1193 public void testSplitLogFileMultipleRegions() throws IOException { 1194 LOG.info("testSplitLogFileMultipleRegions"); 1195 generateWALs(1, 10, -1); 1196 splitAndCount(1, 10); 1197 } 1198 1199 @Test 1200 public void testSplitLogFileFirstLineCorruptionLog() throws IOException { 1201 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 1202 generateWALs(1, 10, -1); 1203 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1204 1205 corruptWAL(logfile.getPath(), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 1206 1207 useDifferentDFSClient(); 1208 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1209 1210 final Path corruptDir = 1211 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 1212 assertEquals(1, fs.listStatus(corruptDir).length); 1213 } 1214 1215 /** 1216 * @see "https://issues.apache.org/jira/browse/HBASE-4862" 1217 */ 1218 @Test 1219 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { 1220 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); 1221 // Generate wals for our destination region 1222 String regionName = "r0"; 1223 final Path regiondir = new Path(TABLEDIR, regionName); 1224 REGIONS.clear(); 1225 REGIONS.add(regionName); 1226 generateWALs(-1); 1227 1228 wals.getWAL(null); 1229 FileStatus[] logfiles = fs.listStatus(WALDIR); 1230 assertTrue("There should be some log file", logfiles != null && logfiles.length > 0); 1231 1232 WALSplitter logSplitter = 1233 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 1234 @Override 1235 protected Writer createWriter(Path logfile) throws IOException { 1236 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); 1237 // After creating writer, simulate region's 1238 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this 1239 // region and delete them, excluding files with '.temp' suffix. 1240 NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir); 1241 if (files != null && !files.isEmpty()) { 1242 for (Path file : files) { 1243 if (!this.walFS.delete(file, false)) { 1244 LOG.error("Failed delete of " + file); 1245 } else { 1246 LOG.debug("Deleted recovered.edits file=" + file); 1247 } 1248 } 1249 } 1250 return writer; 1251 } 1252 }; 1253 try { 1254 logSplitter.splitWAL(logfiles[0], null); 1255 } catch (IOException e) { 1256 LOG.info(e.toString(), e); 1257 fail("Throws IOException when spliting " 1258 + "log, it is most likely because writing file does not " 1259 + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); 1260 } 1261 if (fs.exists(CORRUPTDIR)) { 1262 if (fs.listStatus(CORRUPTDIR).length > 0) { 1263 fail("There are some corrupt logs, " 1264 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); 1265 } 1266 } 1267 } 1268 1269 @Test 1270 public void testRecoveredEditsStoragePolicy() throws IOException { 1271 conf.set(HConstants.WAL_STORAGE_POLICY, "ALL_SSD"); 1272 try { 1273 Path path = createRecoveredEditsPathForRegion(); 1274 assertEquals("ALL_SSD", fs.getStoragePolicy(path.getParent()).getName()); 1275 } finally { 1276 conf.unset(HConstants.WAL_STORAGE_POLICY); 1277 } 1278 } 1279 1280 /** 1281 * See HBASE-27644, typically we should not have empty WALEdit but we should be able to process 1282 * it, instead of losing data after it. 1283 */ 1284 @Test 1285 public void testEmptyWALEdit() throws IOException { 1286 final String region = "region__5"; 1287 REGIONS.clear(); 1288 REGIONS.add(region); 1289 makeRegionDirs(REGIONS); 1290 fs.mkdirs(WALDIR); 1291 Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5); 1292 generateEmptyEditWAL(path, Bytes.toBytes(region)); 1293 useDifferentDFSClient(); 1294 1295 Path regiondir = new Path(TABLEDIR, region); 1296 fs.mkdirs(regiondir); 1297 List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1298 // Make sure that WALSplitter generate the split file 1299 assertEquals(1, splitPaths.size()); 1300 1301 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 1302 assertEquals(11, countWAL(originalLog)); 1303 // we will skip the empty WAL when splitting 1304 assertEquals(10, countWAL(splitPaths.get(0))); 1305 } 1306 1307 private void generateEmptyEditWAL(Path path, byte[] region) throws IOException { 1308 fs.mkdirs(WALDIR); 1309 try (Writer writer = wals.createWALWriter(fs, path)) { 1310 long seq = 0; 1311 appendEmptyEntry(writer, TABLE_NAME, region, seq++); 1312 for (int i = 0; i < 10; i++) { 1313 appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++); 1314 } 1315 } 1316 } 1317 1318 private Writer generateWALs(int leaveOpen) throws IOException { 1319 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0); 1320 } 1321 1322 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { 1323 return generateWALs(writers, entries, leaveOpen, 7); 1324 } 1325 1326 private void makeRegionDirs(List<String> regions) throws IOException { 1327 for (String region : regions) { 1328 LOG.debug("Creating dir for region " + region); 1329 fs.mkdirs(new Path(TABLEDIR, region)); 1330 } 1331 } 1332 1333 /** 1334 * @param leaveOpen index to leave un-closed. -1 to close all. 1335 * @return the writer that's still open, or null if all were closed. 1336 */ 1337 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) 1338 throws IOException { 1339 makeRegionDirs(REGIONS); 1340 fs.mkdirs(WALDIR); 1341 Writer[] ws = new Writer[writers]; 1342 int seq = 0; 1343 int numRegionEventsAdded = 0; 1344 for (int i = 0; i < writers; i++) { 1345 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); 1346 for (int j = 0; j < entries; j++) { 1347 int prefix = 0; 1348 for (String region : REGIONS) { 1349 String row_key = region + prefix++ + i + j; 1350 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY, 1351 QUALIFIER, VALUE, seq++); 1352 1353 if (numRegionEventsAdded < regionEvents) { 1354 numRegionEventsAdded++; 1355 appendRegionEvent(ws[i], region); 1356 } 1357 } 1358 } 1359 if (i != leaveOpen) { 1360 ws[i].close(); 1361 LOG.info("Closing writer " + i); 1362 } 1363 } 1364 if (leaveOpen < 0 || leaveOpen >= writers) { 1365 return null; 1366 } 1367 return ws[leaveOpen]; 1368 } 1369 1370 private Path[] getLogForRegion(TableName table, String region) throws IOException { 1371 Path tdir = CommonFSUtils.getWALTableDir(conf, table); 1372 @SuppressWarnings("deprecation") 1373 Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir( 1374 HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region)))); 1375 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 1376 @Override 1377 public boolean accept(Path p) { 1378 if (WALSplitUtil.isSequenceIdFile(p)) { 1379 return false; 1380 } 1381 return true; 1382 } 1383 }); 1384 Path[] paths = new Path[files.length]; 1385 for (int i = 0; i < files.length; i++) { 1386 paths[i] = files[i].getPath(); 1387 } 1388 return paths; 1389 } 1390 1391 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException { 1392 FSDataOutputStream out; 1393 int fileSize = (int) fs.listStatus(path)[0].getLen(); 1394 1395 FSDataInputStream in = fs.open(path); 1396 byte[] corrupted_bytes = new byte[fileSize]; 1397 in.readFully(0, corrupted_bytes, 0, fileSize); 1398 in.close(); 1399 1400 switch (corruption) { 1401 case APPEND_GARBAGE: 1402 fs.delete(path, false); 1403 out = fs.create(path); 1404 out.write(corrupted_bytes); 1405 out.write(Bytes.toBytes("-----")); 1406 closeOrFlush(close, out); 1407 break; 1408 1409 case INSERT_GARBAGE_ON_FIRST_LINE: 1410 fs.delete(path, false); 1411 out = fs.create(path); 1412 out.write(0); 1413 out.write(corrupted_bytes); 1414 closeOrFlush(close, out); 1415 break; 1416 1417 case INSERT_GARBAGE_IN_THE_MIDDLE: 1418 fs.delete(path, false); 1419 out = fs.create(path); 1420 int middle = (int) Math.floor(corrupted_bytes.length / 2); 1421 out.write(corrupted_bytes, 0, middle); 1422 out.write(0); 1423 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); 1424 closeOrFlush(close, out); 1425 break; 1426 1427 case TRUNCATE: 1428 fs.delete(path, false); 1429 out = fs.create(path); 1430 out.write(corrupted_bytes, 0, fileSize 1431 - (32 + AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); 1432 closeOrFlush(close, out); 1433 break; 1434 1435 case TRUNCATE_TRAILER: 1436 fs.delete(path, false); 1437 out = fs.create(path); 1438 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. 1439 closeOrFlush(close, out); 1440 break; 1441 } 1442 } 1443 1444 private void closeOrFlush(boolean close, FSDataOutputStream out) throws IOException { 1445 if (close) { 1446 out.close(); 1447 } else { 1448 Method syncMethod = null; 1449 try { 1450 syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {}); 1451 } catch (NoSuchMethodException e) { 1452 try { 1453 syncMethod = out.getClass().getMethod("sync", new Class<?>[] {}); 1454 } catch (NoSuchMethodException ex) { 1455 throw new IOException( 1456 "This version of Hadoop supports " + "neither Syncable.sync() nor Syncable.hflush()."); 1457 } 1458 } 1459 try { 1460 syncMethod.invoke(out, new Object[] {}); 1461 } catch (Exception e) { 1462 throw new IOException(e); 1463 } 1464 // Not in 0out.hflush(); 1465 } 1466 } 1467 1468 private int countWAL(Path log) throws IOException { 1469 int count = 0; 1470 try (WALStreamReader in = wals.createStreamReader(fs, log)) { 1471 while (in.next() != null) { 1472 count++; 1473 } 1474 } 1475 return count; 1476 } 1477 1478 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs, 1479 String output) throws IOException { 1480 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder(); 1481 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes())) 1482 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes())) 1483 .setRegionName(ByteString.copyFrom(hri.getRegionName())) 1484 .setFamilyName(ByteString.copyFrom(FAMILY)) 1485 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY)) 1486 .addAllCompactionInput(Arrays.asList(inputs)).addCompactionOutput(output); 1487 1488 WALEdit edit = WALEdit.createCompaction(hri, desc.build()); 1489 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, 1490 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); 1491 w.append(new Entry(key, edit)); 1492 w.sync(false); 1493 } 1494 1495 private static void appendRegionEvent(Writer w, String region) throws IOException { 1496 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( 1497 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, TABLE_NAME.toBytes(), 1498 Bytes.toBytes(region), Bytes.toBytes(String.valueOf(region.hashCode())), 1, 1499 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>> of()); 1500 final long time = EnvironmentEdgeManager.currentTime(); 1501 final WALKeyImpl walKey = 1502 new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, HConstants.DEFAULT_CLUSTER_ID); 1503 WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc); 1504 w.append(new Entry(walKey, we)); 1505 w.sync(false); 1506 } 1507 1508 private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row, 1509 byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException { 1510 LOG.info(Thread.currentThread().getName() + " append"); 1511 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); 1512 LOG.info(Thread.currentThread().getName() + " sync"); 1513 writer.sync(false); 1514 return seq; 1515 } 1516 1517 private static Entry createTestEntry(TableName table, byte[] region, byte[] row, byte[] family, 1518 byte[] qualifier, byte[] value, long seq) { 1519 long time = System.nanoTime(); 1520 1521 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); 1522 WALEdit edit = new WALEdit(); 1523 edit.add(cell); 1524 return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit); 1525 } 1526 1527 private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq) 1528 throws IOException { 1529 LOG.info(Thread.currentThread().getName() + " append"); 1530 writer.append(createEmptyEntry(table, region, seq)); 1531 LOG.info(Thread.currentThread().getName() + " sync"); 1532 writer.sync(false); 1533 return seq; 1534 } 1535 1536 private static Entry createEmptyEntry(TableName table, byte[] region, long seq) { 1537 long time = System.nanoTime(); 1538 return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), 1539 new WALEdit()); 1540 } 1541 1542 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { 1543 Writer writer = 1544 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); 1545 if (closeFile) { 1546 writer.close(); 1547 } 1548 } 1549 1550 private boolean logsAreEqual(Path p1, Path p2) throws IOException { 1551 try (WALStreamReader in1 = wals.createStreamReader(fs, p1); 1552 WALStreamReader in2 = wals.createStreamReader(fs, p2)) { 1553 Entry entry1; 1554 Entry entry2; 1555 while ((entry1 = in1.next()) != null) { 1556 entry2 = in2.next(); 1557 if ( 1558 (entry1.getKey().compareTo(entry2.getKey()) != 0) 1559 || (!entry1.getEdit().toString().equals(entry2.getEdit().toString())) 1560 ) { 1561 return false; 1562 } 1563 } 1564 } 1565 return true; 1566 } 1567}