001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
021import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey;
022import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
023import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertFalse;
026import static org.junit.jupiter.api.Assertions.assertThrows;
027import static org.junit.jupiter.api.Assertions.assertTrue;
028import static org.junit.jupiter.api.Assertions.fail;
029import static org.junit.jupiter.api.Assumptions.assumeFalse;
030
031import java.io.FileNotFoundException;
032import java.io.IOException;
033import java.lang.reflect.Method;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collections;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.List;
041import java.util.Map;
042import java.util.NavigableSet;
043import java.util.Objects;
044import java.util.Set;
045import java.util.concurrent.atomic.AtomicBoolean;
046import java.util.concurrent.atomic.AtomicInteger;
047import java.util.concurrent.atomic.AtomicLong;
048import java.util.stream.Collectors;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.FSDataInputStream;
051import org.apache.hadoop.fs.FSDataOutputStream;
052import org.apache.hadoop.fs.FileStatus;
053import org.apache.hadoop.fs.FileSystem;
054import org.apache.hadoop.fs.FileUtil;
055import org.apache.hadoop.fs.Path;
056import org.apache.hadoop.fs.PathFilter;
057import org.apache.hadoop.hbase.Cell;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseTestingUtil;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.KeyValue;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableName;
064import org.apache.hadoop.hbase.client.RegionInfo;
065import org.apache.hadoop.hbase.client.RegionInfoBuilder;
066import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
067import org.apache.hadoop.hbase.master.SplitLogManager;
068import org.apache.hadoop.hbase.regionserver.HRegion;
069import org.apache.hadoop.hbase.regionserver.LastSequenceId;
070import org.apache.hadoop.hbase.regionserver.RegionServerServices;
071import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
072import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader;
073import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
074import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
075import org.apache.hadoop.hbase.security.User;
076import org.apache.hadoop.hbase.testclassification.LargeTests;
077import org.apache.hadoop.hbase.testclassification.RegionServerTests;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.CancelableProgressable;
080import org.apache.hadoop.hbase.util.CommonFSUtils;
081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
082import org.apache.hadoop.hbase.util.Threads;
083import org.apache.hadoop.hbase.wal.WAL.Entry;
084import org.apache.hadoop.hbase.wal.WALProvider.Writer;
085import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
086import org.apache.hadoop.hdfs.DFSTestUtil;
087import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
088import org.apache.hadoop.ipc.RemoteException;
089import org.junit.jupiter.api.AfterAll;
090import org.junit.jupiter.api.AfterEach;
091import org.junit.jupiter.api.BeforeAll;
092import org.junit.jupiter.api.BeforeEach;
093import org.junit.jupiter.api.Tag;
094import org.junit.jupiter.api.Test;
095import org.junit.jupiter.api.TestInfo;
096import org.mockito.Mockito;
097import org.mockito.invocation.InvocationOnMock;
098import org.mockito.stubbing.Answer;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
103import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
104import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
105import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
106
107import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
110
111/**
112 * Testing {@link WAL} splitting code.
113 */
114@Tag(RegionServerTests.TAG)
115@Tag(LargeTests.TAG)
116public class TestWALSplit {
117  private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
118
119  private static Configuration conf;
120  private FileSystem fs;
121
122  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
123
124  private Path HBASEDIR;
125  private Path HBASELOGDIR;
126  private Path WALDIR;
127  private Path OLDLOGDIR;
128  private Path CORRUPTDIR;
129  private Path TABLEDIR;
130  private String TMPDIRNAME;
131
132  private static final int NUM_WRITERS = 10;
133  private static final int ENTRIES = 10; // entries per writer per region
134
135  private static final String FILENAME_BEING_SPLIT = "testfile";
136  private static final TableName TABLE_NAME = TableName.valueOf("t1");
137  private static final byte[] FAMILY = Bytes.toBytes("f1");
138  private static final byte[] QUALIFIER = Bytes.toBytes("q1");
139  private static final byte[] VALUE = Bytes.toBytes("v1");
140  private static final String WAL_FILE_PREFIX = "wal.dat.";
141  private static List<String> REGIONS = new ArrayList<>();
142  private static String ROBBER;
143  private static String ZOMBIE;
144  private static String[] GROUP = new String[] { "supergroup" };
145
146  static enum Corruptions {
147    INSERT_GARBAGE_ON_FIRST_LINE,
148    INSERT_GARBAGE_IN_THE_MIDDLE,
149    APPEND_GARBAGE,
150    TRUNCATE,
151    TRUNCATE_TRAILER
152  }
153
154  @BeforeAll
155  public static void setUpBeforeClass() throws Exception {
156    conf = TEST_UTIL.getConfiguration();
157    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
158    conf.setClass(FSHLogProvider.WRITER_IMPL, InstrumentedLogWriter.class, Writer.class);
159    // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
160    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
161    // Create fake maping user to group and set it to the conf.
162    Map<String, String[]> u2g_map = new HashMap<>(2);
163    ROBBER = User.getCurrent().getName() + "-robber";
164    ZOMBIE = User.getCurrent().getName() + "-zombie";
165    u2g_map.put(ROBBER, GROUP);
166    u2g_map.put(ZOMBIE, GROUP);
167    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
168    conf.setInt("dfs.heartbeat.interval", 1);
169    TEST_UTIL.startMiniDFSCluster(2);
170  }
171
172  @AfterAll
173  public static void tearDownAfterClass() throws Exception {
174    TEST_UTIL.shutdownMiniDFSCluster();
175  }
176
177  private String testMethodName;
178  private WALFactory wals = null;
179
180  @BeforeEach
181  public void setUp(TestInfo testInfo) throws Exception {
182    testMethodName = testInfo.getTestMethod().get().getName();
183    LOG.info("Cleaning up cluster for new test.");
184    fs = TEST_UTIL.getDFSCluster().getFileSystem();
185    HBASEDIR = TEST_UTIL.createRootDir();
186    HBASELOGDIR = TEST_UTIL.createWALRootDir();
187    OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
188    CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
189    TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
190    TMPDIRNAME =
191      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
192    REGIONS.clear();
193    Collections.addAll(REGIONS, "bbb", "ccc");
194    InstrumentedLogWriter.activateFailure = false;
195    wals = new WALFactory(conf, testMethodName);
196    WALDIR = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(
197      ServerName.valueOf(testMethodName, 16010, EnvironmentEdgeManager.currentTime()).toString()));
198    // fs.mkdirs(WALDIR);
199  }
200
201  @AfterEach
202  public void tearDown() throws Exception {
203    try {
204      wals.close();
205    } catch (IOException exception) {
206      // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
207      LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if"
208        + " you see a failure look here.");
209      LOG.debug("exception details", exception);
210    } finally {
211      wals = null;
212      fs.delete(HBASEDIR, true);
213      fs.delete(HBASELOGDIR, true);
214    }
215  }
216
217  /**
218   * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
219   * Ensures we do not lose edits.
220   */
221  @Test
222  public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
223    final AtomicLong counter = new AtomicLong(0);
224    AtomicBoolean stop = new AtomicBoolean(false);
225    // Region we'll write edits too and then later examine to make sure they all made it in.
226    final String region = REGIONS.get(0);
227    final int numWriters = 3;
228    Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
229    try {
230      long startCount = counter.get();
231      zombie.start();
232      // Wait till writer starts going.
233      while (startCount == counter.get())
234        Threads.sleep(1);
235      // Give it a second to write a few appends.
236      Threads.sleep(1000);
237      final Configuration conf2 = HBaseConfiguration.create(conf);
238      final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
239      int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
240        @Override
241        public Integer run() throws Exception {
242          StringBuilder ls =
243            new StringBuilder("Contents of WALDIR (").append(WALDIR).append("):\n");
244          for (FileStatus status : fs.listStatus(WALDIR)) {
245            ls.append("\t").append(status.toString()).append("\n");
246          }
247          LOG.debug(Objects.toString(ls));
248          LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
249          WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
250          LOG.info("Finished splitting out from under zombie.");
251          Path[] logfiles = getLogForRegion(TABLE_NAME, region);
252          assertEquals(numWriters, logfiles.length, "wrong number of split files for region");
253          int count = 0;
254          for (Path logfile : logfiles) {
255            count += countWAL(logfile);
256          }
257          return count;
258        }
259      });
260      LOG.info("zombie=" + counter.get() + ", robber=" + count);
261      assertTrue(counter.get() == count || counter.get() + 1 == count,
262        "The log file could have at most 1 extra log entry, but can't have less. "
263          + "Zombie could write " + counter.get() + " and logfile had only " + count);
264    } finally {
265      stop.set(true);
266      zombie.interrupt();
267      Threads.threadDumpingIsAlive(zombie);
268    }
269  }
270
271  /**
272   * This thread will keep writing to a 'wal' file even after the split process has started. It
273   * simulates a region server that was considered dead but woke up and wrote some more to the last
274   * log entry. Does its writing as an alternate user in another filesystem instance to simulate
275   * better it being a regionserver.
276   */
277  private class ZombieLastLogWriterRegionServer extends Thread {
278    final AtomicLong editsCount;
279    final AtomicBoolean stop;
280    final int numOfWriters;
281    /**
282     * Region to write edits for.
283     */
284    final String region;
285    final User user;
286
287    public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
288      final String region, final int writers) throws IOException, InterruptedException {
289      super("ZombieLastLogWriterRegionServer");
290      setDaemon(true);
291      this.stop = stop;
292      this.editsCount = counter;
293      this.region = region;
294      this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
295      numOfWriters = writers;
296    }
297
298    @Override
299    public void run() {
300      try {
301        doWriting();
302      } catch (IOException e) {
303        LOG.warn(getName() + " Writer exiting " + e);
304      } catch (InterruptedException e) {
305        LOG.warn(getName() + " Writer exiting " + e);
306      }
307    }
308
309    private void doWriting() throws IOException, InterruptedException {
310      this.user.runAs(new PrivilegedExceptionAction<Object>() {
311        @Override
312        public Object run() throws Exception {
313          // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose
314          // index we supply here.
315          int walToKeepOpen = numOfWriters - 1;
316          // The below method writes numOfWriters files each with ENTRIES entries for a total of
317          // numOfWriters * ENTRIES added per column family in the region.
318          Writer writer = null;
319          try {
320            writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
321          } catch (IOException e1) {
322            throw new RuntimeException("Failed", e1);
323          }
324          // Update counter so has all edits written so far.
325          editsCount.addAndGet(numOfWriters * ENTRIES);
326          loop(writer);
327          // If we've been interruped, then things should have shifted out from under us.
328          // closing should error
329          try {
330            writer.close();
331            fail("Writing closing after parsing should give an error.");
332          } catch (IOException exception) {
333            LOG.debug("ignoring error when closing final writer.", exception);
334          }
335          return null;
336        }
337      });
338    }
339
340    private void loop(final Writer writer) {
341      byte[] regionBytes = Bytes.toBytes(this.region);
342      while (!stop.get()) {
343        try {
344          long seq = appendEntry(writer, TABLE_NAME, regionBytes,
345            Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
346          long count = editsCount.incrementAndGet();
347          LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
348          try {
349            Thread.sleep(1);
350          } catch (InterruptedException e) {
351            //
352          }
353        } catch (IOException ex) {
354          LOG.error(getName() + " ex " + ex.toString());
355          if (ex instanceof RemoteException) {
356            LOG.error("Juliet: got RemoteException " + ex.getMessage() + " while writing "
357              + (editsCount.get() + 1));
358          } else {
359            LOG.error(getName() + " failed to write....at " + editsCount.get());
360            fail("Failed to write " + editsCount.get());
361          }
362          break;
363        } catch (Throwable t) {
364          LOG.error(getName() + " HOW? " + t);
365          LOG.debug("exception details", t);
366          break;
367        }
368      }
369      LOG.info(getName() + " Writer exiting");
370    }
371  }
372
373  // If another worker is assigned to split a WAl and last worker is still running, both should not
374  // impact each other's progress
375  @Test
376  public void testTwoWorkerSplittingSameWAL() throws IOException, InterruptedException {
377    int numWriter = 1, entries = 10;
378    generateWALs(numWriter, entries, -1, 0);
379    FileStatus logfile = fs.listStatus(WALDIR)[0];
380    FileSystem spiedFs = Mockito.spy(fs);
381    RegionServerServices zombieRSServices = Mockito.mock(RegionServerServices.class);
382    RegionServerServices newWorkerRSServices = Mockito.mock(RegionServerServices.class);
383    Mockito.when(zombieRSServices.getServerName())
384      .thenReturn(ServerName.valueOf("zombie-rs.abc.com,1234,1234567890"));
385    Mockito.when(newWorkerRSServices.getServerName())
386      .thenReturn(ServerName.valueOf("worker-rs.abc.com,1234,1234569870"));
387    Thread zombieWorker = new SplitWALWorker(logfile, spiedFs, zombieRSServices);
388    Thread newWorker = new SplitWALWorker(logfile, spiedFs, newWorkerRSServices);
389    zombieWorker.start();
390    newWorker.start();
391    newWorker.join();
392    zombieWorker.join();
393
394    for (String region : REGIONS) {
395      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
396      assertEquals(numWriter, logfiles.length, "wrong number of split files for region");
397
398      int count = 0;
399      for (Path lf : logfiles) {
400        count += countWAL(lf);
401      }
402      assertEquals(entries, count, "wrong number of edits for region " + region);
403    }
404  }
405
406  private class SplitWALWorker extends Thread implements LastSequenceId {
407    final FileStatus logfile;
408    final FileSystem fs;
409    final RegionServerServices rsServices;
410
411    public SplitWALWorker(FileStatus logfile, FileSystem fs, RegionServerServices rsServices) {
412      super(rsServices.getServerName().toShortString());
413      setDaemon(true);
414      this.fs = fs;
415      this.logfile = logfile;
416      this.rsServices = rsServices;
417    }
418
419    @Override
420    public void run() {
421      try {
422        boolean ret =
423          WALSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, null, this, null, wals, rsServices);
424        assertTrue(ret, "Both splitting should pass");
425      } catch (IOException e) {
426        LOG.warn(getName() + " Worker exiting " + e);
427      }
428    }
429
430    @Override
431    public ClusterStatusProtos.RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
432      return ClusterStatusProtos.RegionStoreSequenceIds.newBuilder()
433        .setLastFlushedSequenceId(HConstants.NO_SEQNUM).build();
434    }
435  }
436
437  /**
438   * @see "https://issues.apache.org/jira/browse/HBASE-3020"
439   */
440  @Test
441  public void testRecoveredEditsPathForMeta() throws IOException {
442    Path p = createRecoveredEditsPathForRegion();
443    String parentOfParent = p.getParent().getParent().getName();
444    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
445  }
446
447  /**
448   * Test old recovered edits file doesn't break WALSplitter. This is useful in upgrading old
449   * instances.
450   */
451  @Test
452  public void testOldRecoveredEditsFileSidelined() throws IOException {
453    Path p = createRecoveredEditsPathForRegion();
454    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
455    Path regiondir = new Path(tdir, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
456    fs.mkdirs(regiondir);
457    Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
458    assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
459    fs.createNewFile(parent); // create a recovered.edits file
460    String parentOfParent = p.getParent().getParent().getName();
461    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
462    WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
463  }
464
465  private Path createRecoveredEditsPathForRegion() throws IOException {
466    byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
467    Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
468      FILENAME_BEING_SPLIT, TMPDIRNAME, conf, "");
469    return p;
470  }
471
472  @Test
473  public void testHasRecoveredEdits() throws IOException {
474    Path p = createRecoveredEditsPathForRegion();
475    assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
476    String renamedEdit = p.getName().split("-")[0];
477    fs.createNewFile(new Path(p.getParent(), renamedEdit));
478    assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
479  }
480
481  private void useDifferentDFSClient() throws IOException {
482    // make fs act as a different client now
483    // initialize will create a new DFSClient with a new client ID
484    fs.initialize(fs.getUri(), conf);
485  }
486
487  @Test
488  public void testSplitPreservesEdits() throws IOException {
489    final String REGION = "region__1";
490    REGIONS.clear();
491    REGIONS.add(REGION);
492
493    generateWALs(1, 10, -1, 0);
494    useDifferentDFSClient();
495    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
496    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
497    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
498    assertEquals(1, splitLog.length);
499
500    assertTrue(logsAreEqual(originalLog, splitLog[0]), "edits differ after split");
501  }
502
503  @Test
504  public void testSplitRemovesRegionEventsEdits() throws IOException {
505    final String REGION = "region__1";
506    REGIONS.clear();
507    REGIONS.add(REGION);
508
509    generateWALs(1, 10, -1, 100);
510    useDifferentDFSClient();
511    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
512    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
513    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
514    assertEquals(1, splitLog.length);
515
516    assertFalse(logsAreEqual(originalLog, splitLog[0]), "edits differ after split");
517    // split log should only have the test edits
518    assertEquals(10, countWAL(splitLog[0]));
519  }
520
521  @Test
522  public void testSplitLeavesCompactionEventsEdits() throws IOException {
523    RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
524    REGIONS.clear();
525    REGIONS.add(hri.getEncodedName());
526    Path regionDir =
527      new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
528    LOG.info("Creating region directory: " + regionDir);
529    assertTrue(fs.mkdirs(regionDir));
530
531    Writer writer = generateWALs(1, 10, 0, 10);
532    String[] compactInputs = new String[] { "file1", "file2", "file3" };
533    String compactOutput = "file4";
534    appendCompactionEvent(writer, hri, compactInputs, compactOutput);
535    writer.close();
536
537    useDifferentDFSClient();
538    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
539
540    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
541    // original log should have 10 test edits, 10 region markers, 1 compaction marker
542    assertEquals(21, countWAL(originalLog));
543
544    Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
545    assertEquals(1, splitLog.length);
546
547    assertFalse(logsAreEqual(originalLog, splitLog[0]), "edits differ after split");
548    // split log should have 10 test edits plus 1 compaction marker
549    assertEquals(11, countWAL(splitLog[0]));
550  }
551
552  /**
553   * Tests that WalSplitter ignores replication marker edits.
554   */
555  @Test
556  public void testSplitRemovesReplicationMarkerEdits() throws IOException {
557    RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO;
558    Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1");
559    generateReplicationMarkerEdits(path, regionInfo);
560    useDifferentDFSClient();
561    List<FileStatus> logFiles =
562      SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null);
563    assertEquals(1, logFiles.size());
564    assertEquals(path, logFiles.get(0).getPath());
565    List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
566    // Make sure that WALSplitter doesn't fail.
567    assertEquals(0, splitPaths.size());
568  }
569
570  private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException {
571    long timestamp = EnvironmentEdgeManager.currentTime();
572    fs.mkdirs(WALDIR);
573    try (Writer writer = wals.createWALWriter(fs, path)) {
574      WALProtos.ReplicationMarkerDescriptor.Builder builder =
575        WALProtos.ReplicationMarkerDescriptor.newBuilder();
576      builder.setWalName("wal-name");
577      builder.setRegionServerName("rs-name");
578      builder.setOffset(0L);
579      WALProtos.ReplicationMarkerDescriptor desc = builder.build();
580      appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(),
581        getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1);
582    }
583  }
584
585  /**
586   * @param expectedEntries -1 to not assert
587   * @return the count across all regions
588   */
589  private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException {
590    useDifferentDFSClient();
591    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
592    int result = 0;
593    for (String region : REGIONS) {
594      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
595      assertEquals(expectedFiles, logfiles.length);
596      int count = 0;
597      for (Path logfile : logfiles) {
598        count += countWAL(logfile);
599      }
600      if (-1 != expectedEntries) {
601        assertEquals(expectedEntries, count);
602      }
603      result += count;
604    }
605    return result;
606  }
607
608  @Test
609  public void testEmptyLogFiles() throws IOException {
610    testEmptyLogFiles(true);
611  }
612
613  @Test
614  public void testEmptyOpenLogFiles() throws IOException {
615    testEmptyLogFiles(false);
616  }
617
618  private void testEmptyLogFiles(final boolean close) throws IOException {
619    // we won't create the hlog dir until getWAL got called, so
620    // make dir here when testing empty log file
621    fs.mkdirs(WALDIR);
622    injectEmptyFile(".empty", close);
623    generateWALs(Integer.MAX_VALUE);
624    injectEmptyFile("empty", close);
625    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
626  }
627
628  @Test
629  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
630    // generate logs but leave wal.dat.5 open.
631    generateWALs(5);
632    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
633  }
634
635  @Test
636  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
637    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
638    generateWALs(Integer.MAX_VALUE);
639    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.APPEND_GARBAGE, true);
640    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
641  }
642
643  @Test
644  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
645    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
646    generateWALs(Integer.MAX_VALUE);
647    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE,
648      true);
649    splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); // 1 corrupt
650  }
651
652  @Test
653  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
654    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
655    generateWALs(Integer.MAX_VALUE);
656    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE,
657      false);
658    // the entries in the original logs are alternating regions
659    // considering the sequence file header, the middle corruption should
660    // affect at least half of the entries
661    int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
662    int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
663    int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
664    assertTrue(REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount,
665      "The file up to the corrupted area hasn't been parsed");
666  }
667
668  @Test
669  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
670    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
671    List<FaultyProtobufWALStreamReader.FailureType> failureTypes =
672      Arrays.asList(FaultyProtobufWALStreamReader.FailureType.values()).stream()
673        .filter(x -> x != FaultyProtobufWALStreamReader.FailureType.NONE)
674        .collect(Collectors.toList());
675    for (FaultyProtobufWALStreamReader.FailureType failureType : failureTypes) {
676      final Set<String> walDirContents = splitCorruptWALs(failureType);
677      final Set<String> archivedLogs = new HashSet<>();
678      final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
679      for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
680        archived.append("\n\t").append(log.toString());
681        archivedLogs.add(log.getPath().getName());
682      }
683      LOG.debug(archived.toString());
684      assertEquals(archivedLogs, walDirContents,
685        failureType.name() + ": expected to find all of our wals corrupt.");
686    }
687  }
688
689  /**
690   * @return set of wal names present prior to split attempt.
691   * @throws IOException if the split process fails
692   */
693  private Set<String> splitCorruptWALs(final FaultyProtobufWALStreamReader.FailureType failureType)
694    throws IOException {
695    String backupClass = conf.get(WALFactory.WAL_STREAM_READER_CLASS_IMPL);
696    InstrumentedLogWriter.activateFailure = false;
697
698    try {
699      conf.setClass(WALFactory.WAL_STREAM_READER_CLASS_IMPL, FaultyProtobufWALStreamReader.class,
700        WALStreamReader.class);
701      conf.set("faultyprotobuflogreader.failuretype", failureType.name());
702      // Clean up from previous tests or previous loop
703      try {
704        wals.shutdown();
705      } catch (IOException exception) {
706        // since we're splitting out from under the factory, we should expect some closing failures.
707        LOG.debug("Ignoring problem closing WALFactory.", exception);
708      }
709      wals.close();
710      try {
711        for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
712          fs.delete(log.getPath(), true);
713        }
714      } catch (FileNotFoundException exception) {
715        LOG.debug("no previous CORRUPTDIR to clean.");
716      }
717      // change to the faulty reader
718      wals = new WALFactory(conf, testMethodName);
719      generateWALs(-1);
720      // Our reader will render all of these files corrupt.
721      final Set<String> walDirContents = new HashSet<>();
722      for (FileStatus status : fs.listStatus(WALDIR)) {
723        walDirContents.add(status.getPath().getName());
724      }
725      useDifferentDFSClient();
726      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
727      return walDirContents;
728    } finally {
729      if (backupClass != null) {
730        conf.set(WALFactory.WAL_STREAM_READER_CLASS_IMPL, backupClass);
731      } else {
732        conf.unset(WALFactory.WAL_STREAM_READER_CLASS_IMPL);
733      }
734    }
735  }
736
737  @Test
738  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
739    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
740    assertThrows(IOException.class,
741      () -> splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING));
742  }
743
744  @Test
745  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
746    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
747    try {
748      splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING);
749    } catch (IOException e) {
750      LOG.debug("split with 'skip errors' set to 'false' correctly threw");
751    }
752    assertEquals(NUM_WRITERS, fs.listStatus(WALDIR).length,
753      "if skip.errors is false all files should remain in place");
754  }
755
756  private void ignoreCorruption(final Corruptions corruption, final int entryCount,
757    final int expectedCount) throws IOException {
758    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
759
760    final String REGION = "region__1";
761    REGIONS.clear();
762    REGIONS.add(REGION);
763
764    Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
765    generateWALs(1, entryCount, -1, 0);
766    corruptWAL(c1, corruption, true);
767
768    useDifferentDFSClient();
769    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
770
771    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
772    assertEquals(1, splitLog.length);
773
774    int actualCount = 0;
775    try (WALStreamReader in = wals.createStreamReader(fs, splitLog[0])) {
776      while (in.next() != null) {
777        ++actualCount;
778      }
779    }
780    assertEquals(expectedCount, actualCount);
781
782    // should not have stored the EOF files as corrupt
783    FileStatus[] archivedLogs =
784      fs.exists(CORRUPTDIR) ? fs.listStatus(CORRUPTDIR) : new FileStatus[0];
785    assertEquals(0, archivedLogs.length);
786
787  }
788
789  @Test
790  public void testEOFisIgnored() throws IOException {
791    int entryCount = 10;
792    ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount - 1);
793  }
794
795  @Test
796  public void testCorruptWALTrailer() throws IOException {
797    int entryCount = 10;
798    ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
799  }
800
801  @Test
802  public void testLogsGetArchivedAfterSplit() throws IOException {
803    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
804    generateWALs(-1);
805    useDifferentDFSClient();
806    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
807    FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
808    assertEquals(NUM_WRITERS, archivedLogs.length, "wrong number of files in the archive log");
809  }
810
811  @Test
812  public void testSplit() throws IOException {
813    generateWALs(-1);
814    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
815  }
816
817  @Test
818  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException {
819    generateWALs(-1);
820    useDifferentDFSClient();
821    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
822    FileStatus[] statuses = null;
823    try {
824      statuses = fs.listStatus(WALDIR);
825      if (statuses != null) {
826        fail("Files left in log dir: " + Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
827      }
828    } catch (FileNotFoundException e) {
829      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
830    }
831  }
832
833  @Test
834  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
835    // leave 5th log open so we could append the "trap"
836    Writer writer = generateWALs(4);
837    useDifferentDFSClient();
838
839    String region = "break";
840    Path regiondir = new Path(TABLEDIR, region);
841    fs.mkdirs(regiondir);
842
843    InstrumentedLogWriter.activateFailure = false;
844    appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes("r" + 999), FAMILY,
845      QUALIFIER, VALUE, 0);
846    writer.close();
847
848    try {
849      InstrumentedLogWriter.activateFailure = true;
850      IOException e = assertThrows(IOException.class,
851        () -> WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals));
852      assertTrue(e.getMessage()
853        .contains("This exception is instrumented and should only be thrown for testing"));
854    } finally {
855      InstrumentedLogWriter.activateFailure = false;
856    }
857  }
858
859  @Test
860  public void testSplitDeletedRegion() throws IOException {
861    REGIONS.clear();
862    String region = "region_that_splits";
863    REGIONS.add(region);
864
865    generateWALs(1);
866    useDifferentDFSClient();
867
868    Path regiondir = new Path(TABLEDIR, region);
869    fs.delete(regiondir, true);
870    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
871    assertFalse(fs.exists(regiondir));
872  }
873
874  @Test
875  public void testIOEOnOutputThread() throws Exception {
876    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
877
878    generateWALs(-1);
879    useDifferentDFSClient();
880    FileStatus[] logfiles = fs.listStatus(WALDIR);
881    assertTrue(logfiles != null && logfiles.length > 0, "There should be some log file");
882    // wals with no entries (like the one we don't use in the factory)
883    // won't cause a failure since nothing will ever be written.
884    // pick the largest one since it's most likely to have entries.
885    int largestLogFile = 0;
886    long largestSize = 0;
887    for (int i = 0; i < logfiles.length; i++) {
888      if (logfiles[i].getLen() > largestSize) {
889        largestLogFile = i;
890        largestSize = logfiles[i].getLen();
891      }
892    }
893    assertTrue(0 < largestSize, "There should be some log greater than size 0.");
894    // Set up a splitter that will throw an IOE on the output side
895    WALSplitter logSplitter =
896      new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
897        @Override
898        protected Writer createWriter(Path logfile) throws IOException {
899          Writer mockWriter = Mockito.mock(Writer.class);
900          Mockito.doThrow(new IOException("Injected")).when(mockWriter)
901            .append(Mockito.<Entry> any());
902          return mockWriter;
903        }
904      };
905    // Set up a background thread dumper. Needs a thread to depend on and then we need to run
906    // the thread dumping in a background thread so it does not hold up the test.
907    final AtomicBoolean stop = new AtomicBoolean(false);
908    final Thread someOldThread = new Thread("Some-old-thread") {
909      @Override
910      public void run() {
911        while (!stop.get())
912          Threads.sleep(10);
913      }
914    };
915    someOldThread.setDaemon(true);
916    someOldThread.start();
917    final Thread t = new Thread("Background-thread-dumper") {
918      @Override
919      public void run() {
920        try {
921          Threads.threadDumpingIsAlive(someOldThread);
922        } catch (InterruptedException e) {
923          e.printStackTrace();
924        }
925      }
926    };
927    t.setDaemon(true);
928    t.start();
929    try {
930      logSplitter.splitWAL(logfiles[largestLogFile], null);
931      fail("Didn't throw!");
932    } catch (IOException ioe) {
933      assertTrue(ioe.toString().contains("Injected"));
934    } finally {
935      // Setting this to true will turn off the background thread dumper.
936      stop.set(true);
937    }
938  }
939
940  /**
941   * @param spiedFs should be instrumented for failure.
942   */
943  private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
944    generateWALs(-1);
945    useDifferentDFSClient();
946
947    try {
948      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
949      assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
950      assertFalse(fs.exists(WALDIR));
951    } catch (IOException e) {
952      fail("There shouldn't be any exception but: " + e.toString());
953    }
954  }
955
956  // Test for HBASE-3412
957  @Test
958  public void testMovedWALDuringRecovery() throws Exception {
959    // This partial mock will throw LEE for every file simulating
960    // files that were moved
961    FileSystem spiedFs = Mockito.spy(fs);
962    // The "File does not exist" part is very important,
963    // that's how it comes out of HDFS
964    Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).when(spiedFs)
965      .append(Mockito.<Path> any());
966    retryOverHdfsProblem(spiedFs);
967  }
968
969  @Test
970  public void testRetryOpenDuringRecovery() throws Exception {
971    FileSystem spiedFs = Mockito.spy(fs);
972    // The "Cannot obtain block length", "Could not obtain the last block",
973    // and "Blocklist for [^ ]* has changed.*" part is very important,
974    // that's how it comes out of HDFS. If HDFS changes the exception
975    // message, this test needs to be adjusted accordingly.
976    //
977    // When DFSClient tries to open a file, HDFS needs to locate
978    // the last block of the file and get its length. However, if the
979    // last block is under recovery, HDFS may have problem to obtain
980    // the block length, in which case, retry may help.
981    Mockito.doAnswer(new Answer<FSDataInputStream>() {
982      private final String[] errors = new String[] { "Cannot obtain block length",
983        "Could not obtain the last block", "Blocklist for " + OLDLOGDIR + " has changed" };
984      private int count = 0;
985
986      @Override
987      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
988        if (count < 3) {
989          throw new IOException(errors[count++]);
990        }
991        return (FSDataInputStream) invocation.callRealMethod();
992      }
993    }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt());
994    retryOverHdfsProblem(spiedFs);
995  }
996
997  @Test
998  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
999    generateWALs(1, 10, -1);
1000    FileStatus logfile = fs.listStatus(WALDIR)[0];
1001    useDifferentDFSClient();
1002
1003    final AtomicInteger count = new AtomicInteger();
1004
1005    CancelableProgressable localReporter = new CancelableProgressable() {
1006      @Override
1007      public boolean progress() {
1008        count.getAndIncrement();
1009        return false;
1010      }
1011    };
1012
1013    FileSystem spiedFs = Mockito.spy(fs);
1014    Mockito.doAnswer(new Answer<FSDataInputStream>() {
1015      @Override
1016      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
1017        Thread.sleep(1500); // Sleep a while and wait report status invoked
1018        return (FSDataInputStream) invocation.callRealMethod();
1019      }
1020    }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt());
1021
1022    try {
1023      conf.setInt("hbase.splitlog.report.period", 1000);
1024      boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
1025        Mockito.mock(SplitLogWorkerCoordination.class), wals, null);
1026      assertFalse(ret, "Log splitting should failed");
1027      assertTrue(count.get() > 0);
1028    } catch (IOException e) {
1029      fail("There shouldn't be any exception but: " + e.toString());
1030    } finally {
1031      // reset it back to its default value
1032      conf.setInt("hbase.splitlog.report.period", 59000);
1033    }
1034  }
1035
1036  /**
1037   * Test log split process with fake data and lots of edits to trigger threading issues.
1038   */
1039  @Test
1040  public void testThreading() throws Exception {
1041    doTestThreading(20000, 128 * 1024 * 1024, 0);
1042  }
1043
1044  /**
1045   * Test blocking behavior of the log split process if writers are writing slower than the reader
1046   * is reading.
1047   */
1048  @Test
1049  public void testThreadingSlowWriterSmallBuffer() throws Exception {
1050    // The logic of this test has conflict with the limit writers split logic, skip this test for
1051    // TestWALSplitBoundedLogWriterCreation
1052    assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation);
1053    doTestThreading(200, 1024, 50);
1054  }
1055
1056  /**
1057   * Sets up a log splitter with a mock reader and writer. The mock reader generates a specified
1058   * number of edits spread across 5 regions. The mock writer optionally sleeps for each edit it is
1059   * fed. After the split is complete, verifies that the statistics show the correct number of edits
1060   * output into each region.
1061   * @param numFakeEdits   number of fake edits to push through pipeline
1062   * @param bufferSize     size of in-memory buffer
1063   * @param writerSlowness writer threads will sleep this many ms per edit
1064   */
1065  private void doTestThreading(final int numFakeEdits, final int bufferSize,
1066    final int writerSlowness) throws Exception {
1067
1068    Configuration localConf = new Configuration(conf);
1069    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
1070
1071    // Create a fake log file (we'll override the reader to produce a stream of edits)
1072    Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
1073    FSDataOutputStream out = fs.create(logPath);
1074    out.close();
1075
1076    // Make region dirs for our destination regions so the output doesn't get skipped
1077    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
1078    makeRegionDirs(regions);
1079
1080    // Create a splitter that reads and writes the data without touching disk
1081    WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) {
1082      /* Produce a mock writer that doesn't write anywhere */
1083      @Override
1084      protected Writer createWriter(Path logfile) throws IOException {
1085        Writer mockWriter = Mockito.mock(Writer.class);
1086        Mockito.doAnswer(new Answer<Void>() {
1087          int expectedIndex = 0;
1088
1089          @Override
1090          public Void answer(InvocationOnMock invocation) {
1091            if (writerSlowness > 0) {
1092              try {
1093                Thread.sleep(writerSlowness);
1094              } catch (InterruptedException ie) {
1095                Thread.currentThread().interrupt();
1096              }
1097            }
1098            Entry entry = (Entry) invocation.getArgument(0);
1099            WALEdit edit = entry.getEdit();
1100            List<Cell> cells = edit.getCells();
1101            assertEquals(1, cells.size());
1102            Cell cell = cells.get(0);
1103
1104            // Check that the edits come in the right order.
1105            assertEquals(expectedIndex,
1106              Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
1107            expectedIndex++;
1108            return null;
1109          }
1110        }).when(mockWriter).append(Mockito.<Entry> any());
1111        return mockWriter;
1112      }
1113
1114      /* Produce a mock reader that generates fake entries */
1115      @Override
1116      protected WALStreamReader getReader(FileStatus file, boolean skipErrors,
1117        CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
1118        WALStreamReader mockReader = Mockito.mock(WALStreamReader.class);
1119        Mockito.doAnswer(new Answer<Entry>() {
1120          int index = 0;
1121
1122          @Override
1123          public Entry answer(InvocationOnMock invocation) throws Throwable {
1124            if (index >= numFakeEdits) {
1125              return null;
1126            }
1127
1128            // Generate r0 through r4 in round robin fashion
1129            int regionIdx = index % regions.size();
1130            byte region[] = new byte[] { (byte) 'r', (byte) (0x30 + regionIdx) };
1131
1132            Entry ret = createTestEntry(TABLE_NAME, region, Bytes.toBytes(index / regions.size()),
1133              FAMILY, QUALIFIER, VALUE, index);
1134            index++;
1135            return ret;
1136          }
1137        }).when(mockReader).next();
1138        return mockReader;
1139      }
1140    };
1141
1142    logSplitter.splitWAL(fs.getFileStatus(logPath), null);
1143
1144    // Verify number of written edits per region
1145    Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1146    for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
1147      LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey());
1148      assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
1149    }
1150    assertEquals(regions.size(), outputCounts.size(), "Should have as many outputs as regions");
1151  }
1152
1153  // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1154  @Test
1155  public void testSplitLogFileDeletedRegionDir() throws IOException {
1156    LOG.info("testSplitLogFileDeletedRegionDir");
1157    final String REGION = "region__1";
1158    REGIONS.clear();
1159    REGIONS.add(REGION);
1160
1161    generateWALs(1, 10, -1);
1162    useDifferentDFSClient();
1163
1164    Path regiondir = new Path(TABLEDIR, REGION);
1165    LOG.info("Region directory is" + regiondir);
1166    fs.delete(regiondir, true);
1167    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1168    assertFalse(fs.exists(regiondir));
1169  }
1170
1171  @Test
1172  public void testSplitLogFileEmpty() throws IOException {
1173    LOG.info("testSplitLogFileEmpty");
1174    // we won't create the hlog dir until getWAL got called, so
1175    // make dir here when testing empty log file
1176    fs.mkdirs(WALDIR);
1177    injectEmptyFile(".empty", true);
1178    useDifferentDFSClient();
1179
1180    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1181    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1182    assertFalse(fs.exists(tdir));
1183
1184    assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1185  }
1186
1187  @Test
1188  public void testSplitLogFileMultipleRegions() throws IOException {
1189    LOG.info("testSplitLogFileMultipleRegions");
1190    generateWALs(1, 10, -1);
1191    splitAndCount(1, 10);
1192  }
1193
1194  @Test
1195  public void testSplitLogFileFirstLineCorruptionLog() throws IOException {
1196    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
1197    generateWALs(1, 10, -1);
1198    FileStatus logfile = fs.listStatus(WALDIR)[0];
1199
1200    corruptWAL(logfile.getPath(), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1201
1202    useDifferentDFSClient();
1203    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1204
1205    final Path corruptDir =
1206      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1207    assertEquals(1, fs.listStatus(corruptDir).length);
1208  }
1209
1210  /**
1211   * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1212   */
1213  @Test
1214  public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1215    LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1216    // Generate wals for our destination region
1217    String regionName = "r0";
1218    final Path regiondir = new Path(TABLEDIR, regionName);
1219    REGIONS.clear();
1220    REGIONS.add(regionName);
1221    generateWALs(-1);
1222
1223    wals.getWAL(null);
1224    FileStatus[] logfiles = fs.listStatus(WALDIR);
1225    assertTrue(logfiles != null && logfiles.length > 0, "There should be some log file");
1226
1227    WALSplitter logSplitter =
1228      new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
1229        @Override
1230        protected Writer createWriter(Path logfile) throws IOException {
1231          Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1232          // After creating writer, simulate region's
1233          // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1234          // region and delete them, excluding files with '.temp' suffix.
1235          NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
1236          if (files != null && !files.isEmpty()) {
1237            for (Path file : files) {
1238              if (!this.walFS.delete(file, false)) {
1239                LOG.error("Failed delete of " + file);
1240              } else {
1241                LOG.debug("Deleted recovered.edits file=" + file);
1242              }
1243            }
1244          }
1245          return writer;
1246        }
1247      };
1248    try {
1249      logSplitter.splitWAL(logfiles[0], null);
1250    } catch (IOException e) {
1251      LOG.info(e.toString(), e);
1252      fail("Throws IOException when spliting "
1253        + "log, it is most likely because writing file does not "
1254        + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1255    }
1256    if (fs.exists(CORRUPTDIR)) {
1257      if (fs.listStatus(CORRUPTDIR).length > 0) {
1258        fail("There are some corrupt logs, "
1259          + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1260      }
1261    }
1262  }
1263
1264  @Test
1265  public void testRecoveredEditsStoragePolicy() throws IOException {
1266    conf.set(HConstants.WAL_STORAGE_POLICY, "ALL_SSD");
1267    try {
1268      Path path = createRecoveredEditsPathForRegion();
1269      assertEquals("ALL_SSD", fs.getStoragePolicy(path.getParent()).getName());
1270    } finally {
1271      conf.unset(HConstants.WAL_STORAGE_POLICY);
1272    }
1273  }
1274
1275  /**
1276   * See HBASE-27644, typically we should not have empty WALEdit but we should be able to process
1277   * it, instead of losing data after it.
1278   */
1279  @Test
1280  public void testEmptyWALEdit() throws IOException {
1281    final String region = "region__5";
1282    REGIONS.clear();
1283    REGIONS.add(region);
1284    makeRegionDirs(REGIONS);
1285    fs.mkdirs(WALDIR);
1286    Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5);
1287    generateEmptyEditWAL(path, Bytes.toBytes(region));
1288    useDifferentDFSClient();
1289
1290    Path regiondir = new Path(TABLEDIR, region);
1291    fs.mkdirs(regiondir);
1292    List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1293    // Make sure that WALSplitter generate the split file
1294    assertEquals(1, splitPaths.size());
1295
1296    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
1297    assertEquals(11, countWAL(originalLog));
1298    // we will skip the empty WAL when splitting
1299    assertEquals(10, countWAL(splitPaths.get(0)));
1300  }
1301
1302  private void generateEmptyEditWAL(Path path, byte[] region) throws IOException {
1303    fs.mkdirs(WALDIR);
1304    try (Writer writer = wals.createWALWriter(fs, path)) {
1305      long seq = 0;
1306      appendEmptyEntry(writer, TABLE_NAME, region, seq++);
1307      for (int i = 0; i < 10; i++) {
1308        appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++);
1309      }
1310    }
1311  }
1312
1313  private Writer generateWALs(int leaveOpen) throws IOException {
1314    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1315  }
1316
1317  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1318    return generateWALs(writers, entries, leaveOpen, 7);
1319  }
1320
1321  private void makeRegionDirs(List<String> regions) throws IOException {
1322    for (String region : regions) {
1323      LOG.debug("Creating dir for region " + region);
1324      fs.mkdirs(new Path(TABLEDIR, region));
1325    }
1326  }
1327
1328  /**
1329   * @param leaveOpen index to leave un-closed. -1 to close all.
1330   * @return the writer that's still open, or null if all were closed.
1331   */
1332  private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents)
1333    throws IOException {
1334    makeRegionDirs(REGIONS);
1335    fs.mkdirs(WALDIR);
1336    Writer[] ws = new Writer[writers];
1337    int seq = 0;
1338    int numRegionEventsAdded = 0;
1339    for (int i = 0; i < writers; i++) {
1340      ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1341      for (int j = 0; j < entries; j++) {
1342        int prefix = 0;
1343        for (String region : REGIONS) {
1344          String row_key = region + prefix++ + i + j;
1345          appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1346            QUALIFIER, VALUE, seq++);
1347
1348          if (numRegionEventsAdded < regionEvents) {
1349            numRegionEventsAdded++;
1350            appendRegionEvent(ws[i], region);
1351          }
1352        }
1353      }
1354      if (i != leaveOpen) {
1355        ws[i].close();
1356        LOG.info("Closing writer " + i);
1357      }
1358    }
1359    if (leaveOpen < 0 || leaveOpen >= writers) {
1360      return null;
1361    }
1362    return ws[leaveOpen];
1363  }
1364
1365  private Path[] getLogForRegion(TableName table, String region) throws IOException {
1366    Path tdir = CommonFSUtils.getWALTableDir(conf, table);
1367    @SuppressWarnings("deprecation")
1368    Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(
1369      HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region))));
1370    FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1371      @Override
1372      public boolean accept(Path p) {
1373        if (WALSplitUtil.isSequenceIdFile(p)) {
1374          return false;
1375        }
1376        return true;
1377      }
1378    });
1379    Path[] paths = new Path[files.length];
1380    for (int i = 0; i < files.length; i++) {
1381      paths[i] = files[i].getPath();
1382    }
1383    return paths;
1384  }
1385
1386  private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1387    FSDataOutputStream out;
1388    int fileSize = (int) fs.listStatus(path)[0].getLen();
1389
1390    FSDataInputStream in = fs.open(path);
1391    byte[] corrupted_bytes = new byte[fileSize];
1392    in.readFully(0, corrupted_bytes, 0, fileSize);
1393    in.close();
1394
1395    switch (corruption) {
1396      case APPEND_GARBAGE:
1397        fs.delete(path, false);
1398        out = fs.create(path);
1399        out.write(corrupted_bytes);
1400        out.write(Bytes.toBytes("-----"));
1401        closeOrFlush(close, out);
1402        break;
1403
1404      case INSERT_GARBAGE_ON_FIRST_LINE:
1405        fs.delete(path, false);
1406        out = fs.create(path);
1407        out.write(0);
1408        out.write(corrupted_bytes);
1409        closeOrFlush(close, out);
1410        break;
1411
1412      case INSERT_GARBAGE_IN_THE_MIDDLE:
1413        fs.delete(path, false);
1414        out = fs.create(path);
1415        int middle = (int) Math.floor(corrupted_bytes.length / 2);
1416        out.write(corrupted_bytes, 0, middle);
1417        out.write(0);
1418        out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1419        closeOrFlush(close, out);
1420        break;
1421
1422      case TRUNCATE:
1423        fs.delete(path, false);
1424        out = fs.create(path);
1425        out.write(corrupted_bytes, 0, fileSize
1426          - (32 + AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1427        closeOrFlush(close, out);
1428        break;
1429
1430      case TRUNCATE_TRAILER:
1431        fs.delete(path, false);
1432        out = fs.create(path);
1433        out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1434        closeOrFlush(close, out);
1435        break;
1436    }
1437  }
1438
1439  private void closeOrFlush(boolean close, FSDataOutputStream out) throws IOException {
1440    if (close) {
1441      out.close();
1442    } else {
1443      Method syncMethod = null;
1444      try {
1445        syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {});
1446      } catch (NoSuchMethodException e) {
1447        try {
1448          syncMethod = out.getClass().getMethod("sync", new Class<?>[] {});
1449        } catch (NoSuchMethodException ex) {
1450          throw new IOException(
1451            "This version of Hadoop supports " + "neither Syncable.sync() nor Syncable.hflush().");
1452        }
1453      }
1454      try {
1455        syncMethod.invoke(out, new Object[] {});
1456      } catch (Exception e) {
1457        throw new IOException(e);
1458      }
1459      // Not in 0out.hflush();
1460    }
1461  }
1462
1463  private int countWAL(Path log) throws IOException {
1464    int count = 0;
1465    try (WALStreamReader in = wals.createStreamReader(fs, log)) {
1466      while (in.next() != null) {
1467        count++;
1468      }
1469    }
1470    return count;
1471  }
1472
1473  private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1474    String output) throws IOException {
1475    WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1476    desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1477      .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1478      .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1479      .setFamilyName(ByteString.copyFrom(FAMILY))
1480      .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1481      .addAllCompactionInput(Arrays.asList(inputs)).addCompactionOutput(output);
1482
1483    WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1484    WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1485      EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1486    w.append(new Entry(key, edit));
1487    w.sync(false);
1488  }
1489
1490  private static void appendRegionEvent(Writer w, String region) throws IOException {
1491    WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1492      WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, TABLE_NAME.toBytes(),
1493      Bytes.toBytes(region), Bytes.toBytes(String.valueOf(region.hashCode())), 1,
1494      ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>> of());
1495    final long time = EnvironmentEdgeManager.currentTime();
1496    final WALKeyImpl walKey =
1497      new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, HConstants.DEFAULT_CLUSTER_ID);
1498    WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
1499    w.append(new Entry(walKey, we));
1500    w.sync(false);
1501  }
1502
1503  private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
1504    byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException {
1505    LOG.info(Thread.currentThread().getName() + " append");
1506    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1507    LOG.info(Thread.currentThread().getName() + " sync");
1508    writer.sync(false);
1509    return seq;
1510  }
1511
1512  private static Entry createTestEntry(TableName table, byte[] region, byte[] row, byte[] family,
1513    byte[] qualifier, byte[] value, long seq) {
1514    long time = System.nanoTime();
1515
1516    final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1517    WALEdit edit = new WALEdit();
1518    edit.add(cell);
1519    return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit);
1520  }
1521
1522  private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq)
1523    throws IOException {
1524    LOG.info(Thread.currentThread().getName() + " append");
1525    writer.append(createEmptyEntry(table, region, seq));
1526    LOG.info(Thread.currentThread().getName() + " sync");
1527    writer.sync(false);
1528    return seq;
1529  }
1530
1531  private static Entry createEmptyEntry(TableName table, byte[] region, long seq) {
1532    long time = System.nanoTime();
1533    return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID),
1534      new WALEdit());
1535  }
1536
1537  private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1538    Writer writer =
1539      WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1540    if (closeFile) {
1541      writer.close();
1542    }
1543  }
1544
1545  private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1546    try (WALStreamReader in1 = wals.createStreamReader(fs, p1);
1547      WALStreamReader in2 = wals.createStreamReader(fs, p2)) {
1548      Entry entry1;
1549      Entry entry2;
1550      while ((entry1 = in1.next()) != null) {
1551        entry2 = in2.next();
1552        if (
1553          (entry1.getKey().compareTo(entry2.getKey()) != 0)
1554            || (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
1555        ) {
1556          return false;
1557        }
1558      }
1559    }
1560    return true;
1561  }
1562}