001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.lang.reflect.Method;
028import java.security.PrivilegedExceptionAction;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.List;
035import java.util.Map;
036import java.util.NavigableSet;
037import java.util.Objects;
038import java.util.Set;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FSDataInputStream;
045import org.apache.hadoop.fs.FSDataOutputStream;
046import org.apache.hadoop.fs.FileStatus;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.FileUtil;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.fs.PathFilter;
051import org.apache.hadoop.hbase.Cell;
052import org.apache.hadoop.hbase.HBaseClassTestRule;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtility;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.RegionInfoBuilder;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
063import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
064import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.testclassification.RegionServerTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CancelableProgressable;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.FSUtils;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.wal.WAL.Entry;
074import org.apache.hadoop.hbase.wal.WAL.Reader;
075import org.apache.hadoop.hbase.wal.WALProvider.Writer;
076import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
077import org.apache.hadoop.hdfs.DFSTestUtil;
078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
079import org.apache.hadoop.ipc.RemoteException;
080import org.junit.After;
081import org.junit.AfterClass;
082import org.junit.Before;
083import org.junit.BeforeClass;
084import org.junit.ClassRule;
085import org.junit.Rule;
086import org.junit.Test;
087import org.junit.experimental.categories.Category;
088import org.junit.rules.TestName;
089import org.mockito.Mockito;
090import org.mockito.invocation.InvocationOnMock;
091import org.mockito.stubbing.Answer;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
097import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
098import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
099
100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
102
103/**
104 * Testing {@link WAL} splitting code.
105 */
106@Category({RegionServerTests.class, LargeTests.class})
107public class TestWALSplit {
108
109  @ClassRule
110  public static final HBaseClassTestRule CLASS_RULE =
111      HBaseClassTestRule.forClass(TestWALSplit.class);
112
113  {
114    // Uncomment the following lines if more verbosity is needed for
115    // debugging (see HBASE-12285 for details).
116    //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
117    //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
118    //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
119  }
120  private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
121
122  private static Configuration conf;
123  private FileSystem fs;
124
125  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
126
127  private Path HBASEDIR;
128  private Path HBASELOGDIR;
129  private Path WALDIR;
130  private Path OLDLOGDIR;
131  private Path CORRUPTDIR;
132  private Path TABLEDIR;
133
134  private static final int NUM_WRITERS = 10;
135  private static final int ENTRIES = 10; // entries per writer per region
136
137  private static final String FILENAME_BEING_SPLIT = "testfile";
138  private static final TableName TABLE_NAME =
139      TableName.valueOf("t1");
140  private static final byte[] FAMILY = Bytes.toBytes("f1");
141  private static final byte[] QUALIFIER = Bytes.toBytes("q1");
142  private static final byte[] VALUE = Bytes.toBytes("v1");
143  private static final String WAL_FILE_PREFIX = "wal.dat.";
144  private static List<String> REGIONS = new ArrayList<>();
145  private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
146  private static String ROBBER;
147  private static String ZOMBIE;
148  private static String [] GROUP = new String [] {"supergroup"};
149
150  static enum Corruptions {
151    INSERT_GARBAGE_ON_FIRST_LINE,
152    INSERT_GARBAGE_IN_THE_MIDDLE,
153    APPEND_GARBAGE,
154    TRUNCATE,
155    TRUNCATE_TRAILER
156  }
157
158  @BeforeClass
159  public static void setUpBeforeClass() throws Exception {
160    conf = TEST_UTIL.getConfiguration();
161    conf.setClass("hbase.regionserver.hlog.writer.impl",
162        InstrumentedLogWriter.class, Writer.class);
163    // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
164    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
165    // Create fake maping user to group and set it to the conf.
166    Map<String, String []> u2g_map = new HashMap<>(2);
167    ROBBER = User.getCurrent().getName() + "-robber";
168    ZOMBIE = User.getCurrent().getName() + "-zombie";
169    u2g_map.put(ROBBER, GROUP);
170    u2g_map.put(ZOMBIE, GROUP);
171    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
172    conf.setInt("dfs.heartbeat.interval", 1);
173    TEST_UTIL.startMiniDFSCluster(2);
174  }
175
176  @AfterClass
177  public static void tearDownAfterClass() throws Exception {
178    TEST_UTIL.shutdownMiniDFSCluster();
179  }
180
181  @Rule
182  public TestName name = new TestName();
183  private WALFactory wals = null;
184
185  @Before
186  public void setUp() throws Exception {
187    LOG.info("Cleaning up cluster for new test.");
188    fs = TEST_UTIL.getDFSCluster().getFileSystem();
189    HBASEDIR = TEST_UTIL.createRootDir();
190    HBASELOGDIR = TEST_UTIL.createWALRootDir();
191    OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
192    CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
193    TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
194    REGIONS.clear();
195    Collections.addAll(REGIONS, "bbb", "ccc");
196    InstrumentedLogWriter.activateFailure = false;
197    wals = new WALFactory(conf, name.getMethodName());
198    WALDIR = new Path(HBASELOGDIR,
199        AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(),
200            16010, System.currentTimeMillis()).toString()));
201    //fs.mkdirs(WALDIR);
202  }
203
204  @After
205  public void tearDown() throws Exception {
206    try {
207      wals.close();
208    } catch(IOException exception) {
209      // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
210      LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
211          " you see a failure look here.");
212      LOG.debug("exception details", exception);
213    } finally {
214      wals = null;
215      fs.delete(HBASEDIR, true);
216      fs.delete(HBASELOGDIR, true);
217    }
218  }
219
220  /**
221   * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
222   * Ensures we do not lose edits.
223   * @throws IOException
224   * @throws InterruptedException
225   */
226  @Test
227  public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
228    final AtomicLong counter = new AtomicLong(0);
229    AtomicBoolean stop = new AtomicBoolean(false);
230    // Region we'll write edits too and then later examine to make sure they all made it in.
231    final String region = REGIONS.get(0);
232    final int numWriters = 3;
233    Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
234    try {
235      long startCount = counter.get();
236      zombie.start();
237      // Wait till writer starts going.
238      while (startCount == counter.get()) Threads.sleep(1);
239      // Give it a second to write a few appends.
240      Threads.sleep(1000);
241      final Configuration conf2 = HBaseConfiguration.create(conf);
242      final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
243      int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
244        @Override
245        public Integer run() throws Exception {
246          StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
247              .append("):\n");
248          for (FileStatus status : fs.listStatus(WALDIR)) {
249            ls.append("\t").append(status.toString()).append("\n");
250          }
251          LOG.debug(Objects.toString(ls));
252          LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
253          WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
254          LOG.info("Finished splitting out from under zombie.");
255          Path[] logfiles = getLogForRegion(TABLE_NAME, region);
256          assertEquals("wrong number of split files for region", numWriters, logfiles.length);
257          int count = 0;
258          for (Path logfile: logfiles) {
259            count += countWAL(logfile);
260          }
261          return count;
262        }
263      });
264      LOG.info("zombie=" + counter.get() + ", robber=" + count);
265      assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
266              "Zombie could write " + counter.get() + " and logfile had only " + count,
267          counter.get() == count || counter.get() + 1 == count);
268    } finally {
269      stop.set(true);
270      zombie.interrupt();
271      Threads.threadDumpingIsAlive(zombie);
272    }
273  }
274
275  /**
276   * This thread will keep writing to a 'wal' file even after the split process has started.
277   * It simulates a region server that was considered dead but woke up and wrote some more to the
278   * last log entry. Does its writing as an alternate user in another filesystem instance to
279   * simulate better it being a regionserver.
280   */
281  class ZombieLastLogWriterRegionServer extends Thread {
282    final AtomicLong editsCount;
283    final AtomicBoolean stop;
284    final int numOfWriters;
285    /**
286     * Region to write edits for.
287     */
288    final String region;
289    final User user;
290
291    public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
292        final String region, final int writers)
293        throws IOException, InterruptedException {
294      super("ZombieLastLogWriterRegionServer");
295      setDaemon(true);
296      this.stop = stop;
297      this.editsCount = counter;
298      this.region = region;
299      this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
300      numOfWriters = writers;
301    }
302
303    @Override
304    public void run() {
305      try {
306        doWriting();
307      } catch (IOException e) {
308        LOG.warn(getName() + " Writer exiting " + e);
309      } catch (InterruptedException e) {
310        LOG.warn(getName() + " Writer exiting " + e);
311      }
312    }
313
314    private void doWriting() throws IOException, InterruptedException {
315      this.user.runAs(new PrivilegedExceptionAction<Object>() {
316        @Override
317        public Object run() throws Exception {
318          // Index of the WAL we want to keep open.  generateWALs will leave open the WAL whose
319          // index we supply here.
320          int walToKeepOpen = numOfWriters - 1;
321          // The below method writes numOfWriters files each with ENTRIES entries for a total of
322          // numOfWriters * ENTRIES added per column family in the region.
323          Writer writer = null;
324          try {
325            writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
326          } catch (IOException e1) {
327            throw new RuntimeException("Failed", e1);
328          }
329          // Update counter so has all edits written so far.
330          editsCount.addAndGet(numOfWriters * ENTRIES);
331          loop(writer);
332          // If we've been interruped, then things should have shifted out from under us.
333          // closing should error
334          try {
335            writer.close();
336            fail("Writing closing after parsing should give an error.");
337          } catch (IOException exception) {
338            LOG.debug("ignoring error when closing final writer.", exception);
339          }
340          return null;
341        }
342      });
343    }
344
345    private void loop(final Writer writer) {
346      byte [] regionBytes = Bytes.toBytes(this.region);
347      while (!stop.get()) {
348        try {
349          long seq = appendEntry(writer, TABLE_NAME, regionBytes,
350              Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
351          long count = editsCount.incrementAndGet();
352          LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
353          try {
354            Thread.sleep(1);
355          } catch (InterruptedException e) {
356            //
357          }
358        } catch (IOException ex) {
359          LOG.error(getName() + " ex " + ex.toString());
360          if (ex instanceof RemoteException) {
361            LOG.error("Juliet: got RemoteException " + ex.getMessage() +
362                " while writing " + (editsCount.get() + 1));
363          } else {
364            LOG.error(getName() + " failed to write....at " + editsCount.get());
365            fail("Failed to write " + editsCount.get());
366          }
367          break;
368        } catch (Throwable t) {
369          LOG.error(getName() + " HOW? " + t);
370          LOG.debug("exception details", t);
371          break;
372        }
373      }
374      LOG.info(getName() + " Writer exiting");
375    }
376  }
377
378  /**
379   * @see "https://issues.apache.org/jira/browse/HBASE-3020"
380   */
381  @Test
382  public void testRecoveredEditsPathForMeta() throws IOException {
383    Path p = createRecoveredEditsPathForRegion();
384    String parentOfParent = p.getParent().getParent().getName();
385    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
386  }
387
388  /**
389   * Test old recovered edits file doesn't break WALSplitter.
390   * This is useful in upgrading old instances.
391   */
392  @Test
393  public void testOldRecoveredEditsFileSidelined() throws IOException {
394    Path p = createRecoveredEditsPathForRegion();
395    Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
396    Path regiondir = new Path(tdir,
397      RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
398    fs.mkdirs(regiondir);
399    Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
400    assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
401    fs.createNewFile(parent); // create a recovered.edits file
402    String parentOfParent = p.getParent().getParent().getName();
403    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
404    WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
405  }
406
407  private Path createRecoveredEditsPathForRegion() throws  IOException{
408    byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
409    long now = System.currentTimeMillis();
410    Entry entry =
411      new Entry(new WALKeyImpl(encoded,
412        TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
413        new WALEdit());
414    Path p = WALSplitter.getRegionSplitEditsPath(entry,
415      FILENAME_BEING_SPLIT, conf);
416    return p;
417  }
418
419  /**
420   * Test hasRecoveredEdits correctly identifies proper recovered edits file on related dir.
421   * @throws IOException on any issues found while creating test required files/directories.
422   */
423  @Test
424  public void testHasRecoveredEdits() throws IOException {
425    Path p = createRecoveredEditsPathForRegion();
426    assertFalse(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
427    String renamedEdit = p.getName().split("-")[0];
428    fs.createNewFile(new Path(p.getParent(), renamedEdit));
429    assertTrue(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
430  }
431
432  private void useDifferentDFSClient() throws IOException {
433    // make fs act as a different client now
434    // initialize will create a new DFSClient with a new client ID
435    fs.initialize(fs.getUri(), conf);
436  }
437
438  @Test
439  public void testSplitPreservesEdits() throws IOException{
440    final String REGION = "region__1";
441    REGIONS.clear();
442    REGIONS.add(REGION);
443
444    generateWALs(1, 10, -1, 0);
445    useDifferentDFSClient();
446    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
447    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
448    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
449    assertEquals(1, splitLog.length);
450
451    assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
452  }
453
454  @Test
455  public void testSplitRemovesRegionEventsEdits() throws IOException{
456    final String REGION = "region__1";
457    REGIONS.clear();
458    REGIONS.add(REGION);
459
460    generateWALs(1, 10, -1, 100);
461    useDifferentDFSClient();
462    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
463    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
464    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
465    assertEquals(1, splitLog.length);
466
467    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
468    // split log should only have the test edits
469    assertEquals(10, countWAL(splitLog[0]));
470  }
471
472
473  @Test
474  public void testSplitLeavesCompactionEventsEdits() throws IOException{
475    RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
476    REGIONS.clear();
477    REGIONS.add(hri.getEncodedName());
478    Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
479    LOG.info("Creating region directory: " + regionDir);
480    assertTrue(fs.mkdirs(regionDir));
481
482    Writer writer = generateWALs(1, 10, 0, 10);
483    String[] compactInputs = new String[]{"file1", "file2", "file3"};
484    String compactOutput = "file4";
485    appendCompactionEvent(writer, hri, compactInputs, compactOutput);
486    writer.close();
487
488    useDifferentDFSClient();
489    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
490
491    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
492    // original log should have 10 test edits, 10 region markers, 1 compaction marker
493    assertEquals(21, countWAL(originalLog));
494
495    Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
496    assertEquals(1, splitLog.length);
497
498    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
499    // split log should have 10 test edits plus 1 compaction marker
500    assertEquals(11, countWAL(splitLog[0]));
501  }
502
503  /**
504   * @param expectedEntries -1 to not assert
505   * @return the count across all regions
506   */
507  private int splitAndCount(final int expectedFiles, final int expectedEntries)
508      throws IOException {
509    useDifferentDFSClient();
510    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
511    int result = 0;
512    for (String region : REGIONS) {
513      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
514      assertEquals(expectedFiles, logfiles.length);
515      int count = 0;
516      for (Path logfile: logfiles) {
517        count += countWAL(logfile);
518      }
519      if (-1 != expectedEntries) {
520        assertEquals(expectedEntries, count);
521      }
522      result += count;
523    }
524    return result;
525  }
526
527  @Test
528  public void testEmptyLogFiles() throws IOException {
529    testEmptyLogFiles(true);
530  }
531
532  @Test
533  public void testEmptyOpenLogFiles() throws IOException {
534    testEmptyLogFiles(false);
535  }
536
537  private void testEmptyLogFiles(final boolean close) throws IOException {
538    // we won't create the hlog dir until getWAL got called, so
539    // make dir here when testing empty log file
540    fs.mkdirs(WALDIR);
541    injectEmptyFile(".empty", close);
542    generateWALs(Integer.MAX_VALUE);
543    injectEmptyFile("empty", close);
544    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
545  }
546
547  @Test
548  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
549    // generate logs but leave wal.dat.5 open.
550    generateWALs(5);
551    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
552  }
553
554  @Test
555  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
556    conf.setBoolean(HBASE_SKIP_ERRORS, true);
557    generateWALs(Integer.MAX_VALUE);
558    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
559        Corruptions.APPEND_GARBAGE, true);
560    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
561  }
562
563  @Test
564  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
565    conf.setBoolean(HBASE_SKIP_ERRORS, true);
566    generateWALs(Integer.MAX_VALUE);
567    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
568        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
569    splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt
570  }
571
572  @Test
573  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
574    conf.setBoolean(HBASE_SKIP_ERRORS, true);
575    generateWALs(Integer.MAX_VALUE);
576    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
577        Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
578    // the entries in the original logs are alternating regions
579    // considering the sequence file header, the middle corruption should
580    // affect at least half of the entries
581    int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
582    int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
583    int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
584    assertTrue("The file up to the corrupted area hasn't been parsed",
585        REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
586  }
587
588  @Test
589  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
590    conf.setBoolean(HBASE_SKIP_ERRORS, true);
591    List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays
592        .asList(FaultyProtobufLogReader.FailureType.values()).stream()
593        .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
594    for (FaultyProtobufLogReader.FailureType failureType : failureTypes) {
595      final Set<String> walDirContents = splitCorruptWALs(failureType);
596      final Set<String> archivedLogs = new HashSet<>();
597      final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
598      for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
599        archived.append("\n\t").append(log.toString());
600        archivedLogs.add(log.getPath().getName());
601      }
602      LOG.debug(archived.toString());
603      assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs,
604          walDirContents);
605    }
606  }
607
608  /**
609   * @return set of wal names present prior to split attempt.
610   * @throws IOException if the split process fails
611   */
612  private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType)
613      throws IOException {
614    Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
615        Reader.class);
616    InstrumentedLogWriter.activateFailure = false;
617
618    try {
619      conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class,
620        Reader.class);
621      conf.set("faultyprotobuflogreader.failuretype", failureType.name());
622      // Clean up from previous tests or previous loop
623      try {
624        wals.shutdown();
625      } catch (IOException exception) {
626        // since we're splitting out from under the factory, we should expect some closing failures.
627        LOG.debug("Ignoring problem closing WALFactory.", exception);
628      }
629      wals.close();
630      try {
631        for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
632          fs.delete(log.getPath(), true);
633        }
634      } catch (FileNotFoundException exception) {
635        LOG.debug("no previous CORRUPTDIR to clean.");
636      }
637      // change to the faulty reader
638      wals = new WALFactory(conf, name.getMethodName());
639      generateWALs(-1);
640      // Our reader will render all of these files corrupt.
641      final Set<String> walDirContents = new HashSet<>();
642      for (FileStatus status : fs.listStatus(WALDIR)) {
643        walDirContents.add(status.getPath().getName());
644      }
645      useDifferentDFSClient();
646      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
647      return walDirContents;
648    } finally {
649      conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
650          Reader.class);
651    }
652  }
653
654  @Test (expected = IOException.class)
655  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
656      throws IOException {
657    conf.setBoolean(HBASE_SKIP_ERRORS, false);
658    splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
659  }
660
661  @Test
662  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
663      throws IOException {
664    conf.setBoolean(HBASE_SKIP_ERRORS, false);
665    try {
666      splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
667    } catch (IOException e) {
668      LOG.debug("split with 'skip errors' set to 'false' correctly threw");
669    }
670    assertEquals("if skip.errors is false all files should remain in place",
671        NUM_WRITERS, fs.listStatus(WALDIR).length);
672  }
673
674  private void ignoreCorruption(final Corruptions corruption, final int entryCount,
675      final int expectedCount) throws IOException {
676    conf.setBoolean(HBASE_SKIP_ERRORS, false);
677
678    final String REGION = "region__1";
679    REGIONS.clear();
680    REGIONS.add(REGION);
681
682    Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
683    generateWALs(1, entryCount, -1, 0);
684    corruptWAL(c1, corruption, true);
685
686    useDifferentDFSClient();
687    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
688
689    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
690    assertEquals(1, splitLog.length);
691
692    int actualCount = 0;
693    Reader in = wals.createReader(fs, splitLog[0]);
694    @SuppressWarnings("unused")
695    Entry entry;
696    while ((entry = in.next()) != null) ++actualCount;
697    assertEquals(expectedCount, actualCount);
698    in.close();
699
700    // should not have stored the EOF files as corrupt
701    FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
702    assertEquals(0, archivedLogs.length);
703
704  }
705
706  @Test
707  public void testEOFisIgnored() throws IOException {
708    int entryCount = 10;
709    ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
710  }
711
712  @Test
713  public void testCorruptWALTrailer() throws IOException {
714    int entryCount = 10;
715    ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
716  }
717
718  @Test
719  public void testLogsGetArchivedAfterSplit() throws IOException {
720    conf.setBoolean(HBASE_SKIP_ERRORS, false);
721    generateWALs(-1);
722    useDifferentDFSClient();
723    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
724    FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
725    assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
726  }
727
728  @Test
729  public void testSplit() throws IOException {
730    generateWALs(-1);
731    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
732  }
733
734  @Test
735  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
736      throws IOException {
737    generateWALs(-1);
738    useDifferentDFSClient();
739    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
740    FileStatus [] statuses = null;
741    try {
742      statuses = fs.listStatus(WALDIR);
743      if (statuses != null) {
744        fail("Files left in log dir: " +
745            Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
746      }
747    } catch (FileNotFoundException e) {
748      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
749    }
750  }
751
752  @Test(expected = IOException.class)
753  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
754    //leave 5th log open so we could append the "trap"
755    Writer writer = generateWALs(4);
756    useDifferentDFSClient();
757
758    String region = "break";
759    Path regiondir = new Path(TABLEDIR, region);
760    fs.mkdirs(regiondir);
761
762    InstrumentedLogWriter.activateFailure = false;
763    appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
764        Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0);
765    writer.close();
766
767    try {
768      InstrumentedLogWriter.activateFailure = true;
769      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
770    } catch (IOException e) {
771      assertTrue(e.getMessage().
772          contains("This exception is instrumented and should only be thrown for testing"));
773      throw e;
774    } finally {
775      InstrumentedLogWriter.activateFailure = false;
776    }
777  }
778
779  @Test
780  public void testSplitDeletedRegion() throws IOException {
781    REGIONS.clear();
782    String region = "region_that_splits";
783    REGIONS.add(region);
784
785    generateWALs(1);
786    useDifferentDFSClient();
787
788    Path regiondir = new Path(TABLEDIR, region);
789    fs.delete(regiondir, true);
790    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
791    assertFalse(fs.exists(regiondir));
792  }
793
794  @Test
795  public void testIOEOnOutputThread() throws Exception {
796    conf.setBoolean(HBASE_SKIP_ERRORS, false);
797
798    generateWALs(-1);
799    useDifferentDFSClient();
800    FileStatus[] logfiles = fs.listStatus(WALDIR);
801    assertTrue("There should be some log file",
802        logfiles != null && logfiles.length > 0);
803    // wals with no entries (like the one we don't use in the factory)
804    // won't cause a failure since nothing will ever be written.
805    // pick the largest one since it's most likely to have entries.
806    int largestLogFile = 0;
807    long largestSize = 0;
808    for (int i = 0; i < logfiles.length; i++) {
809      if (logfiles[i].getLen() > largestSize) {
810        largestLogFile = i;
811        largestSize = logfiles[i].getLen();
812      }
813    }
814    assertTrue("There should be some log greater than size 0.", 0 < largestSize);
815    // Set up a splitter that will throw an IOE on the output side
816    WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) {
817      @Override
818      protected Writer createWriter(Path logfile) throws IOException {
819        Writer mockWriter = Mockito.mock(Writer.class);
820        Mockito.doThrow(new IOException("Injected")).when(
821            mockWriter).append(Mockito.<Entry>any());
822        return mockWriter;
823      }
824    };
825    // Set up a background thread dumper.  Needs a thread to depend on and then we need to run
826    // the thread dumping in a background thread so it does not hold up the test.
827    final AtomicBoolean stop = new AtomicBoolean(false);
828    final Thread someOldThread = new Thread("Some-old-thread") {
829      @Override
830      public void run() {
831        while(!stop.get()) Threads.sleep(10);
832      }
833    };
834    someOldThread.setDaemon(true);
835    someOldThread.start();
836    final Thread t = new Thread("Background-thread-dumper") {
837      @Override
838      public void run() {
839        try {
840          Threads.threadDumpingIsAlive(someOldThread);
841        } catch (InterruptedException e) {
842          e.printStackTrace();
843        }
844      }
845    };
846    t.setDaemon(true);
847    t.start();
848    try {
849      logSplitter.splitLogFile(logfiles[largestLogFile], null);
850      fail("Didn't throw!");
851    } catch (IOException ioe) {
852      assertTrue(ioe.toString().contains("Injected"));
853    } finally {
854      // Setting this to true will turn off the background thread dumper.
855      stop.set(true);
856    }
857  }
858
859  /**
860   * @param spiedFs should be instrumented for failure.
861   */
862  private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
863    generateWALs(-1);
864    useDifferentDFSClient();
865
866    try {
867      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
868      assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
869      assertFalse(fs.exists(WALDIR));
870    } catch (IOException e) {
871      fail("There shouldn't be any exception but: " + e.toString());
872    }
873  }
874
875  // Test for HBASE-3412
876  @Test
877  public void testMovedWALDuringRecovery() throws Exception {
878    // This partial mock will throw LEE for every file simulating
879    // files that were moved
880    FileSystem spiedFs = Mockito.spy(fs);
881    // The "File does not exist" part is very important,
882    // that's how it comes out of HDFS
883    Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
884        when(spiedFs).append(Mockito.<Path>any());
885    retryOverHdfsProblem(spiedFs);
886  }
887
888  @Test
889  public void testRetryOpenDuringRecovery() throws Exception {
890    FileSystem spiedFs = Mockito.spy(fs);
891    // The "Cannot obtain block length", "Could not obtain the last block",
892    // and "Blocklist for [^ ]* has changed.*" part is very important,
893    // that's how it comes out of HDFS. If HDFS changes the exception
894    // message, this test needs to be adjusted accordingly.
895    //
896    // When DFSClient tries to open a file, HDFS needs to locate
897    // the last block of the file and get its length. However, if the
898    // last block is under recovery, HDFS may have problem to obtain
899    // the block length, in which case, retry may help.
900    Mockito.doAnswer(new Answer<FSDataInputStream>() {
901      private final String[] errors = new String[] {
902          "Cannot obtain block length", "Could not obtain the last block",
903          "Blocklist for " + OLDLOGDIR + " has changed"};
904      private int count = 0;
905
906      @Override
907      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
908        if (count < 3) {
909          throw new IOException(errors[count++]);
910        }
911        return (FSDataInputStream)invocation.callRealMethod();
912      }
913    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
914    retryOverHdfsProblem(spiedFs);
915  }
916
917  @Test
918  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
919    generateWALs(1, 10, -1);
920    FileStatus logfile = fs.listStatus(WALDIR)[0];
921    useDifferentDFSClient();
922
923    final AtomicInteger count = new AtomicInteger();
924
925    CancelableProgressable localReporter
926        = new CancelableProgressable() {
927      @Override
928      public boolean progress() {
929        count.getAndIncrement();
930        return false;
931      }
932    };
933
934    FileSystem spiedFs = Mockito.spy(fs);
935    Mockito.doAnswer(new Answer<FSDataInputStream>() {
936      @Override
937      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
938        Thread.sleep(1500); // Sleep a while and wait report status invoked
939        return (FSDataInputStream)invocation.callRealMethod();
940      }
941    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
942
943    try {
944      conf.setInt("hbase.splitlog.report.period", 1000);
945      boolean ret = WALSplitter.splitLogFile(
946          HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals);
947      assertFalse("Log splitting should failed", ret);
948      assertTrue(count.get() > 0);
949    } catch (IOException e) {
950      fail("There shouldn't be any exception but: " + e.toString());
951    } finally {
952      // reset it back to its default value
953      conf.setInt("hbase.splitlog.report.period", 59000);
954    }
955  }
956
957  /**
958   * Test log split process with fake data and lots of edits to trigger threading
959   * issues.
960   */
961  @Test
962  public void testThreading() throws Exception {
963    doTestThreading(20000, 128*1024*1024, 0);
964  }
965
966  /**
967   * Test blocking behavior of the log split process if writers are writing slower
968   * than the reader is reading.
969   */
970  @Test
971  public void testThreadingSlowWriterSmallBuffer() throws Exception {
972    doTestThreading(200, 1024, 50);
973  }
974
975  /**
976   * Sets up a log splitter with a mock reader and writer. The mock reader generates
977   * a specified number of edits spread across 5 regions. The mock writer optionally
978   * sleeps for each edit it is fed.
979   * *
980   * After the split is complete, verifies that the statistics show the correct number
981   * of edits output into each region.
982   *
983   * @param numFakeEdits number of fake edits to push through pipeline
984   * @param bufferSize size of in-memory buffer
985   * @param writerSlowness writer threads will sleep this many ms per edit
986   */
987  private void doTestThreading(final int numFakeEdits,
988      final int bufferSize,
989      final int writerSlowness) throws Exception {
990
991    Configuration localConf = new Configuration(conf);
992    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
993
994    // Create a fake log file (we'll override the reader to produce a stream of edits)
995    Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
996    FSDataOutputStream out = fs.create(logPath);
997    out.close();
998
999    // Make region dirs for our destination regions so the output doesn't get skipped
1000    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
1001    makeRegionDirs(regions);
1002
1003    // Create a splitter that reads and writes the data without touching disk
1004    WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) {
1005
1006      /* Produce a mock writer that doesn't write anywhere */
1007      @Override
1008      protected Writer createWriter(Path logfile) throws IOException {
1009        Writer mockWriter = Mockito.mock(Writer.class);
1010        Mockito.doAnswer(new Answer<Void>() {
1011          int expectedIndex = 0;
1012
1013          @Override
1014          public Void answer(InvocationOnMock invocation) {
1015            if (writerSlowness > 0) {
1016              try {
1017                Thread.sleep(writerSlowness);
1018              } catch (InterruptedException ie) {
1019                Thread.currentThread().interrupt();
1020              }
1021            }
1022            Entry entry = (Entry) invocation.getArgument(0);
1023            WALEdit edit = entry.getEdit();
1024            List<Cell> cells = edit.getCells();
1025            assertEquals(1, cells.size());
1026            Cell cell = cells.get(0);
1027
1028            // Check that the edits come in the right order.
1029            assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(),
1030                cell.getRowLength()));
1031            expectedIndex++;
1032            return null;
1033          }
1034        }).when(mockWriter).append(Mockito.<Entry>any());
1035        return mockWriter;
1036      }
1037
1038      /* Produce a mock reader that generates fake entries */
1039      @Override
1040      protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
1041          throws IOException {
1042        Reader mockReader = Mockito.mock(Reader.class);
1043        Mockito.doAnswer(new Answer<Entry>() {
1044          int index = 0;
1045
1046          @Override
1047          public Entry answer(InvocationOnMock invocation) throws Throwable {
1048            if (index >= numFakeEdits) return null;
1049
1050            // Generate r0 through r4 in round robin fashion
1051            int regionIdx = index % regions.size();
1052            byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
1053
1054            Entry ret = createTestEntry(TABLE_NAME, region,
1055                Bytes.toBytes(index / regions.size()),
1056                FAMILY, QUALIFIER, VALUE, index);
1057            index++;
1058            return ret;
1059          }
1060        }).when(mockReader).next();
1061        return mockReader;
1062      }
1063    };
1064
1065    logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
1066
1067    // Verify number of written edits per region
1068    Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1069    for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
1070      LOG.info("Got " + entry.getValue() + " output edits for region " +
1071          Bytes.toString(entry.getKey()));
1072      assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
1073    }
1074    assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1075  }
1076
1077  // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1078  @Test
1079  public void testSplitLogFileDeletedRegionDir() throws IOException {
1080    LOG.info("testSplitLogFileDeletedRegionDir");
1081    final String REGION = "region__1";
1082    REGIONS.clear();
1083    REGIONS.add(REGION);
1084
1085    generateWALs(1, 10, -1);
1086    useDifferentDFSClient();
1087
1088    Path regiondir = new Path(TABLEDIR, REGION);
1089    LOG.info("Region directory is" + regiondir);
1090    fs.delete(regiondir, true);
1091    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1092    assertFalse(fs.exists(regiondir));
1093  }
1094
1095  @Test
1096  public void testSplitLogFileEmpty() throws IOException {
1097    LOG.info("testSplitLogFileEmpty");
1098    // we won't create the hlog dir until getWAL got called, so
1099    // make dir here when testing empty log file
1100    fs.mkdirs(WALDIR);
1101    injectEmptyFile(".empty", true);
1102    useDifferentDFSClient();
1103
1104    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1105    Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1106    assertFalse(fs.exists(tdir));
1107
1108    assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1109  }
1110
1111  @Test
1112  public void testSplitLogFileMultipleRegions() throws IOException {
1113    LOG.info("testSplitLogFileMultipleRegions");
1114    generateWALs(1, 10, -1);
1115    splitAndCount(1, 10);
1116  }
1117
1118  @Test
1119  public void testSplitLogFileFirstLineCorruptionLog()
1120      throws IOException {
1121    conf.setBoolean(HBASE_SKIP_ERRORS, true);
1122    generateWALs(1, 10, -1);
1123    FileStatus logfile = fs.listStatus(WALDIR)[0];
1124
1125    corruptWAL(logfile.getPath(),
1126        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1127
1128    useDifferentDFSClient();
1129    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1130
1131    final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1132    assertEquals(1, fs.listStatus(corruptDir).length);
1133  }
1134
1135  /**
1136   * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1137   */
1138  @Test
1139  public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1140    LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1141    // Generate wals for our destination region
1142    String regionName = "r0";
1143    final Path regiondir = new Path(TABLEDIR, regionName);
1144    REGIONS.clear();
1145    REGIONS.add(regionName);
1146    generateWALs(-1);
1147
1148    wals.getWAL(null);
1149    FileStatus[] logfiles = fs.listStatus(WALDIR);
1150    assertTrue("There should be some log file",
1151        logfiles != null && logfiles.length > 0);
1152
1153    WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) {
1154      @Override
1155      protected Writer createWriter(Path logfile)
1156          throws IOException {
1157        Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1158        // After creating writer, simulate region's
1159        // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1160        // region and delete them, excluding files with '.temp' suffix.
1161        NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
1162        if (files != null && !files.isEmpty()) {
1163          for (Path file : files) {
1164            if (!this.walFS.delete(file, false)) {
1165              LOG.error("Failed delete of " + file);
1166            } else {
1167              LOG.debug("Deleted recovered.edits file=" + file);
1168            }
1169          }
1170        }
1171        return writer;
1172      }
1173    };
1174    try{
1175      logSplitter.splitLogFile(logfiles[0], null);
1176    } catch (IOException e) {
1177      LOG.info(e.toString(), e);
1178      fail("Throws IOException when spliting "
1179          + "log, it is most likely because writing file does not "
1180          + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1181    }
1182    if (fs.exists(CORRUPTDIR)) {
1183      if (fs.listStatus(CORRUPTDIR).length > 0) {
1184        fail("There are some corrupt logs, "
1185            + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1186      }
1187    }
1188  }
1189
1190  private Writer generateWALs(int leaveOpen) throws IOException {
1191    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1192  }
1193
1194  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1195    return generateWALs(writers, entries, leaveOpen, 7);
1196  }
1197
1198  private void makeRegionDirs(List<String> regions) throws IOException {
1199    for (String region : regions) {
1200      LOG.debug("Creating dir for region " + region);
1201      fs.mkdirs(new Path(TABLEDIR, region));
1202    }
1203  }
1204
1205  /**
1206   * @param leaveOpen index to leave un-closed. -1 to close all.
1207   * @return the writer that's still open, or null if all were closed.
1208   */
1209  private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException {
1210    makeRegionDirs(REGIONS);
1211    fs.mkdirs(WALDIR);
1212    Writer [] ws = new Writer[writers];
1213    int seq = 0;
1214    int numRegionEventsAdded = 0;
1215    for (int i = 0; i < writers; i++) {
1216      ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1217      for (int j = 0; j < entries; j++) {
1218        int prefix = 0;
1219        for (String region : REGIONS) {
1220          String row_key = region + prefix++ + i + j;
1221          appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1222              QUALIFIER, VALUE, seq++);
1223
1224          if (numRegionEventsAdded < regionEvents) {
1225            numRegionEventsAdded ++;
1226            appendRegionEvent(ws[i], region);
1227          }
1228        }
1229      }
1230      if (i != leaveOpen) {
1231        ws[i].close();
1232        LOG.info("Closing writer " + i);
1233      }
1234    }
1235    if (leaveOpen < 0 || leaveOpen >= writers) {
1236      return null;
1237    }
1238    return ws[leaveOpen];
1239  }
1240
1241
1242
1243  private Path[] getLogForRegion(TableName table, String region)
1244      throws IOException {
1245    Path tdir = FSUtils.getWALTableDir(conf, table);
1246    @SuppressWarnings("deprecation")
1247    Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1248        Bytes.toString(Bytes.toBytes(region))));
1249    FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1250      @Override
1251      public boolean accept(Path p) {
1252        if (WALSplitter.isSequenceIdFile(p)) {
1253          return false;
1254        }
1255        return true;
1256      }
1257    });
1258    Path[] paths = new Path[files.length];
1259    for (int i = 0; i < files.length; i++) {
1260      paths[i] = files[i].getPath();
1261    }
1262    return paths;
1263  }
1264
1265  private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1266    FSDataOutputStream out;
1267    int fileSize = (int) fs.listStatus(path)[0].getLen();
1268
1269    FSDataInputStream in = fs.open(path);
1270    byte[] corrupted_bytes = new byte[fileSize];
1271    in.readFully(0, corrupted_bytes, 0, fileSize);
1272    in.close();
1273
1274    switch (corruption) {
1275      case APPEND_GARBAGE:
1276        fs.delete(path, false);
1277        out = fs.create(path);
1278        out.write(corrupted_bytes);
1279        out.write(Bytes.toBytes("-----"));
1280        closeOrFlush(close, out);
1281        break;
1282
1283      case INSERT_GARBAGE_ON_FIRST_LINE:
1284        fs.delete(path, false);
1285        out = fs.create(path);
1286        out.write(0);
1287        out.write(corrupted_bytes);
1288        closeOrFlush(close, out);
1289        break;
1290
1291      case INSERT_GARBAGE_IN_THE_MIDDLE:
1292        fs.delete(path, false);
1293        out = fs.create(path);
1294        int middle = (int) Math.floor(corrupted_bytes.length / 2);
1295        out.write(corrupted_bytes, 0, middle);
1296        out.write(0);
1297        out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1298        closeOrFlush(close, out);
1299        break;
1300
1301      case TRUNCATE:
1302        fs.delete(path, false);
1303        out = fs.create(path);
1304        out.write(corrupted_bytes, 0, fileSize
1305            - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1306        closeOrFlush(close, out);
1307        break;
1308
1309      case TRUNCATE_TRAILER:
1310        fs.delete(path, false);
1311        out = fs.create(path);
1312        out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1313        closeOrFlush(close, out);
1314        break;
1315    }
1316  }
1317
1318  private void closeOrFlush(boolean close, FSDataOutputStream out)
1319      throws IOException {
1320    if (close) {
1321      out.close();
1322    } else {
1323      Method syncMethod = null;
1324      try {
1325        syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1326      } catch (NoSuchMethodException e) {
1327        try {
1328          syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1329        } catch (NoSuchMethodException ex) {
1330          throw new IOException("This version of Hadoop supports " +
1331              "neither Syncable.sync() nor Syncable.hflush().");
1332        }
1333      }
1334      try {
1335        syncMethod.invoke(out, new Object[]{});
1336      } catch (Exception e) {
1337        throw new IOException(e);
1338      }
1339      // Not in 0out.hflush();
1340    }
1341  }
1342
1343  private int countWAL(Path log) throws IOException {
1344    int count = 0;
1345    Reader in = wals.createReader(fs, log);
1346    while (in.next() != null) {
1347      count++;
1348    }
1349    in.close();
1350    return count;
1351  }
1352
1353  private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1354      String output) throws IOException {
1355    WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1356    desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1357        .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1358        .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1359        .setFamilyName(ByteString.copyFrom(FAMILY))
1360        .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1361        .addAllCompactionInput(Arrays.asList(inputs))
1362        .addCompactionOutput(output);
1363
1364    WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1365    WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1366        EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1367    w.append(new Entry(key, edit));
1368    w.sync();
1369  }
1370
1371  private static void appendRegionEvent(Writer w, String region) throws IOException {
1372    WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1373        WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
1374        TABLE_NAME.toBytes(),
1375        Bytes.toBytes(region),
1376        Bytes.toBytes(String.valueOf(region.hashCode())),
1377        1,
1378        ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
1379    final long time = EnvironmentEdgeManager.currentTime();
1380    KeyValue kv = new KeyValue(Bytes.toBytes(region), WALEdit.METAFAMILY, WALEdit.REGION_EVENT,
1381        time, regionOpenDesc.toByteArray());
1382    final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time,
1383        HConstants.DEFAULT_CLUSTER_ID);
1384    w.append(
1385        new Entry(walKey, new WALEdit().add(kv)));
1386    w.sync();
1387  }
1388
1389  public static long appendEntry(Writer writer, TableName table, byte[] region,
1390      byte[] row, byte[] family, byte[] qualifier,
1391      byte[] value, long seq)
1392      throws IOException {
1393    LOG.info(Thread.currentThread().getName() + " append");
1394    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1395    LOG.info(Thread.currentThread().getName() + " sync");
1396    writer.sync();
1397    return seq;
1398  }
1399
1400  private static Entry createTestEntry(
1401      TableName table, byte[] region,
1402      byte[] row, byte[] family, byte[] qualifier,
1403      byte[] value, long seq) {
1404    long time = System.nanoTime();
1405
1406    seq++;
1407    final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1408    WALEdit edit = new WALEdit();
1409    edit.add(cell);
1410    return new Entry(new WALKeyImpl(region, table, seq, time,
1411        HConstants.DEFAULT_CLUSTER_ID), edit);
1412  }
1413
1414  private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1415    Writer writer =
1416        WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1417    if (closeFile) {
1418      writer.close();
1419    }
1420  }
1421
1422  private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1423    Reader in1, in2;
1424    in1 = wals.createReader(fs, p1);
1425    in2 = wals.createReader(fs, p2);
1426    Entry entry1;
1427    Entry entry2;
1428    while ((entry1 = in1.next()) != null) {
1429      entry2 = in2.next();
1430      if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1431          (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1432        return false;
1433      }
1434    }
1435    in1.close();
1436    in2.close();
1437    return true;
1438  }
1439}