001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; 021import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey; 022import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; 023import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertFalse; 026import static org.junit.jupiter.api.Assertions.assertThrows; 027import static org.junit.jupiter.api.Assertions.assertTrue; 028import static org.junit.jupiter.api.Assertions.fail; 029import static org.junit.jupiter.api.Assumptions.assumeFalse; 030 031import java.io.FileNotFoundException; 032import java.io.IOException; 033import java.lang.reflect.Method; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collections; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.List; 041import java.util.Map; 042import java.util.NavigableSet; 043import java.util.Objects; 044import java.util.Set; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicInteger; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.stream.Collectors; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.FSDataInputStream; 051import org.apache.hadoop.fs.FSDataOutputStream; 052import org.apache.hadoop.fs.FileStatus; 053import org.apache.hadoop.fs.FileSystem; 054import org.apache.hadoop.fs.FileUtil; 055import org.apache.hadoop.fs.Path; 056import org.apache.hadoop.fs.PathFilter; 057import org.apache.hadoop.hbase.Cell; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseTestingUtil; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.KeyValue; 062import org.apache.hadoop.hbase.ServerName; 063import org.apache.hadoop.hbase.TableName; 064import org.apache.hadoop.hbase.client.RegionInfo; 065import org.apache.hadoop.hbase.client.RegionInfoBuilder; 066import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 067import org.apache.hadoop.hbase.master.SplitLogManager; 068import org.apache.hadoop.hbase.regionserver.HRegion; 069import org.apache.hadoop.hbase.regionserver.LastSequenceId; 070import org.apache.hadoop.hbase.regionserver.RegionServerServices; 071import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader; 072import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader; 073import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; 074import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; 075import org.apache.hadoop.hbase.security.User; 076import org.apache.hadoop.hbase.testclassification.LargeTests; 077import org.apache.hadoop.hbase.testclassification.RegionServerTests; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.CancelableProgressable; 080import org.apache.hadoop.hbase.util.CommonFSUtils; 081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 082import org.apache.hadoop.hbase.util.Threads; 083import org.apache.hadoop.hbase.wal.WAL.Entry; 084import org.apache.hadoop.hbase.wal.WALProvider.Writer; 085import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; 086import org.apache.hadoop.hdfs.DFSTestUtil; 087import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; 088import org.apache.hadoop.ipc.RemoteException; 089import org.junit.jupiter.api.AfterAll; 090import org.junit.jupiter.api.AfterEach; 091import org.junit.jupiter.api.BeforeAll; 092import org.junit.jupiter.api.BeforeEach; 093import org.junit.jupiter.api.Tag; 094import org.junit.jupiter.api.Test; 095import org.junit.jupiter.api.TestInfo; 096import org.mockito.Mockito; 097import org.mockito.invocation.InvocationOnMock; 098import org.mockito.stubbing.Answer; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101 102import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 103import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 104import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 105import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 106 107import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 110 111/** 112 * Testing {@link WAL} splitting code. 113 */ 114@Tag(RegionServerTests.TAG) 115@Tag(LargeTests.TAG) 116public class TestWALSplit { 117 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class); 118 119 private static Configuration conf; 120 private FileSystem fs; 121 122 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 123 124 private Path HBASEDIR; 125 private Path HBASELOGDIR; 126 private Path WALDIR; 127 private Path OLDLOGDIR; 128 private Path CORRUPTDIR; 129 private Path TABLEDIR; 130 private String TMPDIRNAME; 131 132 private static final int NUM_WRITERS = 10; 133 private static final int ENTRIES = 10; // entries per writer per region 134 135 private static final String FILENAME_BEING_SPLIT = "testfile"; 136 private static final TableName TABLE_NAME = TableName.valueOf("t1"); 137 private static final byte[] FAMILY = Bytes.toBytes("f1"); 138 private static final byte[] QUALIFIER = Bytes.toBytes("q1"); 139 private static final byte[] VALUE = Bytes.toBytes("v1"); 140 private static final String WAL_FILE_PREFIX = "wal.dat."; 141 private static List<String> REGIONS = new ArrayList<>(); 142 private static String ROBBER; 143 private static String ZOMBIE; 144 private static String[] GROUP = new String[] { "supergroup" }; 145 146 static enum Corruptions { 147 INSERT_GARBAGE_ON_FIRST_LINE, 148 INSERT_GARBAGE_IN_THE_MIDDLE, 149 APPEND_GARBAGE, 150 TRUNCATE, 151 TRUNCATE_TRAILER 152 } 153 154 @BeforeAll 155 public static void setUpBeforeClass() throws Exception { 156 conf = TEST_UTIL.getConfiguration(); 157 conf.set(WALFactory.WAL_PROVIDER, "filesystem"); 158 conf.setClass(FSHLogProvider.WRITER_IMPL, InstrumentedLogWriter.class, Writer.class); 159 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. 160 System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); 161 // Create fake maping user to group and set it to the conf. 162 Map<String, String[]> u2g_map = new HashMap<>(2); 163 ROBBER = User.getCurrent().getName() + "-robber"; 164 ZOMBIE = User.getCurrent().getName() + "-zombie"; 165 u2g_map.put(ROBBER, GROUP); 166 u2g_map.put(ZOMBIE, GROUP); 167 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); 168 conf.setInt("dfs.heartbeat.interval", 1); 169 TEST_UTIL.startMiniDFSCluster(2); 170 } 171 172 @AfterAll 173 public static void tearDownAfterClass() throws Exception { 174 TEST_UTIL.shutdownMiniDFSCluster(); 175 } 176 177 private String testMethodName; 178 private WALFactory wals = null; 179 180 @BeforeEach 181 public void setUp(TestInfo testInfo) throws Exception { 182 testMethodName = testInfo.getTestMethod().get().getName(); 183 LOG.info("Cleaning up cluster for new test."); 184 fs = TEST_UTIL.getDFSCluster().getFileSystem(); 185 HBASEDIR = TEST_UTIL.createRootDir(); 186 HBASELOGDIR = TEST_UTIL.createWALRootDir(); 187 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME); 188 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME); 189 TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 190 TMPDIRNAME = 191 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 192 REGIONS.clear(); 193 Collections.addAll(REGIONS, "bbb", "ccc"); 194 InstrumentedLogWriter.activateFailure = false; 195 wals = new WALFactory(conf, testMethodName); 196 WALDIR = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName( 197 ServerName.valueOf(testMethodName, 16010, EnvironmentEdgeManager.currentTime()).toString())); 198 // fs.mkdirs(WALDIR); 199 } 200 201 @AfterEach 202 public void tearDown() throws Exception { 203 try { 204 wals.close(); 205 } catch (IOException exception) { 206 // Some tests will move WALs out from under us. In those cases, we'll get an error on close. 207 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" 208 + " you see a failure look here."); 209 LOG.debug("exception details", exception); 210 } finally { 211 wals = null; 212 fs.delete(HBASEDIR, true); 213 fs.delete(HBASELOGDIR, true); 214 } 215 } 216 217 /** 218 * Simulates splitting a WAL out from under a regionserver that is still trying to write it. 219 * Ensures we do not lose edits. 220 */ 221 @Test 222 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException { 223 final AtomicLong counter = new AtomicLong(0); 224 AtomicBoolean stop = new AtomicBoolean(false); 225 // Region we'll write edits too and then later examine to make sure they all made it in. 226 final String region = REGIONS.get(0); 227 final int numWriters = 3; 228 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters); 229 try { 230 long startCount = counter.get(); 231 zombie.start(); 232 // Wait till writer starts going. 233 while (startCount == counter.get()) 234 Threads.sleep(1); 235 // Give it a second to write a few appends. 236 Threads.sleep(1000); 237 final Configuration conf2 = HBaseConfiguration.create(conf); 238 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP); 239 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() { 240 @Override 241 public Integer run() throws Exception { 242 StringBuilder ls = 243 new StringBuilder("Contents of WALDIR (").append(WALDIR).append("):\n"); 244 for (FileStatus status : fs.listStatus(WALDIR)) { 245 ls.append("\t").append(status.toString()).append("\n"); 246 } 247 LOG.debug(Objects.toString(ls)); 248 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files."); 249 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals); 250 LOG.info("Finished splitting out from under zombie."); 251 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 252 assertEquals(numWriters, logfiles.length, "wrong number of split files for region"); 253 int count = 0; 254 for (Path logfile : logfiles) { 255 count += countWAL(logfile); 256 } 257 return count; 258 } 259 }); 260 LOG.info("zombie=" + counter.get() + ", robber=" + count); 261 assertTrue(counter.get() == count || counter.get() + 1 == count, 262 "The log file could have at most 1 extra log entry, but can't have less. " 263 + "Zombie could write " + counter.get() + " and logfile had only " + count); 264 } finally { 265 stop.set(true); 266 zombie.interrupt(); 267 Threads.threadDumpingIsAlive(zombie); 268 } 269 } 270 271 /** 272 * This thread will keep writing to a 'wal' file even after the split process has started. It 273 * simulates a region server that was considered dead but woke up and wrote some more to the last 274 * log entry. Does its writing as an alternate user in another filesystem instance to simulate 275 * better it being a regionserver. 276 */ 277 private class ZombieLastLogWriterRegionServer extends Thread { 278 final AtomicLong editsCount; 279 final AtomicBoolean stop; 280 final int numOfWriters; 281 /** 282 * Region to write edits for. 283 */ 284 final String region; 285 final User user; 286 287 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop, 288 final String region, final int writers) throws IOException, InterruptedException { 289 super("ZombieLastLogWriterRegionServer"); 290 setDaemon(true); 291 this.stop = stop; 292 this.editsCount = counter; 293 this.region = region; 294 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP); 295 numOfWriters = writers; 296 } 297 298 @Override 299 public void run() { 300 try { 301 doWriting(); 302 } catch (IOException e) { 303 LOG.warn(getName() + " Writer exiting " + e); 304 } catch (InterruptedException e) { 305 LOG.warn(getName() + " Writer exiting " + e); 306 } 307 } 308 309 private void doWriting() throws IOException, InterruptedException { 310 this.user.runAs(new PrivilegedExceptionAction<Object>() { 311 @Override 312 public Object run() throws Exception { 313 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose 314 // index we supply here. 315 int walToKeepOpen = numOfWriters - 1; 316 // The below method writes numOfWriters files each with ENTRIES entries for a total of 317 // numOfWriters * ENTRIES added per column family in the region. 318 Writer writer = null; 319 try { 320 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); 321 } catch (IOException e1) { 322 throw new RuntimeException("Failed", e1); 323 } 324 // Update counter so has all edits written so far. 325 editsCount.addAndGet(numOfWriters * ENTRIES); 326 loop(writer); 327 // If we've been interruped, then things should have shifted out from under us. 328 // closing should error 329 try { 330 writer.close(); 331 fail("Writing closing after parsing should give an error."); 332 } catch (IOException exception) { 333 LOG.debug("ignoring error when closing final writer.", exception); 334 } 335 return null; 336 } 337 }); 338 } 339 340 private void loop(final Writer writer) { 341 byte[] regionBytes = Bytes.toBytes(this.region); 342 while (!stop.get()) { 343 try { 344 long seq = appendEntry(writer, TABLE_NAME, regionBytes, 345 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0); 346 long count = editsCount.incrementAndGet(); 347 LOG.info(getName() + " sync count=" + count + ", seq=" + seq); 348 try { 349 Thread.sleep(1); 350 } catch (InterruptedException e) { 351 // 352 } 353 } catch (IOException ex) { 354 LOG.error(getName() + " ex " + ex.toString()); 355 if (ex instanceof RemoteException) { 356 LOG.error("Juliet: got RemoteException " + ex.getMessage() + " while writing " 357 + (editsCount.get() + 1)); 358 } else { 359 LOG.error(getName() + " failed to write....at " + editsCount.get()); 360 fail("Failed to write " + editsCount.get()); 361 } 362 break; 363 } catch (Throwable t) { 364 LOG.error(getName() + " HOW? " + t); 365 LOG.debug("exception details", t); 366 break; 367 } 368 } 369 LOG.info(getName() + " Writer exiting"); 370 } 371 } 372 373 // If another worker is assigned to split a WAl and last worker is still running, both should not 374 // impact each other's progress 375 @Test 376 public void testTwoWorkerSplittingSameWAL() throws IOException, InterruptedException { 377 int numWriter = 1, entries = 10; 378 generateWALs(numWriter, entries, -1, 0); 379 FileStatus logfile = fs.listStatus(WALDIR)[0]; 380 FileSystem spiedFs = Mockito.spy(fs); 381 RegionServerServices zombieRSServices = Mockito.mock(RegionServerServices.class); 382 RegionServerServices newWorkerRSServices = Mockito.mock(RegionServerServices.class); 383 Mockito.when(zombieRSServices.getServerName()) 384 .thenReturn(ServerName.valueOf("zombie-rs.abc.com,1234,1234567890")); 385 Mockito.when(newWorkerRSServices.getServerName()) 386 .thenReturn(ServerName.valueOf("worker-rs.abc.com,1234,1234569870")); 387 Thread zombieWorker = new SplitWALWorker(logfile, spiedFs, zombieRSServices); 388 Thread newWorker = new SplitWALWorker(logfile, spiedFs, newWorkerRSServices); 389 zombieWorker.start(); 390 newWorker.start(); 391 newWorker.join(); 392 zombieWorker.join(); 393 394 for (String region : REGIONS) { 395 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 396 assertEquals(numWriter, logfiles.length, "wrong number of split files for region"); 397 398 int count = 0; 399 for (Path lf : logfiles) { 400 count += countWAL(lf); 401 } 402 assertEquals(entries, count, "wrong number of edits for region " + region); 403 } 404 } 405 406 private class SplitWALWorker extends Thread implements LastSequenceId { 407 final FileStatus logfile; 408 final FileSystem fs; 409 final RegionServerServices rsServices; 410 411 public SplitWALWorker(FileStatus logfile, FileSystem fs, RegionServerServices rsServices) { 412 super(rsServices.getServerName().toShortString()); 413 setDaemon(true); 414 this.fs = fs; 415 this.logfile = logfile; 416 this.rsServices = rsServices; 417 } 418 419 @Override 420 public void run() { 421 try { 422 boolean ret = 423 WALSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, null, this, null, wals, rsServices); 424 assertTrue(ret, "Both splitting should pass"); 425 } catch (IOException e) { 426 LOG.warn(getName() + " Worker exiting " + e); 427 } 428 } 429 430 @Override 431 public ClusterStatusProtos.RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 432 return ClusterStatusProtos.RegionStoreSequenceIds.newBuilder() 433 .setLastFlushedSequenceId(HConstants.NO_SEQNUM).build(); 434 } 435 } 436 437 /** 438 * @see "https://issues.apache.org/jira/browse/HBASE-3020" 439 */ 440 @Test 441 public void testRecoveredEditsPathForMeta() throws IOException { 442 Path p = createRecoveredEditsPathForRegion(); 443 String parentOfParent = p.getParent().getParent().getName(); 444 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 445 } 446 447 /** 448 * Test old recovered edits file doesn't break WALSplitter. This is useful in upgrading old 449 * instances. 450 */ 451 @Test 452 public void testOldRecoveredEditsFileSidelined() throws IOException { 453 Path p = createRecoveredEditsPathForRegion(); 454 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME); 455 Path regiondir = new Path(tdir, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 456 fs.mkdirs(regiondir); 457 Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); 458 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); 459 fs.createNewFile(parent); // create a recovered.edits file 460 String parentOfParent = p.getParent().getParent().getName(); 461 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 462 WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); 463 } 464 465 private Path createRecoveredEditsPathForRegion() throws IOException { 466 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); 467 Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, 468 FILENAME_BEING_SPLIT, TMPDIRNAME, conf, ""); 469 return p; 470 } 471 472 @Test 473 public void testHasRecoveredEdits() throws IOException { 474 Path p = createRecoveredEditsPathForRegion(); 475 assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 476 String renamedEdit = p.getName().split("-")[0]; 477 fs.createNewFile(new Path(p.getParent(), renamedEdit)); 478 assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); 479 } 480 481 private void useDifferentDFSClient() throws IOException { 482 // make fs act as a different client now 483 // initialize will create a new DFSClient with a new client ID 484 fs.initialize(fs.getUri(), conf); 485 } 486 487 @Test 488 public void testSplitPreservesEdits() throws IOException { 489 final String REGION = "region__1"; 490 REGIONS.clear(); 491 REGIONS.add(REGION); 492 493 generateWALs(1, 10, -1, 0); 494 useDifferentDFSClient(); 495 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 496 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 497 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 498 assertEquals(1, splitLog.length); 499 500 assertTrue(logsAreEqual(originalLog, splitLog[0]), "edits differ after split"); 501 } 502 503 @Test 504 public void testSplitRemovesRegionEventsEdits() throws IOException { 505 final String REGION = "region__1"; 506 REGIONS.clear(); 507 REGIONS.add(REGION); 508 509 generateWALs(1, 10, -1, 100); 510 useDifferentDFSClient(); 511 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 512 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 513 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 514 assertEquals(1, splitLog.length); 515 516 assertFalse(logsAreEqual(originalLog, splitLog[0]), "edits differ after split"); 517 // split log should only have the test edits 518 assertEquals(10, countWAL(splitLog[0])); 519 } 520 521 @Test 522 public void testSplitLeavesCompactionEventsEdits() throws IOException { 523 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); 524 REGIONS.clear(); 525 REGIONS.add(hri.getEncodedName()); 526 Path regionDir = 527 new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName()); 528 LOG.info("Creating region directory: " + regionDir); 529 assertTrue(fs.mkdirs(regionDir)); 530 531 Writer writer = generateWALs(1, 10, 0, 10); 532 String[] compactInputs = new String[] { "file1", "file2", "file3" }; 533 String compactOutput = "file4"; 534 appendCompactionEvent(writer, hri, compactInputs, compactOutput); 535 writer.close(); 536 537 useDifferentDFSClient(); 538 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 539 540 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 541 // original log should have 10 test edits, 10 region markers, 1 compaction marker 542 assertEquals(21, countWAL(originalLog)); 543 544 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName()); 545 assertEquals(1, splitLog.length); 546 547 assertFalse(logsAreEqual(originalLog, splitLog[0]), "edits differ after split"); 548 // split log should have 10 test edits plus 1 compaction marker 549 assertEquals(11, countWAL(splitLog[0])); 550 } 551 552 /** 553 * Tests that WalSplitter ignores replication marker edits. 554 */ 555 @Test 556 public void testSplitRemovesReplicationMarkerEdits() throws IOException { 557 RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO; 558 Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1"); 559 generateReplicationMarkerEdits(path, regionInfo); 560 useDifferentDFSClient(); 561 List<FileStatus> logFiles = 562 SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null); 563 assertEquals(1, logFiles.size()); 564 assertEquals(path, logFiles.get(0).getPath()); 565 List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 566 // Make sure that WALSplitter doesn't fail. 567 assertEquals(0, splitPaths.size()); 568 } 569 570 private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException { 571 long timestamp = EnvironmentEdgeManager.currentTime(); 572 fs.mkdirs(WALDIR); 573 try (Writer writer = wals.createWALWriter(fs, path)) { 574 WALProtos.ReplicationMarkerDescriptor.Builder builder = 575 WALProtos.ReplicationMarkerDescriptor.newBuilder(); 576 builder.setWalName("wal-name"); 577 builder.setRegionServerName("rs-name"); 578 builder.setOffset(0L); 579 WALProtos.ReplicationMarkerDescriptor desc = builder.build(); 580 appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(), 581 getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1); 582 } 583 } 584 585 /** 586 * @param expectedEntries -1 to not assert 587 * @return the count across all regions 588 */ 589 private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException { 590 useDifferentDFSClient(); 591 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 592 int result = 0; 593 for (String region : REGIONS) { 594 Path[] logfiles = getLogForRegion(TABLE_NAME, region); 595 assertEquals(expectedFiles, logfiles.length); 596 int count = 0; 597 for (Path logfile : logfiles) { 598 count += countWAL(logfile); 599 } 600 if (-1 != expectedEntries) { 601 assertEquals(expectedEntries, count); 602 } 603 result += count; 604 } 605 return result; 606 } 607 608 @Test 609 public void testEmptyLogFiles() throws IOException { 610 testEmptyLogFiles(true); 611 } 612 613 @Test 614 public void testEmptyOpenLogFiles() throws IOException { 615 testEmptyLogFiles(false); 616 } 617 618 private void testEmptyLogFiles(final boolean close) throws IOException { 619 // we won't create the hlog dir until getWAL got called, so 620 // make dir here when testing empty log file 621 fs.mkdirs(WALDIR); 622 injectEmptyFile(".empty", close); 623 generateWALs(Integer.MAX_VALUE); 624 injectEmptyFile("empty", close); 625 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty 626 } 627 628 @Test 629 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { 630 // generate logs but leave wal.dat.5 open. 631 generateWALs(5); 632 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 633 } 634 635 @Test 636 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { 637 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 638 generateWALs(Integer.MAX_VALUE); 639 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.APPEND_GARBAGE, true); 640 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 641 } 642 643 @Test 644 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { 645 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 646 generateWALs(Integer.MAX_VALUE); 647 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, 648 true); 649 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); // 1 corrupt 650 } 651 652 @Test 653 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { 654 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 655 generateWALs(Integer.MAX_VALUE); 656 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, 657 false); 658 // the entries in the original logs are alternating regions 659 // considering the sequence file header, the middle corruption should 660 // affect at least half of the entries 661 int goodEntries = (NUM_WRITERS - 1) * ENTRIES; 662 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; 663 int allRegionsCount = splitAndCount(NUM_WRITERS, -1); 664 assertTrue(REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount, 665 "The file up to the corrupted area hasn't been parsed"); 666 } 667 668 @Test 669 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { 670 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 671 List<FaultyProtobufWALStreamReader.FailureType> failureTypes = 672 Arrays.asList(FaultyProtobufWALStreamReader.FailureType.values()).stream() 673 .filter(x -> x != FaultyProtobufWALStreamReader.FailureType.NONE) 674 .collect(Collectors.toList()); 675 for (FaultyProtobufWALStreamReader.FailureType failureType : failureTypes) { 676 final Set<String> walDirContents = splitCorruptWALs(failureType); 677 final Set<String> archivedLogs = new HashSet<>(); 678 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:"); 679 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 680 archived.append("\n\t").append(log.toString()); 681 archivedLogs.add(log.getPath().getName()); 682 } 683 LOG.debug(archived.toString()); 684 assertEquals(archivedLogs, walDirContents, 685 failureType.name() + ": expected to find all of our wals corrupt."); 686 } 687 } 688 689 /** 690 * @return set of wal names present prior to split attempt. 691 * @throws IOException if the split process fails 692 */ 693 private Set<String> splitCorruptWALs(final FaultyProtobufWALStreamReader.FailureType failureType) 694 throws IOException { 695 String backupClass = conf.get(WALFactory.WAL_STREAM_READER_CLASS_IMPL); 696 InstrumentedLogWriter.activateFailure = false; 697 698 try { 699 conf.setClass(WALFactory.WAL_STREAM_READER_CLASS_IMPL, FaultyProtobufWALStreamReader.class, 700 WALStreamReader.class); 701 conf.set("faultyprotobuflogreader.failuretype", failureType.name()); 702 // Clean up from previous tests or previous loop 703 try { 704 wals.shutdown(); 705 } catch (IOException exception) { 706 // since we're splitting out from under the factory, we should expect some closing failures. 707 LOG.debug("Ignoring problem closing WALFactory.", exception); 708 } 709 wals.close(); 710 try { 711 for (FileStatus log : fs.listStatus(CORRUPTDIR)) { 712 fs.delete(log.getPath(), true); 713 } 714 } catch (FileNotFoundException exception) { 715 LOG.debug("no previous CORRUPTDIR to clean."); 716 } 717 // change to the faulty reader 718 wals = new WALFactory(conf, testMethodName); 719 generateWALs(-1); 720 // Our reader will render all of these files corrupt. 721 final Set<String> walDirContents = new HashSet<>(); 722 for (FileStatus status : fs.listStatus(WALDIR)) { 723 walDirContents.add(status.getPath().getName()); 724 } 725 useDifferentDFSClient(); 726 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 727 return walDirContents; 728 } finally { 729 if (backupClass != null) { 730 conf.set(WALFactory.WAL_STREAM_READER_CLASS_IMPL, backupClass); 731 } else { 732 conf.unset(WALFactory.WAL_STREAM_READER_CLASS_IMPL); 733 } 734 } 735 } 736 737 @Test 738 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { 739 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 740 assertThrows(IOException.class, 741 () -> splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING)); 742 } 743 744 @Test 745 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { 746 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 747 try { 748 splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING); 749 } catch (IOException e) { 750 LOG.debug("split with 'skip errors' set to 'false' correctly threw"); 751 } 752 assertEquals(NUM_WRITERS, fs.listStatus(WALDIR).length, 753 "if skip.errors is false all files should remain in place"); 754 } 755 756 private void ignoreCorruption(final Corruptions corruption, final int entryCount, 757 final int expectedCount) throws IOException { 758 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 759 760 final String REGION = "region__1"; 761 REGIONS.clear(); 762 REGIONS.add(REGION); 763 764 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0"); 765 generateWALs(1, entryCount, -1, 0); 766 corruptWAL(c1, corruption, true); 767 768 useDifferentDFSClient(); 769 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 770 771 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION); 772 assertEquals(1, splitLog.length); 773 774 int actualCount = 0; 775 try (WALStreamReader in = wals.createStreamReader(fs, splitLog[0])) { 776 while (in.next() != null) { 777 ++actualCount; 778 } 779 } 780 assertEquals(expectedCount, actualCount); 781 782 // should not have stored the EOF files as corrupt 783 FileStatus[] archivedLogs = 784 fs.exists(CORRUPTDIR) ? fs.listStatus(CORRUPTDIR) : new FileStatus[0]; 785 assertEquals(0, archivedLogs.length); 786 787 } 788 789 @Test 790 public void testEOFisIgnored() throws IOException { 791 int entryCount = 10; 792 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount - 1); 793 } 794 795 @Test 796 public void testCorruptWALTrailer() throws IOException { 797 int entryCount = 10; 798 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount); 799 } 800 801 @Test 802 public void testLogsGetArchivedAfterSplit() throws IOException { 803 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 804 generateWALs(-1); 805 useDifferentDFSClient(); 806 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 807 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR); 808 assertEquals(NUM_WRITERS, archivedLogs.length, "wrong number of files in the archive log"); 809 } 810 811 @Test 812 public void testSplit() throws IOException { 813 generateWALs(-1); 814 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); 815 } 816 817 @Test 818 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException { 819 generateWALs(-1); 820 useDifferentDFSClient(); 821 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 822 FileStatus[] statuses = null; 823 try { 824 statuses = fs.listStatus(WALDIR); 825 if (statuses != null) { 826 fail("Files left in log dir: " + Joiner.on(",").join(FileUtil.stat2Paths(statuses))); 827 } 828 } catch (FileNotFoundException e) { 829 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null 830 } 831 } 832 833 @Test 834 public void testSplitWillFailIfWritingToRegionFails() throws Exception { 835 // leave 5th log open so we could append the "trap" 836 Writer writer = generateWALs(4); 837 useDifferentDFSClient(); 838 839 String region = "break"; 840 Path regiondir = new Path(TABLEDIR, region); 841 fs.mkdirs(regiondir); 842 843 InstrumentedLogWriter.activateFailure = false; 844 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes("r" + 999), FAMILY, 845 QUALIFIER, VALUE, 0); 846 writer.close(); 847 848 try { 849 InstrumentedLogWriter.activateFailure = true; 850 IOException e = assertThrows(IOException.class, 851 () -> WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals)); 852 assertTrue(e.getMessage() 853 .contains("This exception is instrumented and should only be thrown for testing")); 854 } finally { 855 InstrumentedLogWriter.activateFailure = false; 856 } 857 } 858 859 @Test 860 public void testSplitDeletedRegion() throws IOException { 861 REGIONS.clear(); 862 String region = "region_that_splits"; 863 REGIONS.add(region); 864 865 generateWALs(1); 866 useDifferentDFSClient(); 867 868 Path regiondir = new Path(TABLEDIR, region); 869 fs.delete(regiondir, true); 870 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 871 assertFalse(fs.exists(regiondir)); 872 } 873 874 @Test 875 public void testIOEOnOutputThread() throws Exception { 876 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false); 877 878 generateWALs(-1); 879 useDifferentDFSClient(); 880 FileStatus[] logfiles = fs.listStatus(WALDIR); 881 assertTrue(logfiles != null && logfiles.length > 0, "There should be some log file"); 882 // wals with no entries (like the one we don't use in the factory) 883 // won't cause a failure since nothing will ever be written. 884 // pick the largest one since it's most likely to have entries. 885 int largestLogFile = 0; 886 long largestSize = 0; 887 for (int i = 0; i < logfiles.length; i++) { 888 if (logfiles[i].getLen() > largestSize) { 889 largestLogFile = i; 890 largestSize = logfiles[i].getLen(); 891 } 892 } 893 assertTrue(0 < largestSize, "There should be some log greater than size 0."); 894 // Set up a splitter that will throw an IOE on the output side 895 WALSplitter logSplitter = 896 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 897 @Override 898 protected Writer createWriter(Path logfile) throws IOException { 899 Writer mockWriter = Mockito.mock(Writer.class); 900 Mockito.doThrow(new IOException("Injected")).when(mockWriter) 901 .append(Mockito.<Entry> any()); 902 return mockWriter; 903 } 904 }; 905 // Set up a background thread dumper. Needs a thread to depend on and then we need to run 906 // the thread dumping in a background thread so it does not hold up the test. 907 final AtomicBoolean stop = new AtomicBoolean(false); 908 final Thread someOldThread = new Thread("Some-old-thread") { 909 @Override 910 public void run() { 911 while (!stop.get()) 912 Threads.sleep(10); 913 } 914 }; 915 someOldThread.setDaemon(true); 916 someOldThread.start(); 917 final Thread t = new Thread("Background-thread-dumper") { 918 @Override 919 public void run() { 920 try { 921 Threads.threadDumpingIsAlive(someOldThread); 922 } catch (InterruptedException e) { 923 e.printStackTrace(); 924 } 925 } 926 }; 927 t.setDaemon(true); 928 t.start(); 929 try { 930 logSplitter.splitWAL(logfiles[largestLogFile], null); 931 fail("Didn't throw!"); 932 } catch (IOException ioe) { 933 assertTrue(ioe.toString().contains("Injected")); 934 } finally { 935 // Setting this to true will turn off the background thread dumper. 936 stop.set(true); 937 } 938 } 939 940 /** 941 * @param spiedFs should be instrumented for failure. 942 */ 943 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception { 944 generateWALs(-1); 945 useDifferentDFSClient(); 946 947 try { 948 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals); 949 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); 950 assertFalse(fs.exists(WALDIR)); 951 } catch (IOException e) { 952 fail("There shouldn't be any exception but: " + e.toString()); 953 } 954 } 955 956 // Test for HBASE-3412 957 @Test 958 public void testMovedWALDuringRecovery() throws Exception { 959 // This partial mock will throw LEE for every file simulating 960 // files that were moved 961 FileSystem spiedFs = Mockito.spy(fs); 962 // The "File does not exist" part is very important, 963 // that's how it comes out of HDFS 964 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).when(spiedFs) 965 .append(Mockito.<Path> any()); 966 retryOverHdfsProblem(spiedFs); 967 } 968 969 @Test 970 public void testRetryOpenDuringRecovery() throws Exception { 971 FileSystem spiedFs = Mockito.spy(fs); 972 // The "Cannot obtain block length", "Could not obtain the last block", 973 // and "Blocklist for [^ ]* has changed.*" part is very important, 974 // that's how it comes out of HDFS. If HDFS changes the exception 975 // message, this test needs to be adjusted accordingly. 976 // 977 // When DFSClient tries to open a file, HDFS needs to locate 978 // the last block of the file and get its length. However, if the 979 // last block is under recovery, HDFS may have problem to obtain 980 // the block length, in which case, retry may help. 981 Mockito.doAnswer(new Answer<FSDataInputStream>() { 982 private final String[] errors = new String[] { "Cannot obtain block length", 983 "Could not obtain the last block", "Blocklist for " + OLDLOGDIR + " has changed" }; 984 private int count = 0; 985 986 @Override 987 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 988 if (count < 3) { 989 throw new IOException(errors[count++]); 990 } 991 return (FSDataInputStream) invocation.callRealMethod(); 992 } 993 }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt()); 994 retryOverHdfsProblem(spiedFs); 995 } 996 997 @Test 998 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException { 999 generateWALs(1, 10, -1); 1000 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1001 useDifferentDFSClient(); 1002 1003 final AtomicInteger count = new AtomicInteger(); 1004 1005 CancelableProgressable localReporter = new CancelableProgressable() { 1006 @Override 1007 public boolean progress() { 1008 count.getAndIncrement(); 1009 return false; 1010 } 1011 }; 1012 1013 FileSystem spiedFs = Mockito.spy(fs); 1014 Mockito.doAnswer(new Answer<FSDataInputStream>() { 1015 @Override 1016 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 1017 Thread.sleep(1500); // Sleep a while and wait report status invoked 1018 return (FSDataInputStream) invocation.callRealMethod(); 1019 } 1020 }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt()); 1021 1022 try { 1023 conf.setInt("hbase.splitlog.report.period", 1000); 1024 boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null, 1025 Mockito.mock(SplitLogWorkerCoordination.class), wals, null); 1026 assertFalse(ret, "Log splitting should failed"); 1027 assertTrue(count.get() > 0); 1028 } catch (IOException e) { 1029 fail("There shouldn't be any exception but: " + e.toString()); 1030 } finally { 1031 // reset it back to its default value 1032 conf.setInt("hbase.splitlog.report.period", 59000); 1033 } 1034 } 1035 1036 /** 1037 * Test log split process with fake data and lots of edits to trigger threading issues. 1038 */ 1039 @Test 1040 public void testThreading() throws Exception { 1041 doTestThreading(20000, 128 * 1024 * 1024, 0); 1042 } 1043 1044 /** 1045 * Test blocking behavior of the log split process if writers are writing slower than the reader 1046 * is reading. 1047 */ 1048 @Test 1049 public void testThreadingSlowWriterSmallBuffer() throws Exception { 1050 // The logic of this test has conflict with the limit writers split logic, skip this test for 1051 // TestWALSplitBoundedLogWriterCreation 1052 assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation); 1053 doTestThreading(200, 1024, 50); 1054 } 1055 1056 /** 1057 * Sets up a log splitter with a mock reader and writer. The mock reader generates a specified 1058 * number of edits spread across 5 regions. The mock writer optionally sleeps for each edit it is 1059 * fed. After the split is complete, verifies that the statistics show the correct number of edits 1060 * output into each region. 1061 * @param numFakeEdits number of fake edits to push through pipeline 1062 * @param bufferSize size of in-memory buffer 1063 * @param writerSlowness writer threads will sleep this many ms per edit 1064 */ 1065 private void doTestThreading(final int numFakeEdits, final int bufferSize, 1066 final int writerSlowness) throws Exception { 1067 1068 Configuration localConf = new Configuration(conf); 1069 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize); 1070 1071 // Create a fake log file (we'll override the reader to produce a stream of edits) 1072 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake"); 1073 FSDataOutputStream out = fs.create(logPath); 1074 out.close(); 1075 1076 // Make region dirs for our destination regions so the output doesn't get skipped 1077 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 1078 makeRegionDirs(regions); 1079 1080 // Create a splitter that reads and writes the data without touching disk 1081 WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) { 1082 /* Produce a mock writer that doesn't write anywhere */ 1083 @Override 1084 protected Writer createWriter(Path logfile) throws IOException { 1085 Writer mockWriter = Mockito.mock(Writer.class); 1086 Mockito.doAnswer(new Answer<Void>() { 1087 int expectedIndex = 0; 1088 1089 @Override 1090 public Void answer(InvocationOnMock invocation) { 1091 if (writerSlowness > 0) { 1092 try { 1093 Thread.sleep(writerSlowness); 1094 } catch (InterruptedException ie) { 1095 Thread.currentThread().interrupt(); 1096 } 1097 } 1098 Entry entry = (Entry) invocation.getArgument(0); 1099 WALEdit edit = entry.getEdit(); 1100 List<Cell> cells = edit.getCells(); 1101 assertEquals(1, cells.size()); 1102 Cell cell = cells.get(0); 1103 1104 // Check that the edits come in the right order. 1105 assertEquals(expectedIndex, 1106 Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 1107 expectedIndex++; 1108 return null; 1109 } 1110 }).when(mockWriter).append(Mockito.<Entry> any()); 1111 return mockWriter; 1112 } 1113 1114 /* Produce a mock reader that generates fake entries */ 1115 @Override 1116 protected WALStreamReader getReader(FileStatus file, boolean skipErrors, 1117 CancelableProgressable reporter) throws IOException, CorruptedLogFileException { 1118 WALStreamReader mockReader = Mockito.mock(WALStreamReader.class); 1119 Mockito.doAnswer(new Answer<Entry>() { 1120 int index = 0; 1121 1122 @Override 1123 public Entry answer(InvocationOnMock invocation) throws Throwable { 1124 if (index >= numFakeEdits) { 1125 return null; 1126 } 1127 1128 // Generate r0 through r4 in round robin fashion 1129 int regionIdx = index % regions.size(); 1130 byte region[] = new byte[] { (byte) 'r', (byte) (0x30 + regionIdx) }; 1131 1132 Entry ret = createTestEntry(TABLE_NAME, region, Bytes.toBytes(index / regions.size()), 1133 FAMILY, QUALIFIER, VALUE, index); 1134 index++; 1135 return ret; 1136 } 1137 }).when(mockReader).next(); 1138 return mockReader; 1139 } 1140 }; 1141 1142 logSplitter.splitWAL(fs.getFileStatus(logPath), null); 1143 1144 // Verify number of written edits per region 1145 Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts(); 1146 for (Map.Entry<String, Long> entry : outputCounts.entrySet()) { 1147 LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey()); 1148 assertEquals((long) entry.getValue(), numFakeEdits / regions.size()); 1149 } 1150 assertEquals(regions.size(), outputCounts.size(), "Should have as many outputs as regions"); 1151 } 1152 1153 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests? 1154 @Test 1155 public void testSplitLogFileDeletedRegionDir() throws IOException { 1156 LOG.info("testSplitLogFileDeletedRegionDir"); 1157 final String REGION = "region__1"; 1158 REGIONS.clear(); 1159 REGIONS.add(REGION); 1160 1161 generateWALs(1, 10, -1); 1162 useDifferentDFSClient(); 1163 1164 Path regiondir = new Path(TABLEDIR, REGION); 1165 LOG.info("Region directory is" + regiondir); 1166 fs.delete(regiondir, true); 1167 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1168 assertFalse(fs.exists(regiondir)); 1169 } 1170 1171 @Test 1172 public void testSplitLogFileEmpty() throws IOException { 1173 LOG.info("testSplitLogFileEmpty"); 1174 // we won't create the hlog dir until getWAL got called, so 1175 // make dir here when testing empty log file 1176 fs.mkdirs(WALDIR); 1177 injectEmptyFile(".empty", true); 1178 useDifferentDFSClient(); 1179 1180 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1181 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME); 1182 assertFalse(fs.exists(tdir)); 1183 1184 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath())); 1185 } 1186 1187 @Test 1188 public void testSplitLogFileMultipleRegions() throws IOException { 1189 LOG.info("testSplitLogFileMultipleRegions"); 1190 generateWALs(1, 10, -1); 1191 splitAndCount(1, 10); 1192 } 1193 1194 @Test 1195 public void testSplitLogFileFirstLineCorruptionLog() throws IOException { 1196 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true); 1197 generateWALs(1, 10, -1); 1198 FileStatus logfile = fs.listStatus(WALDIR)[0]; 1199 1200 corruptWAL(logfile.getPath(), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true); 1201 1202 useDifferentDFSClient(); 1203 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1204 1205 final Path corruptDir = 1206 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 1207 assertEquals(1, fs.listStatus(corruptDir).length); 1208 } 1209 1210 /** 1211 * @see "https://issues.apache.org/jira/browse/HBASE-4862" 1212 */ 1213 @Test 1214 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { 1215 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); 1216 // Generate wals for our destination region 1217 String regionName = "r0"; 1218 final Path regiondir = new Path(TABLEDIR, regionName); 1219 REGIONS.clear(); 1220 REGIONS.add(regionName); 1221 generateWALs(-1); 1222 1223 wals.getWAL(null); 1224 FileStatus[] logfiles = fs.listStatus(WALDIR); 1225 assertTrue(logfiles != null && logfiles.length > 0, "There should be some log file"); 1226 1227 WALSplitter logSplitter = 1228 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) { 1229 @Override 1230 protected Writer createWriter(Path logfile) throws IOException { 1231 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); 1232 // After creating writer, simulate region's 1233 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this 1234 // region and delete them, excluding files with '.temp' suffix. 1235 NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir); 1236 if (files != null && !files.isEmpty()) { 1237 for (Path file : files) { 1238 if (!this.walFS.delete(file, false)) { 1239 LOG.error("Failed delete of " + file); 1240 } else { 1241 LOG.debug("Deleted recovered.edits file=" + file); 1242 } 1243 } 1244 } 1245 return writer; 1246 } 1247 }; 1248 try { 1249 logSplitter.splitWAL(logfiles[0], null); 1250 } catch (IOException e) { 1251 LOG.info(e.toString(), e); 1252 fail("Throws IOException when spliting " 1253 + "log, it is most likely because writing file does not " 1254 + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); 1255 } 1256 if (fs.exists(CORRUPTDIR)) { 1257 if (fs.listStatus(CORRUPTDIR).length > 0) { 1258 fail("There are some corrupt logs, " 1259 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); 1260 } 1261 } 1262 } 1263 1264 @Test 1265 public void testRecoveredEditsStoragePolicy() throws IOException { 1266 conf.set(HConstants.WAL_STORAGE_POLICY, "ALL_SSD"); 1267 try { 1268 Path path = createRecoveredEditsPathForRegion(); 1269 assertEquals("ALL_SSD", fs.getStoragePolicy(path.getParent()).getName()); 1270 } finally { 1271 conf.unset(HConstants.WAL_STORAGE_POLICY); 1272 } 1273 } 1274 1275 /** 1276 * See HBASE-27644, typically we should not have empty WALEdit but we should be able to process 1277 * it, instead of losing data after it. 1278 */ 1279 @Test 1280 public void testEmptyWALEdit() throws IOException { 1281 final String region = "region__5"; 1282 REGIONS.clear(); 1283 REGIONS.add(region); 1284 makeRegionDirs(REGIONS); 1285 fs.mkdirs(WALDIR); 1286 Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5); 1287 generateEmptyEditWAL(path, Bytes.toBytes(region)); 1288 useDifferentDFSClient(); 1289 1290 Path regiondir = new Path(TABLEDIR, region); 1291 fs.mkdirs(regiondir); 1292 List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); 1293 // Make sure that WALSplitter generate the split file 1294 assertEquals(1, splitPaths.size()); 1295 1296 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); 1297 assertEquals(11, countWAL(originalLog)); 1298 // we will skip the empty WAL when splitting 1299 assertEquals(10, countWAL(splitPaths.get(0))); 1300 } 1301 1302 private void generateEmptyEditWAL(Path path, byte[] region) throws IOException { 1303 fs.mkdirs(WALDIR); 1304 try (Writer writer = wals.createWALWriter(fs, path)) { 1305 long seq = 0; 1306 appendEmptyEntry(writer, TABLE_NAME, region, seq++); 1307 for (int i = 0; i < 10; i++) { 1308 appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++); 1309 } 1310 } 1311 } 1312 1313 private Writer generateWALs(int leaveOpen) throws IOException { 1314 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0); 1315 } 1316 1317 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { 1318 return generateWALs(writers, entries, leaveOpen, 7); 1319 } 1320 1321 private void makeRegionDirs(List<String> regions) throws IOException { 1322 for (String region : regions) { 1323 LOG.debug("Creating dir for region " + region); 1324 fs.mkdirs(new Path(TABLEDIR, region)); 1325 } 1326 } 1327 1328 /** 1329 * @param leaveOpen index to leave un-closed. -1 to close all. 1330 * @return the writer that's still open, or null if all were closed. 1331 */ 1332 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) 1333 throws IOException { 1334 makeRegionDirs(REGIONS); 1335 fs.mkdirs(WALDIR); 1336 Writer[] ws = new Writer[writers]; 1337 int seq = 0; 1338 int numRegionEventsAdded = 0; 1339 for (int i = 0; i < writers; i++) { 1340 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); 1341 for (int j = 0; j < entries; j++) { 1342 int prefix = 0; 1343 for (String region : REGIONS) { 1344 String row_key = region + prefix++ + i + j; 1345 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY, 1346 QUALIFIER, VALUE, seq++); 1347 1348 if (numRegionEventsAdded < regionEvents) { 1349 numRegionEventsAdded++; 1350 appendRegionEvent(ws[i], region); 1351 } 1352 } 1353 } 1354 if (i != leaveOpen) { 1355 ws[i].close(); 1356 LOG.info("Closing writer " + i); 1357 } 1358 } 1359 if (leaveOpen < 0 || leaveOpen >= writers) { 1360 return null; 1361 } 1362 return ws[leaveOpen]; 1363 } 1364 1365 private Path[] getLogForRegion(TableName table, String region) throws IOException { 1366 Path tdir = CommonFSUtils.getWALTableDir(conf, table); 1367 @SuppressWarnings("deprecation") 1368 Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir( 1369 HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region)))); 1370 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { 1371 @Override 1372 public boolean accept(Path p) { 1373 if (WALSplitUtil.isSequenceIdFile(p)) { 1374 return false; 1375 } 1376 return true; 1377 } 1378 }); 1379 Path[] paths = new Path[files.length]; 1380 for (int i = 0; i < files.length; i++) { 1381 paths[i] = files[i].getPath(); 1382 } 1383 return paths; 1384 } 1385 1386 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException { 1387 FSDataOutputStream out; 1388 int fileSize = (int) fs.listStatus(path)[0].getLen(); 1389 1390 FSDataInputStream in = fs.open(path); 1391 byte[] corrupted_bytes = new byte[fileSize]; 1392 in.readFully(0, corrupted_bytes, 0, fileSize); 1393 in.close(); 1394 1395 switch (corruption) { 1396 case APPEND_GARBAGE: 1397 fs.delete(path, false); 1398 out = fs.create(path); 1399 out.write(corrupted_bytes); 1400 out.write(Bytes.toBytes("-----")); 1401 closeOrFlush(close, out); 1402 break; 1403 1404 case INSERT_GARBAGE_ON_FIRST_LINE: 1405 fs.delete(path, false); 1406 out = fs.create(path); 1407 out.write(0); 1408 out.write(corrupted_bytes); 1409 closeOrFlush(close, out); 1410 break; 1411 1412 case INSERT_GARBAGE_IN_THE_MIDDLE: 1413 fs.delete(path, false); 1414 out = fs.create(path); 1415 int middle = (int) Math.floor(corrupted_bytes.length / 2); 1416 out.write(corrupted_bytes, 0, middle); 1417 out.write(0); 1418 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); 1419 closeOrFlush(close, out); 1420 break; 1421 1422 case TRUNCATE: 1423 fs.delete(path, false); 1424 out = fs.create(path); 1425 out.write(corrupted_bytes, 0, fileSize 1426 - (32 + AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); 1427 closeOrFlush(close, out); 1428 break; 1429 1430 case TRUNCATE_TRAILER: 1431 fs.delete(path, false); 1432 out = fs.create(path); 1433 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. 1434 closeOrFlush(close, out); 1435 break; 1436 } 1437 } 1438 1439 private void closeOrFlush(boolean close, FSDataOutputStream out) throws IOException { 1440 if (close) { 1441 out.close(); 1442 } else { 1443 Method syncMethod = null; 1444 try { 1445 syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {}); 1446 } catch (NoSuchMethodException e) { 1447 try { 1448 syncMethod = out.getClass().getMethod("sync", new Class<?>[] {}); 1449 } catch (NoSuchMethodException ex) { 1450 throw new IOException( 1451 "This version of Hadoop supports " + "neither Syncable.sync() nor Syncable.hflush()."); 1452 } 1453 } 1454 try { 1455 syncMethod.invoke(out, new Object[] {}); 1456 } catch (Exception e) { 1457 throw new IOException(e); 1458 } 1459 // Not in 0out.hflush(); 1460 } 1461 } 1462 1463 private int countWAL(Path log) throws IOException { 1464 int count = 0; 1465 try (WALStreamReader in = wals.createStreamReader(fs, log)) { 1466 while (in.next() != null) { 1467 count++; 1468 } 1469 } 1470 return count; 1471 } 1472 1473 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs, 1474 String output) throws IOException { 1475 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder(); 1476 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes())) 1477 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes())) 1478 .setRegionName(ByteString.copyFrom(hri.getRegionName())) 1479 .setFamilyName(ByteString.copyFrom(FAMILY)) 1480 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY)) 1481 .addAllCompactionInput(Arrays.asList(inputs)).addCompactionOutput(output); 1482 1483 WALEdit edit = WALEdit.createCompaction(hri, desc.build()); 1484 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, 1485 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); 1486 w.append(new Entry(key, edit)); 1487 w.sync(false); 1488 } 1489 1490 private static void appendRegionEvent(Writer w, String region) throws IOException { 1491 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( 1492 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, TABLE_NAME.toBytes(), 1493 Bytes.toBytes(region), Bytes.toBytes(String.valueOf(region.hashCode())), 1, 1494 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>> of()); 1495 final long time = EnvironmentEdgeManager.currentTime(); 1496 final WALKeyImpl walKey = 1497 new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, HConstants.DEFAULT_CLUSTER_ID); 1498 WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc); 1499 w.append(new Entry(walKey, we)); 1500 w.sync(false); 1501 } 1502 1503 private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row, 1504 byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException { 1505 LOG.info(Thread.currentThread().getName() + " append"); 1506 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); 1507 LOG.info(Thread.currentThread().getName() + " sync"); 1508 writer.sync(false); 1509 return seq; 1510 } 1511 1512 private static Entry createTestEntry(TableName table, byte[] region, byte[] row, byte[] family, 1513 byte[] qualifier, byte[] value, long seq) { 1514 long time = System.nanoTime(); 1515 1516 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); 1517 WALEdit edit = new WALEdit(); 1518 edit.add(cell); 1519 return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit); 1520 } 1521 1522 private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq) 1523 throws IOException { 1524 LOG.info(Thread.currentThread().getName() + " append"); 1525 writer.append(createEmptyEntry(table, region, seq)); 1526 LOG.info(Thread.currentThread().getName() + " sync"); 1527 writer.sync(false); 1528 return seq; 1529 } 1530 1531 private static Entry createEmptyEntry(TableName table, byte[] region, long seq) { 1532 long time = System.nanoTime(); 1533 return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), 1534 new WALEdit()); 1535 } 1536 1537 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { 1538 Writer writer = 1539 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); 1540 if (closeFile) { 1541 writer.close(); 1542 } 1543 } 1544 1545 private boolean logsAreEqual(Path p1, Path p2) throws IOException { 1546 try (WALStreamReader in1 = wals.createStreamReader(fs, p1); 1547 WALStreamReader in2 = wals.createStreamReader(fs, p2)) { 1548 Entry entry1; 1549 Entry entry2; 1550 while ((entry1 = in1.next()) != null) { 1551 entry2 = in2.next(); 1552 if ( 1553 (entry1.getKey().compareTo(entry2.getKey()) != 0) 1554 || (!entry1.getEdit().toString().equals(entry2.getEdit().toString())) 1555 ) { 1556 return false; 1557 } 1558 } 1559 } 1560 return true; 1561 } 1562}