001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
021import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey;
022import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
023import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import static org.junit.Assume.assumeFalse;
029
030import java.io.FileNotFoundException;
031import java.io.IOException;
032import java.lang.reflect.Method;
033import java.security.PrivilegedExceptionAction;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collections;
037import java.util.HashMap;
038import java.util.HashSet;
039import java.util.List;
040import java.util.Map;
041import java.util.NavigableSet;
042import java.util.Objects;
043import java.util.Set;
044import java.util.concurrent.atomic.AtomicBoolean;
045import java.util.concurrent.atomic.AtomicInteger;
046import java.util.concurrent.atomic.AtomicLong;
047import java.util.stream.Collectors;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.FSDataInputStream;
050import org.apache.hadoop.fs.FSDataOutputStream;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.FileUtil;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.PathFilter;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.HBaseClassTestRule;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseTestingUtil;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.KeyValue;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableName;
064import org.apache.hadoop.hbase.client.RegionInfo;
065import org.apache.hadoop.hbase.client.RegionInfoBuilder;
066import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
067import org.apache.hadoop.hbase.master.SplitLogManager;
068import org.apache.hadoop.hbase.regionserver.HRegion;
069import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
070import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader;
071import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
072import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
073import org.apache.hadoop.hbase.security.User;
074import org.apache.hadoop.hbase.testclassification.LargeTests;
075import org.apache.hadoop.hbase.testclassification.RegionServerTests;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.CancelableProgressable;
078import org.apache.hadoop.hbase.util.CommonFSUtils;
079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
080import org.apache.hadoop.hbase.util.Threads;
081import org.apache.hadoop.hbase.wal.WAL.Entry;
082import org.apache.hadoop.hbase.wal.WALProvider.Writer;
083import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
084import org.apache.hadoop.hdfs.DFSTestUtil;
085import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
086import org.apache.hadoop.ipc.RemoteException;
087import org.junit.After;
088import org.junit.AfterClass;
089import org.junit.Before;
090import org.junit.BeforeClass;
091import org.junit.ClassRule;
092import org.junit.Rule;
093import org.junit.Test;
094import org.junit.experimental.categories.Category;
095import org.junit.rules.TestName;
096import org.mockito.Mockito;
097import org.mockito.invocation.InvocationOnMock;
098import org.mockito.stubbing.Answer;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
103import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
104import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
105import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
106
107import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
109
110/**
111 * Testing {@link WAL} splitting code.
112 */
113@Category({ RegionServerTests.class, LargeTests.class })
114public class TestWALSplit {
115  @ClassRule
116  public static final HBaseClassTestRule CLASS_RULE =
117    HBaseClassTestRule.forClass(TestWALSplit.class);
118  private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
119
120  private static Configuration conf;
121  private FileSystem fs;
122
123  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
124
125  private Path HBASEDIR;
126  private Path HBASELOGDIR;
127  private Path WALDIR;
128  private Path OLDLOGDIR;
129  private Path CORRUPTDIR;
130  private Path TABLEDIR;
131  private String TMPDIRNAME;
132
133  private static final int NUM_WRITERS = 10;
134  private static final int ENTRIES = 10; // entries per writer per region
135
136  private static final String FILENAME_BEING_SPLIT = "testfile";
137  private static final TableName TABLE_NAME = TableName.valueOf("t1");
138  private static final byte[] FAMILY = Bytes.toBytes("f1");
139  private static final byte[] QUALIFIER = Bytes.toBytes("q1");
140  private static final byte[] VALUE = Bytes.toBytes("v1");
141  private static final String WAL_FILE_PREFIX = "wal.dat.";
142  private static List<String> REGIONS = new ArrayList<>();
143  private static String ROBBER;
144  private static String ZOMBIE;
145  private static String[] GROUP = new String[] { "supergroup" };
146
147  static enum Corruptions {
148    INSERT_GARBAGE_ON_FIRST_LINE,
149    INSERT_GARBAGE_IN_THE_MIDDLE,
150    APPEND_GARBAGE,
151    TRUNCATE,
152    TRUNCATE_TRAILER
153  }
154
155  @BeforeClass
156  public static void setUpBeforeClass() throws Exception {
157    conf = TEST_UTIL.getConfiguration();
158    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
159    conf.setClass(FSHLogProvider.WRITER_IMPL, InstrumentedLogWriter.class, Writer.class);
160    // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
161    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
162    // Create fake maping user to group and set it to the conf.
163    Map<String, String[]> u2g_map = new HashMap<>(2);
164    ROBBER = User.getCurrent().getName() + "-robber";
165    ZOMBIE = User.getCurrent().getName() + "-zombie";
166    u2g_map.put(ROBBER, GROUP);
167    u2g_map.put(ZOMBIE, GROUP);
168    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
169    conf.setInt("dfs.heartbeat.interval", 1);
170    TEST_UTIL.startMiniDFSCluster(2);
171  }
172
173  @AfterClass
174  public static void tearDownAfterClass() throws Exception {
175    TEST_UTIL.shutdownMiniDFSCluster();
176  }
177
178  @Rule
179  public TestName name = new TestName();
180  private WALFactory wals = null;
181
182  @Before
183  public void setUp() throws Exception {
184    LOG.info("Cleaning up cluster for new test.");
185    fs = TEST_UTIL.getDFSCluster().getFileSystem();
186    HBASEDIR = TEST_UTIL.createRootDir();
187    HBASELOGDIR = TEST_UTIL.createWALRootDir();
188    OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
189    CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
190    TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
191    TMPDIRNAME =
192      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
193    REGIONS.clear();
194    Collections.addAll(REGIONS, "bbb", "ccc");
195    InstrumentedLogWriter.activateFailure = false;
196    wals = new WALFactory(conf, name.getMethodName());
197    WALDIR = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(ServerName
198      .valueOf(name.getMethodName(), 16010, EnvironmentEdgeManager.currentTime()).toString()));
199    // fs.mkdirs(WALDIR);
200  }
201
202  @After
203  public void tearDown() throws Exception {
204    try {
205      wals.close();
206    } catch (IOException exception) {
207      // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
208      LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if"
209        + " you see a failure look here.");
210      LOG.debug("exception details", exception);
211    } finally {
212      wals = null;
213      fs.delete(HBASEDIR, true);
214      fs.delete(HBASELOGDIR, true);
215    }
216  }
217
218  /**
219   * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
220   * Ensures we do not lose edits.
221   */
222  @Test
223  public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
224    final AtomicLong counter = new AtomicLong(0);
225    AtomicBoolean stop = new AtomicBoolean(false);
226    // Region we'll write edits too and then later examine to make sure they all made it in.
227    final String region = REGIONS.get(0);
228    final int numWriters = 3;
229    Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
230    try {
231      long startCount = counter.get();
232      zombie.start();
233      // Wait till writer starts going.
234      while (startCount == counter.get())
235        Threads.sleep(1);
236      // Give it a second to write a few appends.
237      Threads.sleep(1000);
238      final Configuration conf2 = HBaseConfiguration.create(conf);
239      final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
240      int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
241        @Override
242        public Integer run() throws Exception {
243          StringBuilder ls =
244            new StringBuilder("Contents of WALDIR (").append(WALDIR).append("):\n");
245          for (FileStatus status : fs.listStatus(WALDIR)) {
246            ls.append("\t").append(status.toString()).append("\n");
247          }
248          LOG.debug(Objects.toString(ls));
249          LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
250          WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
251          LOG.info("Finished splitting out from under zombie.");
252          Path[] logfiles = getLogForRegion(TABLE_NAME, region);
253          assertEquals("wrong number of split files for region", numWriters, logfiles.length);
254          int count = 0;
255          for (Path logfile : logfiles) {
256            count += countWAL(logfile);
257          }
258          return count;
259        }
260      });
261      LOG.info("zombie=" + counter.get() + ", robber=" + count);
262      assertTrue(
263        "The log file could have at most 1 extra log entry, but can't have less. "
264          + "Zombie could write " + counter.get() + " and logfile had only " + count,
265        counter.get() == count || counter.get() + 1 == count);
266    } finally {
267      stop.set(true);
268      zombie.interrupt();
269      Threads.threadDumpingIsAlive(zombie);
270    }
271  }
272
273  /**
274   * This thread will keep writing to a 'wal' file even after the split process has started. It
275   * simulates a region server that was considered dead but woke up and wrote some more to the last
276   * log entry. Does its writing as an alternate user in another filesystem instance to simulate
277   * better it being a regionserver.
278   */
279  private class ZombieLastLogWriterRegionServer extends Thread {
280    final AtomicLong editsCount;
281    final AtomicBoolean stop;
282    final int numOfWriters;
283    /**
284     * Region to write edits for.
285     */
286    final String region;
287    final User user;
288
289    public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
290      final String region, final int writers) throws IOException, InterruptedException {
291      super("ZombieLastLogWriterRegionServer");
292      setDaemon(true);
293      this.stop = stop;
294      this.editsCount = counter;
295      this.region = region;
296      this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
297      numOfWriters = writers;
298    }
299
300    @Override
301    public void run() {
302      try {
303        doWriting();
304      } catch (IOException e) {
305        LOG.warn(getName() + " Writer exiting " + e);
306      } catch (InterruptedException e) {
307        LOG.warn(getName() + " Writer exiting " + e);
308      }
309    }
310
311    private void doWriting() throws IOException, InterruptedException {
312      this.user.runAs(new PrivilegedExceptionAction<Object>() {
313        @Override
314        public Object run() throws Exception {
315          // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose
316          // index we supply here.
317          int walToKeepOpen = numOfWriters - 1;
318          // The below method writes numOfWriters files each with ENTRIES entries for a total of
319          // numOfWriters * ENTRIES added per column family in the region.
320          Writer writer = null;
321          try {
322            writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
323          } catch (IOException e1) {
324            throw new RuntimeException("Failed", e1);
325          }
326          // Update counter so has all edits written so far.
327          editsCount.addAndGet(numOfWriters * ENTRIES);
328          loop(writer);
329          // If we've been interruped, then things should have shifted out from under us.
330          // closing should error
331          try {
332            writer.close();
333            fail("Writing closing after parsing should give an error.");
334          } catch (IOException exception) {
335            LOG.debug("ignoring error when closing final writer.", exception);
336          }
337          return null;
338        }
339      });
340    }
341
342    private void loop(final Writer writer) {
343      byte[] regionBytes = Bytes.toBytes(this.region);
344      while (!stop.get()) {
345        try {
346          long seq = appendEntry(writer, TABLE_NAME, regionBytes,
347            Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
348          long count = editsCount.incrementAndGet();
349          LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
350          try {
351            Thread.sleep(1);
352          } catch (InterruptedException e) {
353            //
354          }
355        } catch (IOException ex) {
356          LOG.error(getName() + " ex " + ex.toString());
357          if (ex instanceof RemoteException) {
358            LOG.error("Juliet: got RemoteException " + ex.getMessage() + " while writing "
359              + (editsCount.get() + 1));
360          } else {
361            LOG.error(getName() + " failed to write....at " + editsCount.get());
362            fail("Failed to write " + editsCount.get());
363          }
364          break;
365        } catch (Throwable t) {
366          LOG.error(getName() + " HOW? " + t);
367          LOG.debug("exception details", t);
368          break;
369        }
370      }
371      LOG.info(getName() + " Writer exiting");
372    }
373  }
374
375  /**
376   * @see "https://issues.apache.org/jira/browse/HBASE-3020"
377   */
378  @Test
379  public void testRecoveredEditsPathForMeta() throws IOException {
380    Path p = createRecoveredEditsPathForRegion();
381    String parentOfParent = p.getParent().getParent().getName();
382    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
383  }
384
385  /**
386   * Test old recovered edits file doesn't break WALSplitter. This is useful in upgrading old
387   * instances.
388   */
389  @Test
390  public void testOldRecoveredEditsFileSidelined() throws IOException {
391    Path p = createRecoveredEditsPathForRegion();
392    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
393    Path regiondir = new Path(tdir, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
394    fs.mkdirs(regiondir);
395    Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
396    assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
397    fs.createNewFile(parent); // create a recovered.edits file
398    String parentOfParent = p.getParent().getParent().getName();
399    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
400    WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
401  }
402
403  private Path createRecoveredEditsPathForRegion() throws IOException {
404    byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
405    Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
406      FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
407    return p;
408  }
409
410  @Test
411  public void testHasRecoveredEdits() throws IOException {
412    Path p = createRecoveredEditsPathForRegion();
413    assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
414    String renamedEdit = p.getName().split("-")[0];
415    fs.createNewFile(new Path(p.getParent(), renamedEdit));
416    assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
417  }
418
419  private void useDifferentDFSClient() throws IOException {
420    // make fs act as a different client now
421    // initialize will create a new DFSClient with a new client ID
422    fs.initialize(fs.getUri(), conf);
423  }
424
425  @Test
426  public void testSplitPreservesEdits() throws IOException {
427    final String REGION = "region__1";
428    REGIONS.clear();
429    REGIONS.add(REGION);
430
431    generateWALs(1, 10, -1, 0);
432    useDifferentDFSClient();
433    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
434    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
435    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
436    assertEquals(1, splitLog.length);
437
438    assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
439  }
440
441  @Test
442  public void testSplitRemovesRegionEventsEdits() throws IOException {
443    final String REGION = "region__1";
444    REGIONS.clear();
445    REGIONS.add(REGION);
446
447    generateWALs(1, 10, -1, 100);
448    useDifferentDFSClient();
449    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
450    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
451    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
452    assertEquals(1, splitLog.length);
453
454    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
455    // split log should only have the test edits
456    assertEquals(10, countWAL(splitLog[0]));
457  }
458
459  @Test
460  public void testSplitLeavesCompactionEventsEdits() throws IOException {
461    RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
462    REGIONS.clear();
463    REGIONS.add(hri.getEncodedName());
464    Path regionDir =
465      new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
466    LOG.info("Creating region directory: " + regionDir);
467    assertTrue(fs.mkdirs(regionDir));
468
469    Writer writer = generateWALs(1, 10, 0, 10);
470    String[] compactInputs = new String[] { "file1", "file2", "file3" };
471    String compactOutput = "file4";
472    appendCompactionEvent(writer, hri, compactInputs, compactOutput);
473    writer.close();
474
475    useDifferentDFSClient();
476    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
477
478    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
479    // original log should have 10 test edits, 10 region markers, 1 compaction marker
480    assertEquals(21, countWAL(originalLog));
481
482    Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
483    assertEquals(1, splitLog.length);
484
485    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
486    // split log should have 10 test edits plus 1 compaction marker
487    assertEquals(11, countWAL(splitLog[0]));
488  }
489
490  /**
491   * Tests that WalSplitter ignores replication marker edits.
492   */
493  @Test
494  public void testSplitRemovesReplicationMarkerEdits() throws IOException {
495    RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO;
496    Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1");
497    generateReplicationMarkerEdits(path, regionInfo);
498    useDifferentDFSClient();
499    List<FileStatus> logFiles =
500      SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null);
501    assertEquals(1, logFiles.size());
502    assertEquals(path, logFiles.get(0).getPath());
503    List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
504    // Make sure that WALSplitter doesn't fail.
505    assertEquals(0, splitPaths.size());
506  }
507
508  private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException {
509    long timestamp = EnvironmentEdgeManager.currentTime();
510    fs.mkdirs(WALDIR);
511    try (Writer writer = wals.createWALWriter(fs, path)) {
512      WALProtos.ReplicationMarkerDescriptor.Builder builder =
513        WALProtos.ReplicationMarkerDescriptor.newBuilder();
514      builder.setWalName("wal-name");
515      builder.setRegionServerName("rs-name");
516      builder.setOffset(0L);
517      WALProtos.ReplicationMarkerDescriptor desc = builder.build();
518      appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(),
519        getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1);
520    }
521  }
522
523  /**
524   * @param expectedEntries -1 to not assert
525   * @return the count across all regions
526   */
527  private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException {
528    useDifferentDFSClient();
529    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
530    int result = 0;
531    for (String region : REGIONS) {
532      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
533      assertEquals(expectedFiles, logfiles.length);
534      int count = 0;
535      for (Path logfile : logfiles) {
536        count += countWAL(logfile);
537      }
538      if (-1 != expectedEntries) {
539        assertEquals(expectedEntries, count);
540      }
541      result += count;
542    }
543    return result;
544  }
545
546  @Test
547  public void testEmptyLogFiles() throws IOException {
548    testEmptyLogFiles(true);
549  }
550
551  @Test
552  public void testEmptyOpenLogFiles() throws IOException {
553    testEmptyLogFiles(false);
554  }
555
556  private void testEmptyLogFiles(final boolean close) throws IOException {
557    // we won't create the hlog dir until getWAL got called, so
558    // make dir here when testing empty log file
559    fs.mkdirs(WALDIR);
560    injectEmptyFile(".empty", close);
561    generateWALs(Integer.MAX_VALUE);
562    injectEmptyFile("empty", close);
563    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
564  }
565
566  @Test
567  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
568    // generate logs but leave wal.dat.5 open.
569    generateWALs(5);
570    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
571  }
572
573  @Test
574  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
575    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
576    generateWALs(Integer.MAX_VALUE);
577    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.APPEND_GARBAGE, true);
578    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
579  }
580
581  @Test
582  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
583    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
584    generateWALs(Integer.MAX_VALUE);
585    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE,
586      true);
587    splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); // 1 corrupt
588  }
589
590  @Test
591  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
592    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
593    generateWALs(Integer.MAX_VALUE);
594    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE,
595      false);
596    // the entries in the original logs are alternating regions
597    // considering the sequence file header, the middle corruption should
598    // affect at least half of the entries
599    int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
600    int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
601    int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
602    assertTrue("The file up to the corrupted area hasn't been parsed",
603      REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
604  }
605
606  @Test
607  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
608    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
609    List<FaultyProtobufWALStreamReader.FailureType> failureTypes =
610      Arrays.asList(FaultyProtobufWALStreamReader.FailureType.values()).stream()
611        .filter(x -> x != FaultyProtobufWALStreamReader.FailureType.NONE)
612        .collect(Collectors.toList());
613    for (FaultyProtobufWALStreamReader.FailureType failureType : failureTypes) {
614      final Set<String> walDirContents = splitCorruptWALs(failureType);
615      final Set<String> archivedLogs = new HashSet<>();
616      final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
617      for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
618        archived.append("\n\t").append(log.toString());
619        archivedLogs.add(log.getPath().getName());
620      }
621      LOG.debug(archived.toString());
622      assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs,
623        walDirContents);
624    }
625  }
626
627  /**
628   * @return set of wal names present prior to split attempt.
629   * @throws IOException if the split process fails
630   */
631  private Set<String> splitCorruptWALs(final FaultyProtobufWALStreamReader.FailureType failureType)
632    throws IOException {
633    String backupClass = conf.get(WALFactory.WAL_STREAM_READER_CLASS_IMPL);
634    InstrumentedLogWriter.activateFailure = false;
635
636    try {
637      conf.setClass(WALFactory.WAL_STREAM_READER_CLASS_IMPL, FaultyProtobufWALStreamReader.class,
638        WALStreamReader.class);
639      conf.set("faultyprotobuflogreader.failuretype", failureType.name());
640      // Clean up from previous tests or previous loop
641      try {
642        wals.shutdown();
643      } catch (IOException exception) {
644        // since we're splitting out from under the factory, we should expect some closing failures.
645        LOG.debug("Ignoring problem closing WALFactory.", exception);
646      }
647      wals.close();
648      try {
649        for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
650          fs.delete(log.getPath(), true);
651        }
652      } catch (FileNotFoundException exception) {
653        LOG.debug("no previous CORRUPTDIR to clean.");
654      }
655      // change to the faulty reader
656      wals = new WALFactory(conf, name.getMethodName());
657      generateWALs(-1);
658      // Our reader will render all of these files corrupt.
659      final Set<String> walDirContents = new HashSet<>();
660      for (FileStatus status : fs.listStatus(WALDIR)) {
661        walDirContents.add(status.getPath().getName());
662      }
663      useDifferentDFSClient();
664      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
665      return walDirContents;
666    } finally {
667      if (backupClass != null) {
668        conf.set(WALFactory.WAL_STREAM_READER_CLASS_IMPL, backupClass);
669      } else {
670        conf.unset(WALFactory.WAL_STREAM_READER_CLASS_IMPL);
671      }
672    }
673  }
674
675  @Test(expected = IOException.class)
676  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
677    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
678    splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING);
679  }
680
681  @Test
682  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
683    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
684    try {
685      splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING);
686    } catch (IOException e) {
687      LOG.debug("split with 'skip errors' set to 'false' correctly threw");
688    }
689    assertEquals("if skip.errors is false all files should remain in place", NUM_WRITERS,
690      fs.listStatus(WALDIR).length);
691  }
692
693  private void ignoreCorruption(final Corruptions corruption, final int entryCount,
694    final int expectedCount) throws IOException {
695    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
696
697    final String REGION = "region__1";
698    REGIONS.clear();
699    REGIONS.add(REGION);
700
701    Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
702    generateWALs(1, entryCount, -1, 0);
703    corruptWAL(c1, corruption, true);
704
705    useDifferentDFSClient();
706    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
707
708    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
709    assertEquals(1, splitLog.length);
710
711    int actualCount = 0;
712    try (WALStreamReader in = wals.createStreamReader(fs, splitLog[0])) {
713      while (in.next() != null) {
714        ++actualCount;
715      }
716    }
717    assertEquals(expectedCount, actualCount);
718
719    // should not have stored the EOF files as corrupt
720    FileStatus[] archivedLogs =
721      fs.exists(CORRUPTDIR) ? fs.listStatus(CORRUPTDIR) : new FileStatus[0];
722    assertEquals(0, archivedLogs.length);
723
724  }
725
726  @Test
727  public void testEOFisIgnored() throws IOException {
728    int entryCount = 10;
729    ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount - 1);
730  }
731
732  @Test
733  public void testCorruptWALTrailer() throws IOException {
734    int entryCount = 10;
735    ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
736  }
737
738  @Test
739  public void testLogsGetArchivedAfterSplit() throws IOException {
740    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
741    generateWALs(-1);
742    useDifferentDFSClient();
743    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
744    FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
745    assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
746  }
747
748  @Test
749  public void testSplit() throws IOException {
750    generateWALs(-1);
751    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
752  }
753
754  @Test
755  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException {
756    generateWALs(-1);
757    useDifferentDFSClient();
758    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
759    FileStatus[] statuses = null;
760    try {
761      statuses = fs.listStatus(WALDIR);
762      if (statuses != null) {
763        fail("Files left in log dir: " + Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
764      }
765    } catch (FileNotFoundException e) {
766      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
767    }
768  }
769
770  @Test(expected = IOException.class)
771  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
772    // leave 5th log open so we could append the "trap"
773    Writer writer = generateWALs(4);
774    useDifferentDFSClient();
775
776    String region = "break";
777    Path regiondir = new Path(TABLEDIR, region);
778    fs.mkdirs(regiondir);
779
780    InstrumentedLogWriter.activateFailure = false;
781    appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes("r" + 999), FAMILY,
782      QUALIFIER, VALUE, 0);
783    writer.close();
784
785    try {
786      InstrumentedLogWriter.activateFailure = true;
787      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
788    } catch (IOException e) {
789      assertTrue(e.getMessage()
790        .contains("This exception is instrumented and should only be thrown for testing"));
791      throw e;
792    } finally {
793      InstrumentedLogWriter.activateFailure = false;
794    }
795  }
796
797  @Test
798  public void testSplitDeletedRegion() throws IOException {
799    REGIONS.clear();
800    String region = "region_that_splits";
801    REGIONS.add(region);
802
803    generateWALs(1);
804    useDifferentDFSClient();
805
806    Path regiondir = new Path(TABLEDIR, region);
807    fs.delete(regiondir, true);
808    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
809    assertFalse(fs.exists(regiondir));
810  }
811
812  @Test
813  public void testIOEOnOutputThread() throws Exception {
814    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
815
816    generateWALs(-1);
817    useDifferentDFSClient();
818    FileStatus[] logfiles = fs.listStatus(WALDIR);
819    assertTrue("There should be some log file", logfiles != null && logfiles.length > 0);
820    // wals with no entries (like the one we don't use in the factory)
821    // won't cause a failure since nothing will ever be written.
822    // pick the largest one since it's most likely to have entries.
823    int largestLogFile = 0;
824    long largestSize = 0;
825    for (int i = 0; i < logfiles.length; i++) {
826      if (logfiles[i].getLen() > largestSize) {
827        largestLogFile = i;
828        largestSize = logfiles[i].getLen();
829      }
830    }
831    assertTrue("There should be some log greater than size 0.", 0 < largestSize);
832    // Set up a splitter that will throw an IOE on the output side
833    WALSplitter logSplitter =
834      new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
835        @Override
836        protected Writer createWriter(Path logfile) throws IOException {
837          Writer mockWriter = Mockito.mock(Writer.class);
838          Mockito.doThrow(new IOException("Injected")).when(mockWriter)
839            .append(Mockito.<Entry> any());
840          return mockWriter;
841        }
842      };
843    // Set up a background thread dumper. Needs a thread to depend on and then we need to run
844    // the thread dumping in a background thread so it does not hold up the test.
845    final AtomicBoolean stop = new AtomicBoolean(false);
846    final Thread someOldThread = new Thread("Some-old-thread") {
847      @Override
848      public void run() {
849        while (!stop.get())
850          Threads.sleep(10);
851      }
852    };
853    someOldThread.setDaemon(true);
854    someOldThread.start();
855    final Thread t = new Thread("Background-thread-dumper") {
856      @Override
857      public void run() {
858        try {
859          Threads.threadDumpingIsAlive(someOldThread);
860        } catch (InterruptedException e) {
861          e.printStackTrace();
862        }
863      }
864    };
865    t.setDaemon(true);
866    t.start();
867    try {
868      logSplitter.splitWAL(logfiles[largestLogFile], null);
869      fail("Didn't throw!");
870    } catch (IOException ioe) {
871      assertTrue(ioe.toString().contains("Injected"));
872    } finally {
873      // Setting this to true will turn off the background thread dumper.
874      stop.set(true);
875    }
876  }
877
878  /**
879   * @param spiedFs should be instrumented for failure.
880   */
881  private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
882    generateWALs(-1);
883    useDifferentDFSClient();
884
885    try {
886      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
887      assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
888      assertFalse(fs.exists(WALDIR));
889    } catch (IOException e) {
890      fail("There shouldn't be any exception but: " + e.toString());
891    }
892  }
893
894  // Test for HBASE-3412
895  @Test
896  public void testMovedWALDuringRecovery() throws Exception {
897    // This partial mock will throw LEE for every file simulating
898    // files that were moved
899    FileSystem spiedFs = Mockito.spy(fs);
900    // The "File does not exist" part is very important,
901    // that's how it comes out of HDFS
902    Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).when(spiedFs)
903      .append(Mockito.<Path> any());
904    retryOverHdfsProblem(spiedFs);
905  }
906
907  @Test
908  public void testRetryOpenDuringRecovery() throws Exception {
909    FileSystem spiedFs = Mockito.spy(fs);
910    // The "Cannot obtain block length", "Could not obtain the last block",
911    // and "Blocklist for [^ ]* has changed.*" part is very important,
912    // that's how it comes out of HDFS. If HDFS changes the exception
913    // message, this test needs to be adjusted accordingly.
914    //
915    // When DFSClient tries to open a file, HDFS needs to locate
916    // the last block of the file and get its length. However, if the
917    // last block is under recovery, HDFS may have problem to obtain
918    // the block length, in which case, retry may help.
919    Mockito.doAnswer(new Answer<FSDataInputStream>() {
920      private final String[] errors = new String[] { "Cannot obtain block length",
921        "Could not obtain the last block", "Blocklist for " + OLDLOGDIR + " has changed" };
922      private int count = 0;
923
924      @Override
925      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
926        if (count < 3) {
927          throw new IOException(errors[count++]);
928        }
929        return (FSDataInputStream) invocation.callRealMethod();
930      }
931    }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt());
932    retryOverHdfsProblem(spiedFs);
933  }
934
935  @Test
936  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
937    generateWALs(1, 10, -1);
938    FileStatus logfile = fs.listStatus(WALDIR)[0];
939    useDifferentDFSClient();
940
941    final AtomicInteger count = new AtomicInteger();
942
943    CancelableProgressable localReporter = new CancelableProgressable() {
944      @Override
945      public boolean progress() {
946        count.getAndIncrement();
947        return false;
948      }
949    };
950
951    FileSystem spiedFs = Mockito.spy(fs);
952    Mockito.doAnswer(new Answer<FSDataInputStream>() {
953      @Override
954      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
955        Thread.sleep(1500); // Sleep a while and wait report status invoked
956        return (FSDataInputStream) invocation.callRealMethod();
957      }
958    }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt());
959
960    try {
961      conf.setInt("hbase.splitlog.report.period", 1000);
962      boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
963        Mockito.mock(SplitLogWorkerCoordination.class), wals, null);
964      assertFalse("Log splitting should failed", ret);
965      assertTrue(count.get() > 0);
966    } catch (IOException e) {
967      fail("There shouldn't be any exception but: " + e.toString());
968    } finally {
969      // reset it back to its default value
970      conf.setInt("hbase.splitlog.report.period", 59000);
971    }
972  }
973
974  /**
975   * Test log split process with fake data and lots of edits to trigger threading issues.
976   */
977  @Test
978  public void testThreading() throws Exception {
979    doTestThreading(20000, 128 * 1024 * 1024, 0);
980  }
981
982  /**
983   * Test blocking behavior of the log split process if writers are writing slower than the reader
984   * is reading.
985   */
986  @Test
987  public void testThreadingSlowWriterSmallBuffer() throws Exception {
988    // The logic of this test has conflict with the limit writers split logic, skip this test for
989    // TestWALSplitBoundedLogWriterCreation
990    assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation);
991    doTestThreading(200, 1024, 50);
992  }
993
994  /**
995   * Sets up a log splitter with a mock reader and writer. The mock reader generates a specified
996   * number of edits spread across 5 regions. The mock writer optionally sleeps for each edit it is
997   * fed. After the split is complete, verifies that the statistics show the correct number of edits
998   * output into each region.
999   * @param numFakeEdits   number of fake edits to push through pipeline
1000   * @param bufferSize     size of in-memory buffer
1001   * @param writerSlowness writer threads will sleep this many ms per edit
1002   */
1003  private void doTestThreading(final int numFakeEdits, final int bufferSize,
1004    final int writerSlowness) throws Exception {
1005
1006    Configuration localConf = new Configuration(conf);
1007    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
1008
1009    // Create a fake log file (we'll override the reader to produce a stream of edits)
1010    Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
1011    FSDataOutputStream out = fs.create(logPath);
1012    out.close();
1013
1014    // Make region dirs for our destination regions so the output doesn't get skipped
1015    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
1016    makeRegionDirs(regions);
1017
1018    // Create a splitter that reads and writes the data without touching disk
1019    WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) {
1020      /* Produce a mock writer that doesn't write anywhere */
1021      @Override
1022      protected Writer createWriter(Path logfile) throws IOException {
1023        Writer mockWriter = Mockito.mock(Writer.class);
1024        Mockito.doAnswer(new Answer<Void>() {
1025          int expectedIndex = 0;
1026
1027          @Override
1028          public Void answer(InvocationOnMock invocation) {
1029            if (writerSlowness > 0) {
1030              try {
1031                Thread.sleep(writerSlowness);
1032              } catch (InterruptedException ie) {
1033                Thread.currentThread().interrupt();
1034              }
1035            }
1036            Entry entry = (Entry) invocation.getArgument(0);
1037            WALEdit edit = entry.getEdit();
1038            List<Cell> cells = edit.getCells();
1039            assertEquals(1, cells.size());
1040            Cell cell = cells.get(0);
1041
1042            // Check that the edits come in the right order.
1043            assertEquals(expectedIndex,
1044              Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
1045            expectedIndex++;
1046            return null;
1047          }
1048        }).when(mockWriter).append(Mockito.<Entry> any());
1049        return mockWriter;
1050      }
1051
1052      /* Produce a mock reader that generates fake entries */
1053      @Override
1054      protected WALStreamReader getReader(FileStatus file, boolean skipErrors,
1055        CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
1056        WALStreamReader mockReader = Mockito.mock(WALStreamReader.class);
1057        Mockito.doAnswer(new Answer<Entry>() {
1058          int index = 0;
1059
1060          @Override
1061          public Entry answer(InvocationOnMock invocation) throws Throwable {
1062            if (index >= numFakeEdits) {
1063              return null;
1064            }
1065
1066            // Generate r0 through r4 in round robin fashion
1067            int regionIdx = index % regions.size();
1068            byte region[] = new byte[] { (byte) 'r', (byte) (0x30 + regionIdx) };
1069
1070            Entry ret = createTestEntry(TABLE_NAME, region, Bytes.toBytes(index / regions.size()),
1071              FAMILY, QUALIFIER, VALUE, index);
1072            index++;
1073            return ret;
1074          }
1075        }).when(mockReader).next();
1076        return mockReader;
1077      }
1078    };
1079
1080    logSplitter.splitWAL(fs.getFileStatus(logPath), null);
1081
1082    // Verify number of written edits per region
1083    Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1084    for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
1085      LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey());
1086      assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
1087    }
1088    assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1089  }
1090
1091  // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1092  @Test
1093  public void testSplitLogFileDeletedRegionDir() throws IOException {
1094    LOG.info("testSplitLogFileDeletedRegionDir");
1095    final String REGION = "region__1";
1096    REGIONS.clear();
1097    REGIONS.add(REGION);
1098
1099    generateWALs(1, 10, -1);
1100    useDifferentDFSClient();
1101
1102    Path regiondir = new Path(TABLEDIR, REGION);
1103    LOG.info("Region directory is" + regiondir);
1104    fs.delete(regiondir, true);
1105    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1106    assertFalse(fs.exists(regiondir));
1107  }
1108
1109  @Test
1110  public void testSplitLogFileEmpty() throws IOException {
1111    LOG.info("testSplitLogFileEmpty");
1112    // we won't create the hlog dir until getWAL got called, so
1113    // make dir here when testing empty log file
1114    fs.mkdirs(WALDIR);
1115    injectEmptyFile(".empty", true);
1116    useDifferentDFSClient();
1117
1118    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1119    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1120    assertFalse(fs.exists(tdir));
1121
1122    assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1123  }
1124
1125  @Test
1126  public void testSplitLogFileMultipleRegions() throws IOException {
1127    LOG.info("testSplitLogFileMultipleRegions");
1128    generateWALs(1, 10, -1);
1129    splitAndCount(1, 10);
1130  }
1131
1132  @Test
1133  public void testSplitLogFileFirstLineCorruptionLog() throws IOException {
1134    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
1135    generateWALs(1, 10, -1);
1136    FileStatus logfile = fs.listStatus(WALDIR)[0];
1137
1138    corruptWAL(logfile.getPath(), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1139
1140    useDifferentDFSClient();
1141    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1142
1143    final Path corruptDir =
1144      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1145    assertEquals(1, fs.listStatus(corruptDir).length);
1146  }
1147
1148  /**
1149   * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1150   */
1151  @Test
1152  public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1153    LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1154    // Generate wals for our destination region
1155    String regionName = "r0";
1156    final Path regiondir = new Path(TABLEDIR, regionName);
1157    REGIONS.clear();
1158    REGIONS.add(regionName);
1159    generateWALs(-1);
1160
1161    wals.getWAL(null);
1162    FileStatus[] logfiles = fs.listStatus(WALDIR);
1163    assertTrue("There should be some log file", logfiles != null && logfiles.length > 0);
1164
1165    WALSplitter logSplitter =
1166      new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
1167        @Override
1168        protected Writer createWriter(Path logfile) throws IOException {
1169          Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1170          // After creating writer, simulate region's
1171          // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1172          // region and delete them, excluding files with '.temp' suffix.
1173          NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
1174          if (files != null && !files.isEmpty()) {
1175            for (Path file : files) {
1176              if (!this.walFS.delete(file, false)) {
1177                LOG.error("Failed delete of " + file);
1178              } else {
1179                LOG.debug("Deleted recovered.edits file=" + file);
1180              }
1181            }
1182          }
1183          return writer;
1184        }
1185      };
1186    try {
1187      logSplitter.splitWAL(logfiles[0], null);
1188    } catch (IOException e) {
1189      LOG.info(e.toString(), e);
1190      fail("Throws IOException when spliting "
1191        + "log, it is most likely because writing file does not "
1192        + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1193    }
1194    if (fs.exists(CORRUPTDIR)) {
1195      if (fs.listStatus(CORRUPTDIR).length > 0) {
1196        fail("There are some corrupt logs, "
1197          + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1198      }
1199    }
1200  }
1201
1202  @Test
1203  public void testRecoveredEditsStoragePolicy() throws IOException {
1204    conf.set(HConstants.WAL_STORAGE_POLICY, "ALL_SSD");
1205    try {
1206      Path path = createRecoveredEditsPathForRegion();
1207      assertEquals("ALL_SSD", fs.getStoragePolicy(path.getParent()).getName());
1208    } finally {
1209      conf.unset(HConstants.WAL_STORAGE_POLICY);
1210    }
1211  }
1212
1213  /**
1214   * See HBASE-27644, typically we should not have empty WALEdit but we should be able to process
1215   * it, instead of losing data after it.
1216   */
1217  @Test
1218  public void testEmptyWALEdit() throws IOException {
1219    final String region = "region__5";
1220    REGIONS.clear();
1221    REGIONS.add(region);
1222    makeRegionDirs(REGIONS);
1223    fs.mkdirs(WALDIR);
1224    Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5);
1225    generateEmptyEditWAL(path, Bytes.toBytes(region));
1226    useDifferentDFSClient();
1227
1228    Path regiondir = new Path(TABLEDIR, region);
1229    fs.mkdirs(regiondir);
1230    List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1231    // Make sure that WALSplitter generate the split file
1232    assertEquals(1, splitPaths.size());
1233
1234    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
1235    assertEquals(11, countWAL(originalLog));
1236    // we will skip the empty WAL when splitting
1237    assertEquals(10, countWAL(splitPaths.get(0)));
1238  }
1239
1240  private void generateEmptyEditWAL(Path path, byte[] region) throws IOException {
1241    fs.mkdirs(WALDIR);
1242    try (Writer writer = wals.createWALWriter(fs, path)) {
1243      long seq = 0;
1244      appendEmptyEntry(writer, TABLE_NAME, region, seq++);
1245      for (int i = 0; i < 10; i++) {
1246        appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++);
1247      }
1248    }
1249  }
1250
1251  private Writer generateWALs(int leaveOpen) throws IOException {
1252    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1253  }
1254
1255  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1256    return generateWALs(writers, entries, leaveOpen, 7);
1257  }
1258
1259  private void makeRegionDirs(List<String> regions) throws IOException {
1260    for (String region : regions) {
1261      LOG.debug("Creating dir for region " + region);
1262      fs.mkdirs(new Path(TABLEDIR, region));
1263    }
1264  }
1265
1266  /**
1267   * @param leaveOpen index to leave un-closed. -1 to close all.
1268   * @return the writer that's still open, or null if all were closed.
1269   */
1270  private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents)
1271    throws IOException {
1272    makeRegionDirs(REGIONS);
1273    fs.mkdirs(WALDIR);
1274    Writer[] ws = new Writer[writers];
1275    int seq = 0;
1276    int numRegionEventsAdded = 0;
1277    for (int i = 0; i < writers; i++) {
1278      ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1279      for (int j = 0; j < entries; j++) {
1280        int prefix = 0;
1281        for (String region : REGIONS) {
1282          String row_key = region + prefix++ + i + j;
1283          appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1284            QUALIFIER, VALUE, seq++);
1285
1286          if (numRegionEventsAdded < regionEvents) {
1287            numRegionEventsAdded++;
1288            appendRegionEvent(ws[i], region);
1289          }
1290        }
1291      }
1292      if (i != leaveOpen) {
1293        ws[i].close();
1294        LOG.info("Closing writer " + i);
1295      }
1296    }
1297    if (leaveOpen < 0 || leaveOpen >= writers) {
1298      return null;
1299    }
1300    return ws[leaveOpen];
1301  }
1302
1303  private Path[] getLogForRegion(TableName table, String region) throws IOException {
1304    Path tdir = CommonFSUtils.getWALTableDir(conf, table);
1305    @SuppressWarnings("deprecation")
1306    Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(
1307      HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region))));
1308    FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1309      @Override
1310      public boolean accept(Path p) {
1311        if (WALSplitUtil.isSequenceIdFile(p)) {
1312          return false;
1313        }
1314        return true;
1315      }
1316    });
1317    Path[] paths = new Path[files.length];
1318    for (int i = 0; i < files.length; i++) {
1319      paths[i] = files[i].getPath();
1320    }
1321    return paths;
1322  }
1323
1324  private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1325    FSDataOutputStream out;
1326    int fileSize = (int) fs.listStatus(path)[0].getLen();
1327
1328    FSDataInputStream in = fs.open(path);
1329    byte[] corrupted_bytes = new byte[fileSize];
1330    in.readFully(0, corrupted_bytes, 0, fileSize);
1331    in.close();
1332
1333    switch (corruption) {
1334      case APPEND_GARBAGE:
1335        fs.delete(path, false);
1336        out = fs.create(path);
1337        out.write(corrupted_bytes);
1338        out.write(Bytes.toBytes("-----"));
1339        closeOrFlush(close, out);
1340        break;
1341
1342      case INSERT_GARBAGE_ON_FIRST_LINE:
1343        fs.delete(path, false);
1344        out = fs.create(path);
1345        out.write(0);
1346        out.write(corrupted_bytes);
1347        closeOrFlush(close, out);
1348        break;
1349
1350      case INSERT_GARBAGE_IN_THE_MIDDLE:
1351        fs.delete(path, false);
1352        out = fs.create(path);
1353        int middle = (int) Math.floor(corrupted_bytes.length / 2);
1354        out.write(corrupted_bytes, 0, middle);
1355        out.write(0);
1356        out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1357        closeOrFlush(close, out);
1358        break;
1359
1360      case TRUNCATE:
1361        fs.delete(path, false);
1362        out = fs.create(path);
1363        out.write(corrupted_bytes, 0, fileSize
1364          - (32 + AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1365        closeOrFlush(close, out);
1366        break;
1367
1368      case TRUNCATE_TRAILER:
1369        fs.delete(path, false);
1370        out = fs.create(path);
1371        out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1372        closeOrFlush(close, out);
1373        break;
1374    }
1375  }
1376
1377  private void closeOrFlush(boolean close, FSDataOutputStream out) throws IOException {
1378    if (close) {
1379      out.close();
1380    } else {
1381      Method syncMethod = null;
1382      try {
1383        syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {});
1384      } catch (NoSuchMethodException e) {
1385        try {
1386          syncMethod = out.getClass().getMethod("sync", new Class<?>[] {});
1387        } catch (NoSuchMethodException ex) {
1388          throw new IOException(
1389            "This version of Hadoop supports " + "neither Syncable.sync() nor Syncable.hflush().");
1390        }
1391      }
1392      try {
1393        syncMethod.invoke(out, new Object[] {});
1394      } catch (Exception e) {
1395        throw new IOException(e);
1396      }
1397      // Not in 0out.hflush();
1398    }
1399  }
1400
1401  private int countWAL(Path log) throws IOException {
1402    int count = 0;
1403    try (WALStreamReader in = wals.createStreamReader(fs, log)) {
1404      while (in.next() != null) {
1405        count++;
1406      }
1407    }
1408    return count;
1409  }
1410
1411  private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1412    String output) throws IOException {
1413    WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1414    desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1415      .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1416      .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1417      .setFamilyName(ByteString.copyFrom(FAMILY))
1418      .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1419      .addAllCompactionInput(Arrays.asList(inputs)).addCompactionOutput(output);
1420
1421    WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1422    WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1423      EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1424    w.append(new Entry(key, edit));
1425    w.sync(false);
1426  }
1427
1428  private static void appendRegionEvent(Writer w, String region) throws IOException {
1429    WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1430      WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, TABLE_NAME.toBytes(),
1431      Bytes.toBytes(region), Bytes.toBytes(String.valueOf(region.hashCode())), 1,
1432      ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>> of());
1433    final long time = EnvironmentEdgeManager.currentTime();
1434    final WALKeyImpl walKey =
1435      new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, HConstants.DEFAULT_CLUSTER_ID);
1436    WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
1437    w.append(new Entry(walKey, we));
1438    w.sync(false);
1439  }
1440
1441  private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
1442    byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException {
1443    LOG.info(Thread.currentThread().getName() + " append");
1444    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1445    LOG.info(Thread.currentThread().getName() + " sync");
1446    writer.sync(false);
1447    return seq;
1448  }
1449
1450  private static Entry createTestEntry(TableName table, byte[] region, byte[] row, byte[] family,
1451    byte[] qualifier, byte[] value, long seq) {
1452    long time = System.nanoTime();
1453
1454    final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1455    WALEdit edit = new WALEdit();
1456    edit.add(cell);
1457    return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit);
1458  }
1459
1460  private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq)
1461    throws IOException {
1462    LOG.info(Thread.currentThread().getName() + " append");
1463    writer.append(createEmptyEntry(table, region, seq));
1464    LOG.info(Thread.currentThread().getName() + " sync");
1465    writer.sync(false);
1466    return seq;
1467  }
1468
1469  private static Entry createEmptyEntry(TableName table, byte[] region, long seq) {
1470    long time = System.nanoTime();
1471    return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID),
1472      new WALEdit());
1473  }
1474
1475  private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1476    Writer writer =
1477      WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1478    if (closeFile) {
1479      writer.close();
1480    }
1481  }
1482
1483  private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1484    try (WALStreamReader in1 = wals.createStreamReader(fs, p1);
1485      WALStreamReader in2 = wals.createStreamReader(fs, p2)) {
1486      Entry entry1;
1487      Entry entry2;
1488      while ((entry1 = in1.next()) != null) {
1489        entry2 = in2.next();
1490        if (
1491          (entry1.getKey().compareTo(entry2.getKey()) != 0)
1492            || (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
1493        ) {
1494          return false;
1495        }
1496      }
1497    }
1498    return true;
1499  }
1500}