001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.lang.reflect.Method;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.NavigableSet;
036import java.util.Objects;
037import java.util.Set;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicLong;
041import java.util.stream.Collectors;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FSDataInputStream;
044import org.apache.hadoop.fs.FSDataOutputStream;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.FileUtil;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.fs.PathFilter;
050import org.apache.hadoop.hbase.Cell;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtility;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.KeyValue;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.client.RegionInfo;
059import org.apache.hadoop.hbase.client.RegionInfoBuilder;
060import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
063import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
064import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.testclassification.RegionServerTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CancelableProgressable;
070import org.apache.hadoop.hbase.util.CommonFSUtils;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.wal.WAL.Entry;
074import org.apache.hadoop.hbase.wal.WAL.Reader;
075import org.apache.hadoop.hbase.wal.WALProvider.Writer;
076import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
077import org.apache.hadoop.hdfs.DFSTestUtil;
078import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
079import org.apache.hadoop.ipc.RemoteException;
080import org.junit.After;
081import org.junit.AfterClass;
082import org.junit.Before;
083import org.junit.BeforeClass;
084import org.junit.ClassRule;
085import org.junit.Rule;
086import org.junit.Test;
087import org.junit.experimental.categories.Category;
088import org.junit.rules.TestName;
089import org.mockito.Mockito;
090import org.mockito.invocation.InvocationOnMock;
091import org.mockito.stubbing.Answer;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
095import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
097import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
098import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
100
101/**
102 * Testing {@link WAL} splitting code.
103 */
104@Category({RegionServerTests.class, LargeTests.class})
105public class TestWALSplit {
106  @ClassRule
107  public static final HBaseClassTestRule CLASS_RULE =
108      HBaseClassTestRule.forClass(TestWALSplit.class);
109  private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
110
111  private static Configuration conf;
112  private FileSystem fs;
113
114  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
115
116  private Path HBASEDIR;
117  private Path HBASELOGDIR;
118  private Path WALDIR;
119  private Path OLDLOGDIR;
120  private Path CORRUPTDIR;
121  private Path TABLEDIR;
122  private String TMPDIRNAME;
123
124  private static final int NUM_WRITERS = 10;
125  private static final int ENTRIES = 10; // entries per writer per region
126
127  private static final String FILENAME_BEING_SPLIT = "testfile";
128  private static final TableName TABLE_NAME =
129      TableName.valueOf("t1");
130  private static final byte[] FAMILY = Bytes.toBytes("f1");
131  private static final byte[] QUALIFIER = Bytes.toBytes("q1");
132  private static final byte[] VALUE = Bytes.toBytes("v1");
133  private static final String WAL_FILE_PREFIX = "wal.dat.";
134  private static List<String> REGIONS = new ArrayList<>();
135  private static String ROBBER;
136  private static String ZOMBIE;
137  private static String [] GROUP = new String [] {"supergroup"};
138
139  static enum Corruptions {
140    INSERT_GARBAGE_ON_FIRST_LINE,
141    INSERT_GARBAGE_IN_THE_MIDDLE,
142    APPEND_GARBAGE,
143    TRUNCATE,
144    TRUNCATE_TRAILER
145  }
146
147  @BeforeClass
148  public static void setUpBeforeClass() throws Exception {
149    conf = TEST_UTIL.getConfiguration();
150    conf.setClass("hbase.regionserver.hlog.writer.impl",
151        InstrumentedLogWriter.class, Writer.class);
152    // This is how you turn off shortcircuit read currently.  TODO: Fix.  Should read config.
153    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
154    // Create fake maping user to group and set it to the conf.
155    Map<String, String []> u2g_map = new HashMap<>(2);
156    ROBBER = User.getCurrent().getName() + "-robber";
157    ZOMBIE = User.getCurrent().getName() + "-zombie";
158    u2g_map.put(ROBBER, GROUP);
159    u2g_map.put(ZOMBIE, GROUP);
160    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
161    conf.setInt("dfs.heartbeat.interval", 1);
162    TEST_UTIL.startMiniDFSCluster(2);
163  }
164
165  @AfterClass
166  public static void tearDownAfterClass() throws Exception {
167    TEST_UTIL.shutdownMiniDFSCluster();
168  }
169
170  @Rule
171  public TestName name = new TestName();
172  private WALFactory wals = null;
173
174  @Before
175  public void setUp() throws Exception {
176    LOG.info("Cleaning up cluster for new test.");
177    fs = TEST_UTIL.getDFSCluster().getFileSystem();
178    HBASEDIR = TEST_UTIL.createRootDir();
179    HBASELOGDIR = TEST_UTIL.createWALRootDir();
180    OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
181    CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
182    TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
183    TMPDIRNAME = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
184      HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
185    REGIONS.clear();
186    Collections.addAll(REGIONS, "bbb", "ccc");
187    InstrumentedLogWriter.activateFailure = false;
188    wals = new WALFactory(conf, name.getMethodName());
189    WALDIR = new Path(HBASELOGDIR,
190        AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(),
191            16010, System.currentTimeMillis()).toString()));
192    //fs.mkdirs(WALDIR);
193  }
194
195  @After
196  public void tearDown() throws Exception {
197    try {
198      wals.close();
199    } catch(IOException exception) {
200      // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
201      LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
202          " you see a failure look here.");
203      LOG.debug("exception details", exception);
204    } finally {
205      wals = null;
206      fs.delete(HBASEDIR, true);
207      fs.delete(HBASELOGDIR, true);
208    }
209  }
210
211  /**
212   * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
213   * Ensures we do not lose edits.
214   */
215  @Test
216  public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
217    final AtomicLong counter = new AtomicLong(0);
218    AtomicBoolean stop = new AtomicBoolean(false);
219    // Region we'll write edits too and then later examine to make sure they all made it in.
220    final String region = REGIONS.get(0);
221    final int numWriters = 3;
222    Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
223    try {
224      long startCount = counter.get();
225      zombie.start();
226      // Wait till writer starts going.
227      while (startCount == counter.get()) Threads.sleep(1);
228      // Give it a second to write a few appends.
229      Threads.sleep(1000);
230      final Configuration conf2 = HBaseConfiguration.create(conf);
231      final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
232      int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
233        @Override
234        public Integer run() throws Exception {
235          StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
236              .append("):\n");
237          for (FileStatus status : fs.listStatus(WALDIR)) {
238            ls.append("\t").append(status.toString()).append("\n");
239          }
240          LOG.debug(Objects.toString(ls));
241          LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
242          WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
243          LOG.info("Finished splitting out from under zombie.");
244          Path[] logfiles = getLogForRegion(TABLE_NAME, region);
245          assertEquals("wrong number of split files for region", numWriters, logfiles.length);
246          int count = 0;
247          for (Path logfile: logfiles) {
248            count += countWAL(logfile);
249          }
250          return count;
251        }
252      });
253      LOG.info("zombie=" + counter.get() + ", robber=" + count);
254      assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
255              "Zombie could write " + counter.get() + " and logfile had only " + count,
256          counter.get() == count || counter.get() + 1 == count);
257    } finally {
258      stop.set(true);
259      zombie.interrupt();
260      Threads.threadDumpingIsAlive(zombie);
261    }
262  }
263
264  /**
265   * This thread will keep writing to a 'wal' file even after the split process has started.
266   * It simulates a region server that was considered dead but woke up and wrote some more to the
267   * last log entry. Does its writing as an alternate user in another filesystem instance to
268   * simulate better it being a regionserver.
269   */
270  class ZombieLastLogWriterRegionServer extends Thread {
271    final AtomicLong editsCount;
272    final AtomicBoolean stop;
273    final int numOfWriters;
274    /**
275     * Region to write edits for.
276     */
277    final String region;
278    final User user;
279
280    public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
281        final String region, final int writers)
282        throws IOException, InterruptedException {
283      super("ZombieLastLogWriterRegionServer");
284      setDaemon(true);
285      this.stop = stop;
286      this.editsCount = counter;
287      this.region = region;
288      this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
289      numOfWriters = writers;
290    }
291
292    @Override
293    public void run() {
294      try {
295        doWriting();
296      } catch (IOException e) {
297        LOG.warn(getName() + " Writer exiting " + e);
298      } catch (InterruptedException e) {
299        LOG.warn(getName() + " Writer exiting " + e);
300      }
301    }
302
303    private void doWriting() throws IOException, InterruptedException {
304      this.user.runAs(new PrivilegedExceptionAction<Object>() {
305        @Override
306        public Object run() throws Exception {
307          // Index of the WAL we want to keep open.  generateWALs will leave open the WAL whose
308          // index we supply here.
309          int walToKeepOpen = numOfWriters - 1;
310          // The below method writes numOfWriters files each with ENTRIES entries for a total of
311          // numOfWriters * ENTRIES added per column family in the region.
312          Writer writer = null;
313          try {
314            writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
315          } catch (IOException e1) {
316            throw new RuntimeException("Failed", e1);
317          }
318          // Update counter so has all edits written so far.
319          editsCount.addAndGet(numOfWriters * ENTRIES);
320          loop(writer);
321          // If we've been interruped, then things should have shifted out from under us.
322          // closing should error
323          try {
324            writer.close();
325            fail("Writing closing after parsing should give an error.");
326          } catch (IOException exception) {
327            LOG.debug("ignoring error when closing final writer.", exception);
328          }
329          return null;
330        }
331      });
332    }
333
334    private void loop(final Writer writer) {
335      byte [] regionBytes = Bytes.toBytes(this.region);
336      while (!stop.get()) {
337        try {
338          long seq = appendEntry(writer, TABLE_NAME, regionBytes,
339              Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
340          long count = editsCount.incrementAndGet();
341          LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
342          try {
343            Thread.sleep(1);
344          } catch (InterruptedException e) {
345            //
346          }
347        } catch (IOException ex) {
348          LOG.error(getName() + " ex " + ex.toString());
349          if (ex instanceof RemoteException) {
350            LOG.error("Juliet: got RemoteException " + ex.getMessage() +
351                " while writing " + (editsCount.get() + 1));
352          } else {
353            LOG.error(getName() + " failed to write....at " + editsCount.get());
354            fail("Failed to write " + editsCount.get());
355          }
356          break;
357        } catch (Throwable t) {
358          LOG.error(getName() + " HOW? " + t);
359          LOG.debug("exception details", t);
360          break;
361        }
362      }
363      LOG.info(getName() + " Writer exiting");
364    }
365  }
366
367  /**
368   * @see "https://issues.apache.org/jira/browse/HBASE-3020"
369   */
370  @Test
371  public void testRecoveredEditsPathForMeta() throws IOException {
372    Path p = createRecoveredEditsPathForRegion();
373    String parentOfParent = p.getParent().getParent().getName();
374    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
375  }
376
377  /**
378   * Test old recovered edits file doesn't break WALSplitter.
379   * This is useful in upgrading old instances.
380   */
381  @Test
382  public void testOldRecoveredEditsFileSidelined() throws IOException {
383    Path p = createRecoveredEditsPathForRegion();
384    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
385    Path regiondir = new Path(tdir,
386      RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
387    fs.mkdirs(regiondir);
388    Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
389    assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
390    fs.createNewFile(parent); // create a recovered.edits file
391    String parentOfParent = p.getParent().getParent().getName();
392    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
393    WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
394  }
395
396  private Path createRecoveredEditsPathForRegion() throws IOException {
397    byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
398    long now = System.currentTimeMillis();
399    Entry entry = new Entry(
400        new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
401        new WALEdit());
402    Path p = WALSplitUtil
403        .getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT,
404            TMPDIRNAME, conf);
405    return p;
406  }
407
408  @Test
409  public void testHasRecoveredEdits() throws IOException {
410    Path p = createRecoveredEditsPathForRegion();
411    assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
412    String renamedEdit = p.getName().split("-")[0];
413    fs.createNewFile(new Path(p.getParent(), renamedEdit));
414    assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
415  }
416
417  private void useDifferentDFSClient() throws IOException {
418    // make fs act as a different client now
419    // initialize will create a new DFSClient with a new client ID
420    fs.initialize(fs.getUri(), conf);
421  }
422
423  @Test
424  public void testSplitPreservesEdits() throws IOException{
425    final String REGION = "region__1";
426    REGIONS.clear();
427    REGIONS.add(REGION);
428
429    generateWALs(1, 10, -1, 0);
430    useDifferentDFSClient();
431    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
432    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
433    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
434    assertEquals(1, splitLog.length);
435
436    assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
437  }
438
439  @Test
440  public void testSplitRemovesRegionEventsEdits() throws IOException{
441    final String REGION = "region__1";
442    REGIONS.clear();
443    REGIONS.add(REGION);
444
445    generateWALs(1, 10, -1, 100);
446    useDifferentDFSClient();
447    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
448    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
449    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
450    assertEquals(1, splitLog.length);
451
452    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
453    // split log should only have the test edits
454    assertEquals(10, countWAL(splitLog[0]));
455  }
456
457
458  @Test
459  public void testSplitLeavesCompactionEventsEdits() throws IOException{
460    RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
461    REGIONS.clear();
462    REGIONS.add(hri.getEncodedName());
463    Path regionDir =
464      new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
465    LOG.info("Creating region directory: " + regionDir);
466    assertTrue(fs.mkdirs(regionDir));
467
468    Writer writer = generateWALs(1, 10, 0, 10);
469    String[] compactInputs = new String[]{"file1", "file2", "file3"};
470    String compactOutput = "file4";
471    appendCompactionEvent(writer, hri, compactInputs, compactOutput);
472    writer.close();
473
474    useDifferentDFSClient();
475    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
476
477    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
478    // original log should have 10 test edits, 10 region markers, 1 compaction marker
479    assertEquals(21, countWAL(originalLog));
480
481    Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
482    assertEquals(1, splitLog.length);
483
484    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
485    // split log should have 10 test edits plus 1 compaction marker
486    assertEquals(11, countWAL(splitLog[0]));
487  }
488
489  /**
490   * @param expectedEntries -1 to not assert
491   * @return the count across all regions
492   */
493  private int splitAndCount(final int expectedFiles, final int expectedEntries)
494      throws IOException {
495    useDifferentDFSClient();
496    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
497    int result = 0;
498    for (String region : REGIONS) {
499      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
500      assertEquals(expectedFiles, logfiles.length);
501      int count = 0;
502      for (Path logfile: logfiles) {
503        count += countWAL(logfile);
504      }
505      if (-1 != expectedEntries) {
506        assertEquals(expectedEntries, count);
507      }
508      result += count;
509    }
510    return result;
511  }
512
513  @Test
514  public void testEmptyLogFiles() throws IOException {
515    testEmptyLogFiles(true);
516  }
517
518  @Test
519  public void testEmptyOpenLogFiles() throws IOException {
520    testEmptyLogFiles(false);
521  }
522
523  private void testEmptyLogFiles(final boolean close) throws IOException {
524    // we won't create the hlog dir until getWAL got called, so
525    // make dir here when testing empty log file
526    fs.mkdirs(WALDIR);
527    injectEmptyFile(".empty", close);
528    generateWALs(Integer.MAX_VALUE);
529    injectEmptyFile("empty", close);
530    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
531  }
532
533  @Test
534  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
535    // generate logs but leave wal.dat.5 open.
536    generateWALs(5);
537    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
538  }
539
540  @Test
541  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
542    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
543    generateWALs(Integer.MAX_VALUE);
544    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
545        Corruptions.APPEND_GARBAGE, true);
546    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
547  }
548
549  @Test
550  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
551    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
552    generateWALs(Integer.MAX_VALUE);
553    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
554        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
555    splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt
556  }
557
558  @Test
559  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
560    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
561    generateWALs(Integer.MAX_VALUE);
562    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
563        Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
564    // the entries in the original logs are alternating regions
565    // considering the sequence file header, the middle corruption should
566    // affect at least half of the entries
567    int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
568    int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
569    int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
570    assertTrue("The file up to the corrupted area hasn't been parsed",
571        REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
572  }
573
574  @Test
575  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
576    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
577    List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays
578        .asList(FaultyProtobufLogReader.FailureType.values()).stream()
579        .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
580    for (FaultyProtobufLogReader.FailureType failureType : failureTypes) {
581      final Set<String> walDirContents = splitCorruptWALs(failureType);
582      final Set<String> archivedLogs = new HashSet<>();
583      final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
584      for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
585        archived.append("\n\t").append(log.toString());
586        archivedLogs.add(log.getPath().getName());
587      }
588      LOG.debug(archived.toString());
589      assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs,
590          walDirContents);
591    }
592  }
593
594  /**
595   * @return set of wal names present prior to split attempt.
596   * @throws IOException if the split process fails
597   */
598  private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType)
599      throws IOException {
600    Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
601        Reader.class);
602    InstrumentedLogWriter.activateFailure = false;
603
604    try {
605      conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class,
606        Reader.class);
607      conf.set("faultyprotobuflogreader.failuretype", failureType.name());
608      // Clean up from previous tests or previous loop
609      try {
610        wals.shutdown();
611      } catch (IOException exception) {
612        // since we're splitting out from under the factory, we should expect some closing failures.
613        LOG.debug("Ignoring problem closing WALFactory.", exception);
614      }
615      wals.close();
616      try {
617        for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
618          fs.delete(log.getPath(), true);
619        }
620      } catch (FileNotFoundException exception) {
621        LOG.debug("no previous CORRUPTDIR to clean.");
622      }
623      // change to the faulty reader
624      wals = new WALFactory(conf, name.getMethodName());
625      generateWALs(-1);
626      // Our reader will render all of these files corrupt.
627      final Set<String> walDirContents = new HashSet<>();
628      for (FileStatus status : fs.listStatus(WALDIR)) {
629        walDirContents.add(status.getPath().getName());
630      }
631      useDifferentDFSClient();
632      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
633      return walDirContents;
634    } finally {
635      conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
636          Reader.class);
637    }
638  }
639
640  @Test (expected = IOException.class)
641  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
642      throws IOException {
643    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
644    splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
645  }
646
647  @Test
648  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
649      throws IOException {
650    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
651    try {
652      splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
653    } catch (IOException e) {
654      LOG.debug("split with 'skip errors' set to 'false' correctly threw");
655    }
656    assertEquals("if skip.errors is false all files should remain in place",
657        NUM_WRITERS, fs.listStatus(WALDIR).length);
658  }
659
660  private void ignoreCorruption(final Corruptions corruption, final int entryCount,
661      final int expectedCount) throws IOException {
662    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
663
664    final String REGION = "region__1";
665    REGIONS.clear();
666    REGIONS.add(REGION);
667
668    Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
669    generateWALs(1, entryCount, -1, 0);
670    corruptWAL(c1, corruption, true);
671
672    useDifferentDFSClient();
673    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
674
675    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
676    assertEquals(1, splitLog.length);
677
678    int actualCount = 0;
679    Reader in = wals.createReader(fs, splitLog[0]);
680    @SuppressWarnings("unused")
681    Entry entry;
682    while ((entry = in.next()) != null) ++actualCount;
683    assertEquals(expectedCount, actualCount);
684    in.close();
685
686    // should not have stored the EOF files as corrupt
687    FileStatus[] archivedLogs = fs.exists(CORRUPTDIR)? fs.listStatus(CORRUPTDIR): new FileStatus[0];
688    assertEquals(0, archivedLogs.length);
689
690  }
691
692  @Test
693  public void testEOFisIgnored() throws IOException {
694    int entryCount = 10;
695    ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
696  }
697
698  @Test
699  public void testCorruptWALTrailer() throws IOException {
700    int entryCount = 10;
701    ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
702  }
703
704  @Test
705  public void testLogsGetArchivedAfterSplit() throws IOException {
706    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
707    generateWALs(-1);
708    useDifferentDFSClient();
709    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
710    FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
711    assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
712  }
713
714  @Test
715  public void testSplit() throws IOException {
716    generateWALs(-1);
717    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
718  }
719
720  @Test
721  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
722      throws IOException {
723    generateWALs(-1);
724    useDifferentDFSClient();
725    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
726    FileStatus [] statuses = null;
727    try {
728      statuses = fs.listStatus(WALDIR);
729      if (statuses != null) {
730        fail("Files left in log dir: " +
731            Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
732      }
733    } catch (FileNotFoundException e) {
734      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
735    }
736  }
737
738  @Test(expected = IOException.class)
739  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
740    //leave 5th log open so we could append the "trap"
741    Writer writer = generateWALs(4);
742    useDifferentDFSClient();
743
744    String region = "break";
745    Path regiondir = new Path(TABLEDIR, region);
746    fs.mkdirs(regiondir);
747
748    InstrumentedLogWriter.activateFailure = false;
749    appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
750        Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0);
751    writer.close();
752
753    try {
754      InstrumentedLogWriter.activateFailure = true;
755      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
756    } catch (IOException e) {
757      assertTrue(e.getMessage().
758          contains("This exception is instrumented and should only be thrown for testing"));
759      throw e;
760    } finally {
761      InstrumentedLogWriter.activateFailure = false;
762    }
763  }
764
765  @Test
766  public void testSplitDeletedRegion() throws IOException {
767    REGIONS.clear();
768    String region = "region_that_splits";
769    REGIONS.add(region);
770
771    generateWALs(1);
772    useDifferentDFSClient();
773
774    Path regiondir = new Path(TABLEDIR, region);
775    fs.delete(regiondir, true);
776    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
777    assertFalse(fs.exists(regiondir));
778  }
779
780  @Test
781  public void testIOEOnOutputThread() throws Exception {
782    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
783
784    generateWALs(-1);
785    useDifferentDFSClient();
786    FileStatus[] logfiles = fs.listStatus(WALDIR);
787    assertTrue("There should be some log file",
788        logfiles != null && logfiles.length > 0);
789    // wals with no entries (like the one we don't use in the factory)
790    // won't cause a failure since nothing will ever be written.
791    // pick the largest one since it's most likely to have entries.
792    int largestLogFile = 0;
793    long largestSize = 0;
794    for (int i = 0; i < logfiles.length; i++) {
795      if (logfiles[i].getLen() > largestSize) {
796        largestLogFile = i;
797        largestSize = logfiles[i].getLen();
798      }
799    }
800    assertTrue("There should be some log greater than size 0.", 0 < largestSize);
801    // Set up a splitter that will throw an IOE on the output side
802    WALSplitter logSplitter =
803      new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
804        @Override
805        protected Writer createWriter(Path logfile) throws IOException {
806          Writer mockWriter = Mockito.mock(Writer.class);
807          Mockito.doThrow(new IOException("Injected")).when(mockWriter)
808              .append(Mockito.<Entry> any());
809          return mockWriter;
810        }
811      };
812    // Set up a background thread dumper.  Needs a thread to depend on and then we need to run
813    // the thread dumping in a background thread so it does not hold up the test.
814    final AtomicBoolean stop = new AtomicBoolean(false);
815    final Thread someOldThread = new Thread("Some-old-thread") {
816      @Override
817      public void run() {
818        while(!stop.get()) Threads.sleep(10);
819      }
820    };
821    someOldThread.setDaemon(true);
822    someOldThread.start();
823    final Thread t = new Thread("Background-thread-dumper") {
824      @Override
825      public void run() {
826        try {
827          Threads.threadDumpingIsAlive(someOldThread);
828        } catch (InterruptedException e) {
829          e.printStackTrace();
830        }
831      }
832    };
833    t.setDaemon(true);
834    t.start();
835    try {
836      logSplitter.splitWAL(logfiles[largestLogFile], null);
837      fail("Didn't throw!");
838    } catch (IOException ioe) {
839      assertTrue(ioe.toString().contains("Injected"));
840    } finally {
841      // Setting this to true will turn off the background thread dumper.
842      stop.set(true);
843    }
844  }
845
846  /**
847   * @param spiedFs should be instrumented for failure.
848   */
849  private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
850    generateWALs(-1);
851    useDifferentDFSClient();
852
853    try {
854      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
855      assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
856      assertFalse(fs.exists(WALDIR));
857    } catch (IOException e) {
858      fail("There shouldn't be any exception but: " + e.toString());
859    }
860  }
861
862  // Test for HBASE-3412
863  @Test
864  public void testMovedWALDuringRecovery() throws Exception {
865    // This partial mock will throw LEE for every file simulating
866    // files that were moved
867    FileSystem spiedFs = Mockito.spy(fs);
868    // The "File does not exist" part is very important,
869    // that's how it comes out of HDFS
870    Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
871        when(spiedFs).append(Mockito.<Path>any());
872    retryOverHdfsProblem(spiedFs);
873  }
874
875  @Test
876  public void testRetryOpenDuringRecovery() throws Exception {
877    FileSystem spiedFs = Mockito.spy(fs);
878    // The "Cannot obtain block length", "Could not obtain the last block",
879    // and "Blocklist for [^ ]* has changed.*" part is very important,
880    // that's how it comes out of HDFS. If HDFS changes the exception
881    // message, this test needs to be adjusted accordingly.
882    //
883    // When DFSClient tries to open a file, HDFS needs to locate
884    // the last block of the file and get its length. However, if the
885    // last block is under recovery, HDFS may have problem to obtain
886    // the block length, in which case, retry may help.
887    Mockito.doAnswer(new Answer<FSDataInputStream>() {
888      private final String[] errors = new String[] {
889          "Cannot obtain block length", "Could not obtain the last block",
890          "Blocklist for " + OLDLOGDIR + " has changed"};
891      private int count = 0;
892
893      @Override
894      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
895        if (count < 3) {
896          throw new IOException(errors[count++]);
897        }
898        return (FSDataInputStream)invocation.callRealMethod();
899      }
900    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
901    retryOverHdfsProblem(spiedFs);
902  }
903
904  @Test
905  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
906    generateWALs(1, 10, -1);
907    FileStatus logfile = fs.listStatus(WALDIR)[0];
908    useDifferentDFSClient();
909
910    final AtomicInteger count = new AtomicInteger();
911
912    CancelableProgressable localReporter
913        = new CancelableProgressable() {
914      @Override
915      public boolean progress() {
916        count.getAndIncrement();
917        return false;
918      }
919    };
920
921    FileSystem spiedFs = Mockito.spy(fs);
922    Mockito.doAnswer(new Answer<FSDataInputStream>() {
923      @Override
924      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
925        Thread.sleep(1500); // Sleep a while and wait report status invoked
926        return (FSDataInputStream)invocation.callRealMethod();
927      }
928    }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
929
930    try {
931      conf.setInt("hbase.splitlog.report.period", 1000);
932      boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
933        Mockito.mock(SplitLogWorkerCoordination.class), wals, null);
934      assertFalse("Log splitting should failed", ret);
935      assertTrue(count.get() > 0);
936    } catch (IOException e) {
937      fail("There shouldn't be any exception but: " + e.toString());
938    } finally {
939      // reset it back to its default value
940      conf.setInt("hbase.splitlog.report.period", 59000);
941    }
942  }
943
944  /**
945   * Test log split process with fake data and lots of edits to trigger threading
946   * issues.
947   */
948  @Test
949  public void testThreading() throws Exception {
950    doTestThreading(20000, 128*1024*1024, 0);
951  }
952
953  /**
954   * Test blocking behavior of the log split process if writers are writing slower
955   * than the reader is reading.
956   */
957  @Test
958  public void testThreadingSlowWriterSmallBuffer() throws Exception {
959    doTestThreading(200, 1024, 50);
960  }
961
962  /**
963   * Sets up a log splitter with a mock reader and writer. The mock reader generates
964   * a specified number of edits spread across 5 regions. The mock writer optionally
965   * sleeps for each edit it is fed.
966   * *
967   * After the split is complete, verifies that the statistics show the correct number
968   * of edits output into each region.
969   *
970   * @param numFakeEdits number of fake edits to push through pipeline
971   * @param bufferSize size of in-memory buffer
972   * @param writerSlowness writer threads will sleep this many ms per edit
973   */
974  private void doTestThreading(final int numFakeEdits,
975      final int bufferSize,
976      final int writerSlowness) throws Exception {
977
978    Configuration localConf = new Configuration(conf);
979    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
980
981    // Create a fake log file (we'll override the reader to produce a stream of edits)
982    Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
983    FSDataOutputStream out = fs.create(logPath);
984    out.close();
985
986    // Make region dirs for our destination regions so the output doesn't get skipped
987    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
988    makeRegionDirs(regions);
989
990    // Create a splitter that reads and writes the data without touching disk
991    WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) {
992      /* Produce a mock writer that doesn't write anywhere */
993      @Override
994      protected Writer createWriter(Path logfile) throws IOException {
995        Writer mockWriter = Mockito.mock(Writer.class);
996        Mockito.doAnswer(new Answer<Void>() {
997          int expectedIndex = 0;
998
999          @Override
1000          public Void answer(InvocationOnMock invocation) {
1001            if (writerSlowness > 0) {
1002              try {
1003                Thread.sleep(writerSlowness);
1004              } catch (InterruptedException ie) {
1005                Thread.currentThread().interrupt();
1006              }
1007            }
1008            Entry entry = (Entry) invocation.getArgument(0);
1009            WALEdit edit = entry.getEdit();
1010            List<Cell> cells = edit.getCells();
1011            assertEquals(1, cells.size());
1012            Cell cell = cells.get(0);
1013
1014            // Check that the edits come in the right order.
1015            assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(),
1016                cell.getRowLength()));
1017            expectedIndex++;
1018            return null;
1019          }
1020        }).when(mockWriter).append(Mockito.<Entry>any());
1021        return mockWriter;
1022      }
1023
1024      /* Produce a mock reader that generates fake entries */
1025      @Override
1026      protected Reader getReader(FileStatus file, boolean skipErrors,
1027        CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
1028        Reader mockReader = Mockito.mock(Reader.class);
1029        Mockito.doAnswer(new Answer<Entry>() {
1030          int index = 0;
1031
1032          @Override
1033          public Entry answer(InvocationOnMock invocation) throws Throwable {
1034            if (index >= numFakeEdits) return null;
1035
1036            // Generate r0 through r4 in round robin fashion
1037            int regionIdx = index % regions.size();
1038            byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
1039
1040            Entry ret = createTestEntry(TABLE_NAME, region,
1041                Bytes.toBytes(index / regions.size()),
1042                FAMILY, QUALIFIER, VALUE, index);
1043            index++;
1044            return ret;
1045          }
1046        }).when(mockReader).next();
1047        return mockReader;
1048      }
1049    };
1050
1051    logSplitter.splitWAL(fs.getFileStatus(logPath), null);
1052
1053    // Verify number of written edits per region
1054    Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1055    for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
1056      LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey());
1057      assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
1058    }
1059    assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1060  }
1061
1062  // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1063  @Test
1064  public void testSplitLogFileDeletedRegionDir() throws IOException {
1065    LOG.info("testSplitLogFileDeletedRegionDir");
1066    final String REGION = "region__1";
1067    REGIONS.clear();
1068    REGIONS.add(REGION);
1069
1070    generateWALs(1, 10, -1);
1071    useDifferentDFSClient();
1072
1073    Path regiondir = new Path(TABLEDIR, REGION);
1074    LOG.info("Region directory is" + regiondir);
1075    fs.delete(regiondir, true);
1076    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1077    assertFalse(fs.exists(regiondir));
1078  }
1079
1080  @Test
1081  public void testSplitLogFileEmpty() throws IOException {
1082    LOG.info("testSplitLogFileEmpty");
1083    // we won't create the hlog dir until getWAL got called, so
1084    // make dir here when testing empty log file
1085    fs.mkdirs(WALDIR);
1086    injectEmptyFile(".empty", true);
1087    useDifferentDFSClient();
1088
1089    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1090    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1091    assertFalse(fs.exists(tdir));
1092
1093    assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1094  }
1095
1096  @Test
1097  public void testSplitLogFileMultipleRegions() throws IOException {
1098    LOG.info("testSplitLogFileMultipleRegions");
1099    generateWALs(1, 10, -1);
1100    splitAndCount(1, 10);
1101  }
1102
1103  @Test
1104  public void testSplitLogFileFirstLineCorruptionLog()
1105      throws IOException {
1106    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
1107    generateWALs(1, 10, -1);
1108    FileStatus logfile = fs.listStatus(WALDIR)[0];
1109
1110    corruptWAL(logfile.getPath(),
1111        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1112
1113    useDifferentDFSClient();
1114    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1115
1116    final Path corruptDir =
1117      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1118    assertEquals(1, fs.listStatus(corruptDir).length);
1119  }
1120
1121  /**
1122   * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1123   */
1124  @Test
1125  public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1126    LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1127    // Generate wals for our destination region
1128    String regionName = "r0";
1129    final Path regiondir = new Path(TABLEDIR, regionName);
1130    REGIONS.clear();
1131    REGIONS.add(regionName);
1132    generateWALs(-1);
1133
1134    wals.getWAL(null);
1135    FileStatus[] logfiles = fs.listStatus(WALDIR);
1136    assertTrue("There should be some log file",
1137        logfiles != null && logfiles.length > 0);
1138
1139    WALSplitter logSplitter =
1140        new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
1141      @Override
1142      protected Writer createWriter(Path logfile)
1143          throws IOException {
1144        Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1145        // After creating writer, simulate region's
1146        // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1147        // region and delete them, excluding files with '.temp' suffix.
1148        NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
1149        if (files != null && !files.isEmpty()) {
1150          for (Path file : files) {
1151            if (!this.walFS.delete(file, false)) {
1152              LOG.error("Failed delete of " + file);
1153            } else {
1154              LOG.debug("Deleted recovered.edits file=" + file);
1155            }
1156          }
1157        }
1158        return writer;
1159      }
1160    };
1161    try{
1162      logSplitter.splitWAL(logfiles[0], null);
1163    } catch (IOException e) {
1164      LOG.info(e.toString(), e);
1165      fail("Throws IOException when spliting "
1166          + "log, it is most likely because writing file does not "
1167          + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1168    }
1169    if (fs.exists(CORRUPTDIR)) {
1170      if (fs.listStatus(CORRUPTDIR).length > 0) {
1171        fail("There are some corrupt logs, "
1172            + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1173      }
1174    }
1175  }
1176
1177  private Writer generateWALs(int leaveOpen) throws IOException {
1178    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1179  }
1180
1181  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1182    return generateWALs(writers, entries, leaveOpen, 7);
1183  }
1184
1185  private void makeRegionDirs(List<String> regions) throws IOException {
1186    for (String region : regions) {
1187      LOG.debug("Creating dir for region " + region);
1188      fs.mkdirs(new Path(TABLEDIR, region));
1189    }
1190  }
1191
1192  /**
1193   * @param leaveOpen index to leave un-closed. -1 to close all.
1194   * @return the writer that's still open, or null if all were closed.
1195   */
1196  private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException {
1197    makeRegionDirs(REGIONS);
1198    fs.mkdirs(WALDIR);
1199    Writer [] ws = new Writer[writers];
1200    int seq = 0;
1201    int numRegionEventsAdded = 0;
1202    for (int i = 0; i < writers; i++) {
1203      ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1204      for (int j = 0; j < entries; j++) {
1205        int prefix = 0;
1206        for (String region : REGIONS) {
1207          String row_key = region + prefix++ + i + j;
1208          appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1209              QUALIFIER, VALUE, seq++);
1210
1211          if (numRegionEventsAdded < regionEvents) {
1212            numRegionEventsAdded ++;
1213            appendRegionEvent(ws[i], region);
1214          }
1215        }
1216      }
1217      if (i != leaveOpen) {
1218        ws[i].close();
1219        LOG.info("Closing writer " + i);
1220      }
1221    }
1222    if (leaveOpen < 0 || leaveOpen >= writers) {
1223      return null;
1224    }
1225    return ws[leaveOpen];
1226  }
1227
1228
1229
1230  private Path[] getLogForRegion(TableName table, String region)
1231      throws IOException {
1232    Path tdir = CommonFSUtils.getWALTableDir(conf, table);
1233    @SuppressWarnings("deprecation")
1234    Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1235        Bytes.toString(Bytes.toBytes(region))));
1236    FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1237      @Override
1238      public boolean accept(Path p) {
1239        if (WALSplitUtil.isSequenceIdFile(p)) {
1240          return false;
1241        }
1242        return true;
1243      }
1244    });
1245    Path[] paths = new Path[files.length];
1246    for (int i = 0; i < files.length; i++) {
1247      paths[i] = files[i].getPath();
1248    }
1249    return paths;
1250  }
1251
1252  private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1253    FSDataOutputStream out;
1254    int fileSize = (int) fs.listStatus(path)[0].getLen();
1255
1256    FSDataInputStream in = fs.open(path);
1257    byte[] corrupted_bytes = new byte[fileSize];
1258    in.readFully(0, corrupted_bytes, 0, fileSize);
1259    in.close();
1260
1261    switch (corruption) {
1262      case APPEND_GARBAGE:
1263        fs.delete(path, false);
1264        out = fs.create(path);
1265        out.write(corrupted_bytes);
1266        out.write(Bytes.toBytes("-----"));
1267        closeOrFlush(close, out);
1268        break;
1269
1270      case INSERT_GARBAGE_ON_FIRST_LINE:
1271        fs.delete(path, false);
1272        out = fs.create(path);
1273        out.write(0);
1274        out.write(corrupted_bytes);
1275        closeOrFlush(close, out);
1276        break;
1277
1278      case INSERT_GARBAGE_IN_THE_MIDDLE:
1279        fs.delete(path, false);
1280        out = fs.create(path);
1281        int middle = (int) Math.floor(corrupted_bytes.length / 2);
1282        out.write(corrupted_bytes, 0, middle);
1283        out.write(0);
1284        out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1285        closeOrFlush(close, out);
1286        break;
1287
1288      case TRUNCATE:
1289        fs.delete(path, false);
1290        out = fs.create(path);
1291        out.write(corrupted_bytes, 0, fileSize
1292            - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1293        closeOrFlush(close, out);
1294        break;
1295
1296      case TRUNCATE_TRAILER:
1297        fs.delete(path, false);
1298        out = fs.create(path);
1299        out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1300        closeOrFlush(close, out);
1301        break;
1302    }
1303  }
1304
1305  private void closeOrFlush(boolean close, FSDataOutputStream out)
1306      throws IOException {
1307    if (close) {
1308      out.close();
1309    } else {
1310      Method syncMethod = null;
1311      try {
1312        syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1313      } catch (NoSuchMethodException e) {
1314        try {
1315          syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1316        } catch (NoSuchMethodException ex) {
1317          throw new IOException("This version of Hadoop supports " +
1318              "neither Syncable.sync() nor Syncable.hflush().");
1319        }
1320      }
1321      try {
1322        syncMethod.invoke(out, new Object[]{});
1323      } catch (Exception e) {
1324        throw new IOException(e);
1325      }
1326      // Not in 0out.hflush();
1327    }
1328  }
1329
1330  private int countWAL(Path log) throws IOException {
1331    int count = 0;
1332    Reader in = wals.createReader(fs, log);
1333    while (in.next() != null) {
1334      count++;
1335    }
1336    in.close();
1337    return count;
1338  }
1339
1340  private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1341      String output) throws IOException {
1342    WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1343    desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1344        .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1345        .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1346        .setFamilyName(ByteString.copyFrom(FAMILY))
1347        .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1348        .addAllCompactionInput(Arrays.asList(inputs))
1349        .addCompactionOutput(output);
1350
1351    WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1352    WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1353        EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1354    w.append(new Entry(key, edit));
1355    w.sync(false);
1356  }
1357
1358  private static void appendRegionEvent(Writer w, String region) throws IOException {
1359    WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1360        WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
1361        TABLE_NAME.toBytes(),
1362        Bytes.toBytes(region),
1363        Bytes.toBytes(String.valueOf(region.hashCode())),
1364        1,
1365        ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
1366    final long time = EnvironmentEdgeManager.currentTime();
1367    final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time,
1368        HConstants.DEFAULT_CLUSTER_ID);
1369    WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
1370    w.append(new Entry(walKey, we));
1371    w.sync(false);
1372  }
1373
1374  public static long appendEntry(Writer writer, TableName table, byte[] region,
1375      byte[] row, byte[] family, byte[] qualifier,
1376      byte[] value, long seq)
1377      throws IOException {
1378    LOG.info(Thread.currentThread().getName() + " append");
1379    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1380    LOG.info(Thread.currentThread().getName() + " sync");
1381    writer.sync(false);
1382    return seq;
1383  }
1384
1385  private static Entry createTestEntry(
1386      TableName table, byte[] region,
1387      byte[] row, byte[] family, byte[] qualifier,
1388      byte[] value, long seq) {
1389    long time = System.nanoTime();
1390
1391    seq++;
1392    final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1393    WALEdit edit = new WALEdit();
1394    edit.add(cell);
1395    return new Entry(new WALKeyImpl(region, table, seq, time,
1396        HConstants.DEFAULT_CLUSTER_ID), edit);
1397  }
1398
1399  private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1400    Writer writer =
1401        WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1402    if (closeFile) {
1403      writer.close();
1404    }
1405  }
1406
1407  private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1408    Reader in1, in2;
1409    in1 = wals.createReader(fs, p1);
1410    in2 = wals.createReader(fs, p2);
1411    Entry entry1;
1412    Entry entry2;
1413    while ((entry1 = in1.next()) != null) {
1414      entry2 = in2.next();
1415      if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1416          (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1417        return false;
1418      }
1419    }
1420    in1.close();
1421    in2.close();
1422    return true;
1423  }
1424}