001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.lang.reflect.Method;
028import java.security.PrivilegedExceptionAction;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.List;
035import java.util.Map;
036import java.util.NavigableSet;
037import java.util.Objects;
038import java.util.Set;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FSDataInputStream;
045import org.apache.hadoop.fs.FSDataOutputStream;
046import org.apache.hadoop.fs.FileStatus;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.FileUtil;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.fs.PathFilter;
051import org.apache.hadoop.hbase.Cell;
052import org.apache.hadoop.hbase.HBaseClassTestRule;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtility;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.RegionInfoBuilder;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
063import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
064import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.testclassification.RegionServerTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CancelableProgressable;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.FSUtils;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.wal.WAL.Entry;
074import org.apache.hadoop.hbase.wal.WAL.Reader;
075import org.apache.hadoop.hbase.wal.WALProvider.Writer;
076import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
077import org.apache.hadoop.hdfs.DFSTestUtil;
078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
079import org.apache.hadoop.ipc.RemoteException;
080import org.junit.After;
081import org.junit.AfterClass;
082import org.junit.Before;
083import org.junit.BeforeClass;
084import org.junit.ClassRule;
085import org.junit.Rule;
086import org.junit.Test;
087import org.junit.experimental.categories.Category;
088import org.junit.rules.TestName;
089import org.mockito.Mockito;
090import org.mockito.invocation.InvocationOnMock;
091import org.mockito.stubbing.Answer;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
097import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
098import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
099
100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
102
103/**
104 * Testing {@link WAL} splitting code.
105 */
106@Category({RegionServerTests.class, LargeTests.class})
107public class TestWALSplit {
108
109  @ClassRule
110  public static final HBaseClassTestRule CLASS_RULE =
111      HBaseClassTestRule.forClass(TestWALSplit.class);
112
113  {
114    // Uncomment the following lines if more verbosity is needed for
115    // debugging (see HBASE-12285 for details).
116    //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
117    //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
118    //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
119  }
120  private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
121
122  private static Configuration conf;
123  private FileSystem fs;
124
125  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
126
127  private Path HBASEDIR;
128  private Path HBASELOGDIR;
129  private Path WALDIR;
130  private Path OLDLOGDIR;
131  private Path CORRUPTDIR;
132  private Path TABLEDIR;
133  private String TMPDIRNAME;
134
135  private static final int NUM_WRITERS = 10;
136  private static final int ENTRIES = 10; // entries per writer per region
137
138  private static final String FILENAME_BEING_SPLIT = "testfile";
139  private static final TableName TABLE_NAME =
140      TableName.valueOf("t1");
141  private static final byte[] FAMILY = Bytes.toBytes("f1");
142  private static final byte[] QUALIFIER = Bytes.toBytes("q1");
143  private static final byte[] VALUE = Bytes.toBytes("v1");
144  private static final String WAL_FILE_PREFIX = "wal.dat.";
145  private static List<String> REGIONS = new ArrayList<>();
146  private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
147  private static String ROBBER;
148  private static String ZOMBIE;
149  private static String [] GROUP = new String [] {"supergroup"};
150
151  static enum Corruptions {
152    INSERT_GARBAGE_ON_FIRST_LINE,
153    INSERT_GARBAGE_IN_THE_MIDDLE,
154    APPEND_GARBAGE,
155    TRUNCATE,
156    TRUNCATE_TRAILER
157  }
158
159  @BeforeClass
160  public static void setUpBeforeClass() throws Exception {
161    conf = TEST_UTIL.getConfiguration();
162    conf.setClass("hbase.regionserver.hlog.writer.impl",
163        InstrumentedLogWriter.class, Writer.class);
164    // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
165    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
166    // Create fake maping user to group and set it to the conf.
167    Map<String, String []> u2g_map = new HashMap<>(2);
168    ROBBER = User.getCurrent().getName() + "-robber";
169    ZOMBIE = User.getCurrent().getName() + "-zombie";
170    u2g_map.put(ROBBER, GROUP);
171    u2g_map.put(ZOMBIE, GROUP);
172    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
173    conf.setInt("dfs.heartbeat.interval", 1);
174    TEST_UTIL.startMiniDFSCluster(2);
175  }
176
177  @AfterClass
178  public static void tearDownAfterClass() throws Exception {
179    TEST_UTIL.shutdownMiniDFSCluster();
180  }
181
182  @Rule
183  public TestName name = new TestName();
184  private WALFactory wals = null;
185
186  @Before
187  public void setUp() throws Exception {
188    LOG.info("Cleaning up cluster for new test.");
189    fs = TEST_UTIL.getDFSCluster().getFileSystem();
190    HBASEDIR = TEST_UTIL.createRootDir();
191    HBASELOGDIR = TEST_UTIL.createWALRootDir();
192    OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
193    CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
194    TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
195    TMPDIRNAME = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
196      HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
197    REGIONS.clear();
198    Collections.addAll(REGIONS, "bbb", "ccc");
199    InstrumentedLogWriter.activateFailure = false;
200    wals = new WALFactory(conf, name.getMethodName());
201    WALDIR = new Path(HBASELOGDIR,
202        AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(),
203            16010, System.currentTimeMillis()).toString()));
204    //fs.mkdirs(WALDIR);
205  }
206
207  @After
208  public void tearDown() throws Exception {
209    try {
210      wals.close();
211    } catch(IOException exception) {
212      // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
213      LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
214          " you see a failure look here.");
215      LOG.debug("exception details", exception);
216    } finally {
217      wals = null;
218      fs.delete(HBASEDIR, true);
219      fs.delete(HBASELOGDIR, true);
220    }
221  }
222
223  /**
224   * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
225   * Ensures we do not lose edits.
226   * @throws IOException
227   * @throws InterruptedException
228   */
229  @Test
230  public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
231    final AtomicLong counter = new AtomicLong(0);
232    AtomicBoolean stop = new AtomicBoolean(false);
233    // Region we'll write edits too and then later examine to make sure they all made it in.
234    final String region = REGIONS.get(0);
235    final int numWriters = 3;
236    Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
237    try {
238      long startCount = counter.get();
239      zombie.start();
240      // Wait till writer starts going.
241      while (startCount == counter.get()) Threads.sleep(1);
242      // Give it a second to write a few appends.
243      Threads.sleep(1000);
244      final Configuration conf2 = HBaseConfiguration.create(conf);
245      final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
246      int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
247        @Override
248        public Integer run() throws Exception {
249          StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
250              .append("):\n");
251          for (FileStatus status : fs.listStatus(WALDIR)) {
252            ls.append("\t").append(status.toString()).append("\n");
253          }
254          LOG.debug(Objects.toString(ls));
255          LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
256          WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
257          LOG.info("Finished splitting out from under zombie.");
258          Path[] logfiles = getLogForRegion(TABLE_NAME, region);
259          assertEquals("wrong number of split files for region", numWriters, logfiles.length);
260          int count = 0;
261          for (Path logfile: logfiles) {
262            count += countWAL(logfile);
263          }
264          return count;
265        }
266      });
267      LOG.info("zombie=" + counter.get() + ", robber=" + count);
268      assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
269              "Zombie could write " + counter.get() + " and logfile had only " + count,
270          counter.get() == count || counter.get() + 1 == count);
271    } finally {
272      stop.set(true);
273      zombie.interrupt();
274      Threads.threadDumpingIsAlive(zombie);
275    }
276  }
277
278  /**
279   * This thread will keep writing to a 'wal' file even after the split process has started.
280   * It simulates a region server that was considered dead but woke up and wrote some more to the
281   * last log entry. Does its writing as an alternate user in another filesystem instance to
282   * simulate better it being a regionserver.
283   */
284  class ZombieLastLogWriterRegionServer extends Thread {
285    final AtomicLong editsCount;
286    final AtomicBoolean stop;
287    final int numOfWriters;
288    /**
289     * Region to write edits for.
290     */
291    final String region;
292    final User user;
293
294    public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
295        final String region, final int writers)
296        throws IOException, InterruptedException {
297      super("ZombieLastLogWriterRegionServer");
298      setDaemon(true);
299      this.stop = stop;
300      this.editsCount = counter;
301      this.region = region;
302      this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
303      numOfWriters = writers;
304    }
305
306    @Override
307    public void run() {
308      try {
309        doWriting();
310      } catch (IOException e) {
311        LOG.warn(getName() + " Writer exiting " + e);
312      } catch (InterruptedException e) {
313        LOG.warn(getName() + " Writer exiting " + e);
314      }
315    }
316
317    private void doWriting() throws IOException, InterruptedException {
318      this.user.runAs(new PrivilegedExceptionAction<Object>() {
319        @Override
320        public Object run() throws Exception {
321          // Index of the WAL we want to keep open.  generateWALs will leave open the WAL whose
322          // index we supply here.
323          int walToKeepOpen = numOfWriters - 1;
324          // The below method writes numOfWriters files each with ENTRIES entries for a total of
325          // numOfWriters * ENTRIES added per column family in the region.
326          Writer writer = null;
327          try {
328            writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
329          } catch (IOException e1) {
330            throw new RuntimeException("Failed", e1);
331          }
332          // Update counter so has all edits written so far.
333          editsCount.addAndGet(numOfWriters * ENTRIES);
334          loop(writer);
335          // If we've been interruped, then things should have shifted out from under us.
336          // closing should error
337          try {
338            writer.close();
339            fail("Writing closing after parsing should give an error.");
340          } catch (IOException exception) {
341            LOG.debug("ignoring error when closing final writer.", exception);
342          }
343          return null;
344        }
345      });
346    }
347
348    private void loop(final Writer writer) {
349      byte [] regionBytes = Bytes.toBytes(this.region);
350      while (!stop.get()) {
351        try {
352          long seq = appendEntry(writer, TABLE_NAME, regionBytes,
353              Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
354          long count = editsCount.incrementAndGet();
355          LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
356          try {
357            Thread.sleep(1);
358          } catch (InterruptedException e) {
359            //
360          }
361        } catch (IOException ex) {
362          LOG.error(getName() + " ex " + ex.toString());
363          if (ex instanceof RemoteException) {
364            LOG.error("Juliet: got RemoteException " + ex.getMessage() +
365                " while writing " + (editsCount.get() + 1));
366          } else {
367            LOG.error(getName() + " failed to write....at " + editsCount.get());
368            fail("Failed to write " + editsCount.get());
369          }
370          break;
371        } catch (Throwable t) {
372          LOG.error(getName() + " HOW? " + t);
373          LOG.debug("exception details", t);
374          break;
375        }
376      }
377      LOG.info(getName() + " Writer exiting");
378    }
379  }
380
381  /**
382   * @see "https://issues.apache.org/jira/browse/HBASE-3020"
383   */
384  @Test
385  public void testRecoveredEditsPathForMeta() throws IOException {
386    Path p = createRecoveredEditsPathForRegion();
387    String parentOfParent = p.getParent().getParent().getName();
388    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
389  }
390
391  /**
392   * Test old recovered edits file doesn't break WALSplitter.
393   * This is useful in upgrading old instances.
394   */
395  @Test
396  public void testOldRecoveredEditsFileSidelined() throws IOException {
397    Path p = createRecoveredEditsPathForRegion();
398    Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
399    Path regiondir = new Path(tdir,
400      RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
401    fs.mkdirs(regiondir);
402    Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
403    assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
404    fs.createNewFile(parent); // create a recovered.edits file
405    String parentOfParent = p.getParent().getParent().getName();
406    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
407    WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
408  }
409
410  private Path createRecoveredEditsPathForRegion() throws  IOException{
411    byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
412    long now = System.currentTimeMillis();
413    Entry entry =
414      new Entry(new WALKeyImpl(encoded,
415        TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
416        new WALEdit());
417    Path p = WALSplitter.getRegionSplitEditsPath(entry,
418      FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
419    return p;
420  }
421
422  /**
423   * Test hasRecoveredEdits correctly identifies proper recovered edits file on related dir.
424   * @throws IOException on any issues found while creating test required files/directories.
425   */
426  @Test
427  public void testHasRecoveredEdits() throws IOException {
428    Path p = createRecoveredEditsPathForRegion();
429    assertFalse(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
430    String renamedEdit = p.getName().split("-")[0];
431    fs.createNewFile(new Path(p.getParent(), renamedEdit));
432    assertTrue(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
433  }
434
435  private void useDifferentDFSClient() throws IOException {
436    // make fs act as a different client now
437    // initialize will create a new DFSClient with a new client ID
438    fs.initialize(fs.getUri(), conf);
439  }
440
441  @Test
442  public void testSplitPreservesEdits() throws IOException{
443    final String REGION = "region__1";
444    REGIONS.clear();
445    REGIONS.add(REGION);
446
447    generateWALs(1, 10, -1, 0);
448    useDifferentDFSClient();
449    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
450    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
451    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
452    assertEquals(1, splitLog.length);
453
454    assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
455  }
456
457  @Test
458  public void testSplitRemovesRegionEventsEdits() throws IOException{
459    final String REGION = "region__1";
460    REGIONS.clear();
461    REGIONS.add(REGION);
462
463    generateWALs(1, 10, -1, 100);
464    useDifferentDFSClient();
465    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
466    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
467    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
468    assertEquals(1, splitLog.length);
469
470    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
471    // split log should only have the test edits
472    assertEquals(10, countWAL(splitLog[0]));
473  }
474
475
476  @Test
477  public void testSplitLeavesCompactionEventsEdits() throws IOException{
478    RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
479    REGIONS.clear();
480    REGIONS.add(hri.getEncodedName());
481    Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
482    LOG.info("Creating region directory: " + regionDir);
483    assertTrue(fs.mkdirs(regionDir));
484
485    Writer writer = generateWALs(1, 10, 0, 10);
486    String[] compactInputs = new String[]{"file1", "file2", "file3"};
487    String compactOutput = "file4";
488    appendCompactionEvent(writer, hri, compactInputs, compactOutput);
489    writer.close();
490
491    useDifferentDFSClient();
492    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
493
494    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
495    // original log should have 10 test edits, 10 region markers, 1 compaction marker
496    assertEquals(21, countWAL(originalLog));
497
498    Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
499    assertEquals(1, splitLog.length);
500
501    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
502    // split log should have 10 test edits plus 1 compaction marker
503    assertEquals(11, countWAL(splitLog[0]));
504  }
505
506  /**
507   * @param expectedEntries -1 to not assert
508   * @return the count across all regions
509   */
510  private int splitAndCount(final int expectedFiles, final int expectedEntries)
511      throws IOException {
512    useDifferentDFSClient();
513    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
514    int result = 0;
515    for (String region : REGIONS) {
516      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
517      assertEquals(expectedFiles, logfiles.length);
518      int count = 0;
519      for (Path logfile: logfiles) {
520        count += countWAL(logfile);
521      }
522      if (-1 != expectedEntries) {
523        assertEquals(expectedEntries, count);
524      }
525      result += count;
526    }
527    return result;
528  }
529
530  @Test
531  public void testEmptyLogFiles() throws IOException {
532    testEmptyLogFiles(true);
533  }
534
535  @Test
536  public void testEmptyOpenLogFiles() throws IOException {
537    testEmptyLogFiles(false);
538  }
539
540  private void testEmptyLogFiles(final boolean close) throws IOException {
541    // we won't create the hlog dir until getWAL got called, so
542    // make dir here when testing empty log file
543    fs.mkdirs(WALDIR);
544    injectEmptyFile(".empty", close);
545    generateWALs(Integer.MAX_VALUE);
546    injectEmptyFile("empty", close);
547    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
548  }
549
550  @Test
551  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
552    // generate logs but leave wal.dat.5 open.
553    generateWALs(5);
554    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
555  }
556
557  @Test
558  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
559    conf.setBoolean(HBASE_SKIP_ERRORS, true);
560    generateWALs(Integer.MAX_VALUE);
561    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
562        Corruptions.APPEND_GARBAGE, true);
563    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
564  }
565
566  @Test
567  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
568    conf.setBoolean(HBASE_SKIP_ERRORS, true);
569    generateWALs(Integer.MAX_VALUE);
570    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
571        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
572    splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt
573  }
574
575  @Test
576  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
577    conf.setBoolean(HBASE_SKIP_ERRORS, true);
578    generateWALs(Integer.MAX_VALUE);
579    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
580        Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
581    // the entries in the original logs are alternating regions
582    // considering the sequence file header, the middle corruption should
583    // affect at least half of the entries
584    int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
585    int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
586    int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
587    assertTrue("The file up to the corrupted area hasn't been parsed",
588        REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
589  }
590
591  @Test
592  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
593    conf.setBoolean(HBASE_SKIP_ERRORS, true);
594    List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays
595        .asList(FaultyProtobufLogReader.FailureType.values()).stream()
596        .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
597    for (FaultyProtobufLogReader.FailureType failureType : failureTypes) {
598      final Set<String> walDirContents = splitCorruptWALs(failureType);
599      final Set<String> archivedLogs = new HashSet<>();
600      final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
601      for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
602        archived.append("\n\t").append(log.toString());
603        archivedLogs.add(log.getPath().getName());
604      }
605      LOG.debug(archived.toString());
606      assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs,
607          walDirContents);
608    }
609  }
610
611  /**
612   * @return set of wal names present prior to split attempt.
613   * @throws IOException if the split process fails
614   */
615  private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType)
616      throws IOException {
617    Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
618        Reader.class);
619    InstrumentedLogWriter.activateFailure = false;
620
621    try {
622      conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class,
623        Reader.class);
624      conf.set("faultyprotobuflogreader.failuretype", failureType.name());
625      // Clean up from previous tests or previous loop
626      try {
627        wals.shutdown();
628      } catch (IOException exception) {
629        // since we're splitting out from under the factory, we should expect some closing failures.
630        LOG.debug("Ignoring problem closing WALFactory.", exception);
631      }
632      wals.close();
633      try {
634        for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
635          fs.delete(log.getPath(), true);
636        }
637      } catch (FileNotFoundException exception) {
638        LOG.debug("no previous CORRUPTDIR to clean.");
639      }
640      // change to the faulty reader
641      wals = new WALFactory(conf, name.getMethodName());
642      generateWALs(-1);
643      // Our reader will render all of these files corrupt.
644      final Set<String> walDirContents = new HashSet<>();
645      for (FileStatus status : fs.listStatus(WALDIR)) {
646        walDirContents.add(status.getPath().getName());
647      }
648      useDifferentDFSClient();
649      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
650      return walDirContents;
651    } finally {
652      conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
653          Reader.class);
654    }
655  }
656
657  @Test (expected = IOException.class)
658  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
659      throws IOException {
660    conf.setBoolean(HBASE_SKIP_ERRORS, false);
661    splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
662  }
663
664  @Test
665  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
666      throws IOException {
667    conf.setBoolean(HBASE_SKIP_ERRORS, false);
668    try {
669      splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
670    } catch (IOException e) {
671      LOG.debug("split with 'skip errors' set to 'false' correctly threw");
672    }
673    assertEquals("if skip.errors is false all files should remain in place",
674        NUM_WRITERS, fs.listStatus(WALDIR).length);
675  }
676
677  private void ignoreCorruption(final Corruptions corruption, final int entryCount,
678      final int expectedCount) throws IOException {
679    conf.setBoolean(HBASE_SKIP_ERRORS, false);
680
681    final String REGION = "region__1";
682    REGIONS.clear();
683    REGIONS.add(REGION);
684
685    Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
686    generateWALs(1, entryCount, -1, 0);
687    corruptWAL(c1, corruption, true);
688
689    useDifferentDFSClient();
690    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
691
692    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
693    assertEquals(1, splitLog.length);
694
695    int actualCount = 0;
696    Reader in = wals.createReader(fs, splitLog[0]);
697    @SuppressWarnings("unused")
698    Entry entry;
699    while ((entry = in.next()) != null) ++actualCount;
700    assertEquals(expectedCount, actualCount);
701    in.close();
702
703    // should not have stored the EOF files as corrupt
704    FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
705    assertEquals(0, archivedLogs.length);
706
707  }
708
709  @Test
710  public void testEOFisIgnored() throws IOException {
711    int entryCount = 10;
712    ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
713  }
714
715  @Test
716  public void testCorruptWALTrailer() throws IOException {
717    int entryCount = 10;
718    ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
719  }
720
721  @Test
722  public void testLogsGetArchivedAfterSplit() throws IOException {
723    conf.setBoolean(HBASE_SKIP_ERRORS, false);
724    generateWALs(-1);
725    useDifferentDFSClient();
726    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
727    FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
728    assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
729  }
730
731  @Test
732  public void testSplit() throws IOException {
733    generateWALs(-1);
734    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
735  }
736
737  @Test
738  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
739      throws IOException {
740    generateWALs(-1);
741    useDifferentDFSClient();
742    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
743    FileStatus [] statuses = null;
744    try {
745      statuses = fs.listStatus(WALDIR);
746      if (statuses != null) {
747        fail("Files left in log dir: " +
748            Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
749      }
750    } catch (FileNotFoundException e) {
751      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
752    }
753  }
754
755  @Test(expected = IOException.class)
756  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
757    //leave 5th log open so we could append the "trap"
758    Writer writer = generateWALs(4);
759    useDifferentDFSClient();
760
761    String region = "break";
762    Path regiondir = new Path(TABLEDIR, region);
763    fs.mkdirs(regiondir);
764
765    InstrumentedLogWriter.activateFailure = false;
766    appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
767        Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0);
768    writer.close();
769
770    try {
771      InstrumentedLogWriter.activateFailure = true;
772      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
773    } catch (IOException e) {
774      assertTrue(e.getMessage().
775          contains("This exception is instrumented and should only be thrown for testing"));
776      throw e;
777    } finally {
778      InstrumentedLogWriter.activateFailure = false;
779    }
780  }
781
782  @Test
783  public void testSplitDeletedRegion() throws IOException {
784    REGIONS.clear();
785    String region = "region_that_splits";
786    REGIONS.add(region);
787
788    generateWALs(1);
789    useDifferentDFSClient();
790
791    Path regiondir = new Path(TABLEDIR, region);
792    fs.delete(regiondir, true);
793    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
794    assertFalse(fs.exists(regiondir));
795  }
796
797  @Test
798  public void testIOEOnOutputThread() throws Exception {
799    conf.setBoolean(HBASE_SKIP_ERRORS, false);
800
801    generateWALs(-1);
802    useDifferentDFSClient();
803    FileStatus[] logfiles = fs.listStatus(WALDIR);
804    assertTrue("There should be some log file",
805        logfiles != null && logfiles.length > 0);
806    // wals with no entries (like the one we don't use in the factory)
807    // won't cause a failure since nothing will ever be written.
808    // pick the largest one since it's most likely to have entries.
809    int largestLogFile = 0;
810    long largestSize = 0;
811    for (int i = 0; i < logfiles.length; i++) {
812      if (logfiles[i].getLen() > largestSize) {
813        largestLogFile = i;
814        largestSize = logfiles[i].getLen();
815      }
816    }
817    assertTrue("There should be some log greater than size 0.", 0 < largestSize);
818    // Set up a splitter that will throw an IOE on the output side
819    WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) {
820      @Override
821      protected Writer createWriter(Path logfile) throws IOException {
822        Writer mockWriter = Mockito.mock(Writer.class);
823        Mockito.doThrow(new IOException("Injected")).when(
824            mockWriter).append(Mockito.<Entry>any());
825        return mockWriter;
826      }
827    };
828    // Set up a background thread dumper.  Needs a thread to depend on and then we need to run
829    // the thread dumping in a background thread so it does not hold up the test.
830    final AtomicBoolean stop = new AtomicBoolean(false);
831    final Thread someOldThread = new Thread("Some-old-thread") {
832      @Override
833      public void run() {
834        while(!stop.get()) Threads.sleep(10);
835      }
836    };
837    someOldThread.setDaemon(true);
838    someOldThread.start();
839    final Thread t = new Thread("Background-thread-dumper") {
840      @Override
841      public void run() {
842        try {
843          Threads.threadDumpingIsAlive(someOldThread);
844        } catch (InterruptedException e) {
845          e.printStackTrace();
846        }
847      }
848    };
849    t.setDaemon(true);
850    t.start();
851    try {
852      logSplitter.splitLogFile(logfiles[largestLogFile], null);
853      fail("Didn't throw!");
854    } catch (IOException ioe) {
855      assertTrue(ioe.toString().contains("Injected"));
856    } finally {
857      // Setting this to true will turn off the background thread dumper.
858      stop.set(true);
859    }
860  }
861
862  /**
863   * @param spiedFs should be instrumented for failure.
864   */
865  private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
866    generateWALs(-1);
867    useDifferentDFSClient();
868
869    try {
870      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
871      assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
872      assertFalse(fs.exists(WALDIR));
873    } catch (IOException e) {
874      fail("There shouldn't be any exception but: " + e.toString());
875    }
876  }
877
878  // Test for HBASE-3412
879  @Test
880  public void testMovedWALDuringRecovery() throws Exception {
881    // This partial mock will throw LEE for every file simulating
882    // files that were moved
883    FileSystem spiedFs = Mockito.spy(fs);
884    // The "File does not exist" part is very important,
885    // that's how it comes out of HDFS
886    Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
887        when(spiedFs).append(Mockito.<Path>any());
888    retryOverHdfsProblem(spiedFs);
889  }
890
891  @Test
892  public void testRetryOpenDuringRecovery() throws Exception {
893    FileSystem spiedFs = Mockito.spy(fs);
894    // The "Cannot obtain block length", "Could not obtain the last block",
895    // and "Blocklist for [^ ]* has changed.*" part is very important,
896    // that's how it comes out of HDFS. If HDFS changes the exception
897    // message, this test needs to be adjusted accordingly.
898    //
899    // When DFSClient tries to open a file, HDFS needs to locate
900    // the last block of the file and get its length. However, if the
901    // last block is under recovery, HDFS may have problem to obtain
902    // the block length, in which case, retry may help.
903    Mockito.doAnswer(new Answer<FSDataInputStream>() {
904      private final String[] errors = new String[] {
905          "Cannot obtain block length", "Could not obtain the last block",
906          "Blocklist for " + OLDLOGDIR + " has changed"};
907      private int count = 0;
908
909      @Override
910      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
911        if (count < 3) {
912          throw new IOException(errors[count++]);
913        }
914        return (FSDataInputStream)invocation.callRealMethod();
915      }
916    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
917    retryOverHdfsProblem(spiedFs);
918  }
919
920  @Test
921  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
922    generateWALs(1, 10, -1);
923    FileStatus logfile = fs.listStatus(WALDIR)[0];
924    useDifferentDFSClient();
925
926    final AtomicInteger count = new AtomicInteger();
927
928    CancelableProgressable localReporter
929        = new CancelableProgressable() {
930      @Override
931      public boolean progress() {
932        count.getAndIncrement();
933        return false;
934      }
935    };
936
937    FileSystem spiedFs = Mockito.spy(fs);
938    Mockito.doAnswer(new Answer<FSDataInputStream>() {
939      @Override
940      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
941        Thread.sleep(1500); // Sleep a while and wait report status invoked
942        return (FSDataInputStream)invocation.callRealMethod();
943      }
944    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
945
946    try {
947      conf.setInt("hbase.splitlog.report.period", 1000);
948      boolean ret = WALSplitter.splitLogFile(
949          HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals);
950      assertFalse("Log splitting should failed", ret);
951      assertTrue(count.get() > 0);
952    } catch (IOException e) {
953      fail("There shouldn't be any exception but: " + e.toString());
954    } finally {
955      // reset it back to its default value
956      conf.setInt("hbase.splitlog.report.period", 59000);
957    }
958  }
959
960  /**
961   * Test log split process with fake data and lots of edits to trigger threading
962   * issues.
963   */
964  @Test
965  public void testThreading() throws Exception {
966    doTestThreading(20000, 128*1024*1024, 0);
967  }
968
969  /**
970   * Test blocking behavior of the log split process if writers are writing slower
971   * than the reader is reading.
972   */
973  @Test
974  public void testThreadingSlowWriterSmallBuffer() throws Exception {
975    doTestThreading(200, 1024, 50);
976  }
977
978  /**
979   * Sets up a log splitter with a mock reader and writer. The mock reader generates
980   * a specified number of edits spread across 5 regions. The mock writer optionally
981   * sleeps for each edit it is fed.
982   * *
983   * After the split is complete, verifies that the statistics show the correct number
984   * of edits output into each region.
985   *
986   * @param numFakeEdits number of fake edits to push through pipeline
987   * @param bufferSize size of in-memory buffer
988   * @param writerSlowness writer threads will sleep this many ms per edit
989   */
990  private void doTestThreading(final int numFakeEdits,
991      final int bufferSize,
992      final int writerSlowness) throws Exception {
993
994    Configuration localConf = new Configuration(conf);
995    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
996
997    // Create a fake log file (we'll override the reader to produce a stream of edits)
998    Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
999    FSDataOutputStream out = fs.create(logPath);
1000    out.close();
1001
1002    // Make region dirs for our destination regions so the output doesn't get skipped
1003    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
1004    makeRegionDirs(regions);
1005
1006    // Create a splitter that reads and writes the data without touching disk
1007    WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) {
1008
1009      /* Produce a mock writer that doesn't write anywhere */
1010      @Override
1011      protected Writer createWriter(Path logfile) throws IOException {
1012        Writer mockWriter = Mockito.mock(Writer.class);
1013        Mockito.doAnswer(new Answer<Void>() {
1014          int expectedIndex = 0;
1015
1016          @Override
1017          public Void answer(InvocationOnMock invocation) {
1018            if (writerSlowness > 0) {
1019              try {
1020                Thread.sleep(writerSlowness);
1021              } catch (InterruptedException ie) {
1022                Thread.currentThread().interrupt();
1023              }
1024            }
1025            Entry entry = (Entry) invocation.getArgument(0);
1026            WALEdit edit = entry.getEdit();
1027            List<Cell> cells = edit.getCells();
1028            assertEquals(1, cells.size());
1029            Cell cell = cells.get(0);
1030
1031            // Check that the edits come in the right order.
1032            assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(),
1033                cell.getRowLength()));
1034            expectedIndex++;
1035            return null;
1036          }
1037        }).when(mockWriter).append(Mockito.<Entry>any());
1038        return mockWriter;
1039      }
1040
1041      /* Produce a mock reader that generates fake entries */
1042      @Override
1043      protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
1044          throws IOException {
1045        Reader mockReader = Mockito.mock(Reader.class);
1046        Mockito.doAnswer(new Answer<Entry>() {
1047          int index = 0;
1048
1049          @Override
1050          public Entry answer(InvocationOnMock invocation) throws Throwable {
1051            if (index >= numFakeEdits) return null;
1052
1053            // Generate r0 through r4 in round robin fashion
1054            int regionIdx = index % regions.size();
1055            byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
1056
1057            Entry ret = createTestEntry(TABLE_NAME, region,
1058                Bytes.toBytes(index / regions.size()),
1059                FAMILY, QUALIFIER, VALUE, index);
1060            index++;
1061            return ret;
1062          }
1063        }).when(mockReader).next();
1064        return mockReader;
1065      }
1066    };
1067
1068    logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
1069
1070    // Verify number of written edits per region
1071    Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1072    for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
1073      LOG.info("Got " + entry.getValue() + " output edits for region " +
1074          Bytes.toString(entry.getKey()));
1075      assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
1076    }
1077    assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1078  }
1079
1080  // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1081  @Test
1082  public void testSplitLogFileDeletedRegionDir() throws IOException {
1083    LOG.info("testSplitLogFileDeletedRegionDir");
1084    final String REGION = "region__1";
1085    REGIONS.clear();
1086    REGIONS.add(REGION);
1087
1088    generateWALs(1, 10, -1);
1089    useDifferentDFSClient();
1090
1091    Path regiondir = new Path(TABLEDIR, REGION);
1092    LOG.info("Region directory is" + regiondir);
1093    fs.delete(regiondir, true);
1094    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1095    assertFalse(fs.exists(regiondir));
1096  }
1097
1098  @Test
1099  public void testSplitLogFileEmpty() throws IOException {
1100    LOG.info("testSplitLogFileEmpty");
1101    // we won't create the hlog dir until getWAL got called, so
1102    // make dir here when testing empty log file
1103    fs.mkdirs(WALDIR);
1104    injectEmptyFile(".empty", true);
1105    useDifferentDFSClient();
1106
1107    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1108    Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1109    assertFalse(fs.exists(tdir));
1110
1111    assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1112  }
1113
1114  @Test
1115  public void testSplitLogFileMultipleRegions() throws IOException {
1116    LOG.info("testSplitLogFileMultipleRegions");
1117    generateWALs(1, 10, -1);
1118    splitAndCount(1, 10);
1119  }
1120
1121  @Test
1122  public void testSplitLogFileFirstLineCorruptionLog()
1123      throws IOException {
1124    conf.setBoolean(HBASE_SKIP_ERRORS, true);
1125    generateWALs(1, 10, -1);
1126    FileStatus logfile = fs.listStatus(WALDIR)[0];
1127
1128    corruptWAL(logfile.getPath(),
1129        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1130
1131    useDifferentDFSClient();
1132    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1133
1134    final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1135    assertEquals(1, fs.listStatus(corruptDir).length);
1136  }
1137
1138  /**
1139   * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1140   */
1141  @Test
1142  public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1143    LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1144    // Generate wals for our destination region
1145    String regionName = "r0";
1146    final Path regiondir = new Path(TABLEDIR, regionName);
1147    REGIONS.clear();
1148    REGIONS.add(regionName);
1149    generateWALs(-1);
1150
1151    wals.getWAL(null);
1152    FileStatus[] logfiles = fs.listStatus(WALDIR);
1153    assertTrue("There should be some log file",
1154        logfiles != null && logfiles.length > 0);
1155
1156    WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) {
1157      @Override
1158      protected Writer createWriter(Path logfile)
1159          throws IOException {
1160        Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1161        // After creating writer, simulate region's
1162        // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1163        // region and delete them, excluding files with '.temp' suffix.
1164        NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
1165        if (files != null && !files.isEmpty()) {
1166          for (Path file : files) {
1167            if (!this.walFS.delete(file, false)) {
1168              LOG.error("Failed delete of " + file);
1169            } else {
1170              LOG.debug("Deleted recovered.edits file=" + file);
1171            }
1172          }
1173        }
1174        return writer;
1175      }
1176    };
1177    try{
1178      logSplitter.splitLogFile(logfiles[0], null);
1179    } catch (IOException e) {
1180      LOG.info(e.toString(), e);
1181      fail("Throws IOException when spliting "
1182          + "log, it is most likely because writing file does not "
1183          + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1184    }
1185    if (fs.exists(CORRUPTDIR)) {
1186      if (fs.listStatus(CORRUPTDIR).length > 0) {
1187        fail("There are some corrupt logs, "
1188            + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1189      }
1190    }
1191  }
1192
1193  private Writer generateWALs(int leaveOpen) throws IOException {
1194    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1195  }
1196
1197  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1198    return generateWALs(writers, entries, leaveOpen, 7);
1199  }
1200
1201  private void makeRegionDirs(List<String> regions) throws IOException {
1202    for (String region : regions) {
1203      LOG.debug("Creating dir for region " + region);
1204      fs.mkdirs(new Path(TABLEDIR, region));
1205    }
1206  }
1207
1208  /**
1209   * @param leaveOpen index to leave un-closed. -1 to close all.
1210   * @return the writer that's still open, or null if all were closed.
1211   */
1212  private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException {
1213    makeRegionDirs(REGIONS);
1214    fs.mkdirs(WALDIR);
1215    Writer [] ws = new Writer[writers];
1216    int seq = 0;
1217    int numRegionEventsAdded = 0;
1218    for (int i = 0; i < writers; i++) {
1219      ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1220      for (int j = 0; j < entries; j++) {
1221        int prefix = 0;
1222        for (String region : REGIONS) {
1223          String row_key = region + prefix++ + i + j;
1224          appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1225              QUALIFIER, VALUE, seq++);
1226
1227          if (numRegionEventsAdded < regionEvents) {
1228            numRegionEventsAdded ++;
1229            appendRegionEvent(ws[i], region);
1230          }
1231        }
1232      }
1233      if (i != leaveOpen) {
1234        ws[i].close();
1235        LOG.info("Closing writer " + i);
1236      }
1237    }
1238    if (leaveOpen < 0 || leaveOpen >= writers) {
1239      return null;
1240    }
1241    return ws[leaveOpen];
1242  }
1243
1244
1245
1246  private Path[] getLogForRegion(TableName table, String region)
1247      throws IOException {
1248    Path tdir = FSUtils.getWALTableDir(conf, table);
1249    @SuppressWarnings("deprecation")
1250    Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1251        Bytes.toString(Bytes.toBytes(region))));
1252    FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1253      @Override
1254      public boolean accept(Path p) {
1255        if (WALSplitter.isSequenceIdFile(p)) {
1256          return false;
1257        }
1258        return true;
1259      }
1260    });
1261    Path[] paths = new Path[files.length];
1262    for (int i = 0; i < files.length; i++) {
1263      paths[i] = files[i].getPath();
1264    }
1265    return paths;
1266  }
1267
1268  private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1269    FSDataOutputStream out;
1270    int fileSize = (int) fs.listStatus(path)[0].getLen();
1271
1272    FSDataInputStream in = fs.open(path);
1273    byte[] corrupted_bytes = new byte[fileSize];
1274    in.readFully(0, corrupted_bytes, 0, fileSize);
1275    in.close();
1276
1277    switch (corruption) {
1278      case APPEND_GARBAGE:
1279        fs.delete(path, false);
1280        out = fs.create(path);
1281        out.write(corrupted_bytes);
1282        out.write(Bytes.toBytes("-----"));
1283        closeOrFlush(close, out);
1284        break;
1285
1286      case INSERT_GARBAGE_ON_FIRST_LINE:
1287        fs.delete(path, false);
1288        out = fs.create(path);
1289        out.write(0);
1290        out.write(corrupted_bytes);
1291        closeOrFlush(close, out);
1292        break;
1293
1294      case INSERT_GARBAGE_IN_THE_MIDDLE:
1295        fs.delete(path, false);
1296        out = fs.create(path);
1297        int middle = (int) Math.floor(corrupted_bytes.length / 2);
1298        out.write(corrupted_bytes, 0, middle);
1299        out.write(0);
1300        out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1301        closeOrFlush(close, out);
1302        break;
1303
1304      case TRUNCATE:
1305        fs.delete(path, false);
1306        out = fs.create(path);
1307        out.write(corrupted_bytes, 0, fileSize
1308            - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1309        closeOrFlush(close, out);
1310        break;
1311
1312      case TRUNCATE_TRAILER:
1313        fs.delete(path, false);
1314        out = fs.create(path);
1315        out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1316        closeOrFlush(close, out);
1317        break;
1318    }
1319  }
1320
1321  private void closeOrFlush(boolean close, FSDataOutputStream out)
1322      throws IOException {
1323    if (close) {
1324      out.close();
1325    } else {
1326      Method syncMethod = null;
1327      try {
1328        syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1329      } catch (NoSuchMethodException e) {
1330        try {
1331          syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1332        } catch (NoSuchMethodException ex) {
1333          throw new IOException("This version of Hadoop supports " +
1334              "neither Syncable.sync() nor Syncable.hflush().");
1335        }
1336      }
1337      try {
1338        syncMethod.invoke(out, new Object[]{});
1339      } catch (Exception e) {
1340        throw new IOException(e);
1341      }
1342      // Not in 0out.hflush();
1343    }
1344  }
1345
1346  private int countWAL(Path log) throws IOException {
1347    int count = 0;
1348    Reader in = wals.createReader(fs, log);
1349    while (in.next() != null) {
1350      count++;
1351    }
1352    in.close();
1353    return count;
1354  }
1355
1356  private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1357      String output) throws IOException {
1358    WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1359    desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1360        .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1361        .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1362        .setFamilyName(ByteString.copyFrom(FAMILY))
1363        .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1364        .addAllCompactionInput(Arrays.asList(inputs))
1365        .addCompactionOutput(output);
1366
1367    WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1368    WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1369        EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1370    w.append(new Entry(key, edit));
1371    w.sync(false);
1372  }
1373
1374  private static void appendRegionEvent(Writer w, String region) throws IOException {
1375    WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1376        WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
1377        TABLE_NAME.toBytes(),
1378        Bytes.toBytes(region),
1379        Bytes.toBytes(String.valueOf(region.hashCode())),
1380        1,
1381        ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
1382    final long time = EnvironmentEdgeManager.currentTime();
1383    final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time,
1384        HConstants.DEFAULT_CLUSTER_ID);
1385    WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
1386    w.append(new Entry(walKey, we));
1387    w.sync(false);
1388  }
1389
1390  public static long appendEntry(Writer writer, TableName table, byte[] region,
1391      byte[] row, byte[] family, byte[] qualifier,
1392      byte[] value, long seq)
1393      throws IOException {
1394    LOG.info(Thread.currentThread().getName() + " append");
1395    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1396    LOG.info(Thread.currentThread().getName() + " sync");
1397    writer.sync(false);
1398    return seq;
1399  }
1400
1401  private static Entry createTestEntry(
1402      TableName table, byte[] region,
1403      byte[] row, byte[] family, byte[] qualifier,
1404      byte[] value, long seq) {
1405    long time = System.nanoTime();
1406
1407    seq++;
1408    final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1409    WALEdit edit = new WALEdit();
1410    edit.add(cell);
1411    return new Entry(new WALKeyImpl(region, table, seq, time,
1412        HConstants.DEFAULT_CLUSTER_ID), edit);
1413  }
1414
1415  private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1416    Writer writer =
1417        WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1418    if (closeFile) {
1419      writer.close();
1420    }
1421  }
1422
1423  private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1424    Reader in1, in2;
1425    in1 = wals.createReader(fs, p1);
1426    in2 = wals.createReader(fs, p2);
1427    Entry entry1;
1428    Entry entry2;
1429    while ((entry1 = in1.next()) != null) {
1430      entry2 = in2.next();
1431      if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1432          (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1433        return false;
1434      }
1435    }
1436    in1.close();
1437    in2.close();
1438    return true;
1439  }
1440}