001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.junit.Assume.assumeFalse;
025
026import java.io.FileNotFoundException;
027import java.io.IOException;
028import java.lang.reflect.Method;
029import java.security.PrivilegedExceptionAction;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.NavigableSet;
038import java.util.Objects;
039import java.util.Set;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.stream.Collectors;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FSDataInputStream;
046import org.apache.hadoop.fs.FSDataOutputStream;
047import org.apache.hadoop.fs.FileStatus;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.FileUtil;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseClassTestRule;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HBaseTestingUtility;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.KeyValue;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.client.RegionInfo;
061import org.apache.hadoop.hbase.client.RegionInfoBuilder;
062import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
063import org.apache.hadoop.hbase.regionserver.HRegion;
064import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
065import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
066import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
067import org.apache.hadoop.hbase.security.User;
068import org.apache.hadoop.hbase.testclassification.LargeTests;
069import org.apache.hadoop.hbase.testclassification.RegionServerTests;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.CancelableProgressable;
072import org.apache.hadoop.hbase.util.CommonFSUtils;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
074import org.apache.hadoop.hbase.util.Threads;
075import org.apache.hadoop.hbase.wal.WAL.Entry;
076import org.apache.hadoop.hbase.wal.WAL.Reader;
077import org.apache.hadoop.hbase.wal.WALProvider.Writer;
078import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
079import org.apache.hadoop.hdfs.DFSTestUtil;
080import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
081import org.apache.hadoop.ipc.RemoteException;
082import org.junit.After;
083import org.junit.AfterClass;
084import org.junit.Before;
085import org.junit.BeforeClass;
086import org.junit.ClassRule;
087import org.junit.Rule;
088import org.junit.Test;
089import org.junit.experimental.categories.Category;
090import org.junit.rules.TestName;
091import org.mockito.Mockito;
092import org.mockito.invocation.InvocationOnMock;
093import org.mockito.stubbing.Answer;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
098import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
099import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
100import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
101
102import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
104
105/**
106 * Testing {@link WAL} splitting code.
107 */
108@Category({ RegionServerTests.class, LargeTests.class })
109public class TestWALSplit {
110  @ClassRule
111  public static final HBaseClassTestRule CLASS_RULE =
112    HBaseClassTestRule.forClass(TestWALSplit.class);
113  private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
114
115  private static Configuration conf;
116  private FileSystem fs;
117
118  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
119
120  private Path HBASEDIR;
121  private Path HBASELOGDIR;
122  private Path WALDIR;
123  private Path OLDLOGDIR;
124  private Path CORRUPTDIR;
125  private Path TABLEDIR;
126  private String TMPDIRNAME;
127
128  private static final int NUM_WRITERS = 10;
129  private static final int ENTRIES = 10; // entries per writer per region
130
131  private static final String FILENAME_BEING_SPLIT = "testfile";
132  private static final TableName TABLE_NAME = TableName.valueOf("t1");
133  private static final byte[] FAMILY = Bytes.toBytes("f1");
134  private static final byte[] QUALIFIER = Bytes.toBytes("q1");
135  private static final byte[] VALUE = Bytes.toBytes("v1");
136  private static final String WAL_FILE_PREFIX = "wal.dat.";
137  private static List<String> REGIONS = new ArrayList<>();
138  private static String ROBBER;
139  private static String ZOMBIE;
140  private static String[] GROUP = new String[] { "supergroup" };
141
142  static enum Corruptions {
143    INSERT_GARBAGE_ON_FIRST_LINE,
144    INSERT_GARBAGE_IN_THE_MIDDLE,
145    APPEND_GARBAGE,
146    TRUNCATE,
147    TRUNCATE_TRAILER
148  }
149
150  @BeforeClass
151  public static void setUpBeforeClass() throws Exception {
152    conf = TEST_UTIL.getConfiguration();
153    conf.setClass("hbase.regionserver.hlog.writer.impl", InstrumentedLogWriter.class, Writer.class);
154    // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
155    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
156    // Create fake maping user to group and set it to the conf.
157    Map<String, String[]> u2g_map = new HashMap<>(2);
158    ROBBER = User.getCurrent().getName() + "-robber";
159    ZOMBIE = User.getCurrent().getName() + "-zombie";
160    u2g_map.put(ROBBER, GROUP);
161    u2g_map.put(ZOMBIE, GROUP);
162    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
163    conf.setInt("dfs.heartbeat.interval", 1);
164    TEST_UTIL.startMiniDFSCluster(2);
165  }
166
167  @AfterClass
168  public static void tearDownAfterClass() throws Exception {
169    TEST_UTIL.shutdownMiniDFSCluster();
170  }
171
172  @Rule
173  public TestName name = new TestName();
174  private WALFactory wals = null;
175
176  @Before
177  public void setUp() throws Exception {
178    LOG.info("Cleaning up cluster for new test.");
179    fs = TEST_UTIL.getDFSCluster().getFileSystem();
180    HBASEDIR = TEST_UTIL.createRootDir();
181    HBASELOGDIR = TEST_UTIL.createWALRootDir();
182    OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
183    CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
184    TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
185    TMPDIRNAME =
186      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
187    REGIONS.clear();
188    Collections.addAll(REGIONS, "bbb", "ccc");
189    InstrumentedLogWriter.activateFailure = false;
190    wals = new WALFactory(conf, name.getMethodName());
191    WALDIR = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(ServerName
192      .valueOf(name.getMethodName(), 16010, EnvironmentEdgeManager.currentTime()).toString()));
193    // fs.mkdirs(WALDIR);
194  }
195
196  @After
197  public void tearDown() throws Exception {
198    try {
199      wals.close();
200    } catch (IOException exception) {
201      // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
202      LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if"
203        + " you see a failure look here.");
204      LOG.debug("exception details", exception);
205    } finally {
206      wals = null;
207      fs.delete(HBASEDIR, true);
208      fs.delete(HBASELOGDIR, true);
209    }
210  }
211
212  /**
213   * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
214   * Ensures we do not lose edits.
215   */
216  @Test
217  public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
218    final AtomicLong counter = new AtomicLong(0);
219    AtomicBoolean stop = new AtomicBoolean(false);
220    // Region we'll write edits too and then later examine to make sure they all made it in.
221    final String region = REGIONS.get(0);
222    final int numWriters = 3;
223    Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
224    try {
225      long startCount = counter.get();
226      zombie.start();
227      // Wait till writer starts going.
228      while (startCount == counter.get())
229        Threads.sleep(1);
230      // Give it a second to write a few appends.
231      Threads.sleep(1000);
232      final Configuration conf2 = HBaseConfiguration.create(conf);
233      final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
234      int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
235        @Override
236        public Integer run() throws Exception {
237          StringBuilder ls =
238            new StringBuilder("Contents of WALDIR (").append(WALDIR).append("):\n");
239          for (FileStatus status : fs.listStatus(WALDIR)) {
240            ls.append("\t").append(status.toString()).append("\n");
241          }
242          LOG.debug(Objects.toString(ls));
243          LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
244          WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
245          LOG.info("Finished splitting out from under zombie.");
246          Path[] logfiles = getLogForRegion(TABLE_NAME, region);
247          assertEquals("wrong number of split files for region", numWriters, logfiles.length);
248          int count = 0;
249          for (Path logfile : logfiles) {
250            count += countWAL(logfile);
251          }
252          return count;
253        }
254      });
255      LOG.info("zombie=" + counter.get() + ", robber=" + count);
256      assertTrue(
257        "The log file could have at most 1 extra log entry, but can't have less. "
258          + "Zombie could write " + counter.get() + " and logfile had only " + count,
259        counter.get() == count || counter.get() + 1 == count);
260    } finally {
261      stop.set(true);
262      zombie.interrupt();
263      Threads.threadDumpingIsAlive(zombie);
264    }
265  }
266
267  /**
268   * This thread will keep writing to a 'wal' file even after the split process has started. It
269   * simulates a region server that was considered dead but woke up and wrote some more to the last
270   * log entry. Does its writing as an alternate user in another filesystem instance to simulate
271   * better it being a regionserver.
272   */
273  class ZombieLastLogWriterRegionServer extends Thread {
274    final AtomicLong editsCount;
275    final AtomicBoolean stop;
276    final int numOfWriters;
277    /**
278     * Region to write edits for.
279     */
280    final String region;
281    final User user;
282
283    public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
284      final String region, final int writers) throws IOException, InterruptedException {
285      super("ZombieLastLogWriterRegionServer");
286      setDaemon(true);
287      this.stop = stop;
288      this.editsCount = counter;
289      this.region = region;
290      this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
291      numOfWriters = writers;
292    }
293
294    @Override
295    public void run() {
296      try {
297        doWriting();
298      } catch (IOException e) {
299        LOG.warn(getName() + " Writer exiting " + e);
300      } catch (InterruptedException e) {
301        LOG.warn(getName() + " Writer exiting " + e);
302      }
303    }
304
305    private void doWriting() throws IOException, InterruptedException {
306      this.user.runAs(new PrivilegedExceptionAction<Object>() {
307        @Override
308        public Object run() throws Exception {
309          // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose
310          // index we supply here.
311          int walToKeepOpen = numOfWriters - 1;
312          // The below method writes numOfWriters files each with ENTRIES entries for a total of
313          // numOfWriters * ENTRIES added per column family in the region.
314          Writer writer = null;
315          try {
316            writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
317          } catch (IOException e1) {
318            throw new RuntimeException("Failed", e1);
319          }
320          // Update counter so has all edits written so far.
321          editsCount.addAndGet(numOfWriters * ENTRIES);
322          loop(writer);
323          // If we've been interruped, then things should have shifted out from under us.
324          // closing should error
325          try {
326            writer.close();
327            fail("Writing closing after parsing should give an error.");
328          } catch (IOException exception) {
329            LOG.debug("ignoring error when closing final writer.", exception);
330          }
331          return null;
332        }
333      });
334    }
335
336    private void loop(final Writer writer) {
337      byte[] regionBytes = Bytes.toBytes(this.region);
338      while (!stop.get()) {
339        try {
340          long seq = appendEntry(writer, TABLE_NAME, regionBytes,
341            Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
342          long count = editsCount.incrementAndGet();
343          LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
344          try {
345            Thread.sleep(1);
346          } catch (InterruptedException e) {
347            //
348          }
349        } catch (IOException ex) {
350          LOG.error(getName() + " ex " + ex.toString());
351          if (ex instanceof RemoteException) {
352            LOG.error("Juliet: got RemoteException " + ex.getMessage() + " while writing "
353              + (editsCount.get() + 1));
354          } else {
355            LOG.error(getName() + " failed to write....at " + editsCount.get());
356            fail("Failed to write " + editsCount.get());
357          }
358          break;
359        } catch (Throwable t) {
360          LOG.error(getName() + " HOW? " + t);
361          LOG.debug("exception details", t);
362          break;
363        }
364      }
365      LOG.info(getName() + " Writer exiting");
366    }
367  }
368
369  /**
370   * @see "https://issues.apache.org/jira/browse/HBASE-3020"
371   */
372  @Test
373  public void testRecoveredEditsPathForMeta() throws IOException {
374    Path p = createRecoveredEditsPathForRegion();
375    String parentOfParent = p.getParent().getParent().getName();
376    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
377  }
378
379  /**
380   * Test old recovered edits file doesn't break WALSplitter. This is useful in upgrading old
381   * instances.
382   */
383  @Test
384  public void testOldRecoveredEditsFileSidelined() throws IOException {
385    Path p = createRecoveredEditsPathForRegion();
386    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
387    Path regiondir = new Path(tdir, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
388    fs.mkdirs(regiondir);
389    Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
390    assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
391    fs.createNewFile(parent); // create a recovered.edits file
392    String parentOfParent = p.getParent().getParent().getName();
393    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
394    WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
395  }
396
397  private Path createRecoveredEditsPathForRegion() throws IOException {
398    byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
399    long now = EnvironmentEdgeManager.currentTime();
400    Entry entry = new Entry(
401      new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
402      new WALEdit());
403    Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
404      FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
405    return p;
406  }
407
408  @Test
409  public void testHasRecoveredEdits() throws IOException {
410    Path p = createRecoveredEditsPathForRegion();
411    assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
412    String renamedEdit = p.getName().split("-")[0];
413    fs.createNewFile(new Path(p.getParent(), renamedEdit));
414    assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
415  }
416
417  private void useDifferentDFSClient() throws IOException {
418    // make fs act as a different client now
419    // initialize will create a new DFSClient with a new client ID
420    fs.initialize(fs.getUri(), conf);
421  }
422
423  @Test
424  public void testSplitPreservesEdits() throws IOException {
425    final String REGION = "region__1";
426    REGIONS.clear();
427    REGIONS.add(REGION);
428
429    generateWALs(1, 10, -1, 0);
430    useDifferentDFSClient();
431    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
432    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
433    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
434    assertEquals(1, splitLog.length);
435
436    assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
437  }
438
439  @Test
440  public void testSplitRemovesRegionEventsEdits() throws IOException {
441    final String REGION = "region__1";
442    REGIONS.clear();
443    REGIONS.add(REGION);
444
445    generateWALs(1, 10, -1, 100);
446    useDifferentDFSClient();
447    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
448    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
449    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
450    assertEquals(1, splitLog.length);
451
452    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
453    // split log should only have the test edits
454    assertEquals(10, countWAL(splitLog[0]));
455  }
456
457  @Test
458  public void testSplitLeavesCompactionEventsEdits() throws IOException {
459    RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
460    REGIONS.clear();
461    REGIONS.add(hri.getEncodedName());
462    Path regionDir =
463      new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
464    LOG.info("Creating region directory: " + regionDir);
465    assertTrue(fs.mkdirs(regionDir));
466
467    Writer writer = generateWALs(1, 10, 0, 10);
468    String[] compactInputs = new String[] { "file1", "file2", "file3" };
469    String compactOutput = "file4";
470    appendCompactionEvent(writer, hri, compactInputs, compactOutput);
471    writer.close();
472
473    useDifferentDFSClient();
474    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
475
476    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
477    // original log should have 10 test edits, 10 region markers, 1 compaction marker
478    assertEquals(21, countWAL(originalLog));
479
480    Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
481    assertEquals(1, splitLog.length);
482
483    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
484    // split log should have 10 test edits plus 1 compaction marker
485    assertEquals(11, countWAL(splitLog[0]));
486  }
487
488  /**
489   * @param expectedEntries -1 to not assert
490   * @return the count across all regions
491   */
492  private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException {
493    useDifferentDFSClient();
494    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
495    int result = 0;
496    for (String region : REGIONS) {
497      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
498      assertEquals(expectedFiles, logfiles.length);
499      int count = 0;
500      for (Path logfile : logfiles) {
501        count += countWAL(logfile);
502      }
503      if (-1 != expectedEntries) {
504        assertEquals(expectedEntries, count);
505      }
506      result += count;
507    }
508    return result;
509  }
510
511  @Test
512  public void testEmptyLogFiles() throws IOException {
513    testEmptyLogFiles(true);
514  }
515
516  @Test
517  public void testEmptyOpenLogFiles() throws IOException {
518    testEmptyLogFiles(false);
519  }
520
521  private void testEmptyLogFiles(final boolean close) throws IOException {
522    // we won't create the hlog dir until getWAL got called, so
523    // make dir here when testing empty log file
524    fs.mkdirs(WALDIR);
525    injectEmptyFile(".empty", close);
526    generateWALs(Integer.MAX_VALUE);
527    injectEmptyFile("empty", close);
528    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
529  }
530
531  @Test
532  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
533    // generate logs but leave wal.dat.5 open.
534    generateWALs(5);
535    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
536  }
537
538  @Test
539  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
540    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
541    generateWALs(Integer.MAX_VALUE);
542    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.APPEND_GARBAGE, true);
543    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
544  }
545
546  @Test
547  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
548    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
549    generateWALs(Integer.MAX_VALUE);
550    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE,
551      true);
552    splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); // 1 corrupt
553  }
554
555  @Test
556  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
557    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
558    generateWALs(Integer.MAX_VALUE);
559    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE,
560      false);
561    // the entries in the original logs are alternating regions
562    // considering the sequence file header, the middle corruption should
563    // affect at least half of the entries
564    int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
565    int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
566    int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
567    assertTrue("The file up to the corrupted area hasn't been parsed",
568      REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
569  }
570
571  @Test
572  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
573    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
574    List<FaultyProtobufLogReader.FailureType> failureTypes =
575      Arrays.asList(FaultyProtobufLogReader.FailureType.values()).stream()
576        .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
577    for (FaultyProtobufLogReader.FailureType failureType : failureTypes) {
578      final Set<String> walDirContents = splitCorruptWALs(failureType);
579      final Set<String> archivedLogs = new HashSet<>();
580      final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
581      for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
582        archived.append("\n\t").append(log.toString());
583        archivedLogs.add(log.getPath().getName());
584      }
585      LOG.debug(archived.toString());
586      assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs,
587        walDirContents);
588    }
589  }
590
591  /**
592   * @return set of wal names present prior to split attempt.
593   * @throws IOException if the split process fails
594   */
595  private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType)
596    throws IOException {
597    Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class);
598    InstrumentedLogWriter.activateFailure = false;
599
600    try {
601      conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class,
602        Reader.class);
603      conf.set("faultyprotobuflogreader.failuretype", failureType.name());
604      // Clean up from previous tests or previous loop
605      try {
606        wals.shutdown();
607      } catch (IOException exception) {
608        // since we're splitting out from under the factory, we should expect some closing failures.
609        LOG.debug("Ignoring problem closing WALFactory.", exception);
610      }
611      wals.close();
612      try {
613        for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
614          fs.delete(log.getPath(), true);
615        }
616      } catch (FileNotFoundException exception) {
617        LOG.debug("no previous CORRUPTDIR to clean.");
618      }
619      // change to the faulty reader
620      wals = new WALFactory(conf, name.getMethodName());
621      generateWALs(-1);
622      // Our reader will render all of these files corrupt.
623      final Set<String> walDirContents = new HashSet<>();
624      for (FileStatus status : fs.listStatus(WALDIR)) {
625        walDirContents.add(status.getPath().getName());
626      }
627      useDifferentDFSClient();
628      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
629      return walDirContents;
630    } finally {
631      conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class);
632    }
633  }
634
635  @Test(expected = IOException.class)
636  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
637    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
638    splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
639  }
640
641  @Test
642  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
643    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
644    try {
645      splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
646    } catch (IOException e) {
647      LOG.debug("split with 'skip errors' set to 'false' correctly threw");
648    }
649    assertEquals("if skip.errors is false all files should remain in place", NUM_WRITERS,
650      fs.listStatus(WALDIR).length);
651  }
652
653  private void ignoreCorruption(final Corruptions corruption, final int entryCount,
654    final int expectedCount) throws IOException {
655    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
656
657    final String REGION = "region__1";
658    REGIONS.clear();
659    REGIONS.add(REGION);
660
661    Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
662    generateWALs(1, entryCount, -1, 0);
663    corruptWAL(c1, corruption, true);
664
665    useDifferentDFSClient();
666    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
667
668    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
669    assertEquals(1, splitLog.length);
670
671    int actualCount = 0;
672    Reader in = wals.createReader(fs, splitLog[0]);
673    @SuppressWarnings("unused")
674    Entry entry;
675    while ((entry = in.next()) != null)
676      ++actualCount;
677    assertEquals(expectedCount, actualCount);
678    in.close();
679
680    // should not have stored the EOF files as corrupt
681    FileStatus[] archivedLogs =
682      fs.exists(CORRUPTDIR) ? fs.listStatus(CORRUPTDIR) : new FileStatus[0];
683    assertEquals(0, archivedLogs.length);
684
685  }
686
687  @Test
688  public void testEOFisIgnored() throws IOException {
689    int entryCount = 10;
690    ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount - 1);
691  }
692
693  @Test
694  public void testCorruptWALTrailer() throws IOException {
695    int entryCount = 10;
696    ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
697  }
698
699  @Test
700  public void testLogsGetArchivedAfterSplit() throws IOException {
701    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
702    generateWALs(-1);
703    useDifferentDFSClient();
704    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
705    FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
706    assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
707  }
708
709  @Test
710  public void testSplit() throws IOException {
711    generateWALs(-1);
712    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
713  }
714
715  @Test
716  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException {
717    generateWALs(-1);
718    useDifferentDFSClient();
719    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
720    FileStatus[] statuses = null;
721    try {
722      statuses = fs.listStatus(WALDIR);
723      if (statuses != null) {
724        fail("Files left in log dir: " + Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
725      }
726    } catch (FileNotFoundException e) {
727      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
728    }
729  }
730
731  @Test(expected = IOException.class)
732  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
733    // leave 5th log open so we could append the "trap"
734    Writer writer = generateWALs(4);
735    useDifferentDFSClient();
736
737    String region = "break";
738    Path regiondir = new Path(TABLEDIR, region);
739    fs.mkdirs(regiondir);
740
741    InstrumentedLogWriter.activateFailure = false;
742    appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes("r" + 999), FAMILY,
743      QUALIFIER, VALUE, 0);
744    writer.close();
745
746    try {
747      InstrumentedLogWriter.activateFailure = true;
748      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
749    } catch (IOException e) {
750      assertTrue(e.getMessage()
751        .contains("This exception is instrumented and should only be thrown for testing"));
752      throw e;
753    } finally {
754      InstrumentedLogWriter.activateFailure = false;
755    }
756  }
757
758  @Test
759  public void testSplitDeletedRegion() throws IOException {
760    REGIONS.clear();
761    String region = "region_that_splits";
762    REGIONS.add(region);
763
764    generateWALs(1);
765    useDifferentDFSClient();
766
767    Path regiondir = new Path(TABLEDIR, region);
768    fs.delete(regiondir, true);
769    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
770    assertFalse(fs.exists(regiondir));
771  }
772
773  @Test
774  public void testIOEOnOutputThread() throws Exception {
775    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
776
777    generateWALs(-1);
778    useDifferentDFSClient();
779    FileStatus[] logfiles = fs.listStatus(WALDIR);
780    assertTrue("There should be some log file", logfiles != null && logfiles.length > 0);
781    // wals with no entries (like the one we don't use in the factory)
782    // won't cause a failure since nothing will ever be written.
783    // pick the largest one since it's most likely to have entries.
784    int largestLogFile = 0;
785    long largestSize = 0;
786    for (int i = 0; i < logfiles.length; i++) {
787      if (logfiles[i].getLen() > largestSize) {
788        largestLogFile = i;
789        largestSize = logfiles[i].getLen();
790      }
791    }
792    assertTrue("There should be some log greater than size 0.", 0 < largestSize);
793    // Set up a splitter that will throw an IOE on the output side
794    WALSplitter logSplitter =
795      new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
796        @Override
797        protected Writer createWriter(Path logfile) throws IOException {
798          Writer mockWriter = Mockito.mock(Writer.class);
799          Mockito.doThrow(new IOException("Injected")).when(mockWriter)
800            .append(Mockito.<Entry> any());
801          return mockWriter;
802        }
803      };
804    // Set up a background thread dumper. Needs a thread to depend on and then we need to run
805    // the thread dumping in a background thread so it does not hold up the test.
806    final AtomicBoolean stop = new AtomicBoolean(false);
807    final Thread someOldThread = new Thread("Some-old-thread") {
808      @Override
809      public void run() {
810        while (!stop.get())
811          Threads.sleep(10);
812      }
813    };
814    someOldThread.setDaemon(true);
815    someOldThread.start();
816    final Thread t = new Thread("Background-thread-dumper") {
817      @Override
818      public void run() {
819        try {
820          Threads.threadDumpingIsAlive(someOldThread);
821        } catch (InterruptedException e) {
822          e.printStackTrace();
823        }
824      }
825    };
826    t.setDaemon(true);
827    t.start();
828    try {
829      logSplitter.splitWAL(logfiles[largestLogFile], null);
830      fail("Didn't throw!");
831    } catch (IOException ioe) {
832      assertTrue(ioe.toString().contains("Injected"));
833    } finally {
834      // Setting this to true will turn off the background thread dumper.
835      stop.set(true);
836    }
837  }
838
839  /**
840   * @param spiedFs should be instrumented for failure.
841   */
842  private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
843    generateWALs(-1);
844    useDifferentDFSClient();
845
846    try {
847      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
848      assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
849      assertFalse(fs.exists(WALDIR));
850    } catch (IOException e) {
851      fail("There shouldn't be any exception but: " + e.toString());
852    }
853  }
854
855  // Test for HBASE-3412
856  @Test
857  public void testMovedWALDuringRecovery() throws Exception {
858    // This partial mock will throw LEE for every file simulating
859    // files that were moved
860    FileSystem spiedFs = Mockito.spy(fs);
861    // The "File does not exist" part is very important,
862    // that's how it comes out of HDFS
863    Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).when(spiedFs)
864      .append(Mockito.<Path> any());
865    retryOverHdfsProblem(spiedFs);
866  }
867
868  @Test
869  public void testRetryOpenDuringRecovery() throws Exception {
870    FileSystem spiedFs = Mockito.spy(fs);
871    // The "Cannot obtain block length", "Could not obtain the last block",
872    // and "Blocklist for [^ ]* has changed.*" part is very important,
873    // that's how it comes out of HDFS. If HDFS changes the exception
874    // message, this test needs to be adjusted accordingly.
875    //
876    // When DFSClient tries to open a file, HDFS needs to locate
877    // the last block of the file and get its length. However, if the
878    // last block is under recovery, HDFS may have problem to obtain
879    // the block length, in which case, retry may help.
880    Mockito.doAnswer(new Answer<FSDataInputStream>() {
881      private final String[] errors = new String[] { "Cannot obtain block length",
882        "Could not obtain the last block", "Blocklist for " + OLDLOGDIR + " has changed" };
883      private int count = 0;
884
885      @Override
886      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
887        if (count < 3) {
888          throw new IOException(errors[count++]);
889        }
890        return (FSDataInputStream) invocation.callRealMethod();
891      }
892    }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt());
893    retryOverHdfsProblem(spiedFs);
894  }
895
896  @Test
897  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
898    generateWALs(1, 10, -1);
899    FileStatus logfile = fs.listStatus(WALDIR)[0];
900    useDifferentDFSClient();
901
902    final AtomicInteger count = new AtomicInteger();
903
904    CancelableProgressable localReporter = new CancelableProgressable() {
905      @Override
906      public boolean progress() {
907        count.getAndIncrement();
908        return false;
909      }
910    };
911
912    FileSystem spiedFs = Mockito.spy(fs);
913    Mockito.doAnswer(new Answer<FSDataInputStream>() {
914      @Override
915      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
916        Thread.sleep(1500); // Sleep a while and wait report status invoked
917        return (FSDataInputStream) invocation.callRealMethod();
918      }
919    }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt());
920
921    try {
922      conf.setInt("hbase.splitlog.report.period", 1000);
923      boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
924        Mockito.mock(SplitLogWorkerCoordination.class), wals, null);
925      assertFalse("Log splitting should failed", ret);
926      assertTrue(count.get() > 0);
927    } catch (IOException e) {
928      fail("There shouldn't be any exception but: " + e.toString());
929    } finally {
930      // reset it back to its default value
931      conf.setInt("hbase.splitlog.report.period", 59000);
932    }
933  }
934
935  /**
936   * Test log split process with fake data and lots of edits to trigger threading issues.
937   */
938  @Test
939  public void testThreading() throws Exception {
940    doTestThreading(20000, 128 * 1024 * 1024, 0);
941  }
942
943  /**
944   * Test blocking behavior of the log split process if writers are writing slower than the reader
945   * is reading.
946   */
947  @Test
948  public void testThreadingSlowWriterSmallBuffer() throws Exception {
949    // The logic of this test has conflict with the limit writers split logic, skip this test for
950    // TestWALSplitBoundedLogWriterCreation
951    assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation);
952    doTestThreading(200, 1024, 50);
953  }
954
955  /**
956   * Sets up a log splitter with a mock reader and writer. The mock reader generates a specified
957   * number of edits spread across 5 regions. The mock writer optionally sleeps for each edit it is
958   * fed. * After the split is complete, verifies that the statistics show the correct number of
959   * edits output into each region.
960   * @param numFakeEdits   number of fake edits to push through pipeline
961   * @param bufferSize     size of in-memory buffer
962   * @param writerSlowness writer threads will sleep this many ms per edit
963   */
964  private void doTestThreading(final int numFakeEdits, final int bufferSize,
965    final int writerSlowness) throws Exception {
966
967    Configuration localConf = new Configuration(conf);
968    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
969
970    // Create a fake log file (we'll override the reader to produce a stream of edits)
971    Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
972    FSDataOutputStream out = fs.create(logPath);
973    out.close();
974
975    // Make region dirs for our destination regions so the output doesn't get skipped
976    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
977    makeRegionDirs(regions);
978
979    // Create a splitter that reads and writes the data without touching disk
980    WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) {
981      /* Produce a mock writer that doesn't write anywhere */
982      @Override
983      protected Writer createWriter(Path logfile) throws IOException {
984        Writer mockWriter = Mockito.mock(Writer.class);
985        Mockito.doAnswer(new Answer<Void>() {
986          int expectedIndex = 0;
987
988          @Override
989          public Void answer(InvocationOnMock invocation) {
990            if (writerSlowness > 0) {
991              try {
992                Thread.sleep(writerSlowness);
993              } catch (InterruptedException ie) {
994                Thread.currentThread().interrupt();
995              }
996            }
997            Entry entry = (Entry) invocation.getArgument(0);
998            WALEdit edit = entry.getEdit();
999            List<Cell> cells = edit.getCells();
1000            assertEquals(1, cells.size());
1001            Cell cell = cells.get(0);
1002
1003            // Check that the edits come in the right order.
1004            assertEquals(expectedIndex,
1005              Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
1006            expectedIndex++;
1007            return null;
1008          }
1009        }).when(mockWriter).append(Mockito.<Entry> any());
1010        return mockWriter;
1011      }
1012
1013      /* Produce a mock reader that generates fake entries */
1014      @Override
1015      protected Reader getReader(FileStatus file, boolean skipErrors,
1016        CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
1017        Reader mockReader = Mockito.mock(Reader.class);
1018        Mockito.doAnswer(new Answer<Entry>() {
1019          int index = 0;
1020
1021          @Override
1022          public Entry answer(InvocationOnMock invocation) throws Throwable {
1023            if (index >= numFakeEdits) return null;
1024
1025            // Generate r0 through r4 in round robin fashion
1026            int regionIdx = index % regions.size();
1027            byte region[] = new byte[] { (byte) 'r', (byte) (0x30 + regionIdx) };
1028
1029            Entry ret = createTestEntry(TABLE_NAME, region, Bytes.toBytes(index / regions.size()),
1030              FAMILY, QUALIFIER, VALUE, index);
1031            index++;
1032            return ret;
1033          }
1034        }).when(mockReader).next();
1035        return mockReader;
1036      }
1037    };
1038
1039    logSplitter.splitWAL(fs.getFileStatus(logPath), null);
1040
1041    // Verify number of written edits per region
1042    Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1043    for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
1044      LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey());
1045      assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
1046    }
1047    assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1048  }
1049
1050  // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1051  @Test
1052  public void testSplitLogFileDeletedRegionDir() throws IOException {
1053    LOG.info("testSplitLogFileDeletedRegionDir");
1054    final String REGION = "region__1";
1055    REGIONS.clear();
1056    REGIONS.add(REGION);
1057
1058    generateWALs(1, 10, -1);
1059    useDifferentDFSClient();
1060
1061    Path regiondir = new Path(TABLEDIR, REGION);
1062    LOG.info("Region directory is" + regiondir);
1063    fs.delete(regiondir, true);
1064    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1065    assertFalse(fs.exists(regiondir));
1066  }
1067
1068  @Test
1069  public void testSplitLogFileEmpty() throws IOException {
1070    LOG.info("testSplitLogFileEmpty");
1071    // we won't create the hlog dir until getWAL got called, so
1072    // make dir here when testing empty log file
1073    fs.mkdirs(WALDIR);
1074    injectEmptyFile(".empty", true);
1075    useDifferentDFSClient();
1076
1077    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1078    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1079    assertFalse(fs.exists(tdir));
1080
1081    assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1082  }
1083
1084  @Test
1085  public void testSplitLogFileMultipleRegions() throws IOException {
1086    LOG.info("testSplitLogFileMultipleRegions");
1087    generateWALs(1, 10, -1);
1088    splitAndCount(1, 10);
1089  }
1090
1091  @Test
1092  public void testSplitLogFileFirstLineCorruptionLog() throws IOException {
1093    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
1094    generateWALs(1, 10, -1);
1095    FileStatus logfile = fs.listStatus(WALDIR)[0];
1096
1097    corruptWAL(logfile.getPath(), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1098
1099    useDifferentDFSClient();
1100    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1101
1102    final Path corruptDir =
1103      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1104    assertEquals(1, fs.listStatus(corruptDir).length);
1105  }
1106
1107  /**
1108   * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1109   */
1110  @Test
1111  public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1112    LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1113    // Generate wals for our destination region
1114    String regionName = "r0";
1115    final Path regiondir = new Path(TABLEDIR, regionName);
1116    REGIONS.clear();
1117    REGIONS.add(regionName);
1118    generateWALs(-1);
1119
1120    wals.getWAL(null);
1121    FileStatus[] logfiles = fs.listStatus(WALDIR);
1122    assertTrue("There should be some log file", logfiles != null && logfiles.length > 0);
1123
1124    WALSplitter logSplitter =
1125      new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
1126        @Override
1127        protected Writer createWriter(Path logfile) throws IOException {
1128          Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1129          // After creating writer, simulate region's
1130          // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1131          // region and delete them, excluding files with '.temp' suffix.
1132          NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
1133          if (files != null && !files.isEmpty()) {
1134            for (Path file : files) {
1135              if (!this.walFS.delete(file, false)) {
1136                LOG.error("Failed delete of " + file);
1137              } else {
1138                LOG.debug("Deleted recovered.edits file=" + file);
1139              }
1140            }
1141          }
1142          return writer;
1143        }
1144      };
1145    try {
1146      logSplitter.splitWAL(logfiles[0], null);
1147    } catch (IOException e) {
1148      LOG.info(e.toString(), e);
1149      fail("Throws IOException when spliting "
1150        + "log, it is most likely because writing file does not "
1151        + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1152    }
1153    if (fs.exists(CORRUPTDIR)) {
1154      if (fs.listStatus(CORRUPTDIR).length > 0) {
1155        fail("There are some corrupt logs, "
1156          + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1157      }
1158    }
1159  }
1160
1161  @Test
1162  public void testRecoveredEditsStoragePolicy() throws IOException {
1163    conf.set(HConstants.WAL_STORAGE_POLICY, "ALL_SSD");
1164    try {
1165      Path path = createRecoveredEditsPathForRegion();
1166      assertEquals("ALL_SSD", fs.getStoragePolicy(path.getParent()).getName());
1167    } finally {
1168      conf.unset(HConstants.WAL_STORAGE_POLICY);
1169    }
1170
1171  }
1172
1173  private Writer generateWALs(int leaveOpen) throws IOException {
1174    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1175  }
1176
1177  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1178    return generateWALs(writers, entries, leaveOpen, 7);
1179  }
1180
1181  private void makeRegionDirs(List<String> regions) throws IOException {
1182    for (String region : regions) {
1183      LOG.debug("Creating dir for region " + region);
1184      fs.mkdirs(new Path(TABLEDIR, region));
1185    }
1186  }
1187
1188  /**
1189   * @param leaveOpen index to leave un-closed. -1 to close all.
1190   * @return the writer that's still open, or null if all were closed.
1191   */
1192  private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents)
1193    throws IOException {
1194    makeRegionDirs(REGIONS);
1195    fs.mkdirs(WALDIR);
1196    Writer[] ws = new Writer[writers];
1197    int seq = 0;
1198    int numRegionEventsAdded = 0;
1199    for (int i = 0; i < writers; i++) {
1200      ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1201      for (int j = 0; j < entries; j++) {
1202        int prefix = 0;
1203        for (String region : REGIONS) {
1204          String row_key = region + prefix++ + i + j;
1205          appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1206            QUALIFIER, VALUE, seq++);
1207
1208          if (numRegionEventsAdded < regionEvents) {
1209            numRegionEventsAdded++;
1210            appendRegionEvent(ws[i], region);
1211          }
1212        }
1213      }
1214      if (i != leaveOpen) {
1215        ws[i].close();
1216        LOG.info("Closing writer " + i);
1217      }
1218    }
1219    if (leaveOpen < 0 || leaveOpen >= writers) {
1220      return null;
1221    }
1222    return ws[leaveOpen];
1223  }
1224
1225  private Path[] getLogForRegion(TableName table, String region) throws IOException {
1226    Path tdir = CommonFSUtils.getWALTableDir(conf, table);
1227    @SuppressWarnings("deprecation")
1228    Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(
1229      HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region))));
1230    FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1231      @Override
1232      public boolean accept(Path p) {
1233        if (WALSplitUtil.isSequenceIdFile(p)) {
1234          return false;
1235        }
1236        return true;
1237      }
1238    });
1239    Path[] paths = new Path[files.length];
1240    for (int i = 0; i < files.length; i++) {
1241      paths[i] = files[i].getPath();
1242    }
1243    return paths;
1244  }
1245
1246  private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1247    FSDataOutputStream out;
1248    int fileSize = (int) fs.listStatus(path)[0].getLen();
1249
1250    FSDataInputStream in = fs.open(path);
1251    byte[] corrupted_bytes = new byte[fileSize];
1252    in.readFully(0, corrupted_bytes, 0, fileSize);
1253    in.close();
1254
1255    switch (corruption) {
1256      case APPEND_GARBAGE:
1257        fs.delete(path, false);
1258        out = fs.create(path);
1259        out.write(corrupted_bytes);
1260        out.write(Bytes.toBytes("-----"));
1261        closeOrFlush(close, out);
1262        break;
1263
1264      case INSERT_GARBAGE_ON_FIRST_LINE:
1265        fs.delete(path, false);
1266        out = fs.create(path);
1267        out.write(0);
1268        out.write(corrupted_bytes);
1269        closeOrFlush(close, out);
1270        break;
1271
1272      case INSERT_GARBAGE_IN_THE_MIDDLE:
1273        fs.delete(path, false);
1274        out = fs.create(path);
1275        int middle = (int) Math.floor(corrupted_bytes.length / 2);
1276        out.write(corrupted_bytes, 0, middle);
1277        out.write(0);
1278        out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1279        closeOrFlush(close, out);
1280        break;
1281
1282      case TRUNCATE:
1283        fs.delete(path, false);
1284        out = fs.create(path);
1285        out.write(corrupted_bytes, 0,
1286          fileSize - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1287        closeOrFlush(close, out);
1288        break;
1289
1290      case TRUNCATE_TRAILER:
1291        fs.delete(path, false);
1292        out = fs.create(path);
1293        out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1294        closeOrFlush(close, out);
1295        break;
1296    }
1297  }
1298
1299  private void closeOrFlush(boolean close, FSDataOutputStream out) throws IOException {
1300    if (close) {
1301      out.close();
1302    } else {
1303      Method syncMethod = null;
1304      try {
1305        syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {});
1306      } catch (NoSuchMethodException e) {
1307        try {
1308          syncMethod = out.getClass().getMethod("sync", new Class<?>[] {});
1309        } catch (NoSuchMethodException ex) {
1310          throw new IOException(
1311            "This version of Hadoop supports " + "neither Syncable.sync() nor Syncable.hflush().");
1312        }
1313      }
1314      try {
1315        syncMethod.invoke(out, new Object[] {});
1316      } catch (Exception e) {
1317        throw new IOException(e);
1318      }
1319      // Not in 0out.hflush();
1320    }
1321  }
1322
1323  private int countWAL(Path log) throws IOException {
1324    int count = 0;
1325    Reader in = wals.createReader(fs, log);
1326    while (in.next() != null) {
1327      count++;
1328    }
1329    in.close();
1330    return count;
1331  }
1332
1333  private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1334    String output) throws IOException {
1335    WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1336    desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1337      .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1338      .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1339      .setFamilyName(ByteString.copyFrom(FAMILY))
1340      .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1341      .addAllCompactionInput(Arrays.asList(inputs)).addCompactionOutput(output);
1342
1343    WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1344    WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1345      EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1346    w.append(new Entry(key, edit));
1347    w.sync(false);
1348  }
1349
1350  private static void appendRegionEvent(Writer w, String region) throws IOException {
1351    WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1352      WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, TABLE_NAME.toBytes(),
1353      Bytes.toBytes(region), Bytes.toBytes(String.valueOf(region.hashCode())), 1,
1354      ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>> of());
1355    final long time = EnvironmentEdgeManager.currentTime();
1356    final WALKeyImpl walKey =
1357      new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, HConstants.DEFAULT_CLUSTER_ID);
1358    WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
1359    w.append(new Entry(walKey, we));
1360    w.sync(false);
1361  }
1362
1363  public static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
1364    byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException {
1365    LOG.info(Thread.currentThread().getName() + " append");
1366    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1367    LOG.info(Thread.currentThread().getName() + " sync");
1368    writer.sync(false);
1369    return seq;
1370  }
1371
1372  private static Entry createTestEntry(TableName table, byte[] region, byte[] row, byte[] family,
1373    byte[] qualifier, byte[] value, long seq) {
1374    long time = System.nanoTime();
1375
1376    seq++;
1377    final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1378    WALEdit edit = new WALEdit();
1379    edit.add(cell);
1380    return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit);
1381  }
1382
1383  private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1384    Writer writer =
1385      WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1386    if (closeFile) {
1387      writer.close();
1388    }
1389  }
1390
1391  private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1392    Reader in1, in2;
1393    in1 = wals.createReader(fs, p1);
1394    in2 = wals.createReader(fs, p2);
1395    Entry entry1;
1396    Entry entry2;
1397    while ((entry1 = in1.next()) != null) {
1398      entry2 = in2.next();
1399      if (
1400        (entry1.getKey().compareTo(entry2.getKey()) != 0)
1401          || (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
1402      ) {
1403        return false;
1404      }
1405    }
1406    in1.close();
1407    in2.close();
1408    return true;
1409  }
1410}