001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.lang.reflect.Method;
028import java.security.PrivilegedExceptionAction;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.List;
035import java.util.Map;
036import java.util.NavigableSet;
037import java.util.Objects;
038import java.util.Set;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FSDataInputStream;
045import org.apache.hadoop.fs.FSDataOutputStream;
046import org.apache.hadoop.fs.FileStatus;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.FileUtil;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.fs.PathFilter;
051import org.apache.hadoop.hbase.Cell;
052import org.apache.hadoop.hbase.HBaseClassTestRule;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtility;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.RegionInfoBuilder;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
063import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
064import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.testclassification.RegionServerTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CancelableProgressable;
070import org.apache.hadoop.hbase.util.CommonFSUtils;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.wal.WAL.Entry;
074import org.apache.hadoop.hbase.wal.WAL.Reader;
075import org.apache.hadoop.hbase.wal.WALProvider.Writer;
076import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
077import org.apache.hadoop.hdfs.DFSTestUtil;
078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
079import org.apache.hadoop.ipc.RemoteException;
080import org.junit.After;
081import org.junit.AfterClass;
082import org.junit.Before;
083import org.junit.BeforeClass;
084import org.junit.ClassRule;
085import org.junit.Rule;
086import org.junit.Test;
087import org.junit.experimental.categories.Category;
088import org.junit.rules.TestName;
089import org.mockito.Mockito;
090import org.mockito.invocation.InvocationOnMock;
091import org.mockito.stubbing.Answer;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
097import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
098import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
099
100import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
102
103/**
104 * Testing {@link WAL} splitting code.
105 */
106@Category({RegionServerTests.class, LargeTests.class})
107public class TestWALSplit {
108
109  @ClassRule
110  public static final HBaseClassTestRule CLASS_RULE =
111      HBaseClassTestRule.forClass(TestWALSplit.class);
112
113  {
114    // Uncomment the following lines if more verbosity is needed for
115    // debugging (see HBASE-12285 for details).
116    //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
117    //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
118    //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
119  }
120  private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
121
122  private static Configuration conf;
123  private FileSystem fs;
124
125  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
126
127  private Path HBASEDIR;
128  private Path HBASELOGDIR;
129  private Path WALDIR;
130  private Path OLDLOGDIR;
131  private Path CORRUPTDIR;
132  private Path TABLEDIR;
133  private String TMPDIRNAME;
134
135  private static final int NUM_WRITERS = 10;
136  private static final int ENTRIES = 10; // entries per writer per region
137
138  private static final String FILENAME_BEING_SPLIT = "testfile";
139  private static final TableName TABLE_NAME =
140      TableName.valueOf("t1");
141  private static final byte[] FAMILY = Bytes.toBytes("f1");
142  private static final byte[] QUALIFIER = Bytes.toBytes("q1");
143  private static final byte[] VALUE = Bytes.toBytes("v1");
144  private static final String WAL_FILE_PREFIX = "wal.dat.";
145  private static List<String> REGIONS = new ArrayList<>();
146  private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
147  private static String ROBBER;
148  private static String ZOMBIE;
149  private static String [] GROUP = new String [] {"supergroup"};
150
151  static enum Corruptions {
152    INSERT_GARBAGE_ON_FIRST_LINE,
153    INSERT_GARBAGE_IN_THE_MIDDLE,
154    APPEND_GARBAGE,
155    TRUNCATE,
156    TRUNCATE_TRAILER
157  }
158
159  @BeforeClass
160  public static void setUpBeforeClass() throws Exception {
161    conf = TEST_UTIL.getConfiguration();
162    conf.setClass("hbase.regionserver.hlog.writer.impl",
163        InstrumentedLogWriter.class, Writer.class);
164    // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
165    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
166    // Create fake maping user to group and set it to the conf.
167    Map<String, String []> u2g_map = new HashMap<>(2);
168    ROBBER = User.getCurrent().getName() + "-robber";
169    ZOMBIE = User.getCurrent().getName() + "-zombie";
170    u2g_map.put(ROBBER, GROUP);
171    u2g_map.put(ZOMBIE, GROUP);
172    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
173    conf.setInt("dfs.heartbeat.interval", 1);
174    TEST_UTIL.startMiniDFSCluster(2);
175  }
176
177  @AfterClass
178  public static void tearDownAfterClass() throws Exception {
179    TEST_UTIL.shutdownMiniDFSCluster();
180  }
181
182  @Rule
183  public TestName name = new TestName();
184  private WALFactory wals = null;
185
186  @Before
187  public void setUp() throws Exception {
188    LOG.info("Cleaning up cluster for new test.");
189    fs = TEST_UTIL.getDFSCluster().getFileSystem();
190    HBASEDIR = TEST_UTIL.createRootDir();
191    HBASELOGDIR = TEST_UTIL.createWALRootDir();
192    OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
193    CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
194    TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
195    TMPDIRNAME = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
196      HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
197    REGIONS.clear();
198    Collections.addAll(REGIONS, "bbb", "ccc");
199    InstrumentedLogWriter.activateFailure = false;
200    wals = new WALFactory(conf, name.getMethodName());
201    WALDIR = new Path(HBASELOGDIR,
202        AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(),
203            16010, System.currentTimeMillis()).toString()));
204    //fs.mkdirs(WALDIR);
205  }
206
207  @After
208  public void tearDown() throws Exception {
209    try {
210      wals.close();
211    } catch(IOException exception) {
212      // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
213      LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
214          " you see a failure look here.");
215      LOG.debug("exception details", exception);
216    } finally {
217      wals = null;
218      fs.delete(HBASEDIR, true);
219      fs.delete(HBASELOGDIR, true);
220    }
221  }
222
223  /**
224   * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
225   * Ensures we do not lose edits.
226   * @throws IOException
227   * @throws InterruptedException
228   */
229  @Test
230  public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
231    final AtomicLong counter = new AtomicLong(0);
232    AtomicBoolean stop = new AtomicBoolean(false);
233    // Region we'll write edits too and then later examine to make sure they all made it in.
234    final String region = REGIONS.get(0);
235    final int numWriters = 3;
236    Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
237    try {
238      long startCount = counter.get();
239      zombie.start();
240      // Wait till writer starts going.
241      while (startCount == counter.get()) Threads.sleep(1);
242      // Give it a second to write a few appends.
243      Threads.sleep(1000);
244      final Configuration conf2 = HBaseConfiguration.create(conf);
245      final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
246      int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
247        @Override
248        public Integer run() throws Exception {
249          StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
250              .append("):\n");
251          for (FileStatus status : fs.listStatus(WALDIR)) {
252            ls.append("\t").append(status.toString()).append("\n");
253          }
254          LOG.debug(Objects.toString(ls));
255          LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
256          WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
257          LOG.info("Finished splitting out from under zombie.");
258          Path[] logfiles = getLogForRegion(TABLE_NAME, region);
259          assertEquals("wrong number of split files for region", numWriters, logfiles.length);
260          int count = 0;
261          for (Path logfile: logfiles) {
262            count += countWAL(logfile);
263          }
264          return count;
265        }
266      });
267      LOG.info("zombie=" + counter.get() + ", robber=" + count);
268      assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
269              "Zombie could write " + counter.get() + " and logfile had only " + count,
270          counter.get() == count || counter.get() + 1 == count);
271    } finally {
272      stop.set(true);
273      zombie.interrupt();
274      Threads.threadDumpingIsAlive(zombie);
275    }
276  }
277
278  /**
279   * This thread will keep writing to a 'wal' file even after the split process has started.
280   * It simulates a region server that was considered dead but woke up and wrote some more to the
281   * last log entry. Does its writing as an alternate user in another filesystem instance to
282   * simulate better it being a regionserver.
283   */
284  class ZombieLastLogWriterRegionServer extends Thread {
285    final AtomicLong editsCount;
286    final AtomicBoolean stop;
287    final int numOfWriters;
288    /**
289     * Region to write edits for.
290     */
291    final String region;
292    final User user;
293
294    public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
295        final String region, final int writers)
296        throws IOException, InterruptedException {
297      super("ZombieLastLogWriterRegionServer");
298      setDaemon(true);
299      this.stop = stop;
300      this.editsCount = counter;
301      this.region = region;
302      this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
303      numOfWriters = writers;
304    }
305
306    @Override
307    public void run() {
308      try {
309        doWriting();
310      } catch (IOException e) {
311        LOG.warn(getName() + " Writer exiting " + e);
312      } catch (InterruptedException e) {
313        LOG.warn(getName() + " Writer exiting " + e);
314      }
315    }
316
317    private void doWriting() throws IOException, InterruptedException {
318      this.user.runAs(new PrivilegedExceptionAction<Object>() {
319        @Override
320        public Object run() throws Exception {
321          // Index of the WAL we want to keep open.  generateWALs will leave open the WAL whose
322          // index we supply here.
323          int walToKeepOpen = numOfWriters - 1;
324          // The below method writes numOfWriters files each with ENTRIES entries for a total of
325          // numOfWriters * ENTRIES added per column family in the region.
326          Writer writer = null;
327          try {
328            writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
329          } catch (IOException e1) {
330            throw new RuntimeException("Failed", e1);
331          }
332          // Update counter so has all edits written so far.
333          editsCount.addAndGet(numOfWriters * ENTRIES);
334          loop(writer);
335          // If we've been interruped, then things should have shifted out from under us.
336          // closing should error
337          try {
338            writer.close();
339            fail("Writing closing after parsing should give an error.");
340          } catch (IOException exception) {
341            LOG.debug("ignoring error when closing final writer.", exception);
342          }
343          return null;
344        }
345      });
346    }
347
348    private void loop(final Writer writer) {
349      byte [] regionBytes = Bytes.toBytes(this.region);
350      while (!stop.get()) {
351        try {
352          long seq = appendEntry(writer, TABLE_NAME, regionBytes,
353              Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
354          long count = editsCount.incrementAndGet();
355          LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
356          try {
357            Thread.sleep(1);
358          } catch (InterruptedException e) {
359            //
360          }
361        } catch (IOException ex) {
362          LOG.error(getName() + " ex " + ex.toString());
363          if (ex instanceof RemoteException) {
364            LOG.error("Juliet: got RemoteException " + ex.getMessage() +
365                " while writing " + (editsCount.get() + 1));
366          } else {
367            LOG.error(getName() + " failed to write....at " + editsCount.get());
368            fail("Failed to write " + editsCount.get());
369          }
370          break;
371        } catch (Throwable t) {
372          LOG.error(getName() + " HOW? " + t);
373          LOG.debug("exception details", t);
374          break;
375        }
376      }
377      LOG.info(getName() + " Writer exiting");
378    }
379  }
380
381  /**
382   * @see "https://issues.apache.org/jira/browse/HBASE-3020"
383   */
384  @Test
385  public void testRecoveredEditsPathForMeta() throws IOException {
386    Path p = createRecoveredEditsPathForRegion();
387    String parentOfParent = p.getParent().getParent().getName();
388    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
389  }
390
391  /**
392   * Test old recovered edits file doesn't break WALSplitter.
393   * This is useful in upgrading old instances.
394   */
395  @Test
396  public void testOldRecoveredEditsFileSidelined() throws IOException {
397    Path p = createRecoveredEditsPathForRegion();
398    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
399    Path regiondir = new Path(tdir,
400      RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
401    fs.mkdirs(regiondir);
402    Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
403    assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
404    fs.createNewFile(parent); // create a recovered.edits file
405    String parentOfParent = p.getParent().getParent().getName();
406    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
407    WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
408  }
409
410  private Path createRecoveredEditsPathForRegion() throws IOException {
411    byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
412    long now = System.currentTimeMillis();
413    Entry entry = new Entry(
414        new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
415        new WALEdit());
416    Path p = WALSplitUtil
417        .getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT,
418            TMPDIRNAME, conf);
419    return p;
420  }
421
422  @Test
423  public void testHasRecoveredEdits() throws IOException {
424    Path p = createRecoveredEditsPathForRegion();
425    assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
426    String renamedEdit = p.getName().split("-")[0];
427    fs.createNewFile(new Path(p.getParent(), renamedEdit));
428    assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
429  }
430
431  private void useDifferentDFSClient() throws IOException {
432    // make fs act as a different client now
433    // initialize will create a new DFSClient with a new client ID
434    fs.initialize(fs.getUri(), conf);
435  }
436
437  @Test
438  public void testSplitPreservesEdits() throws IOException{
439    final String REGION = "region__1";
440    REGIONS.clear();
441    REGIONS.add(REGION);
442
443    generateWALs(1, 10, -1, 0);
444    useDifferentDFSClient();
445    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
446    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
447    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
448    assertEquals(1, splitLog.length);
449
450    assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
451  }
452
453  @Test
454  public void testSplitRemovesRegionEventsEdits() throws IOException{
455    final String REGION = "region__1";
456    REGIONS.clear();
457    REGIONS.add(REGION);
458
459    generateWALs(1, 10, -1, 100);
460    useDifferentDFSClient();
461    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
462    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
463    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
464    assertEquals(1, splitLog.length);
465
466    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
467    // split log should only have the test edits
468    assertEquals(10, countWAL(splitLog[0]));
469  }
470
471
472  @Test
473  public void testSplitLeavesCompactionEventsEdits() throws IOException{
474    RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
475    REGIONS.clear();
476    REGIONS.add(hri.getEncodedName());
477    Path regionDir =
478      new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
479    LOG.info("Creating region directory: " + regionDir);
480    assertTrue(fs.mkdirs(regionDir));
481
482    Writer writer = generateWALs(1, 10, 0, 10);
483    String[] compactInputs = new String[]{"file1", "file2", "file3"};
484    String compactOutput = "file4";
485    appendCompactionEvent(writer, hri, compactInputs, compactOutput);
486    writer.close();
487
488    useDifferentDFSClient();
489    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
490
491    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
492    // original log should have 10 test edits, 10 region markers, 1 compaction marker
493    assertEquals(21, countWAL(originalLog));
494
495    Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
496    assertEquals(1, splitLog.length);
497
498    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
499    // split log should have 10 test edits plus 1 compaction marker
500    assertEquals(11, countWAL(splitLog[0]));
501  }
502
503  /**
504   * @param expectedEntries -1 to not assert
505   * @return the count across all regions
506   */
507  private int splitAndCount(final int expectedFiles, final int expectedEntries)
508      throws IOException {
509    useDifferentDFSClient();
510    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
511    int result = 0;
512    for (String region : REGIONS) {
513      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
514      assertEquals(expectedFiles, logfiles.length);
515      int count = 0;
516      for (Path logfile: logfiles) {
517        count += countWAL(logfile);
518      }
519      if (-1 != expectedEntries) {
520        assertEquals(expectedEntries, count);
521      }
522      result += count;
523    }
524    return result;
525  }
526
527  @Test
528  public void testEmptyLogFiles() throws IOException {
529    testEmptyLogFiles(true);
530  }
531
532  @Test
533  public void testEmptyOpenLogFiles() throws IOException {
534    testEmptyLogFiles(false);
535  }
536
537  private void testEmptyLogFiles(final boolean close) throws IOException {
538    // we won't create the hlog dir until getWAL got called, so
539    // make dir here when testing empty log file
540    fs.mkdirs(WALDIR);
541    injectEmptyFile(".empty", close);
542    generateWALs(Integer.MAX_VALUE);
543    injectEmptyFile("empty", close);
544    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
545  }
546
547  @Test
548  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
549    // generate logs but leave wal.dat.5 open.
550    generateWALs(5);
551    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
552  }
553
554  @Test
555  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
556    conf.setBoolean(HBASE_SKIP_ERRORS, true);
557    generateWALs(Integer.MAX_VALUE);
558    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
559        Corruptions.APPEND_GARBAGE, true);
560    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
561  }
562
563  @Test
564  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
565    conf.setBoolean(HBASE_SKIP_ERRORS, true);
566    generateWALs(Integer.MAX_VALUE);
567    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
568        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
569    splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt
570  }
571
572  @Test
573  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
574    conf.setBoolean(HBASE_SKIP_ERRORS, true);
575    generateWALs(Integer.MAX_VALUE);
576    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
577        Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
578    // the entries in the original logs are alternating regions
579    // considering the sequence file header, the middle corruption should
580    // affect at least half of the entries
581    int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
582    int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
583    int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
584    assertTrue("The file up to the corrupted area hasn't been parsed",
585        REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
586  }
587
588  @Test
589  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
590    conf.setBoolean(HBASE_SKIP_ERRORS, true);
591    List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays
592        .asList(FaultyProtobufLogReader.FailureType.values()).stream()
593        .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
594    for (FaultyProtobufLogReader.FailureType failureType : failureTypes) {
595      final Set<String> walDirContents = splitCorruptWALs(failureType);
596      final Set<String> archivedLogs = new HashSet<>();
597      final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
598      for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
599        archived.append("\n\t").append(log.toString());
600        archivedLogs.add(log.getPath().getName());
601      }
602      LOG.debug(archived.toString());
603      assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs,
604          walDirContents);
605    }
606  }
607
608  /**
609   * @return set of wal names present prior to split attempt.
610   * @throws IOException if the split process fails
611   */
612  private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType)
613      throws IOException {
614    Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
615        Reader.class);
616    InstrumentedLogWriter.activateFailure = false;
617
618    try {
619      conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class,
620        Reader.class);
621      conf.set("faultyprotobuflogreader.failuretype", failureType.name());
622      // Clean up from previous tests or previous loop
623      try {
624        wals.shutdown();
625      } catch (IOException exception) {
626        // since we're splitting out from under the factory, we should expect some closing failures.
627        LOG.debug("Ignoring problem closing WALFactory.", exception);
628      }
629      wals.close();
630      try {
631        for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
632          fs.delete(log.getPath(), true);
633        }
634      } catch (FileNotFoundException exception) {
635        LOG.debug("no previous CORRUPTDIR to clean.");
636      }
637      // change to the faulty reader
638      wals = new WALFactory(conf, name.getMethodName());
639      generateWALs(-1);
640      // Our reader will render all of these files corrupt.
641      final Set<String> walDirContents = new HashSet<>();
642      for (FileStatus status : fs.listStatus(WALDIR)) {
643        walDirContents.add(status.getPath().getName());
644      }
645      useDifferentDFSClient();
646      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
647      return walDirContents;
648    } finally {
649      conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
650          Reader.class);
651    }
652  }
653
654  @Test (expected = IOException.class)
655  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
656      throws IOException {
657    conf.setBoolean(HBASE_SKIP_ERRORS, false);
658    splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
659  }
660
661  @Test
662  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
663      throws IOException {
664    conf.setBoolean(HBASE_SKIP_ERRORS, false);
665    try {
666      splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
667    } catch (IOException e) {
668      LOG.debug("split with 'skip errors' set to 'false' correctly threw");
669    }
670    assertEquals("if skip.errors is false all files should remain in place",
671        NUM_WRITERS, fs.listStatus(WALDIR).length);
672  }
673
674  private void ignoreCorruption(final Corruptions corruption, final int entryCount,
675      final int expectedCount) throws IOException {
676    conf.setBoolean(HBASE_SKIP_ERRORS, false);
677
678    final String REGION = "region__1";
679    REGIONS.clear();
680    REGIONS.add(REGION);
681
682    Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
683    generateWALs(1, entryCount, -1, 0);
684    corruptWAL(c1, corruption, true);
685
686    useDifferentDFSClient();
687    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
688
689    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
690    assertEquals(1, splitLog.length);
691
692    int actualCount = 0;
693    Reader in = wals.createReader(fs, splitLog[0]);
694    @SuppressWarnings("unused")
695    Entry entry;
696    while ((entry = in.next()) != null) ++actualCount;
697    assertEquals(expectedCount, actualCount);
698    in.close();
699
700    // should not have stored the EOF files as corrupt
701    FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
702    assertEquals(0, archivedLogs.length);
703
704  }
705
706  @Test
707  public void testEOFisIgnored() throws IOException {
708    int entryCount = 10;
709    ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
710  }
711
712  @Test
713  public void testCorruptWALTrailer() throws IOException {
714    int entryCount = 10;
715    ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
716  }
717
718  @Test
719  public void testLogsGetArchivedAfterSplit() throws IOException {
720    conf.setBoolean(HBASE_SKIP_ERRORS, false);
721    generateWALs(-1);
722    useDifferentDFSClient();
723    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
724    FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
725    assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
726  }
727
728  @Test
729  public void testSplit() throws IOException {
730    generateWALs(-1);
731    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
732  }
733
734  @Test
735  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
736      throws IOException {
737    generateWALs(-1);
738    useDifferentDFSClient();
739    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
740    FileStatus [] statuses = null;
741    try {
742      statuses = fs.listStatus(WALDIR);
743      if (statuses != null) {
744        fail("Files left in log dir: " +
745            Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
746      }
747    } catch (FileNotFoundException e) {
748      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
749    }
750  }
751
752  @Test(expected = IOException.class)
753  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
754    //leave 5th log open so we could append the "trap"
755    Writer writer = generateWALs(4);
756    useDifferentDFSClient();
757
758    String region = "break";
759    Path regiondir = new Path(TABLEDIR, region);
760    fs.mkdirs(regiondir);
761
762    InstrumentedLogWriter.activateFailure = false;
763    appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
764        Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0);
765    writer.close();
766
767    try {
768      InstrumentedLogWriter.activateFailure = true;
769      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
770    } catch (IOException e) {
771      assertTrue(e.getMessage().
772          contains("This exception is instrumented and should only be thrown for testing"));
773      throw e;
774    } finally {
775      InstrumentedLogWriter.activateFailure = false;
776    }
777  }
778
779  @Test
780  public void testSplitDeletedRegion() throws IOException {
781    REGIONS.clear();
782    String region = "region_that_splits";
783    REGIONS.add(region);
784
785    generateWALs(1);
786    useDifferentDFSClient();
787
788    Path regiondir = new Path(TABLEDIR, region);
789    fs.delete(regiondir, true);
790    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
791    assertFalse(fs.exists(regiondir));
792  }
793
794  @Test
795  public void testIOEOnOutputThread() throws Exception {
796    conf.setBoolean(HBASE_SKIP_ERRORS, false);
797
798    generateWALs(-1);
799    useDifferentDFSClient();
800    FileStatus[] logfiles = fs.listStatus(WALDIR);
801    assertTrue("There should be some log file",
802        logfiles != null && logfiles.length > 0);
803    // wals with no entries (like the one we don't use in the factory)
804    // won't cause a failure since nothing will ever be written.
805    // pick the largest one since it's most likely to have entries.
806    int largestLogFile = 0;
807    long largestSize = 0;
808    for (int i = 0; i < logfiles.length; i++) {
809      if (logfiles[i].getLen() > largestSize) {
810        largestLogFile = i;
811        largestSize = logfiles[i].getLen();
812      }
813    }
814    assertTrue("There should be some log greater than size 0.", 0 < largestSize);
815    // Set up a splitter that will throw an IOE on the output side
816    WALSplitter logSplitter =
817      new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
818        @Override
819        protected Writer createWriter(Path logfile) throws IOException {
820          Writer mockWriter = Mockito.mock(Writer.class);
821          Mockito.doThrow(new IOException("Injected")).when(mockWriter)
822              .append(Mockito.<Entry> any());
823          return mockWriter;
824        }
825      };
826    // Set up a background thread dumper.  Needs a thread to depend on and then we need to run
827    // the thread dumping in a background thread so it does not hold up the test.
828    final AtomicBoolean stop = new AtomicBoolean(false);
829    final Thread someOldThread = new Thread("Some-old-thread") {
830      @Override
831      public void run() {
832        while(!stop.get()) Threads.sleep(10);
833      }
834    };
835    someOldThread.setDaemon(true);
836    someOldThread.start();
837    final Thread t = new Thread("Background-thread-dumper") {
838      @Override
839      public void run() {
840        try {
841          Threads.threadDumpingIsAlive(someOldThread);
842        } catch (InterruptedException e) {
843          e.printStackTrace();
844        }
845      }
846    };
847    t.setDaemon(true);
848    t.start();
849    try {
850      logSplitter.splitLogFile(logfiles[largestLogFile], null);
851      fail("Didn't throw!");
852    } catch (IOException ioe) {
853      assertTrue(ioe.toString().contains("Injected"));
854    } finally {
855      // Setting this to true will turn off the background thread dumper.
856      stop.set(true);
857    }
858  }
859
860  /**
861   * @param spiedFs should be instrumented for failure.
862   */
863  private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
864    generateWALs(-1);
865    useDifferentDFSClient();
866
867    try {
868      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
869      assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
870      assertFalse(fs.exists(WALDIR));
871    } catch (IOException e) {
872      fail("There shouldn't be any exception but: " + e.toString());
873    }
874  }
875
876  // Test for HBASE-3412
877  @Test
878  public void testMovedWALDuringRecovery() throws Exception {
879    // This partial mock will throw LEE for every file simulating
880    // files that were moved
881    FileSystem spiedFs = Mockito.spy(fs);
882    // The "File does not exist" part is very important,
883    // that's how it comes out of HDFS
884    Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
885        when(spiedFs).append(Mockito.<Path>any());
886    retryOverHdfsProblem(spiedFs);
887  }
888
889  @Test
890  public void testRetryOpenDuringRecovery() throws Exception {
891    FileSystem spiedFs = Mockito.spy(fs);
892    // The "Cannot obtain block length", "Could not obtain the last block",
893    // and "Blocklist for [^ ]* has changed.*" part is very important,
894    // that's how it comes out of HDFS. If HDFS changes the exception
895    // message, this test needs to be adjusted accordingly.
896    //
897    // When DFSClient tries to open a file, HDFS needs to locate
898    // the last block of the file and get its length. However, if the
899    // last block is under recovery, HDFS may have problem to obtain
900    // the block length, in which case, retry may help.
901    Mockito.doAnswer(new Answer<FSDataInputStream>() {
902      private final String[] errors = new String[] {
903          "Cannot obtain block length", "Could not obtain the last block",
904          "Blocklist for " + OLDLOGDIR + " has changed"};
905      private int count = 0;
906
907      @Override
908      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
909        if (count < 3) {
910          throw new IOException(errors[count++]);
911        }
912        return (FSDataInputStream)invocation.callRealMethod();
913      }
914    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
915    retryOverHdfsProblem(spiedFs);
916  }
917
918  @Test
919  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
920    generateWALs(1, 10, -1);
921    FileStatus logfile = fs.listStatus(WALDIR)[0];
922    useDifferentDFSClient();
923
924    final AtomicInteger count = new AtomicInteger();
925
926    CancelableProgressable localReporter
927        = new CancelableProgressable() {
928      @Override
929      public boolean progress() {
930        count.getAndIncrement();
931        return false;
932      }
933    };
934
935    FileSystem spiedFs = Mockito.spy(fs);
936    Mockito.doAnswer(new Answer<FSDataInputStream>() {
937      @Override
938      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
939        Thread.sleep(1500); // Sleep a while and wait report status invoked
940        return (FSDataInputStream)invocation.callRealMethod();
941      }
942    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
943
944    try {
945      conf.setInt("hbase.splitlog.report.period", 1000);
946      boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
947        null, wals, null);
948      assertFalse("Log splitting should failed", ret);
949      assertTrue(count.get() > 0);
950    } catch (IOException e) {
951      fail("There shouldn't be any exception but: " + e.toString());
952    } finally {
953      // reset it back to its default value
954      conf.setInt("hbase.splitlog.report.period", 59000);
955    }
956  }
957
958  /**
959   * Test log split process with fake data and lots of edits to trigger threading
960   * issues.
961   */
962  @Test
963  public void testThreading() throws Exception {
964    doTestThreading(20000, 128*1024*1024, 0);
965  }
966
967  /**
968   * Test blocking behavior of the log split process if writers are writing slower
969   * than the reader is reading.
970   */
971  @Test
972  public void testThreadingSlowWriterSmallBuffer() throws Exception {
973    doTestThreading(200, 1024, 50);
974  }
975
976  /**
977   * Sets up a log splitter with a mock reader and writer. The mock reader generates
978   * a specified number of edits spread across 5 regions. The mock writer optionally
979   * sleeps for each edit it is fed.
980   * *
981   * After the split is complete, verifies that the statistics show the correct number
982   * of edits output into each region.
983   *
984   * @param numFakeEdits number of fake edits to push through pipeline
985   * @param bufferSize size of in-memory buffer
986   * @param writerSlowness writer threads will sleep this many ms per edit
987   */
988  private void doTestThreading(final int numFakeEdits,
989      final int bufferSize,
990      final int writerSlowness) throws Exception {
991
992    Configuration localConf = new Configuration(conf);
993    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
994
995    // Create a fake log file (we'll override the reader to produce a stream of edits)
996    Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
997    FSDataOutputStream out = fs.create(logPath);
998    out.close();
999
1000    // Make region dirs for our destination regions so the output doesn't get skipped
1001    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
1002    makeRegionDirs(regions);
1003
1004    // Create a splitter that reads and writes the data without touching disk
1005    WALSplitter logSplitter =
1006        new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
1007
1008      /* Produce a mock writer that doesn't write anywhere */
1009      @Override
1010      protected Writer createWriter(Path logfile) throws IOException {
1011        Writer mockWriter = Mockito.mock(Writer.class);
1012        Mockito.doAnswer(new Answer<Void>() {
1013          int expectedIndex = 0;
1014
1015          @Override
1016          public Void answer(InvocationOnMock invocation) {
1017            if (writerSlowness > 0) {
1018              try {
1019                Thread.sleep(writerSlowness);
1020              } catch (InterruptedException ie) {
1021                Thread.currentThread().interrupt();
1022              }
1023            }
1024            Entry entry = (Entry) invocation.getArgument(0);
1025            WALEdit edit = entry.getEdit();
1026            List<Cell> cells = edit.getCells();
1027            assertEquals(1, cells.size());
1028            Cell cell = cells.get(0);
1029
1030            // Check that the edits come in the right order.
1031            assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(),
1032                cell.getRowLength()));
1033            expectedIndex++;
1034            return null;
1035          }
1036        }).when(mockWriter).append(Mockito.<Entry>any());
1037        return mockWriter;
1038      }
1039
1040      /* Produce a mock reader that generates fake entries */
1041      @Override
1042      protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
1043          throws IOException {
1044        Reader mockReader = Mockito.mock(Reader.class);
1045        Mockito.doAnswer(new Answer<Entry>() {
1046          int index = 0;
1047
1048          @Override
1049          public Entry answer(InvocationOnMock invocation) throws Throwable {
1050            if (index >= numFakeEdits) return null;
1051
1052            // Generate r0 through r4 in round robin fashion
1053            int regionIdx = index % regions.size();
1054            byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
1055
1056            Entry ret = createTestEntry(TABLE_NAME, region,
1057                Bytes.toBytes(index / regions.size()),
1058                FAMILY, QUALIFIER, VALUE, index);
1059            index++;
1060            return ret;
1061          }
1062        }).when(mockReader).next();
1063        return mockReader;
1064      }
1065    };
1066
1067    logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
1068
1069    // Verify number of written edits per region
1070    Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1071    for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
1072      LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey());
1073      assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
1074    }
1075    assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1076  }
1077
1078  // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1079  @Test
1080  public void testSplitLogFileDeletedRegionDir() throws IOException {
1081    LOG.info("testSplitLogFileDeletedRegionDir");
1082    final String REGION = "region__1";
1083    REGIONS.clear();
1084    REGIONS.add(REGION);
1085
1086    generateWALs(1, 10, -1);
1087    useDifferentDFSClient();
1088
1089    Path regiondir = new Path(TABLEDIR, REGION);
1090    LOG.info("Region directory is" + regiondir);
1091    fs.delete(regiondir, true);
1092    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1093    assertFalse(fs.exists(regiondir));
1094  }
1095
1096  @Test
1097  public void testSplitLogFileEmpty() throws IOException {
1098    LOG.info("testSplitLogFileEmpty");
1099    // we won't create the hlog dir until getWAL got called, so
1100    // make dir here when testing empty log file
1101    fs.mkdirs(WALDIR);
1102    injectEmptyFile(".empty", true);
1103    useDifferentDFSClient();
1104
1105    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1106    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1107    assertFalse(fs.exists(tdir));
1108
1109    assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1110  }
1111
1112  @Test
1113  public void testSplitLogFileMultipleRegions() throws IOException {
1114    LOG.info("testSplitLogFileMultipleRegions");
1115    generateWALs(1, 10, -1);
1116    splitAndCount(1, 10);
1117  }
1118
1119  @Test
1120  public void testSplitLogFileFirstLineCorruptionLog()
1121      throws IOException {
1122    conf.setBoolean(HBASE_SKIP_ERRORS, true);
1123    generateWALs(1, 10, -1);
1124    FileStatus logfile = fs.listStatus(WALDIR)[0];
1125
1126    corruptWAL(logfile.getPath(),
1127        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1128
1129    useDifferentDFSClient();
1130    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1131
1132    final Path corruptDir =
1133      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1134    assertEquals(1, fs.listStatus(corruptDir).length);
1135  }
1136
1137  /**
1138   * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1139   */
1140  @Test
1141  public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1142    LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1143    // Generate wals for our destination region
1144    String regionName = "r0";
1145    final Path regiondir = new Path(TABLEDIR, regionName);
1146    REGIONS.clear();
1147    REGIONS.add(regionName);
1148    generateWALs(-1);
1149
1150    wals.getWAL(null);
1151    FileStatus[] logfiles = fs.listStatus(WALDIR);
1152    assertTrue("There should be some log file",
1153        logfiles != null && logfiles.length > 0);
1154
1155    WALSplitter logSplitter =
1156        new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
1157      @Override
1158      protected Writer createWriter(Path logfile)
1159          throws IOException {
1160        Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1161        // After creating writer, simulate region's
1162        // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1163        // region and delete them, excluding files with '.temp' suffix.
1164        NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
1165        if (files != null && !files.isEmpty()) {
1166          for (Path file : files) {
1167            if (!this.walFS.delete(file, false)) {
1168              LOG.error("Failed delete of " + file);
1169            } else {
1170              LOG.debug("Deleted recovered.edits file=" + file);
1171            }
1172          }
1173        }
1174        return writer;
1175      }
1176    };
1177    try{
1178      logSplitter.splitLogFile(logfiles[0], null);
1179    } catch (IOException e) {
1180      LOG.info(e.toString(), e);
1181      fail("Throws IOException when spliting "
1182          + "log, it is most likely because writing file does not "
1183          + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1184    }
1185    if (fs.exists(CORRUPTDIR)) {
1186      if (fs.listStatus(CORRUPTDIR).length > 0) {
1187        fail("There are some corrupt logs, "
1188            + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1189      }
1190    }
1191  }
1192
1193  private Writer generateWALs(int leaveOpen) throws IOException {
1194    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1195  }
1196
1197  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1198    return generateWALs(writers, entries, leaveOpen, 7);
1199  }
1200
1201  private void makeRegionDirs(List<String> regions) throws IOException {
1202    for (String region : regions) {
1203      LOG.debug("Creating dir for region " + region);
1204      fs.mkdirs(new Path(TABLEDIR, region));
1205    }
1206  }
1207
1208  /**
1209   * @param leaveOpen index to leave un-closed. -1 to close all.
1210   * @return the writer that's still open, or null if all were closed.
1211   */
1212  private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException {
1213    makeRegionDirs(REGIONS);
1214    fs.mkdirs(WALDIR);
1215    Writer [] ws = new Writer[writers];
1216    int seq = 0;
1217    int numRegionEventsAdded = 0;
1218    for (int i = 0; i < writers; i++) {
1219      ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1220      for (int j = 0; j < entries; j++) {
1221        int prefix = 0;
1222        for (String region : REGIONS) {
1223          String row_key = region + prefix++ + i + j;
1224          appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1225              QUALIFIER, VALUE, seq++);
1226
1227          if (numRegionEventsAdded < regionEvents) {
1228            numRegionEventsAdded ++;
1229            appendRegionEvent(ws[i], region);
1230          }
1231        }
1232      }
1233      if (i != leaveOpen) {
1234        ws[i].close();
1235        LOG.info("Closing writer " + i);
1236      }
1237    }
1238    if (leaveOpen < 0 || leaveOpen >= writers) {
1239      return null;
1240    }
1241    return ws[leaveOpen];
1242  }
1243
1244
1245
1246  private Path[] getLogForRegion(TableName table, String region)
1247      throws IOException {
1248    Path tdir = CommonFSUtils.getWALTableDir(conf, table);
1249    @SuppressWarnings("deprecation")
1250    Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1251        Bytes.toString(Bytes.toBytes(region))));
1252    FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1253      @Override
1254      public boolean accept(Path p) {
1255        if (WALSplitUtil.isSequenceIdFile(p)) {
1256          return false;
1257        }
1258        return true;
1259      }
1260    });
1261    Path[] paths = new Path[files.length];
1262    for (int i = 0; i < files.length; i++) {
1263      paths[i] = files[i].getPath();
1264    }
1265    return paths;
1266  }
1267
1268  private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1269    FSDataOutputStream out;
1270    int fileSize = (int) fs.listStatus(path)[0].getLen();
1271
1272    FSDataInputStream in = fs.open(path);
1273    byte[] corrupted_bytes = new byte[fileSize];
1274    in.readFully(0, corrupted_bytes, 0, fileSize);
1275    in.close();
1276
1277    switch (corruption) {
1278      case APPEND_GARBAGE:
1279        fs.delete(path, false);
1280        out = fs.create(path);
1281        out.write(corrupted_bytes);
1282        out.write(Bytes.toBytes("-----"));
1283        closeOrFlush(close, out);
1284        break;
1285
1286      case INSERT_GARBAGE_ON_FIRST_LINE:
1287        fs.delete(path, false);
1288        out = fs.create(path);
1289        out.write(0);
1290        out.write(corrupted_bytes);
1291        closeOrFlush(close, out);
1292        break;
1293
1294      case INSERT_GARBAGE_IN_THE_MIDDLE:
1295        fs.delete(path, false);
1296        out = fs.create(path);
1297        int middle = (int) Math.floor(corrupted_bytes.length / 2);
1298        out.write(corrupted_bytes, 0, middle);
1299        out.write(0);
1300        out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1301        closeOrFlush(close, out);
1302        break;
1303
1304      case TRUNCATE:
1305        fs.delete(path, false);
1306        out = fs.create(path);
1307        out.write(corrupted_bytes, 0, fileSize
1308            - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1309        closeOrFlush(close, out);
1310        break;
1311
1312      case TRUNCATE_TRAILER:
1313        fs.delete(path, false);
1314        out = fs.create(path);
1315        out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1316        closeOrFlush(close, out);
1317        break;
1318    }
1319  }
1320
1321  private void closeOrFlush(boolean close, FSDataOutputStream out)
1322      throws IOException {
1323    if (close) {
1324      out.close();
1325    } else {
1326      Method syncMethod = null;
1327      try {
1328        syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1329      } catch (NoSuchMethodException e) {
1330        try {
1331          syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1332        } catch (NoSuchMethodException ex) {
1333          throw new IOException("This version of Hadoop supports " +
1334              "neither Syncable.sync() nor Syncable.hflush().");
1335        }
1336      }
1337      try {
1338        syncMethod.invoke(out, new Object[]{});
1339      } catch (Exception e) {
1340        throw new IOException(e);
1341      }
1342      // Not in 0out.hflush();
1343    }
1344  }
1345
1346  private int countWAL(Path log) throws IOException {
1347    int count = 0;
1348    Reader in = wals.createReader(fs, log);
1349    while (in.next() != null) {
1350      count++;
1351    }
1352    in.close();
1353    return count;
1354  }
1355
1356  private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1357      String output) throws IOException {
1358    WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1359    desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1360        .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1361        .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1362        .setFamilyName(ByteString.copyFrom(FAMILY))
1363        .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1364        .addAllCompactionInput(Arrays.asList(inputs))
1365        .addCompactionOutput(output);
1366
1367    WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1368    WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1369        EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1370    w.append(new Entry(key, edit));
1371    w.sync(false);
1372  }
1373
1374  private static void appendRegionEvent(Writer w, String region) throws IOException {
1375    WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1376        WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
1377        TABLE_NAME.toBytes(),
1378        Bytes.toBytes(region),
1379        Bytes.toBytes(String.valueOf(region.hashCode())),
1380        1,
1381        ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
1382    final long time = EnvironmentEdgeManager.currentTime();
1383    final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time,
1384        HConstants.DEFAULT_CLUSTER_ID);
1385    WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
1386    w.append(new Entry(walKey, we));
1387    w.sync(false);
1388  }
1389
1390  public static long appendEntry(Writer writer, TableName table, byte[] region,
1391      byte[] row, byte[] family, byte[] qualifier,
1392      byte[] value, long seq)
1393      throws IOException {
1394    LOG.info(Thread.currentThread().getName() + " append");
1395    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1396    LOG.info(Thread.currentThread().getName() + " sync");
1397    writer.sync(false);
1398    return seq;
1399  }
1400
1401  private static Entry createTestEntry(
1402      TableName table, byte[] region,
1403      byte[] row, byte[] family, byte[] qualifier,
1404      byte[] value, long seq) {
1405    long time = System.nanoTime();
1406
1407    seq++;
1408    final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1409    WALEdit edit = new WALEdit();
1410    edit.add(cell);
1411    return new Entry(new WALKeyImpl(region, table, seq, time,
1412        HConstants.DEFAULT_CLUSTER_ID), edit);
1413  }
1414
1415  private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1416    Writer writer =
1417        WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1418    if (closeFile) {
1419      writer.close();
1420    }
1421  }
1422
1423  private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1424    Reader in1, in2;
1425    in1 = wals.createReader(fs, p1);
1426    in2 = wals.createReader(fs, p2);
1427    Entry entry1;
1428    Entry entry2;
1429    while ((entry1 = in1.next()) != null) {
1430      entry2 = in2.next();
1431      if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1432          (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1433        return false;
1434      }
1435    }
1436    in1.close();
1437    in2.close();
1438    return true;
1439  }
1440}