001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.lang.reflect.Method;
028import java.security.PrivilegedExceptionAction;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.List;
035import java.util.Map;
036import java.util.NavigableSet;
037import java.util.Objects;
038import java.util.Set;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FSDataInputStream;
045import org.apache.hadoop.fs.FSDataOutputStream;
046import org.apache.hadoop.fs.FileStatus;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.FileUtil;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.fs.PathFilter;
051import org.apache.hadoop.hbase.Cell;
052import org.apache.hadoop.hbase.HBaseClassTestRule;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtility;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.RegionInfoBuilder;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
063import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
064import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.testclassification.RegionServerTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CancelableProgressable;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.FSUtils;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.wal.WAL.Entry;
074import org.apache.hadoop.hbase.wal.WAL.Reader;
075import org.apache.hadoop.hbase.wal.WALProvider.Writer;
076import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
077import org.apache.hadoop.hdfs.DFSTestUtil;
078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
079import org.apache.hadoop.ipc.RemoteException;
080import org.junit.After;
081import org.junit.AfterClass;
082import org.junit.Before;
083import org.junit.BeforeClass;
084import org.junit.ClassRule;
085import org.junit.Rule;
086import org.junit.Test;
087import org.junit.experimental.categories.Category;
088import org.junit.rules.TestName;
089import org.mockito.Mockito;
090import org.mockito.invocation.InvocationOnMock;
091import org.mockito.stubbing.Answer;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
097import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
098import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
099
100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
102
103/**
104 * Testing {@link WAL} splitting code.
105 */
106@Category({RegionServerTests.class, LargeTests.class})
107public class TestWALSplit {
108
109  @ClassRule
110  public static final HBaseClassTestRule CLASS_RULE =
111      HBaseClassTestRule.forClass(TestWALSplit.class);
112
113  {
114    // Uncomment the following lines if more verbosity is needed for
115    // debugging (see HBASE-12285 for details).
116    //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
117    //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
118    //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
119  }
120  private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
121
122  private static Configuration conf;
123  private FileSystem fs;
124
125  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
126
127  private Path HBASEDIR;
128  private Path HBASELOGDIR;
129  private Path WALDIR;
130  private Path OLDLOGDIR;
131  private Path CORRUPTDIR;
132  private Path TABLEDIR;
133  private String TMPDIRNAME;
134
135  private static final int NUM_WRITERS = 10;
136  private static final int ENTRIES = 10; // entries per writer per region
137
138  private static final String FILENAME_BEING_SPLIT = "testfile";
139  private static final TableName TABLE_NAME =
140      TableName.valueOf("t1");
141  private static final byte[] FAMILY = Bytes.toBytes("f1");
142  private static final byte[] QUALIFIER = Bytes.toBytes("q1");
143  private static final byte[] VALUE = Bytes.toBytes("v1");
144  private static final String WAL_FILE_PREFIX = "wal.dat.";
145  private static List<String> REGIONS = new ArrayList<>();
146  private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
147  private static String ROBBER;
148  private static String ZOMBIE;
149  private static String [] GROUP = new String [] {"supergroup"};
150
151  static enum Corruptions {
152    INSERT_GARBAGE_ON_FIRST_LINE,
153    INSERT_GARBAGE_IN_THE_MIDDLE,
154    APPEND_GARBAGE,
155    TRUNCATE,
156    TRUNCATE_TRAILER
157  }
158
159  @BeforeClass
160  public static void setUpBeforeClass() throws Exception {
161    conf = TEST_UTIL.getConfiguration();
162    conf.setClass("hbase.regionserver.hlog.writer.impl",
163        InstrumentedLogWriter.class, Writer.class);
164    // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
165    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
166    // Create fake maping user to group and set it to the conf.
167    Map<String, String []> u2g_map = new HashMap<>(2);
168    ROBBER = User.getCurrent().getName() + "-robber";
169    ZOMBIE = User.getCurrent().getName() + "-zombie";
170    u2g_map.put(ROBBER, GROUP);
171    u2g_map.put(ZOMBIE, GROUP);
172    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
173    conf.setInt("dfs.heartbeat.interval", 1);
174    TEST_UTIL.startMiniDFSCluster(2);
175  }
176
177  @AfterClass
178  public static void tearDownAfterClass() throws Exception {
179    TEST_UTIL.shutdownMiniDFSCluster();
180  }
181
182  @Rule
183  public TestName name = new TestName();
184  private WALFactory wals = null;
185
186  @Before
187  public void setUp() throws Exception {
188    LOG.info("Cleaning up cluster for new test.");
189    fs = TEST_UTIL.getDFSCluster().getFileSystem();
190    HBASEDIR = TEST_UTIL.createRootDir();
191    HBASELOGDIR = TEST_UTIL.createWALRootDir();
192    OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
193    CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
194    TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
195    TMPDIRNAME = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
196      HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
197    REGIONS.clear();
198    Collections.addAll(REGIONS, "bbb", "ccc");
199    InstrumentedLogWriter.activateFailure = false;
200    wals = new WALFactory(conf, name.getMethodName());
201    WALDIR = new Path(HBASELOGDIR,
202        AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(),
203            16010, System.currentTimeMillis()).toString()));
204    //fs.mkdirs(WALDIR);
205  }
206
207  @After
208  public void tearDown() throws Exception {
209    try {
210      wals.close();
211    } catch(IOException exception) {
212      // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
213      LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
214          " you see a failure look here.");
215      LOG.debug("exception details", exception);
216    } finally {
217      wals = null;
218      fs.delete(HBASEDIR, true);
219      fs.delete(HBASELOGDIR, true);
220    }
221  }
222
223  /**
224   * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
225   * Ensures we do not lose edits.
226   * @throws IOException
227   * @throws InterruptedException
228   */
229  @Test
230  public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
231    final AtomicLong counter = new AtomicLong(0);
232    AtomicBoolean stop = new AtomicBoolean(false);
233    // Region we'll write edits too and then later examine to make sure they all made it in.
234    final String region = REGIONS.get(0);
235    final int numWriters = 3;
236    Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
237    try {
238      long startCount = counter.get();
239      zombie.start();
240      // Wait till writer starts going.
241      while (startCount == counter.get()) Threads.sleep(1);
242      // Give it a second to write a few appends.
243      Threads.sleep(1000);
244      final Configuration conf2 = HBaseConfiguration.create(conf);
245      final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
246      int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
247        @Override
248        public Integer run() throws Exception {
249          StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
250              .append("):\n");
251          for (FileStatus status : fs.listStatus(WALDIR)) {
252            ls.append("\t").append(status.toString()).append("\n");
253          }
254          LOG.debug(Objects.toString(ls));
255          LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
256          WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
257          LOG.info("Finished splitting out from under zombie.");
258          Path[] logfiles = getLogForRegion(TABLE_NAME, region);
259          assertEquals("wrong number of split files for region", numWriters, logfiles.length);
260          int count = 0;
261          for (Path logfile: logfiles) {
262            count += countWAL(logfile);
263          }
264          return count;
265        }
266      });
267      LOG.info("zombie=" + counter.get() + ", robber=" + count);
268      assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
269              "Zombie could write " + counter.get() + " and logfile had only " + count,
270          counter.get() == count || counter.get() + 1 == count);
271    } finally {
272      stop.set(true);
273      zombie.interrupt();
274      Threads.threadDumpingIsAlive(zombie);
275    }
276  }
277
278  /**
279   * This thread will keep writing to a 'wal' file even after the split process has started.
280   * It simulates a region server that was considered dead but woke up and wrote some more to the
281   * last log entry. Does its writing as an alternate user in another filesystem instance to
282   * simulate better it being a regionserver.
283   */
284  class ZombieLastLogWriterRegionServer extends Thread {
285    final AtomicLong editsCount;
286    final AtomicBoolean stop;
287    final int numOfWriters;
288    /**
289     * Region to write edits for.
290     */
291    final String region;
292    final User user;
293
294    public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
295        final String region, final int writers)
296        throws IOException, InterruptedException {
297      super("ZombieLastLogWriterRegionServer");
298      setDaemon(true);
299      this.stop = stop;
300      this.editsCount = counter;
301      this.region = region;
302      this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
303      numOfWriters = writers;
304    }
305
306    @Override
307    public void run() {
308      try {
309        doWriting();
310      } catch (IOException e) {
311        LOG.warn(getName() + " Writer exiting " + e);
312      } catch (InterruptedException e) {
313        LOG.warn(getName() + " Writer exiting " + e);
314      }
315    }
316
317    private void doWriting() throws IOException, InterruptedException {
318      this.user.runAs(new PrivilegedExceptionAction<Object>() {
319        @Override
320        public Object run() throws Exception {
321          // Index of the WAL we want to keep open.  generateWALs will leave open the WAL whose
322          // index we supply here.
323          int walToKeepOpen = numOfWriters - 1;
324          // The below method writes numOfWriters files each with ENTRIES entries for a total of
325          // numOfWriters * ENTRIES added per column family in the region.
326          Writer writer = null;
327          try {
328            writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
329          } catch (IOException e1) {
330            throw new RuntimeException("Failed", e1);
331          }
332          // Update counter so has all edits written so far.
333          editsCount.addAndGet(numOfWriters * ENTRIES);
334          loop(writer);
335          // If we've been interruped, then things should have shifted out from under us.
336          // closing should error
337          try {
338            writer.close();
339            fail("Writing closing after parsing should give an error.");
340          } catch (IOException exception) {
341            LOG.debug("ignoring error when closing final writer.", exception);
342          }
343          return null;
344        }
345      });
346    }
347
348    private void loop(final Writer writer) {
349      byte [] regionBytes = Bytes.toBytes(this.region);
350      while (!stop.get()) {
351        try {
352          long seq = appendEntry(writer, TABLE_NAME, regionBytes,
353              Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
354          long count = editsCount.incrementAndGet();
355          LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
356          try {
357            Thread.sleep(1);
358          } catch (InterruptedException e) {
359            //
360          }
361        } catch (IOException ex) {
362          LOG.error(getName() + " ex " + ex.toString());
363          if (ex instanceof RemoteException) {
364            LOG.error("Juliet: got RemoteException " + ex.getMessage() +
365                " while writing " + (editsCount.get() + 1));
366          } else {
367            LOG.error(getName() + " failed to write....at " + editsCount.get());
368            fail("Failed to write " + editsCount.get());
369          }
370          break;
371        } catch (Throwable t) {
372          LOG.error(getName() + " HOW? " + t);
373          LOG.debug("exception details", t);
374          break;
375        }
376      }
377      LOG.info(getName() + " Writer exiting");
378    }
379  }
380
381  /**
382   * @see "https://issues.apache.org/jira/browse/HBASE-3020"
383   */
384  @Test
385  public void testRecoveredEditsPathForMeta() throws IOException {
386    Path p = createRecoveredEditsPathForRegion();
387    String parentOfParent = p.getParent().getParent().getName();
388    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
389  }
390
391  /**
392   * Test old recovered edits file doesn't break WALSplitter.
393   * This is useful in upgrading old instances.
394   */
395  @Test
396  public void testOldRecoveredEditsFileSidelined() throws IOException {
397    Path p = createRecoveredEditsPathForRegion();
398    Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
399    Path regiondir = new Path(tdir,
400      RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
401    fs.mkdirs(regiondir);
402    Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
403    assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
404    fs.createNewFile(parent); // create a recovered.edits file
405    String parentOfParent = p.getParent().getParent().getName();
406    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
407    WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
408  }
409
410  private Path createRecoveredEditsPathForRegion() throws  IOException{
411    byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
412    long now = System.currentTimeMillis();
413    Entry entry =
414      new Entry(new WALKeyImpl(encoded,
415        TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
416        new WALEdit());
417    Path p = WALSplitUtil.getRegionSplitEditsPath(entry,
418      FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
419    return p;
420  }
421
422  @Test
423  public void testHasRecoveredEdits() throws IOException {
424    Path p = createRecoveredEditsPathForRegion();
425    assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
426    String renamedEdit = p.getName().split("-")[0];
427    fs.createNewFile(new Path(p.getParent(), renamedEdit));
428    assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
429  }
430
431  private void useDifferentDFSClient() throws IOException {
432    // make fs act as a different client now
433    // initialize will create a new DFSClient with a new client ID
434    fs.initialize(fs.getUri(), conf);
435  }
436
437  @Test
438  public void testSplitPreservesEdits() throws IOException{
439    final String REGION = "region__1";
440    REGIONS.clear();
441    REGIONS.add(REGION);
442
443    generateWALs(1, 10, -1, 0);
444    useDifferentDFSClient();
445    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
446    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
447    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
448    assertEquals(1, splitLog.length);
449
450    assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
451  }
452
453  @Test
454  public void testSplitRemovesRegionEventsEdits() throws IOException{
455    final String REGION = "region__1";
456    REGIONS.clear();
457    REGIONS.add(REGION);
458
459    generateWALs(1, 10, -1, 100);
460    useDifferentDFSClient();
461    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
462    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
463    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
464    assertEquals(1, splitLog.length);
465
466    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
467    // split log should only have the test edits
468    assertEquals(10, countWAL(splitLog[0]));
469  }
470
471
472  @Test
473  public void testSplitLeavesCompactionEventsEdits() throws IOException{
474    RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
475    REGIONS.clear();
476    REGIONS.add(hri.getEncodedName());
477    Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
478    LOG.info("Creating region directory: " + regionDir);
479    assertTrue(fs.mkdirs(regionDir));
480
481    Writer writer = generateWALs(1, 10, 0, 10);
482    String[] compactInputs = new String[]{"file1", "file2", "file3"};
483    String compactOutput = "file4";
484    appendCompactionEvent(writer, hri, compactInputs, compactOutput);
485    writer.close();
486
487    useDifferentDFSClient();
488    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
489
490    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
491    // original log should have 10 test edits, 10 region markers, 1 compaction marker
492    assertEquals(21, countWAL(originalLog));
493
494    Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
495    assertEquals(1, splitLog.length);
496
497    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
498    // split log should have 10 test edits plus 1 compaction marker
499    assertEquals(11, countWAL(splitLog[0]));
500  }
501
502  /**
503   * @param expectedEntries -1 to not assert
504   * @return the count across all regions
505   */
506  private int splitAndCount(final int expectedFiles, final int expectedEntries)
507      throws IOException {
508    useDifferentDFSClient();
509    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
510    int result = 0;
511    for (String region : REGIONS) {
512      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
513      assertEquals(expectedFiles, logfiles.length);
514      int count = 0;
515      for (Path logfile: logfiles) {
516        count += countWAL(logfile);
517      }
518      if (-1 != expectedEntries) {
519        assertEquals(expectedEntries, count);
520      }
521      result += count;
522    }
523    return result;
524  }
525
526  @Test
527  public void testEmptyLogFiles() throws IOException {
528    testEmptyLogFiles(true);
529  }
530
531  @Test
532  public void testEmptyOpenLogFiles() throws IOException {
533    testEmptyLogFiles(false);
534  }
535
536  private void testEmptyLogFiles(final boolean close) throws IOException {
537    // we won't create the hlog dir until getWAL got called, so
538    // make dir here when testing empty log file
539    fs.mkdirs(WALDIR);
540    injectEmptyFile(".empty", close);
541    generateWALs(Integer.MAX_VALUE);
542    injectEmptyFile("empty", close);
543    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
544  }
545
546  @Test
547  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
548    // generate logs but leave wal.dat.5 open.
549    generateWALs(5);
550    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
551  }
552
553  @Test
554  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
555    conf.setBoolean(HBASE_SKIP_ERRORS, true);
556    generateWALs(Integer.MAX_VALUE);
557    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
558        Corruptions.APPEND_GARBAGE, true);
559    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
560  }
561
562  @Test
563  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
564    conf.setBoolean(HBASE_SKIP_ERRORS, true);
565    generateWALs(Integer.MAX_VALUE);
566    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
567        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
568    splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt
569  }
570
571  @Test
572  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
573    conf.setBoolean(HBASE_SKIP_ERRORS, true);
574    generateWALs(Integer.MAX_VALUE);
575    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
576        Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
577    // the entries in the original logs are alternating regions
578    // considering the sequence file header, the middle corruption should
579    // affect at least half of the entries
580    int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
581    int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
582    int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
583    assertTrue("The file up to the corrupted area hasn't been parsed",
584        REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
585  }
586
587  @Test
588  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
589    conf.setBoolean(HBASE_SKIP_ERRORS, true);
590    List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays
591        .asList(FaultyProtobufLogReader.FailureType.values()).stream()
592        .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
593    for (FaultyProtobufLogReader.FailureType failureType : failureTypes) {
594      final Set<String> walDirContents = splitCorruptWALs(failureType);
595      final Set<String> archivedLogs = new HashSet<>();
596      final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
597      for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
598        archived.append("\n\t").append(log.toString());
599        archivedLogs.add(log.getPath().getName());
600      }
601      LOG.debug(archived.toString());
602      assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs,
603          walDirContents);
604    }
605  }
606
607  /**
608   * @return set of wal names present prior to split attempt.
609   * @throws IOException if the split process fails
610   */
611  private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType)
612      throws IOException {
613    Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
614        Reader.class);
615    InstrumentedLogWriter.activateFailure = false;
616
617    try {
618      conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class,
619        Reader.class);
620      conf.set("faultyprotobuflogreader.failuretype", failureType.name());
621      // Clean up from previous tests or previous loop
622      try {
623        wals.shutdown();
624      } catch (IOException exception) {
625        // since we're splitting out from under the factory, we should expect some closing failures.
626        LOG.debug("Ignoring problem closing WALFactory.", exception);
627      }
628      wals.close();
629      try {
630        for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
631          fs.delete(log.getPath(), true);
632        }
633      } catch (FileNotFoundException exception) {
634        LOG.debug("no previous CORRUPTDIR to clean.");
635      }
636      // change to the faulty reader
637      wals = new WALFactory(conf, name.getMethodName());
638      generateWALs(-1);
639      // Our reader will render all of these files corrupt.
640      final Set<String> walDirContents = new HashSet<>();
641      for (FileStatus status : fs.listStatus(WALDIR)) {
642        walDirContents.add(status.getPath().getName());
643      }
644      useDifferentDFSClient();
645      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
646      return walDirContents;
647    } finally {
648      conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
649          Reader.class);
650    }
651  }
652
653  @Test (expected = IOException.class)
654  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
655      throws IOException {
656    conf.setBoolean(HBASE_SKIP_ERRORS, false);
657    splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
658  }
659
660  @Test
661  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
662      throws IOException {
663    conf.setBoolean(HBASE_SKIP_ERRORS, false);
664    try {
665      splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
666    } catch (IOException e) {
667      LOG.debug("split with 'skip errors' set to 'false' correctly threw");
668    }
669    assertEquals("if skip.errors is false all files should remain in place",
670        NUM_WRITERS, fs.listStatus(WALDIR).length);
671  }
672
673  private void ignoreCorruption(final Corruptions corruption, final int entryCount,
674      final int expectedCount) throws IOException {
675    conf.setBoolean(HBASE_SKIP_ERRORS, false);
676
677    final String REGION = "region__1";
678    REGIONS.clear();
679    REGIONS.add(REGION);
680
681    Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
682    generateWALs(1, entryCount, -1, 0);
683    corruptWAL(c1, corruption, true);
684
685    useDifferentDFSClient();
686    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
687
688    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
689    assertEquals(1, splitLog.length);
690
691    int actualCount = 0;
692    Reader in = wals.createReader(fs, splitLog[0]);
693    @SuppressWarnings("unused")
694    Entry entry;
695    while ((entry = in.next()) != null) ++actualCount;
696    assertEquals(expectedCount, actualCount);
697    in.close();
698
699    // should not have stored the EOF files as corrupt
700    FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
701    assertEquals(0, archivedLogs.length);
702
703  }
704
705  @Test
706  public void testEOFisIgnored() throws IOException {
707    int entryCount = 10;
708    ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
709  }
710
711  @Test
712  public void testCorruptWALTrailer() throws IOException {
713    int entryCount = 10;
714    ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
715  }
716
717  @Test
718  public void testLogsGetArchivedAfterSplit() throws IOException {
719    conf.setBoolean(HBASE_SKIP_ERRORS, false);
720    generateWALs(-1);
721    useDifferentDFSClient();
722    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
723    FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
724    assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
725  }
726
727  @Test
728  public void testSplit() throws IOException {
729    generateWALs(-1);
730    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
731  }
732
733  @Test
734  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
735      throws IOException {
736    generateWALs(-1);
737    useDifferentDFSClient();
738    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
739    FileStatus [] statuses = null;
740    try {
741      statuses = fs.listStatus(WALDIR);
742      if (statuses != null) {
743        fail("Files left in log dir: " +
744            Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
745      }
746    } catch (FileNotFoundException e) {
747      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
748    }
749  }
750
751  @Test(expected = IOException.class)
752  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
753    //leave 5th log open so we could append the "trap"
754    Writer writer = generateWALs(4);
755    useDifferentDFSClient();
756
757    String region = "break";
758    Path regiondir = new Path(TABLEDIR, region);
759    fs.mkdirs(regiondir);
760
761    InstrumentedLogWriter.activateFailure = false;
762    appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
763        Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0);
764    writer.close();
765
766    try {
767      InstrumentedLogWriter.activateFailure = true;
768      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
769    } catch (IOException e) {
770      assertTrue(e.getMessage().
771          contains("This exception is instrumented and should only be thrown for testing"));
772      throw e;
773    } finally {
774      InstrumentedLogWriter.activateFailure = false;
775    }
776  }
777
778  @Test
779  public void testSplitDeletedRegion() throws IOException {
780    REGIONS.clear();
781    String region = "region_that_splits";
782    REGIONS.add(region);
783
784    generateWALs(1);
785    useDifferentDFSClient();
786
787    Path regiondir = new Path(TABLEDIR, region);
788    fs.delete(regiondir, true);
789    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
790    assertFalse(fs.exists(regiondir));
791  }
792
793  @Test
794  public void testIOEOnOutputThread() throws Exception {
795    conf.setBoolean(HBASE_SKIP_ERRORS, false);
796
797    generateWALs(-1);
798    useDifferentDFSClient();
799    FileStatus[] logfiles = fs.listStatus(WALDIR);
800    assertTrue("There should be some log file",
801        logfiles != null && logfiles.length > 0);
802    // wals with no entries (like the one we don't use in the factory)
803    // won't cause a failure since nothing will ever be written.
804    // pick the largest one since it's most likely to have entries.
805    int largestLogFile = 0;
806    long largestSize = 0;
807    for (int i = 0; i < logfiles.length; i++) {
808      if (logfiles[i].getLen() > largestSize) {
809        largestLogFile = i;
810        largestSize = logfiles[i].getLen();
811      }
812    }
813    assertTrue("There should be some log greater than size 0.", 0 < largestSize);
814    // Set up a splitter that will throw an IOE on the output side
815    WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) {
816      @Override
817      protected Writer createWriter(Path logfile) throws IOException {
818        Writer mockWriter = Mockito.mock(Writer.class);
819        Mockito.doThrow(new IOException("Injected")).when(
820            mockWriter).append(Mockito.<Entry>any());
821        return mockWriter;
822      }
823    };
824    // Set up a background thread dumper.  Needs a thread to depend on and then we need to run
825    // the thread dumping in a background thread so it does not hold up the test.
826    final AtomicBoolean stop = new AtomicBoolean(false);
827    final Thread someOldThread = new Thread("Some-old-thread") {
828      @Override
829      public void run() {
830        while(!stop.get()) Threads.sleep(10);
831      }
832    };
833    someOldThread.setDaemon(true);
834    someOldThread.start();
835    final Thread t = new Thread("Background-thread-dumper") {
836      @Override
837      public void run() {
838        try {
839          Threads.threadDumpingIsAlive(someOldThread);
840        } catch (InterruptedException e) {
841          e.printStackTrace();
842        }
843      }
844    };
845    t.setDaemon(true);
846    t.start();
847    try {
848      logSplitter.splitLogFile(logfiles[largestLogFile], null);
849      fail("Didn't throw!");
850    } catch (IOException ioe) {
851      assertTrue(ioe.toString().contains("Injected"));
852    } finally {
853      // Setting this to true will turn off the background thread dumper.
854      stop.set(true);
855    }
856  }
857
858  /**
859   * @param spiedFs should be instrumented for failure.
860   */
861  private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
862    generateWALs(-1);
863    useDifferentDFSClient();
864
865    try {
866      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
867      assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
868      assertFalse(fs.exists(WALDIR));
869    } catch (IOException e) {
870      fail("There shouldn't be any exception but: " + e.toString());
871    }
872  }
873
874  // Test for HBASE-3412
875  @Test
876  public void testMovedWALDuringRecovery() throws Exception {
877    // This partial mock will throw LEE for every file simulating
878    // files that were moved
879    FileSystem spiedFs = Mockito.spy(fs);
880    // The "File does not exist" part is very important,
881    // that's how it comes out of HDFS
882    Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
883        when(spiedFs).append(Mockito.<Path>any());
884    retryOverHdfsProblem(spiedFs);
885  }
886
887  @Test
888  public void testRetryOpenDuringRecovery() throws Exception {
889    FileSystem spiedFs = Mockito.spy(fs);
890    // The "Cannot obtain block length", "Could not obtain the last block",
891    // and "Blocklist for [^ ]* has changed.*" part is very important,
892    // that's how it comes out of HDFS. If HDFS changes the exception
893    // message, this test needs to be adjusted accordingly.
894    //
895    // When DFSClient tries to open a file, HDFS needs to locate
896    // the last block of the file and get its length. However, if the
897    // last block is under recovery, HDFS may have problem to obtain
898    // the block length, in which case, retry may help.
899    Mockito.doAnswer(new Answer<FSDataInputStream>() {
900      private final String[] errors = new String[] {
901          "Cannot obtain block length", "Could not obtain the last block",
902          "Blocklist for " + OLDLOGDIR + " has changed"};
903      private int count = 0;
904
905      @Override
906      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
907        if (count < 3) {
908          throw new IOException(errors[count++]);
909        }
910        return (FSDataInputStream)invocation.callRealMethod();
911      }
912    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
913    retryOverHdfsProblem(spiedFs);
914  }
915
916  @Test
917  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
918    generateWALs(1, 10, -1);
919    FileStatus logfile = fs.listStatus(WALDIR)[0];
920    useDifferentDFSClient();
921
922    final AtomicInteger count = new AtomicInteger();
923
924    CancelableProgressable localReporter
925        = new CancelableProgressable() {
926      @Override
927      public boolean progress() {
928        count.getAndIncrement();
929        return false;
930      }
931    };
932
933    FileSystem spiedFs = Mockito.spy(fs);
934    Mockito.doAnswer(new Answer<FSDataInputStream>() {
935      @Override
936      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
937        Thread.sleep(1500); // Sleep a while and wait report status invoked
938        return (FSDataInputStream)invocation.callRealMethod();
939      }
940    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
941
942    try {
943      conf.setInt("hbase.splitlog.report.period", 1000);
944      boolean ret = WALSplitter.splitLogFile(
945          HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals);
946      assertFalse("Log splitting should failed", ret);
947      assertTrue(count.get() > 0);
948    } catch (IOException e) {
949      fail("There shouldn't be any exception but: " + e.toString());
950    } finally {
951      // reset it back to its default value
952      conf.setInt("hbase.splitlog.report.period", 59000);
953    }
954  }
955
956  /**
957   * Test log split process with fake data and lots of edits to trigger threading
958   * issues.
959   */
960  @Test
961  public void testThreading() throws Exception {
962    doTestThreading(20000, 128*1024*1024, 0);
963  }
964
965  /**
966   * Test blocking behavior of the log split process if writers are writing slower
967   * than the reader is reading.
968   */
969  @Test
970  public void testThreadingSlowWriterSmallBuffer() throws Exception {
971    doTestThreading(200, 1024, 50);
972  }
973
974  /**
975   * Sets up a log splitter with a mock reader and writer. The mock reader generates
976   * a specified number of edits spread across 5 regions. The mock writer optionally
977   * sleeps for each edit it is fed.
978   * *
979   * After the split is complete, verifies that the statistics show the correct number
980   * of edits output into each region.
981   *
982   * @param numFakeEdits number of fake edits to push through pipeline
983   * @param bufferSize size of in-memory buffer
984   * @param writerSlowness writer threads will sleep this many ms per edit
985   */
986  private void doTestThreading(final int numFakeEdits,
987      final int bufferSize,
988      final int writerSlowness) throws Exception {
989
990    Configuration localConf = new Configuration(conf);
991    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
992
993    // Create a fake log file (we'll override the reader to produce a stream of edits)
994    Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
995    FSDataOutputStream out = fs.create(logPath);
996    out.close();
997
998    // Make region dirs for our destination regions so the output doesn't get skipped
999    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
1000    makeRegionDirs(regions);
1001
1002    // Create a splitter that reads and writes the data without touching disk
1003    WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) {
1004
1005      /* Produce a mock writer that doesn't write anywhere */
1006      @Override
1007      protected Writer createWriter(Path logfile) throws IOException {
1008        Writer mockWriter = Mockito.mock(Writer.class);
1009        Mockito.doAnswer(new Answer<Void>() {
1010          int expectedIndex = 0;
1011
1012          @Override
1013          public Void answer(InvocationOnMock invocation) {
1014            if (writerSlowness > 0) {
1015              try {
1016                Thread.sleep(writerSlowness);
1017              } catch (InterruptedException ie) {
1018                Thread.currentThread().interrupt();
1019              }
1020            }
1021            Entry entry = (Entry) invocation.getArgument(0);
1022            WALEdit edit = entry.getEdit();
1023            List<Cell> cells = edit.getCells();
1024            assertEquals(1, cells.size());
1025            Cell cell = cells.get(0);
1026
1027            // Check that the edits come in the right order.
1028            assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(),
1029                cell.getRowLength()));
1030            expectedIndex++;
1031            return null;
1032          }
1033        }).when(mockWriter).append(Mockito.<Entry>any());
1034        return mockWriter;
1035      }
1036
1037      /* Produce a mock reader that generates fake entries */
1038      @Override
1039      protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
1040          throws IOException {
1041        Reader mockReader = Mockito.mock(Reader.class);
1042        Mockito.doAnswer(new Answer<Entry>() {
1043          int index = 0;
1044
1045          @Override
1046          public Entry answer(InvocationOnMock invocation) throws Throwable {
1047            if (index >= numFakeEdits) return null;
1048
1049            // Generate r0 through r4 in round robin fashion
1050            int regionIdx = index % regions.size();
1051            byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
1052
1053            Entry ret = createTestEntry(TABLE_NAME, region,
1054                Bytes.toBytes(index / regions.size()),
1055                FAMILY, QUALIFIER, VALUE, index);
1056            index++;
1057            return ret;
1058          }
1059        }).when(mockReader).next();
1060        return mockReader;
1061      }
1062    };
1063
1064    logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
1065
1066    // Verify number of written edits per region
1067    Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1068    for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
1069      LOG.info("Got " + entry.getValue() + " output edits for region " +
1070          Bytes.toString(entry.getKey()));
1071      assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
1072    }
1073    assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1074  }
1075
1076  // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1077  @Test
1078  public void testSplitLogFileDeletedRegionDir() throws IOException {
1079    LOG.info("testSplitLogFileDeletedRegionDir");
1080    final String REGION = "region__1";
1081    REGIONS.clear();
1082    REGIONS.add(REGION);
1083
1084    generateWALs(1, 10, -1);
1085    useDifferentDFSClient();
1086
1087    Path regiondir = new Path(TABLEDIR, REGION);
1088    LOG.info("Region directory is" + regiondir);
1089    fs.delete(regiondir, true);
1090    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1091    assertFalse(fs.exists(regiondir));
1092  }
1093
1094  @Test
1095  public void testSplitLogFileEmpty() throws IOException {
1096    LOG.info("testSplitLogFileEmpty");
1097    // we won't create the hlog dir until getWAL got called, so
1098    // make dir here when testing empty log file
1099    fs.mkdirs(WALDIR);
1100    injectEmptyFile(".empty", true);
1101    useDifferentDFSClient();
1102
1103    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1104    Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1105    assertFalse(fs.exists(tdir));
1106
1107    assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1108  }
1109
1110  @Test
1111  public void testSplitLogFileMultipleRegions() throws IOException {
1112    LOG.info("testSplitLogFileMultipleRegions");
1113    generateWALs(1, 10, -1);
1114    splitAndCount(1, 10);
1115  }
1116
1117  @Test
1118  public void testSplitLogFileFirstLineCorruptionLog()
1119      throws IOException {
1120    conf.setBoolean(HBASE_SKIP_ERRORS, true);
1121    generateWALs(1, 10, -1);
1122    FileStatus logfile = fs.listStatus(WALDIR)[0];
1123
1124    corruptWAL(logfile.getPath(),
1125        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1126
1127    useDifferentDFSClient();
1128    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1129
1130    final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1131    assertEquals(1, fs.listStatus(corruptDir).length);
1132  }
1133
1134  /**
1135   * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1136   */
1137  @Test
1138  public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1139    LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1140    // Generate wals for our destination region
1141    String regionName = "r0";
1142    final Path regiondir = new Path(TABLEDIR, regionName);
1143    REGIONS.clear();
1144    REGIONS.add(regionName);
1145    generateWALs(-1);
1146
1147    wals.getWAL(null);
1148    FileStatus[] logfiles = fs.listStatus(WALDIR);
1149    assertTrue("There should be some log file",
1150        logfiles != null && logfiles.length > 0);
1151
1152    WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) {
1153      @Override
1154      protected Writer createWriter(Path logfile)
1155          throws IOException {
1156        Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1157        // After creating writer, simulate region's
1158        // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1159        // region and delete them, excluding files with '.temp' suffix.
1160        NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
1161        if (files != null && !files.isEmpty()) {
1162          for (Path file : files) {
1163            if (!this.walFS.delete(file, false)) {
1164              LOG.error("Failed delete of " + file);
1165            } else {
1166              LOG.debug("Deleted recovered.edits file=" + file);
1167            }
1168          }
1169        }
1170        return writer;
1171      }
1172    };
1173    try{
1174      logSplitter.splitLogFile(logfiles[0], null);
1175    } catch (IOException e) {
1176      LOG.info(e.toString(), e);
1177      fail("Throws IOException when spliting "
1178          + "log, it is most likely because writing file does not "
1179          + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1180    }
1181    if (fs.exists(CORRUPTDIR)) {
1182      if (fs.listStatus(CORRUPTDIR).length > 0) {
1183        fail("There are some corrupt logs, "
1184            + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1185      }
1186    }
1187  }
1188
1189  private Writer generateWALs(int leaveOpen) throws IOException {
1190    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1191  }
1192
1193  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1194    return generateWALs(writers, entries, leaveOpen, 7);
1195  }
1196
1197  private void makeRegionDirs(List<String> regions) throws IOException {
1198    for (String region : regions) {
1199      LOG.debug("Creating dir for region " + region);
1200      fs.mkdirs(new Path(TABLEDIR, region));
1201    }
1202  }
1203
1204  /**
1205   * @param leaveOpen index to leave un-closed. -1 to close all.
1206   * @return the writer that's still open, or null if all were closed.
1207   */
1208  private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException {
1209    makeRegionDirs(REGIONS);
1210    fs.mkdirs(WALDIR);
1211    Writer [] ws = new Writer[writers];
1212    int seq = 0;
1213    int numRegionEventsAdded = 0;
1214    for (int i = 0; i < writers; i++) {
1215      ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1216      for (int j = 0; j < entries; j++) {
1217        int prefix = 0;
1218        for (String region : REGIONS) {
1219          String row_key = region + prefix++ + i + j;
1220          appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1221              QUALIFIER, VALUE, seq++);
1222
1223          if (numRegionEventsAdded < regionEvents) {
1224            numRegionEventsAdded ++;
1225            appendRegionEvent(ws[i], region);
1226          }
1227        }
1228      }
1229      if (i != leaveOpen) {
1230        ws[i].close();
1231        LOG.info("Closing writer " + i);
1232      }
1233    }
1234    if (leaveOpen < 0 || leaveOpen >= writers) {
1235      return null;
1236    }
1237    return ws[leaveOpen];
1238  }
1239
1240
1241
1242  private Path[] getLogForRegion(TableName table, String region)
1243      throws IOException {
1244    Path tdir = FSUtils.getWALTableDir(conf, table);
1245    @SuppressWarnings("deprecation")
1246    Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1247        Bytes.toString(Bytes.toBytes(region))));
1248    FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1249      @Override
1250      public boolean accept(Path p) {
1251        if (WALSplitUtil.isSequenceIdFile(p)) {
1252          return false;
1253        }
1254        return true;
1255      }
1256    });
1257    Path[] paths = new Path[files.length];
1258    for (int i = 0; i < files.length; i++) {
1259      paths[i] = files[i].getPath();
1260    }
1261    return paths;
1262  }
1263
1264  private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1265    FSDataOutputStream out;
1266    int fileSize = (int) fs.listStatus(path)[0].getLen();
1267
1268    FSDataInputStream in = fs.open(path);
1269    byte[] corrupted_bytes = new byte[fileSize];
1270    in.readFully(0, corrupted_bytes, 0, fileSize);
1271    in.close();
1272
1273    switch (corruption) {
1274      case APPEND_GARBAGE:
1275        fs.delete(path, false);
1276        out = fs.create(path);
1277        out.write(corrupted_bytes);
1278        out.write(Bytes.toBytes("-----"));
1279        closeOrFlush(close, out);
1280        break;
1281
1282      case INSERT_GARBAGE_ON_FIRST_LINE:
1283        fs.delete(path, false);
1284        out = fs.create(path);
1285        out.write(0);
1286        out.write(corrupted_bytes);
1287        closeOrFlush(close, out);
1288        break;
1289
1290      case INSERT_GARBAGE_IN_THE_MIDDLE:
1291        fs.delete(path, false);
1292        out = fs.create(path);
1293        int middle = (int) Math.floor(corrupted_bytes.length / 2);
1294        out.write(corrupted_bytes, 0, middle);
1295        out.write(0);
1296        out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1297        closeOrFlush(close, out);
1298        break;
1299
1300      case TRUNCATE:
1301        fs.delete(path, false);
1302        out = fs.create(path);
1303        out.write(corrupted_bytes, 0, fileSize
1304            - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1305        closeOrFlush(close, out);
1306        break;
1307
1308      case TRUNCATE_TRAILER:
1309        fs.delete(path, false);
1310        out = fs.create(path);
1311        out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1312        closeOrFlush(close, out);
1313        break;
1314    }
1315  }
1316
1317  private void closeOrFlush(boolean close, FSDataOutputStream out)
1318      throws IOException {
1319    if (close) {
1320      out.close();
1321    } else {
1322      Method syncMethod = null;
1323      try {
1324        syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1325      } catch (NoSuchMethodException e) {
1326        try {
1327          syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1328        } catch (NoSuchMethodException ex) {
1329          throw new IOException("This version of Hadoop supports " +
1330              "neither Syncable.sync() nor Syncable.hflush().");
1331        }
1332      }
1333      try {
1334        syncMethod.invoke(out, new Object[]{});
1335      } catch (Exception e) {
1336        throw new IOException(e);
1337      }
1338      // Not in 0out.hflush();
1339    }
1340  }
1341
1342  private int countWAL(Path log) throws IOException {
1343    int count = 0;
1344    Reader in = wals.createReader(fs, log);
1345    while (in.next() != null) {
1346      count++;
1347    }
1348    in.close();
1349    return count;
1350  }
1351
1352  private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1353      String output) throws IOException {
1354    WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1355    desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1356        .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1357        .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1358        .setFamilyName(ByteString.copyFrom(FAMILY))
1359        .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1360        .addAllCompactionInput(Arrays.asList(inputs))
1361        .addCompactionOutput(output);
1362
1363    WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1364    WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1365        EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1366    w.append(new Entry(key, edit));
1367    w.sync(false);
1368  }
1369
1370  private static void appendRegionEvent(Writer w, String region) throws IOException {
1371    WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1372        WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
1373        TABLE_NAME.toBytes(),
1374        Bytes.toBytes(region),
1375        Bytes.toBytes(String.valueOf(region.hashCode())),
1376        1,
1377        ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
1378    final long time = EnvironmentEdgeManager.currentTime();
1379    final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time,
1380        HConstants.DEFAULT_CLUSTER_ID);
1381    WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
1382    w.append(new Entry(walKey, we));
1383    w.sync(false);
1384  }
1385
1386  public static long appendEntry(Writer writer, TableName table, byte[] region,
1387      byte[] row, byte[] family, byte[] qualifier,
1388      byte[] value, long seq)
1389      throws IOException {
1390    LOG.info(Thread.currentThread().getName() + " append");
1391    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1392    LOG.info(Thread.currentThread().getName() + " sync");
1393    writer.sync(false);
1394    return seq;
1395  }
1396
1397  private static Entry createTestEntry(
1398      TableName table, byte[] region,
1399      byte[] row, byte[] family, byte[] qualifier,
1400      byte[] value, long seq) {
1401    long time = System.nanoTime();
1402
1403    seq++;
1404    final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1405    WALEdit edit = new WALEdit();
1406    edit.add(cell);
1407    return new Entry(new WALKeyImpl(region, table, seq, time,
1408        HConstants.DEFAULT_CLUSTER_ID), edit);
1409  }
1410
1411  private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1412    Writer writer =
1413        WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1414    if (closeFile) {
1415      writer.close();
1416    }
1417  }
1418
1419  private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1420    Reader in1, in2;
1421    in1 = wals.createReader(fs, p1);
1422    in2 = wals.createReader(fs, p2);
1423    Entry entry1;
1424    Entry entry2;
1425    while ((entry1 = in1.next()) != null) {
1426      entry2 = in2.next();
1427      if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1428          (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1429        return false;
1430      }
1431    }
1432    in1.close();
1433    in2.close();
1434    return true;
1435  }
1436}