001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
021import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey;
022import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
023import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import static org.junit.Assume.assumeFalse;
029
030import java.io.FileNotFoundException;
031import java.io.IOException;
032import java.lang.reflect.Method;
033import java.security.PrivilegedExceptionAction;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collections;
037import java.util.HashMap;
038import java.util.HashSet;
039import java.util.List;
040import java.util.Map;
041import java.util.NavigableSet;
042import java.util.Objects;
043import java.util.Set;
044import java.util.concurrent.atomic.AtomicBoolean;
045import java.util.concurrent.atomic.AtomicInteger;
046import java.util.concurrent.atomic.AtomicLong;
047import java.util.stream.Collectors;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.FSDataInputStream;
050import org.apache.hadoop.fs.FSDataOutputStream;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.FileUtil;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.PathFilter;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.HBaseClassTestRule;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseTestingUtil;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.KeyValue;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableName;
064import org.apache.hadoop.hbase.client.RegionInfo;
065import org.apache.hadoop.hbase.client.RegionInfoBuilder;
066import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
067import org.apache.hadoop.hbase.master.SplitLogManager;
068import org.apache.hadoop.hbase.regionserver.HRegion;
069import org.apache.hadoop.hbase.regionserver.LastSequenceId;
070import org.apache.hadoop.hbase.regionserver.RegionServerServices;
071import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
072import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader;
073import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
074import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
075import org.apache.hadoop.hbase.security.User;
076import org.apache.hadoop.hbase.testclassification.LargeTests;
077import org.apache.hadoop.hbase.testclassification.RegionServerTests;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.CancelableProgressable;
080import org.apache.hadoop.hbase.util.CommonFSUtils;
081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
082import org.apache.hadoop.hbase.util.Threads;
083import org.apache.hadoop.hbase.wal.WAL.Entry;
084import org.apache.hadoop.hbase.wal.WALProvider.Writer;
085import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
086import org.apache.hadoop.hdfs.DFSTestUtil;
087import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
088import org.apache.hadoop.ipc.RemoteException;
089import org.junit.After;
090import org.junit.AfterClass;
091import org.junit.Before;
092import org.junit.BeforeClass;
093import org.junit.ClassRule;
094import org.junit.Rule;
095import org.junit.Test;
096import org.junit.experimental.categories.Category;
097import org.junit.rules.TestName;
098import org.mockito.Mockito;
099import org.mockito.invocation.InvocationOnMock;
100import org.mockito.stubbing.Answer;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103
104import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
105import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
106import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
107import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
108
109import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
112
113/**
114 * Testing {@link WAL} splitting code.
115 */
116@Category({ RegionServerTests.class, LargeTests.class })
117public class TestWALSplit {
118  @ClassRule
119  public static final HBaseClassTestRule CLASS_RULE =
120    HBaseClassTestRule.forClass(TestWALSplit.class);
121  private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
122
123  private static Configuration conf;
124  private FileSystem fs;
125
126  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
127
128  private Path HBASEDIR;
129  private Path HBASELOGDIR;
130  private Path WALDIR;
131  private Path OLDLOGDIR;
132  private Path CORRUPTDIR;
133  private Path TABLEDIR;
134  private String TMPDIRNAME;
135
136  private static final int NUM_WRITERS = 10;
137  private static final int ENTRIES = 10; // entries per writer per region
138
139  private static final String FILENAME_BEING_SPLIT = "testfile";
140  private static final TableName TABLE_NAME = TableName.valueOf("t1");
141  private static final byte[] FAMILY = Bytes.toBytes("f1");
142  private static final byte[] QUALIFIER = Bytes.toBytes("q1");
143  private static final byte[] VALUE = Bytes.toBytes("v1");
144  private static final String WAL_FILE_PREFIX = "wal.dat.";
145  private static List<String> REGIONS = new ArrayList<>();
146  private static String ROBBER;
147  private static String ZOMBIE;
148  private static String[] GROUP = new String[] { "supergroup" };
149
150  static enum Corruptions {
151    INSERT_GARBAGE_ON_FIRST_LINE,
152    INSERT_GARBAGE_IN_THE_MIDDLE,
153    APPEND_GARBAGE,
154    TRUNCATE,
155    TRUNCATE_TRAILER
156  }
157
158  @BeforeClass
159  public static void setUpBeforeClass() throws Exception {
160    conf = TEST_UTIL.getConfiguration();
161    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
162    conf.setClass(FSHLogProvider.WRITER_IMPL, InstrumentedLogWriter.class, Writer.class);
163    // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
164    System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
165    // Create fake maping user to group and set it to the conf.
166    Map<String, String[]> u2g_map = new HashMap<>(2);
167    ROBBER = User.getCurrent().getName() + "-robber";
168    ZOMBIE = User.getCurrent().getName() + "-zombie";
169    u2g_map.put(ROBBER, GROUP);
170    u2g_map.put(ZOMBIE, GROUP);
171    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
172    conf.setInt("dfs.heartbeat.interval", 1);
173    TEST_UTIL.startMiniDFSCluster(2);
174  }
175
176  @AfterClass
177  public static void tearDownAfterClass() throws Exception {
178    TEST_UTIL.shutdownMiniDFSCluster();
179  }
180
181  @Rule
182  public TestName name = new TestName();
183  private WALFactory wals = null;
184
185  @Before
186  public void setUp() throws Exception {
187    LOG.info("Cleaning up cluster for new test.");
188    fs = TEST_UTIL.getDFSCluster().getFileSystem();
189    HBASEDIR = TEST_UTIL.createRootDir();
190    HBASELOGDIR = TEST_UTIL.createWALRootDir();
191    OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
192    CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
193    TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
194    TMPDIRNAME =
195      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
196    REGIONS.clear();
197    Collections.addAll(REGIONS, "bbb", "ccc");
198    InstrumentedLogWriter.activateFailure = false;
199    wals = new WALFactory(conf, name.getMethodName());
200    WALDIR = new Path(HBASELOGDIR, AbstractFSWALProvider.getWALDirectoryName(ServerName
201      .valueOf(name.getMethodName(), 16010, EnvironmentEdgeManager.currentTime()).toString()));
202    // fs.mkdirs(WALDIR);
203  }
204
205  @After
206  public void tearDown() throws Exception {
207    try {
208      wals.close();
209    } catch (IOException exception) {
210      // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
211      LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if"
212        + " you see a failure look here.");
213      LOG.debug("exception details", exception);
214    } finally {
215      wals = null;
216      fs.delete(HBASEDIR, true);
217      fs.delete(HBASELOGDIR, true);
218    }
219  }
220
221  /**
222   * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
223   * Ensures we do not lose edits.
224   */
225  @Test
226  public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
227    final AtomicLong counter = new AtomicLong(0);
228    AtomicBoolean stop = new AtomicBoolean(false);
229    // Region we'll write edits too and then later examine to make sure they all made it in.
230    final String region = REGIONS.get(0);
231    final int numWriters = 3;
232    Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
233    try {
234      long startCount = counter.get();
235      zombie.start();
236      // Wait till writer starts going.
237      while (startCount == counter.get())
238        Threads.sleep(1);
239      // Give it a second to write a few appends.
240      Threads.sleep(1000);
241      final Configuration conf2 = HBaseConfiguration.create(conf);
242      final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
243      int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
244        @Override
245        public Integer run() throws Exception {
246          StringBuilder ls =
247            new StringBuilder("Contents of WALDIR (").append(WALDIR).append("):\n");
248          for (FileStatus status : fs.listStatus(WALDIR)) {
249            ls.append("\t").append(status.toString()).append("\n");
250          }
251          LOG.debug(Objects.toString(ls));
252          LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
253          WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
254          LOG.info("Finished splitting out from under zombie.");
255          Path[] logfiles = getLogForRegion(TABLE_NAME, region);
256          assertEquals("wrong number of split files for region", numWriters, logfiles.length);
257          int count = 0;
258          for (Path logfile : logfiles) {
259            count += countWAL(logfile);
260          }
261          return count;
262        }
263      });
264      LOG.info("zombie=" + counter.get() + ", robber=" + count);
265      assertTrue(
266        "The log file could have at most 1 extra log entry, but can't have less. "
267          + "Zombie could write " + counter.get() + " and logfile had only " + count,
268        counter.get() == count || counter.get() + 1 == count);
269    } finally {
270      stop.set(true);
271      zombie.interrupt();
272      Threads.threadDumpingIsAlive(zombie);
273    }
274  }
275
276  /**
277   * This thread will keep writing to a 'wal' file even after the split process has started. It
278   * simulates a region server that was considered dead but woke up and wrote some more to the last
279   * log entry. Does its writing as an alternate user in another filesystem instance to simulate
280   * better it being a regionserver.
281   */
282  private class ZombieLastLogWriterRegionServer extends Thread {
283    final AtomicLong editsCount;
284    final AtomicBoolean stop;
285    final int numOfWriters;
286    /**
287     * Region to write edits for.
288     */
289    final String region;
290    final User user;
291
292    public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
293      final String region, final int writers) throws IOException, InterruptedException {
294      super("ZombieLastLogWriterRegionServer");
295      setDaemon(true);
296      this.stop = stop;
297      this.editsCount = counter;
298      this.region = region;
299      this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
300      numOfWriters = writers;
301    }
302
303    @Override
304    public void run() {
305      try {
306        doWriting();
307      } catch (IOException e) {
308        LOG.warn(getName() + " Writer exiting " + e);
309      } catch (InterruptedException e) {
310        LOG.warn(getName() + " Writer exiting " + e);
311      }
312    }
313
314    private void doWriting() throws IOException, InterruptedException {
315      this.user.runAs(new PrivilegedExceptionAction<Object>() {
316        @Override
317        public Object run() throws Exception {
318          // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose
319          // index we supply here.
320          int walToKeepOpen = numOfWriters - 1;
321          // The below method writes numOfWriters files each with ENTRIES entries for a total of
322          // numOfWriters * ENTRIES added per column family in the region.
323          Writer writer = null;
324          try {
325            writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
326          } catch (IOException e1) {
327            throw new RuntimeException("Failed", e1);
328          }
329          // Update counter so has all edits written so far.
330          editsCount.addAndGet(numOfWriters * ENTRIES);
331          loop(writer);
332          // If we've been interruped, then things should have shifted out from under us.
333          // closing should error
334          try {
335            writer.close();
336            fail("Writing closing after parsing should give an error.");
337          } catch (IOException exception) {
338            LOG.debug("ignoring error when closing final writer.", exception);
339          }
340          return null;
341        }
342      });
343    }
344
345    private void loop(final Writer writer) {
346      byte[] regionBytes = Bytes.toBytes(this.region);
347      while (!stop.get()) {
348        try {
349          long seq = appendEntry(writer, TABLE_NAME, regionBytes,
350            Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
351          long count = editsCount.incrementAndGet();
352          LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
353          try {
354            Thread.sleep(1);
355          } catch (InterruptedException e) {
356            //
357          }
358        } catch (IOException ex) {
359          LOG.error(getName() + " ex " + ex.toString());
360          if (ex instanceof RemoteException) {
361            LOG.error("Juliet: got RemoteException " + ex.getMessage() + " while writing "
362              + (editsCount.get() + 1));
363          } else {
364            LOG.error(getName() + " failed to write....at " + editsCount.get());
365            fail("Failed to write " + editsCount.get());
366          }
367          break;
368        } catch (Throwable t) {
369          LOG.error(getName() + " HOW? " + t);
370          LOG.debug("exception details", t);
371          break;
372        }
373      }
374      LOG.info(getName() + " Writer exiting");
375    }
376  }
377
378  // If another worker is assigned to split a WAl and last worker is still running, both should not
379  // impact each other's progress
380  @Test
381  public void testTwoWorkerSplittingSameWAL() throws IOException, InterruptedException {
382    int numWriter = 1, entries = 10;
383    generateWALs(numWriter, entries, -1, 0);
384    FileStatus logfile = fs.listStatus(WALDIR)[0];
385    FileSystem spiedFs = Mockito.spy(fs);
386    RegionServerServices zombieRSServices = Mockito.mock(RegionServerServices.class);
387    RegionServerServices newWorkerRSServices = Mockito.mock(RegionServerServices.class);
388    Mockito.when(zombieRSServices.getServerName())
389      .thenReturn(ServerName.valueOf("zombie-rs.abc.com,1234,1234567890"));
390    Mockito.when(newWorkerRSServices.getServerName())
391      .thenReturn(ServerName.valueOf("worker-rs.abc.com,1234,1234569870"));
392    Thread zombieWorker = new SplitWALWorker(logfile, spiedFs, zombieRSServices);
393    Thread newWorker = new SplitWALWorker(logfile, spiedFs, newWorkerRSServices);
394    zombieWorker.start();
395    newWorker.start();
396    newWorker.join();
397    zombieWorker.join();
398
399    for (String region : REGIONS) {
400      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
401      assertEquals("wrong number of split files for region", numWriter, logfiles.length);
402
403      int count = 0;
404      for (Path lf : logfiles) {
405        count += countWAL(lf);
406      }
407      assertEquals("wrong number of edits for region " + region, entries, count);
408    }
409  }
410
411  private class SplitWALWorker extends Thread implements LastSequenceId {
412    final FileStatus logfile;
413    final FileSystem fs;
414    final RegionServerServices rsServices;
415
416    public SplitWALWorker(FileStatus logfile, FileSystem fs, RegionServerServices rsServices) {
417      super(rsServices.getServerName().toShortString());
418      setDaemon(true);
419      this.fs = fs;
420      this.logfile = logfile;
421      this.rsServices = rsServices;
422    }
423
424    @Override
425    public void run() {
426      try {
427        boolean ret =
428          WALSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, null, this, null, wals, rsServices);
429        assertTrue("Both splitting should pass", ret);
430      } catch (IOException e) {
431        LOG.warn(getName() + " Worker exiting " + e);
432      }
433    }
434
435    @Override
436    public ClusterStatusProtos.RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
437      return ClusterStatusProtos.RegionStoreSequenceIds.newBuilder()
438        .setLastFlushedSequenceId(HConstants.NO_SEQNUM).build();
439    }
440  }
441
442  /**
443   * @see "https://issues.apache.org/jira/browse/HBASE-3020"
444   */
445  @Test
446  public void testRecoveredEditsPathForMeta() throws IOException {
447    Path p = createRecoveredEditsPathForRegion();
448    String parentOfParent = p.getParent().getParent().getName();
449    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
450  }
451
452  /**
453   * Test old recovered edits file doesn't break WALSplitter. This is useful in upgrading old
454   * instances.
455   */
456  @Test
457  public void testOldRecoveredEditsFileSidelined() throws IOException {
458    Path p = createRecoveredEditsPathForRegion();
459    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
460    Path regiondir = new Path(tdir, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
461    fs.mkdirs(regiondir);
462    Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
463    assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
464    fs.createNewFile(parent); // create a recovered.edits file
465    String parentOfParent = p.getParent().getParent().getName();
466    assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
467    WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
468  }
469
470  private Path createRecoveredEditsPathForRegion() throws IOException {
471    byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
472    Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
473      FILENAME_BEING_SPLIT, TMPDIRNAME, conf, "");
474    return p;
475  }
476
477  @Test
478  public void testHasRecoveredEdits() throws IOException {
479    Path p = createRecoveredEditsPathForRegion();
480    assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
481    String renamedEdit = p.getName().split("-")[0];
482    fs.createNewFile(new Path(p.getParent(), renamedEdit));
483    assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
484  }
485
486  private void useDifferentDFSClient() throws IOException {
487    // make fs act as a different client now
488    // initialize will create a new DFSClient with a new client ID
489    fs.initialize(fs.getUri(), conf);
490  }
491
492  @Test
493  public void testSplitPreservesEdits() throws IOException {
494    final String REGION = "region__1";
495    REGIONS.clear();
496    REGIONS.add(REGION);
497
498    generateWALs(1, 10, -1, 0);
499    useDifferentDFSClient();
500    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
501    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
502    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
503    assertEquals(1, splitLog.length);
504
505    assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
506  }
507
508  @Test
509  public void testSplitRemovesRegionEventsEdits() throws IOException {
510    final String REGION = "region__1";
511    REGIONS.clear();
512    REGIONS.add(REGION);
513
514    generateWALs(1, 10, -1, 100);
515    useDifferentDFSClient();
516    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
517    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
518    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
519    assertEquals(1, splitLog.length);
520
521    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
522    // split log should only have the test edits
523    assertEquals(10, countWAL(splitLog[0]));
524  }
525
526  @Test
527  public void testSplitLeavesCompactionEventsEdits() throws IOException {
528    RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
529    REGIONS.clear();
530    REGIONS.add(hri.getEncodedName());
531    Path regionDir =
532      new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
533    LOG.info("Creating region directory: " + regionDir);
534    assertTrue(fs.mkdirs(regionDir));
535
536    Writer writer = generateWALs(1, 10, 0, 10);
537    String[] compactInputs = new String[] { "file1", "file2", "file3" };
538    String compactOutput = "file4";
539    appendCompactionEvent(writer, hri, compactInputs, compactOutput);
540    writer.close();
541
542    useDifferentDFSClient();
543    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
544
545    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
546    // original log should have 10 test edits, 10 region markers, 1 compaction marker
547    assertEquals(21, countWAL(originalLog));
548
549    Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
550    assertEquals(1, splitLog.length);
551
552    assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
553    // split log should have 10 test edits plus 1 compaction marker
554    assertEquals(11, countWAL(splitLog[0]));
555  }
556
557  /**
558   * Tests that WalSplitter ignores replication marker edits.
559   */
560  @Test
561  public void testSplitRemovesReplicationMarkerEdits() throws IOException {
562    RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO;
563    Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1");
564    generateReplicationMarkerEdits(path, regionInfo);
565    useDifferentDFSClient();
566    List<FileStatus> logFiles =
567      SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null);
568    assertEquals(1, logFiles.size());
569    assertEquals(path, logFiles.get(0).getPath());
570    List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
571    // Make sure that WALSplitter doesn't fail.
572    assertEquals(0, splitPaths.size());
573  }
574
575  private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException {
576    long timestamp = EnvironmentEdgeManager.currentTime();
577    fs.mkdirs(WALDIR);
578    try (Writer writer = wals.createWALWriter(fs, path)) {
579      WALProtos.ReplicationMarkerDescriptor.Builder builder =
580        WALProtos.ReplicationMarkerDescriptor.newBuilder();
581      builder.setWalName("wal-name");
582      builder.setRegionServerName("rs-name");
583      builder.setOffset(0L);
584      WALProtos.ReplicationMarkerDescriptor desc = builder.build();
585      appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(),
586        getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1);
587    }
588  }
589
590  /**
591   * @param expectedEntries -1 to not assert
592   * @return the count across all regions
593   */
594  private int splitAndCount(final int expectedFiles, final int expectedEntries) throws IOException {
595    useDifferentDFSClient();
596    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
597    int result = 0;
598    for (String region : REGIONS) {
599      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
600      assertEquals(expectedFiles, logfiles.length);
601      int count = 0;
602      for (Path logfile : logfiles) {
603        count += countWAL(logfile);
604      }
605      if (-1 != expectedEntries) {
606        assertEquals(expectedEntries, count);
607      }
608      result += count;
609    }
610    return result;
611  }
612
613  @Test
614  public void testEmptyLogFiles() throws IOException {
615    testEmptyLogFiles(true);
616  }
617
618  @Test
619  public void testEmptyOpenLogFiles() throws IOException {
620    testEmptyLogFiles(false);
621  }
622
623  private void testEmptyLogFiles(final boolean close) throws IOException {
624    // we won't create the hlog dir until getWAL got called, so
625    // make dir here when testing empty log file
626    fs.mkdirs(WALDIR);
627    injectEmptyFile(".empty", close);
628    generateWALs(Integer.MAX_VALUE);
629    injectEmptyFile("empty", close);
630    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
631  }
632
633  @Test
634  public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
635    // generate logs but leave wal.dat.5 open.
636    generateWALs(5);
637    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
638  }
639
640  @Test
641  public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
642    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
643    generateWALs(Integer.MAX_VALUE);
644    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.APPEND_GARBAGE, true);
645    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
646  }
647
648  @Test
649  public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
650    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
651    generateWALs(Integer.MAX_VALUE);
652    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE,
653      true);
654    splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); // 1 corrupt
655  }
656
657  @Test
658  public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
659    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
660    generateWALs(Integer.MAX_VALUE);
661    corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE,
662      false);
663    // the entries in the original logs are alternating regions
664    // considering the sequence file header, the middle corruption should
665    // affect at least half of the entries
666    int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
667    int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
668    int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
669    assertTrue("The file up to the corrupted area hasn't been parsed",
670      REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
671  }
672
673  @Test
674  public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
675    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
676    List<FaultyProtobufWALStreamReader.FailureType> failureTypes =
677      Arrays.asList(FaultyProtobufWALStreamReader.FailureType.values()).stream()
678        .filter(x -> x != FaultyProtobufWALStreamReader.FailureType.NONE)
679        .collect(Collectors.toList());
680    for (FaultyProtobufWALStreamReader.FailureType failureType : failureTypes) {
681      final Set<String> walDirContents = splitCorruptWALs(failureType);
682      final Set<String> archivedLogs = new HashSet<>();
683      final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
684      for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
685        archived.append("\n\t").append(log.toString());
686        archivedLogs.add(log.getPath().getName());
687      }
688      LOG.debug(archived.toString());
689      assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs,
690        walDirContents);
691    }
692  }
693
694  /**
695   * @return set of wal names present prior to split attempt.
696   * @throws IOException if the split process fails
697   */
698  private Set<String> splitCorruptWALs(final FaultyProtobufWALStreamReader.FailureType failureType)
699    throws IOException {
700    String backupClass = conf.get(WALFactory.WAL_STREAM_READER_CLASS_IMPL);
701    InstrumentedLogWriter.activateFailure = false;
702
703    try {
704      conf.setClass(WALFactory.WAL_STREAM_READER_CLASS_IMPL, FaultyProtobufWALStreamReader.class,
705        WALStreamReader.class);
706      conf.set("faultyprotobuflogreader.failuretype", failureType.name());
707      // Clean up from previous tests or previous loop
708      try {
709        wals.shutdown();
710      } catch (IOException exception) {
711        // since we're splitting out from under the factory, we should expect some closing failures.
712        LOG.debug("Ignoring problem closing WALFactory.", exception);
713      }
714      wals.close();
715      try {
716        for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
717          fs.delete(log.getPath(), true);
718        }
719      } catch (FileNotFoundException exception) {
720        LOG.debug("no previous CORRUPTDIR to clean.");
721      }
722      // change to the faulty reader
723      wals = new WALFactory(conf, name.getMethodName());
724      generateWALs(-1);
725      // Our reader will render all of these files corrupt.
726      final Set<String> walDirContents = new HashSet<>();
727      for (FileStatus status : fs.listStatus(WALDIR)) {
728        walDirContents.add(status.getPath().getName());
729      }
730      useDifferentDFSClient();
731      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
732      return walDirContents;
733    } finally {
734      if (backupClass != null) {
735        conf.set(WALFactory.WAL_STREAM_READER_CLASS_IMPL, backupClass);
736      } else {
737        conf.unset(WALFactory.WAL_STREAM_READER_CLASS_IMPL);
738      }
739    }
740  }
741
742  @Test(expected = IOException.class)
743  public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
744    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
745    splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING);
746  }
747
748  @Test
749  public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
750    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
751    try {
752      splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING);
753    } catch (IOException e) {
754      LOG.debug("split with 'skip errors' set to 'false' correctly threw");
755    }
756    assertEquals("if skip.errors is false all files should remain in place", NUM_WRITERS,
757      fs.listStatus(WALDIR).length);
758  }
759
760  private void ignoreCorruption(final Corruptions corruption, final int entryCount,
761    final int expectedCount) throws IOException {
762    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
763
764    final String REGION = "region__1";
765    REGIONS.clear();
766    REGIONS.add(REGION);
767
768    Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
769    generateWALs(1, entryCount, -1, 0);
770    corruptWAL(c1, corruption, true);
771
772    useDifferentDFSClient();
773    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
774
775    Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
776    assertEquals(1, splitLog.length);
777
778    int actualCount = 0;
779    try (WALStreamReader in = wals.createStreamReader(fs, splitLog[0])) {
780      while (in.next() != null) {
781        ++actualCount;
782      }
783    }
784    assertEquals(expectedCount, actualCount);
785
786    // should not have stored the EOF files as corrupt
787    FileStatus[] archivedLogs =
788      fs.exists(CORRUPTDIR) ? fs.listStatus(CORRUPTDIR) : new FileStatus[0];
789    assertEquals(0, archivedLogs.length);
790
791  }
792
793  @Test
794  public void testEOFisIgnored() throws IOException {
795    int entryCount = 10;
796    ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount - 1);
797  }
798
799  @Test
800  public void testCorruptWALTrailer() throws IOException {
801    int entryCount = 10;
802    ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
803  }
804
805  @Test
806  public void testLogsGetArchivedAfterSplit() throws IOException {
807    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
808    generateWALs(-1);
809    useDifferentDFSClient();
810    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
811    FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
812    assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
813  }
814
815  @Test
816  public void testSplit() throws IOException {
817    generateWALs(-1);
818    splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
819  }
820
821  @Test
822  public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException {
823    generateWALs(-1);
824    useDifferentDFSClient();
825    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
826    FileStatus[] statuses = null;
827    try {
828      statuses = fs.listStatus(WALDIR);
829      if (statuses != null) {
830        fail("Files left in log dir: " + Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
831      }
832    } catch (FileNotFoundException e) {
833      // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
834    }
835  }
836
837  @Test(expected = IOException.class)
838  public void testSplitWillFailIfWritingToRegionFails() throws Exception {
839    // leave 5th log open so we could append the "trap"
840    Writer writer = generateWALs(4);
841    useDifferentDFSClient();
842
843    String region = "break";
844    Path regiondir = new Path(TABLEDIR, region);
845    fs.mkdirs(regiondir);
846
847    InstrumentedLogWriter.activateFailure = false;
848    appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes("r" + 999), FAMILY,
849      QUALIFIER, VALUE, 0);
850    writer.close();
851
852    try {
853      InstrumentedLogWriter.activateFailure = true;
854      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
855    } catch (IOException e) {
856      assertTrue(e.getMessage()
857        .contains("This exception is instrumented and should only be thrown for testing"));
858      throw e;
859    } finally {
860      InstrumentedLogWriter.activateFailure = false;
861    }
862  }
863
864  @Test
865  public void testSplitDeletedRegion() throws IOException {
866    REGIONS.clear();
867    String region = "region_that_splits";
868    REGIONS.add(region);
869
870    generateWALs(1);
871    useDifferentDFSClient();
872
873    Path regiondir = new Path(TABLEDIR, region);
874    fs.delete(regiondir, true);
875    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
876    assertFalse(fs.exists(regiondir));
877  }
878
879  @Test
880  public void testIOEOnOutputThread() throws Exception {
881    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
882
883    generateWALs(-1);
884    useDifferentDFSClient();
885    FileStatus[] logfiles = fs.listStatus(WALDIR);
886    assertTrue("There should be some log file", logfiles != null && logfiles.length > 0);
887    // wals with no entries (like the one we don't use in the factory)
888    // won't cause a failure since nothing will ever be written.
889    // pick the largest one since it's most likely to have entries.
890    int largestLogFile = 0;
891    long largestSize = 0;
892    for (int i = 0; i < logfiles.length; i++) {
893      if (logfiles[i].getLen() > largestSize) {
894        largestLogFile = i;
895        largestSize = logfiles[i].getLen();
896      }
897    }
898    assertTrue("There should be some log greater than size 0.", 0 < largestSize);
899    // Set up a splitter that will throw an IOE on the output side
900    WALSplitter logSplitter =
901      new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
902        @Override
903        protected Writer createWriter(Path logfile) throws IOException {
904          Writer mockWriter = Mockito.mock(Writer.class);
905          Mockito.doThrow(new IOException("Injected")).when(mockWriter)
906            .append(Mockito.<Entry> any());
907          return mockWriter;
908        }
909      };
910    // Set up a background thread dumper. Needs a thread to depend on and then we need to run
911    // the thread dumping in a background thread so it does not hold up the test.
912    final AtomicBoolean stop = new AtomicBoolean(false);
913    final Thread someOldThread = new Thread("Some-old-thread") {
914      @Override
915      public void run() {
916        while (!stop.get())
917          Threads.sleep(10);
918      }
919    };
920    someOldThread.setDaemon(true);
921    someOldThread.start();
922    final Thread t = new Thread("Background-thread-dumper") {
923      @Override
924      public void run() {
925        try {
926          Threads.threadDumpingIsAlive(someOldThread);
927        } catch (InterruptedException e) {
928          e.printStackTrace();
929        }
930      }
931    };
932    t.setDaemon(true);
933    t.start();
934    try {
935      logSplitter.splitWAL(logfiles[largestLogFile], null);
936      fail("Didn't throw!");
937    } catch (IOException ioe) {
938      assertTrue(ioe.toString().contains("Injected"));
939    } finally {
940      // Setting this to true will turn off the background thread dumper.
941      stop.set(true);
942    }
943  }
944
945  /**
946   * @param spiedFs should be instrumented for failure.
947   */
948  private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
949    generateWALs(-1);
950    useDifferentDFSClient();
951
952    try {
953      WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
954      assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
955      assertFalse(fs.exists(WALDIR));
956    } catch (IOException e) {
957      fail("There shouldn't be any exception but: " + e.toString());
958    }
959  }
960
961  // Test for HBASE-3412
962  @Test
963  public void testMovedWALDuringRecovery() throws Exception {
964    // This partial mock will throw LEE for every file simulating
965    // files that were moved
966    FileSystem spiedFs = Mockito.spy(fs);
967    // The "File does not exist" part is very important,
968    // that's how it comes out of HDFS
969    Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).when(spiedFs)
970      .append(Mockito.<Path> any());
971    retryOverHdfsProblem(spiedFs);
972  }
973
974  @Test
975  public void testRetryOpenDuringRecovery() throws Exception {
976    FileSystem spiedFs = Mockito.spy(fs);
977    // The "Cannot obtain block length", "Could not obtain the last block",
978    // and "Blocklist for [^ ]* has changed.*" part is very important,
979    // that's how it comes out of HDFS. If HDFS changes the exception
980    // message, this test needs to be adjusted accordingly.
981    //
982    // When DFSClient tries to open a file, HDFS needs to locate
983    // the last block of the file and get its length. However, if the
984    // last block is under recovery, HDFS may have problem to obtain
985    // the block length, in which case, retry may help.
986    Mockito.doAnswer(new Answer<FSDataInputStream>() {
987      private final String[] errors = new String[] { "Cannot obtain block length",
988        "Could not obtain the last block", "Blocklist for " + OLDLOGDIR + " has changed" };
989      private int count = 0;
990
991      @Override
992      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
993        if (count < 3) {
994          throw new IOException(errors[count++]);
995        }
996        return (FSDataInputStream) invocation.callRealMethod();
997      }
998    }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt());
999    retryOverHdfsProblem(spiedFs);
1000  }
1001
1002  @Test
1003  public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
1004    generateWALs(1, 10, -1);
1005    FileStatus logfile = fs.listStatus(WALDIR)[0];
1006    useDifferentDFSClient();
1007
1008    final AtomicInteger count = new AtomicInteger();
1009
1010    CancelableProgressable localReporter = new CancelableProgressable() {
1011      @Override
1012      public boolean progress() {
1013        count.getAndIncrement();
1014        return false;
1015      }
1016    };
1017
1018    FileSystem spiedFs = Mockito.spy(fs);
1019    Mockito.doAnswer(new Answer<FSDataInputStream>() {
1020      @Override
1021      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
1022        Thread.sleep(1500); // Sleep a while and wait report status invoked
1023        return (FSDataInputStream) invocation.callRealMethod();
1024      }
1025    }).when(spiedFs).open(Mockito.<Path> any(), Mockito.anyInt());
1026
1027    try {
1028      conf.setInt("hbase.splitlog.report.period", 1000);
1029      boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
1030        Mockito.mock(SplitLogWorkerCoordination.class), wals, null);
1031      assertFalse("Log splitting should failed", ret);
1032      assertTrue(count.get() > 0);
1033    } catch (IOException e) {
1034      fail("There shouldn't be any exception but: " + e.toString());
1035    } finally {
1036      // reset it back to its default value
1037      conf.setInt("hbase.splitlog.report.period", 59000);
1038    }
1039  }
1040
1041  /**
1042   * Test log split process with fake data and lots of edits to trigger threading issues.
1043   */
1044  @Test
1045  public void testThreading() throws Exception {
1046    doTestThreading(20000, 128 * 1024 * 1024, 0);
1047  }
1048
1049  /**
1050   * Test blocking behavior of the log split process if writers are writing slower than the reader
1051   * is reading.
1052   */
1053  @Test
1054  public void testThreadingSlowWriterSmallBuffer() throws Exception {
1055    // The logic of this test has conflict with the limit writers split logic, skip this test for
1056    // TestWALSplitBoundedLogWriterCreation
1057    assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation);
1058    doTestThreading(200, 1024, 50);
1059  }
1060
1061  /**
1062   * Sets up a log splitter with a mock reader and writer. The mock reader generates a specified
1063   * number of edits spread across 5 regions. The mock writer optionally sleeps for each edit it is
1064   * fed. After the split is complete, verifies that the statistics show the correct number of edits
1065   * output into each region.
1066   * @param numFakeEdits   number of fake edits to push through pipeline
1067   * @param bufferSize     size of in-memory buffer
1068   * @param writerSlowness writer threads will sleep this many ms per edit
1069   */
1070  private void doTestThreading(final int numFakeEdits, final int bufferSize,
1071    final int writerSlowness) throws Exception {
1072
1073    Configuration localConf = new Configuration(conf);
1074    localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
1075
1076    // Create a fake log file (we'll override the reader to produce a stream of edits)
1077    Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
1078    FSDataOutputStream out = fs.create(logPath);
1079    out.close();
1080
1081    // Make region dirs for our destination regions so the output doesn't get skipped
1082    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
1083    makeRegionDirs(regions);
1084
1085    // Create a splitter that reads and writes the data without touching disk
1086    WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) {
1087      /* Produce a mock writer that doesn't write anywhere */
1088      @Override
1089      protected Writer createWriter(Path logfile) throws IOException {
1090        Writer mockWriter = Mockito.mock(Writer.class);
1091        Mockito.doAnswer(new Answer<Void>() {
1092          int expectedIndex = 0;
1093
1094          @Override
1095          public Void answer(InvocationOnMock invocation) {
1096            if (writerSlowness > 0) {
1097              try {
1098                Thread.sleep(writerSlowness);
1099              } catch (InterruptedException ie) {
1100                Thread.currentThread().interrupt();
1101              }
1102            }
1103            Entry entry = (Entry) invocation.getArgument(0);
1104            WALEdit edit = entry.getEdit();
1105            List<Cell> cells = edit.getCells();
1106            assertEquals(1, cells.size());
1107            Cell cell = cells.get(0);
1108
1109            // Check that the edits come in the right order.
1110            assertEquals(expectedIndex,
1111              Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
1112            expectedIndex++;
1113            return null;
1114          }
1115        }).when(mockWriter).append(Mockito.<Entry> any());
1116        return mockWriter;
1117      }
1118
1119      /* Produce a mock reader that generates fake entries */
1120      @Override
1121      protected WALStreamReader getReader(FileStatus file, boolean skipErrors,
1122        CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
1123        WALStreamReader mockReader = Mockito.mock(WALStreamReader.class);
1124        Mockito.doAnswer(new Answer<Entry>() {
1125          int index = 0;
1126
1127          @Override
1128          public Entry answer(InvocationOnMock invocation) throws Throwable {
1129            if (index >= numFakeEdits) {
1130              return null;
1131            }
1132
1133            // Generate r0 through r4 in round robin fashion
1134            int regionIdx = index % regions.size();
1135            byte region[] = new byte[] { (byte) 'r', (byte) (0x30 + regionIdx) };
1136
1137            Entry ret = createTestEntry(TABLE_NAME, region, Bytes.toBytes(index / regions.size()),
1138              FAMILY, QUALIFIER, VALUE, index);
1139            index++;
1140            return ret;
1141          }
1142        }).when(mockReader).next();
1143        return mockReader;
1144      }
1145    };
1146
1147    logSplitter.splitWAL(fs.getFileStatus(logPath), null);
1148
1149    // Verify number of written edits per region
1150    Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1151    for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
1152      LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey());
1153      assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
1154    }
1155    assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1156  }
1157
1158  // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1159  @Test
1160  public void testSplitLogFileDeletedRegionDir() throws IOException {
1161    LOG.info("testSplitLogFileDeletedRegionDir");
1162    final String REGION = "region__1";
1163    REGIONS.clear();
1164    REGIONS.add(REGION);
1165
1166    generateWALs(1, 10, -1);
1167    useDifferentDFSClient();
1168
1169    Path regiondir = new Path(TABLEDIR, REGION);
1170    LOG.info("Region directory is" + regiondir);
1171    fs.delete(regiondir, true);
1172    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1173    assertFalse(fs.exists(regiondir));
1174  }
1175
1176  @Test
1177  public void testSplitLogFileEmpty() throws IOException {
1178    LOG.info("testSplitLogFileEmpty");
1179    // we won't create the hlog dir until getWAL got called, so
1180    // make dir here when testing empty log file
1181    fs.mkdirs(WALDIR);
1182    injectEmptyFile(".empty", true);
1183    useDifferentDFSClient();
1184
1185    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1186    Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1187    assertFalse(fs.exists(tdir));
1188
1189    assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1190  }
1191
1192  @Test
1193  public void testSplitLogFileMultipleRegions() throws IOException {
1194    LOG.info("testSplitLogFileMultipleRegions");
1195    generateWALs(1, 10, -1);
1196    splitAndCount(1, 10);
1197  }
1198
1199  @Test
1200  public void testSplitLogFileFirstLineCorruptionLog() throws IOException {
1201    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
1202    generateWALs(1, 10, -1);
1203    FileStatus logfile = fs.listStatus(WALDIR)[0];
1204
1205    corruptWAL(logfile.getPath(), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1206
1207    useDifferentDFSClient();
1208    WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1209
1210    final Path corruptDir =
1211      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1212    assertEquals(1, fs.listStatus(corruptDir).length);
1213  }
1214
1215  /**
1216   * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1217   */
1218  @Test
1219  public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1220    LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1221    // Generate wals for our destination region
1222    String regionName = "r0";
1223    final Path regiondir = new Path(TABLEDIR, regionName);
1224    REGIONS.clear();
1225    REGIONS.add(regionName);
1226    generateWALs(-1);
1227
1228    wals.getWAL(null);
1229    FileStatus[] logfiles = fs.listStatus(WALDIR);
1230    assertTrue("There should be some log file", logfiles != null && logfiles.length > 0);
1231
1232    WALSplitter logSplitter =
1233      new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
1234        @Override
1235        protected Writer createWriter(Path logfile) throws IOException {
1236          Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1237          // After creating writer, simulate region's
1238          // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1239          // region and delete them, excluding files with '.temp' suffix.
1240          NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
1241          if (files != null && !files.isEmpty()) {
1242            for (Path file : files) {
1243              if (!this.walFS.delete(file, false)) {
1244                LOG.error("Failed delete of " + file);
1245              } else {
1246                LOG.debug("Deleted recovered.edits file=" + file);
1247              }
1248            }
1249          }
1250          return writer;
1251        }
1252      };
1253    try {
1254      logSplitter.splitWAL(logfiles[0], null);
1255    } catch (IOException e) {
1256      LOG.info(e.toString(), e);
1257      fail("Throws IOException when spliting "
1258        + "log, it is most likely because writing file does not "
1259        + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1260    }
1261    if (fs.exists(CORRUPTDIR)) {
1262      if (fs.listStatus(CORRUPTDIR).length > 0) {
1263        fail("There are some corrupt logs, "
1264          + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1265      }
1266    }
1267  }
1268
1269  @Test
1270  public void testRecoveredEditsStoragePolicy() throws IOException {
1271    conf.set(HConstants.WAL_STORAGE_POLICY, "ALL_SSD");
1272    try {
1273      Path path = createRecoveredEditsPathForRegion();
1274      assertEquals("ALL_SSD", fs.getStoragePolicy(path.getParent()).getName());
1275    } finally {
1276      conf.unset(HConstants.WAL_STORAGE_POLICY);
1277    }
1278  }
1279
1280  /**
1281   * See HBASE-27644, typically we should not have empty WALEdit but we should be able to process
1282   * it, instead of losing data after it.
1283   */
1284  @Test
1285  public void testEmptyWALEdit() throws IOException {
1286    final String region = "region__5";
1287    REGIONS.clear();
1288    REGIONS.add(region);
1289    makeRegionDirs(REGIONS);
1290    fs.mkdirs(WALDIR);
1291    Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5);
1292    generateEmptyEditWAL(path, Bytes.toBytes(region));
1293    useDifferentDFSClient();
1294
1295    Path regiondir = new Path(TABLEDIR, region);
1296    fs.mkdirs(regiondir);
1297    List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1298    // Make sure that WALSplitter generate the split file
1299    assertEquals(1, splitPaths.size());
1300
1301    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
1302    assertEquals(11, countWAL(originalLog));
1303    // we will skip the empty WAL when splitting
1304    assertEquals(10, countWAL(splitPaths.get(0)));
1305  }
1306
1307  private void generateEmptyEditWAL(Path path, byte[] region) throws IOException {
1308    fs.mkdirs(WALDIR);
1309    try (Writer writer = wals.createWALWriter(fs, path)) {
1310      long seq = 0;
1311      appendEmptyEntry(writer, TABLE_NAME, region, seq++);
1312      for (int i = 0; i < 10; i++) {
1313        appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++);
1314      }
1315    }
1316  }
1317
1318  private Writer generateWALs(int leaveOpen) throws IOException {
1319    return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1320  }
1321
1322  private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1323    return generateWALs(writers, entries, leaveOpen, 7);
1324  }
1325
1326  private void makeRegionDirs(List<String> regions) throws IOException {
1327    for (String region : regions) {
1328      LOG.debug("Creating dir for region " + region);
1329      fs.mkdirs(new Path(TABLEDIR, region));
1330    }
1331  }
1332
1333  /**
1334   * @param leaveOpen index to leave un-closed. -1 to close all.
1335   * @return the writer that's still open, or null if all were closed.
1336   */
1337  private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents)
1338    throws IOException {
1339    makeRegionDirs(REGIONS);
1340    fs.mkdirs(WALDIR);
1341    Writer[] ws = new Writer[writers];
1342    int seq = 0;
1343    int numRegionEventsAdded = 0;
1344    for (int i = 0; i < writers; i++) {
1345      ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1346      for (int j = 0; j < entries; j++) {
1347        int prefix = 0;
1348        for (String region : REGIONS) {
1349          String row_key = region + prefix++ + i + j;
1350          appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1351            QUALIFIER, VALUE, seq++);
1352
1353          if (numRegionEventsAdded < regionEvents) {
1354            numRegionEventsAdded++;
1355            appendRegionEvent(ws[i], region);
1356          }
1357        }
1358      }
1359      if (i != leaveOpen) {
1360        ws[i].close();
1361        LOG.info("Closing writer " + i);
1362      }
1363    }
1364    if (leaveOpen < 0 || leaveOpen >= writers) {
1365      return null;
1366    }
1367    return ws[leaveOpen];
1368  }
1369
1370  private Path[] getLogForRegion(TableName table, String region) throws IOException {
1371    Path tdir = CommonFSUtils.getWALTableDir(conf, table);
1372    @SuppressWarnings("deprecation")
1373    Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(
1374      HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region))));
1375    FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1376      @Override
1377      public boolean accept(Path p) {
1378        if (WALSplitUtil.isSequenceIdFile(p)) {
1379          return false;
1380        }
1381        return true;
1382      }
1383    });
1384    Path[] paths = new Path[files.length];
1385    for (int i = 0; i < files.length; i++) {
1386      paths[i] = files[i].getPath();
1387    }
1388    return paths;
1389  }
1390
1391  private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1392    FSDataOutputStream out;
1393    int fileSize = (int) fs.listStatus(path)[0].getLen();
1394
1395    FSDataInputStream in = fs.open(path);
1396    byte[] corrupted_bytes = new byte[fileSize];
1397    in.readFully(0, corrupted_bytes, 0, fileSize);
1398    in.close();
1399
1400    switch (corruption) {
1401      case APPEND_GARBAGE:
1402        fs.delete(path, false);
1403        out = fs.create(path);
1404        out.write(corrupted_bytes);
1405        out.write(Bytes.toBytes("-----"));
1406        closeOrFlush(close, out);
1407        break;
1408
1409      case INSERT_GARBAGE_ON_FIRST_LINE:
1410        fs.delete(path, false);
1411        out = fs.create(path);
1412        out.write(0);
1413        out.write(corrupted_bytes);
1414        closeOrFlush(close, out);
1415        break;
1416
1417      case INSERT_GARBAGE_IN_THE_MIDDLE:
1418        fs.delete(path, false);
1419        out = fs.create(path);
1420        int middle = (int) Math.floor(corrupted_bytes.length / 2);
1421        out.write(corrupted_bytes, 0, middle);
1422        out.write(0);
1423        out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1424        closeOrFlush(close, out);
1425        break;
1426
1427      case TRUNCATE:
1428        fs.delete(path, false);
1429        out = fs.create(path);
1430        out.write(corrupted_bytes, 0, fileSize
1431          - (32 + AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1432        closeOrFlush(close, out);
1433        break;
1434
1435      case TRUNCATE_TRAILER:
1436        fs.delete(path, false);
1437        out = fs.create(path);
1438        out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1439        closeOrFlush(close, out);
1440        break;
1441    }
1442  }
1443
1444  private void closeOrFlush(boolean close, FSDataOutputStream out) throws IOException {
1445    if (close) {
1446      out.close();
1447    } else {
1448      Method syncMethod = null;
1449      try {
1450        syncMethod = out.getClass().getMethod("hflush", new Class<?>[] {});
1451      } catch (NoSuchMethodException e) {
1452        try {
1453          syncMethod = out.getClass().getMethod("sync", new Class<?>[] {});
1454        } catch (NoSuchMethodException ex) {
1455          throw new IOException(
1456            "This version of Hadoop supports " + "neither Syncable.sync() nor Syncable.hflush().");
1457        }
1458      }
1459      try {
1460        syncMethod.invoke(out, new Object[] {});
1461      } catch (Exception e) {
1462        throw new IOException(e);
1463      }
1464      // Not in 0out.hflush();
1465    }
1466  }
1467
1468  private int countWAL(Path log) throws IOException {
1469    int count = 0;
1470    try (WALStreamReader in = wals.createStreamReader(fs, log)) {
1471      while (in.next() != null) {
1472        count++;
1473      }
1474    }
1475    return count;
1476  }
1477
1478  private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1479    String output) throws IOException {
1480    WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1481    desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1482      .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1483      .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1484      .setFamilyName(ByteString.copyFrom(FAMILY))
1485      .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1486      .addAllCompactionInput(Arrays.asList(inputs)).addCompactionOutput(output);
1487
1488    WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1489    WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1490      EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1491    w.append(new Entry(key, edit));
1492    w.sync(false);
1493  }
1494
1495  private static void appendRegionEvent(Writer w, String region) throws IOException {
1496    WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1497      WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, TABLE_NAME.toBytes(),
1498      Bytes.toBytes(region), Bytes.toBytes(String.valueOf(region.hashCode())), 1,
1499      ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>> of());
1500    final long time = EnvironmentEdgeManager.currentTime();
1501    final WALKeyImpl walKey =
1502      new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, HConstants.DEFAULT_CLUSTER_ID);
1503    WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
1504    w.append(new Entry(walKey, we));
1505    w.sync(false);
1506  }
1507
1508  private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
1509    byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException {
1510    LOG.info(Thread.currentThread().getName() + " append");
1511    writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1512    LOG.info(Thread.currentThread().getName() + " sync");
1513    writer.sync(false);
1514    return seq;
1515  }
1516
1517  private static Entry createTestEntry(TableName table, byte[] region, byte[] row, byte[] family,
1518    byte[] qualifier, byte[] value, long seq) {
1519    long time = System.nanoTime();
1520
1521    final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1522    WALEdit edit = new WALEdit();
1523    edit.add(cell);
1524    return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit);
1525  }
1526
1527  private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq)
1528    throws IOException {
1529    LOG.info(Thread.currentThread().getName() + " append");
1530    writer.append(createEmptyEntry(table, region, seq));
1531    LOG.info(Thread.currentThread().getName() + " sync");
1532    writer.sync(false);
1533    return seq;
1534  }
1535
1536  private static Entry createEmptyEntry(TableName table, byte[] region, long seq) {
1537    long time = System.nanoTime();
1538    return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID),
1539      new WALEdit());
1540  }
1541
1542  private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1543    Writer writer =
1544      WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1545    if (closeFile) {
1546      writer.close();
1547    }
1548  }
1549
1550  private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1551    try (WALStreamReader in1 = wals.createStreamReader(fs, p1);
1552      WALStreamReader in2 = wals.createStreamReader(fs, p2)) {
1553      Entry entry1;
1554      Entry entry2;
1555      while ((entry1 = in1.next()) != null) {
1556        entry2 = in2.next();
1557        if (
1558          (entry1.getKey().compareTo(entry2.getKey()) != 0)
1559            || (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
1560        ) {
1561          return false;
1562        }
1563      }
1564    }
1565    return true;
1566  }
1567}