001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.anyInt;
026import static org.mockito.ArgumentMatchers.eq;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.spy;
029import static org.mockito.Mockito.when;
030
031import java.io.FilterInputStream;
032import java.io.IOException;
033import java.lang.reflect.Field;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collection;
038import java.util.HashSet;
039import java.util.List;
040import java.util.NavigableMap;
041import java.util.Set;
042import java.util.TreeMap;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.atomic.AtomicInteger;
045import java.util.function.Consumer;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FSDataInputStream;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtility;
055import org.apache.hadoop.hbase.HColumnDescriptor;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionInfo;
058import org.apache.hadoop.hbase.HTableDescriptor;
059import org.apache.hadoop.hbase.KeyValue;
060import org.apache.hadoop.hbase.MiniHBaseCluster;
061import org.apache.hadoop.hbase.ServerName;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.ResultScanner;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.Table;
070import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
071import org.apache.hadoop.hbase.monitoring.MonitoredTask;
072import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
073import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
074import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
075import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
076import org.apache.hadoop.hbase.regionserver.FlushRequester;
077import org.apache.hadoop.hbase.regionserver.HRegion;
078import org.apache.hadoop.hbase.regionserver.HRegionServer;
079import org.apache.hadoop.hbase.regionserver.HStore;
080import org.apache.hadoop.hbase.regionserver.MemStoreSizing;
081import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
082import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
083import org.apache.hadoop.hbase.regionserver.Region;
084import org.apache.hadoop.hbase.regionserver.RegionScanner;
085import org.apache.hadoop.hbase.regionserver.RegionServerServices;
086import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
087import org.apache.hadoop.hbase.security.User;
088import org.apache.hadoop.hbase.util.Bytes;
089import org.apache.hadoop.hbase.util.CommonFSUtils;
090import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
091import org.apache.hadoop.hbase.util.EnvironmentEdge;
092import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
093import org.apache.hadoop.hbase.util.HFileTestUtil;
094import org.apache.hadoop.hbase.util.Pair;
095import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
096import org.apache.hadoop.hbase.wal.WAL;
097import org.apache.hadoop.hbase.wal.WALEdit;
098import org.apache.hadoop.hbase.wal.WALFactory;
099import org.apache.hadoop.hbase.wal.WALKeyImpl;
100import org.apache.hadoop.hbase.wal.WALSplitUtil;
101import org.apache.hadoop.hbase.wal.WALSplitter;
102import org.apache.hadoop.hdfs.DFSInputStream;
103import org.junit.After;
104import org.junit.AfterClass;
105import org.junit.Before;
106import org.junit.BeforeClass;
107import org.junit.Rule;
108import org.junit.Test;
109import org.junit.rules.TestName;
110import org.mockito.Mockito;
111import org.mockito.invocation.InvocationOnMock;
112import org.mockito.stubbing.Answer;
113import org.slf4j.Logger;
114import org.slf4j.LoggerFactory;
115
116/**
117 * Test replay of edits out of a WAL split.
118 */
119public abstract class AbstractTestWALReplay {
120  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
121  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
122  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
123  private Path hbaseRootDir = null;
124  private String logName;
125  private Path oldLogDir;
126  private Path logDir;
127  private FileSystem fs;
128  private Configuration conf;
129  private WALFactory wals;
130
131  @Rule
132  public final TestName currentTest = new TestName();
133
134  @BeforeClass
135  public static void setUpBeforeClass() throws Exception {
136    Configuration conf = TEST_UTIL.getConfiguration();
137    // The below config supported by 0.20-append and CDH3b2
138    conf.setInt("dfs.client.block.recovery.retries", 2);
139    TEST_UTIL.startMiniCluster(3);
140    Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
141    LOG.info("hbase.rootdir=" + hbaseRootDir);
142    CommonFSUtils.setRootDir(conf, hbaseRootDir);
143  }
144
145  @AfterClass
146  public static void tearDownAfterClass() throws Exception {
147    TEST_UTIL.shutdownMiniCluster();
148  }
149
150  @Before
151  public void setUp() throws Exception {
152    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
153    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
154    this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf);
155    this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
156    String serverName = ServerName
157      .valueOf(currentTest.getMethodName() + "-manual", 16010, ee.currentTime()).toString();
158    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
159    this.logDir = new Path(this.hbaseRootDir, logName);
160    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
161      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
162    }
163    this.wals = new WALFactory(conf, currentTest.getMethodName());
164  }
165
166  @After
167  public void tearDown() throws Exception {
168    this.wals.close();
169    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
170  }
171
172  /*
173   * @param p Directory to cleanup
174   */
175  private void deleteDir(final Path p) throws IOException {
176    if (this.fs.exists(p)) {
177      if (!this.fs.delete(p, true)) {
178        throw new IOException("Failed remove of " + p);
179      }
180    }
181  }
182
183  /**
184   * n
185   */
186  @Test
187  public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
188    final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
189    byte[] family1 = Bytes.toBytes("cf1");
190    byte[] family2 = Bytes.toBytes("cf2");
191    byte[] qualifier = Bytes.toBytes("q");
192    byte[] value = Bytes.toBytes("testV");
193    byte[][] familys = { family1, family2 };
194    TEST_UTIL.createTable(tableName, familys);
195    Table htable = TEST_UTIL.getConnection().getTable(tableName);
196    Put put = new Put(Bytes.toBytes("r1"));
197    put.addColumn(family1, qualifier, value);
198    htable.put(put);
199    ResultScanner resultScanner = htable.getScanner(new Scan());
200    int count = 0;
201    while (resultScanner.next() != null) {
202      count++;
203    }
204    resultScanner.close();
205    assertEquals(1, count);
206
207    MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
208    List<HRegion> regions = hbaseCluster.getRegions(tableName);
209    assertEquals(1, regions.size());
210
211    // move region to another regionserver
212    Region destRegion = regions.get(0);
213    int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName());
214    assertTrue("Please start more than 1 regionserver",
215      hbaseCluster.getRegionServerThreads().size() > 1);
216    int destServerNum = 0;
217    while (destServerNum == originServerNum) {
218      destServerNum++;
219    }
220    HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
221    HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
222    // move region to destination regionserver
223    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName());
224
225    // delete the row
226    Delete del = new Delete(Bytes.toBytes("r1"));
227    htable.delete(del);
228    resultScanner = htable.getScanner(new Scan());
229    count = 0;
230    while (resultScanner.next() != null) {
231      count++;
232    }
233    resultScanner.close();
234    assertEquals(0, count);
235
236    // flush region and make major compaction
237    HRegion region =
238      (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
239    region.flush(true);
240    // wait to complete major compaction
241    for (HStore store : region.getStores()) {
242      store.triggerMajorCompaction();
243    }
244    region.compact(true);
245
246    // move region to origin regionserver
247    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName());
248    // abort the origin regionserver
249    originServer.abort("testing");
250
251    // see what we get
252    Result result = htable.get(new Get(Bytes.toBytes("r1")));
253    if (result != null) {
254      assertTrue("Row is deleted, but we get" + result.toString(),
255        (result == null) || result.isEmpty());
256    }
257    resultScanner.close();
258  }
259
260  /**
261   * Tests for hbase-2727. n * @see
262   * <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a>
263   */
264  @Test
265  public void test2727() throws Exception {
266    // Test being able to have > 1 set of edits in the recovered.edits directory.
267    // Ensure edits are replayed properly.
268    final TableName tableName = TableName.valueOf("test2727");
269
270    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
271    HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
272    Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
273    deleteDir(basedir);
274
275    HTableDescriptor htd = createBasic3FamilyHTD(tableName);
276    Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
277    HBaseTestingUtility.closeRegionAndWAL(region2);
278    final byte[] rowName = tableName.getName();
279
280    WAL wal1 = createWAL(this.conf, hbaseRootDir, logName);
281    // Add 1k to each family.
282    final int countPerFamily = 1000;
283
284    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
285    for (byte[] fam : htd.getFamiliesKeys()) {
286      scopes.put(fam, 0);
287    }
288    for (HColumnDescriptor hcd : htd.getFamilies()) {
289      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal1, htd, mvcc,
290        scopes);
291    }
292    wal1.shutdown();
293    runWALSplit(this.conf);
294
295    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
296    // Add 1k to each family.
297    for (HColumnDescriptor hcd : htd.getFamilies()) {
298      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal2, htd, mvcc,
299        scopes);
300    }
301    wal2.shutdown();
302    runWALSplit(this.conf);
303
304    WAL wal3 = createWAL(this.conf, hbaseRootDir, logName);
305    try {
306      HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
307      long seqid = region.getOpenSeqNum();
308      // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
309      // When opened, this region would apply 6k edits, and increment the sequenceId by 1
310      assertTrue(seqid > mvcc.getWritePoint());
311      assertEquals(seqid - 1, mvcc.getWritePoint());
312      LOG.debug(
313        "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint());
314
315      // TODO: Scan all.
316      region.close();
317    } finally {
318      wal3.close();
319    }
320  }
321
322  /**
323   * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'.
324   * nnnnn
325   */
326  @Test
327  public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException,
328    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
329    final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
330    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
331    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
332    deleteDir(basedir);
333    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
334    Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
335    HBaseTestingUtility.closeRegionAndWAL(region2);
336    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
337    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
338
339    byte[] family = htd.getFamilies().iterator().next().getName();
340    Path f = new Path(basedir, "hfile");
341    HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
342      Bytes.toBytes("z"), 10);
343    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
344    hfs.add(Pair.newPair(family, f.toString()));
345    region.bulkLoadHFiles(hfs, true, null);
346
347    // Add an edit so something in the WAL
348    byte[] row = tableName.getName();
349    region.put((new Put(row)).addColumn(family, family, family));
350    wal.sync();
351    final int rowsInsertedCount = 11;
352
353    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
354
355    // Now 'crash' the region by stealing its wal
356    final Configuration newConf = HBaseConfiguration.create(this.conf);
357    User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString());
358    user.runAs(new PrivilegedExceptionAction() {
359      @Override
360      public Object run() throws Exception {
361        runWALSplit(newConf);
362        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
363
364        HRegion region2 =
365          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
366        long seqid2 = region2.getOpenSeqNum();
367        assertTrue(seqid2 > -1);
368        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
369
370        // I can't close wal1. Its been appropriated when we split.
371        region2.close();
372        wal2.close();
373        return null;
374      }
375    });
376  }
377
378  /**
379   * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
380   * files) and an edit in the memstore. This is for HBASE-10958 "[dataloss] Bulk loading with
381   * seqids can prevent some log entries from being replayed" nnnnn
382   */
383  @Test
384  public void testCompactedBulkLoadedFiles() throws IOException, SecurityException,
385    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
386    final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles");
387    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
388    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
389    deleteDir(basedir);
390    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
391    HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
392    HBaseTestingUtility.closeRegionAndWAL(region2);
393    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
394    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
395
396    // Add an edit so something in the WAL
397    byte[] row = tableName.getName();
398    byte[] family = htd.getFamilies().iterator().next().getName();
399    region.put((new Put(row)).addColumn(family, family, family));
400    wal.sync();
401
402    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
403    for (int i = 0; i < 3; i++) {
404      Path f = new Path(basedir, "hfile" + i);
405      HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
406        Bytes.toBytes(i + "50"), 10);
407      hfs.add(Pair.newPair(family, f.toString()));
408    }
409    region.bulkLoadHFiles(hfs, true, null);
410    final int rowsInsertedCount = 31;
411    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
412
413    // major compact to turn all the bulk loaded files into one normal file
414    region.compact(true);
415    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
416
417    // Now 'crash' the region by stealing its wal
418    final Configuration newConf = HBaseConfiguration.create(this.conf);
419    User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString());
420    user.runAs(new PrivilegedExceptionAction() {
421      @Override
422      public Object run() throws Exception {
423        runWALSplit(newConf);
424        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
425
426        HRegion region2 =
427          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
428        long seqid2 = region2.getOpenSeqNum();
429        assertTrue(seqid2 > -1);
430        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
431
432        // I can't close wal1. Its been appropriated when we split.
433        region2.close();
434        wal2.close();
435        return null;
436      }
437    });
438  }
439
440  /**
441   * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify
442   * seqids. nnnnn
443   */
444  @Test
445  public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException,
446    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
447    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
448    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
449    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
450    deleteDir(basedir);
451    final byte[] rowName = tableName.getName();
452    final int countPerFamily = 10;
453    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
454    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
455    HBaseTestingUtility.closeRegionAndWAL(region3);
456    // Write countPerFamily edits into the three families. Do a flush on one
457    // of the families during the load of edits so its seqid is not same as
458    // others to test we do right thing when different seqids.
459    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
460    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
461    long seqid = region.getOpenSeqNum();
462    boolean first = true;
463    for (HColumnDescriptor hcd : htd.getFamilies()) {
464      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
465      if (first) {
466        // If first, so we have at least one family w/ different seqid to rest.
467        region.flush(true);
468        first = false;
469      }
470    }
471    // Now assert edits made it in.
472    final Get g = new Get(rowName);
473    Result result = region.get(g);
474    assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
475    // Now close the region (without flush), split the log, reopen the region and assert that
476    // replay of log has the correct effect, that our seqids are calculated correctly so
477    // all edits in logs are seen as 'stale'/old.
478    region.close(true);
479    wal.shutdown();
480    runWALSplit(this.conf);
481    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
482    HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
483    long seqid2 = region2.getOpenSeqNum();
484    assertTrue(seqid + result.size() < seqid2);
485    final Result result1b = region2.get(g);
486    assertEquals(result.size(), result1b.size());
487
488    // Next test. Add more edits, then 'crash' this region by stealing its wal
489    // out from under it and assert that replay of the log adds the edits back
490    // correctly when region is opened again.
491    for (HColumnDescriptor hcd : htd.getFamilies()) {
492      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
493    }
494    // Get count of edits.
495    final Result result2 = region2.get(g);
496    assertEquals(2 * result.size(), result2.size());
497    wal2.sync();
498    final Configuration newConf = HBaseConfiguration.create(this.conf);
499    User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString());
500    user.runAs(new PrivilegedExceptionAction<Object>() {
501      @Override
502      public Object run() throws Exception {
503        runWALSplit(newConf);
504        FileSystem newFS = FileSystem.get(newConf);
505        // Make a new wal for new region open.
506        WAL wal3 = createWAL(newConf, hbaseRootDir, logName);
507        final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
508        HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
509          @Override
510          protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) {
511            super.restoreEdit(s, cell, memstoreSizing);
512            countOfRestoredEdits.incrementAndGet();
513          }
514        };
515        long seqid3 = region3.initialize();
516        Result result3 = region3.get(g);
517        // Assert that count of cells is same as before crash.
518        assertEquals(result2.size(), result3.size());
519        assertEquals(htd.getFamilies().size() * countPerFamily, countOfRestoredEdits.get());
520
521        // I can't close wal1. Its been appropriated when we split.
522        region3.close();
523        wal3.close();
524        return null;
525      }
526    });
527  }
528
529  /**
530   * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores
531   * got flushed but others did not. Unfortunately, there is no easy hook to flush at a store level.
532   * The way we get around this is by flushing at the region level, and then deleting the recently
533   * flushed store file for one of the Stores. This would put us back in the situation where all but
534   * that store got flushed and the region died. We restart Region again, and verify that the edits
535   * were replayed. nnnnn
536   */
537  @Test
538  public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException,
539    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
540    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
541    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
542    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
543    deleteDir(basedir);
544    final byte[] rowName = tableName.getName();
545    final int countPerFamily = 10;
546    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
547    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
548    HBaseTestingUtility.closeRegionAndWAL(region3);
549    // Write countPerFamily edits into the three families. Do a flush on one
550    // of the families during the load of edits so its seqid is not same as
551    // others to test we do right thing when different seqids.
552    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
553    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
554    long seqid = region.getOpenSeqNum();
555    for (HColumnDescriptor hcd : htd.getFamilies()) {
556      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
557    }
558
559    // Now assert edits made it in.
560    final Get g = new Get(rowName);
561    Result result = region.get(g);
562    assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
563
564    // Let us flush the region
565    region.flush(true);
566    region.close(true);
567    wal.shutdown();
568
569    // delete the store files in the second column family to simulate a failure
570    // in between the flushcache();
571    // we have 3 families. killing the middle one ensures that taking the maximum
572    // will make us fail.
573    int cf_count = 0;
574    for (HColumnDescriptor hcd : htd.getFamilies()) {
575      cf_count++;
576      if (cf_count == 2) {
577        region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
578      }
579    }
580
581    // Let us try to split and recover
582    runWALSplit(this.conf);
583    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
584    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
585    long seqid2 = region2.getOpenSeqNum();
586    assertTrue(seqid + result.size() < seqid2);
587
588    final Result result1b = region2.get(g);
589    assertEquals(result.size(), result1b.size());
590  }
591
592  // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
593  // Only throws exception if throwExceptionWhenFlushing is set true.
594  public static class CustomStoreFlusher extends DefaultStoreFlusher {
595    // Switch between throw and not throw exception in flush
596    public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
597
598    public CustomStoreFlusher(Configuration conf, HStore store) {
599      super(conf, store);
600    }
601
602    @Override
603    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
604      MonitoredTask status, ThroughputController throughputController,
605      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
606      if (throwExceptionWhenFlushing.get()) {
607        throw new IOException("Simulated exception by tests");
608      }
609      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
610        writerCreationTracker);
611    }
612  };
613
614  /**
615   * Test that we could recover the data correctly after aborting flush. In the test, first we abort
616   * flush after writing some data, then writing more data and flush again, at last verify the data.
617   * n
618   */
619  @Test
620  public void testReplayEditsAfterAbortingFlush() throws IOException {
621    final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush");
622    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
623    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
624    deleteDir(basedir);
625    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
626    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
627    HBaseTestingUtility.closeRegionAndWAL(region3);
628    // Write countPerFamily edits into the three families. Do a flush on one
629    // of the families during the load of edits so its seqid is not same as
630    // others to test we do right thing when different seqids.
631    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
632    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
633    Mockito.doReturn(false).when(rsServices).isAborted();
634    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
635    when(rsServices.getConfiguration()).thenReturn(conf);
636    Configuration customConf = new Configuration(this.conf);
637    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
638      CustomStoreFlusher.class.getName());
639    HRegion region =
640      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
641    int writtenRowCount = 10;
642    List<HColumnDescriptor> families = new ArrayList<>(htd.getFamilies());
643    for (int i = 0; i < writtenRowCount; i++) {
644      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
645      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
646        Bytes.toBytes("val"));
647      region.put(put);
648    }
649
650    // Now assert edits made it in.
651    RegionScanner scanner = region.getScanner(new Scan());
652    assertEquals(writtenRowCount, getScannedCount(scanner));
653
654    // Let us flush the region
655    CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
656    try {
657      region.flush(true);
658      fail("Injected exception hasn't been thrown");
659    } catch (IOException e) {
660      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
661      // simulated to abort server
662      Mockito.doReturn(true).when(rsServices).isAborted();
663      region.setClosing(false); // region normally does not accept writes after
664      // DroppedSnapshotException. We mock around it for this test.
665    }
666    // writing more data
667    int moreRow = 10;
668    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
669      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
670      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
671        Bytes.toBytes("val"));
672      region.put(put);
673    }
674    writtenRowCount += moreRow;
675    // call flush again
676    CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
677    try {
678      region.flush(true);
679    } catch (IOException t) {
680      LOG.info(
681        "Expected exception when flushing region because server is stopped," + t.getMessage());
682    }
683
684    region.close(true);
685    wal.shutdown();
686
687    // Let us try to split and recover
688    runWALSplit(this.conf);
689    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
690    Mockito.doReturn(false).when(rsServices).isAborted();
691    HRegion region2 =
692      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
693    scanner = region2.getScanner(new Scan());
694    assertEquals(writtenRowCount, getScannedCount(scanner));
695  }
696
697  private int getScannedCount(RegionScanner scanner) throws IOException {
698    int scannedCount = 0;
699    List<Cell> results = new ArrayList<>();
700    while (true) {
701      boolean existMore = scanner.next(results);
702      if (!results.isEmpty()) scannedCount++;
703      if (!existMore) break;
704      results.clear();
705    }
706    return scannedCount;
707  }
708
709  /**
710   * Create an HRegion with the result of a WAL split and test we only see the good edits n
711   */
712  @Test
713  public void testReplayEditsWrittenIntoWAL() throws Exception {
714    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
715    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
716    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
717    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
718    deleteDir(basedir);
719
720    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
721    HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
722    HBaseTestingUtility.closeRegionAndWAL(region2);
723    final WAL wal = createWAL(this.conf, hbaseRootDir, logName);
724    final byte[] rowName = tableName.getName();
725    final byte[] regionName = hri.getEncodedNameAsBytes();
726
727    // Add 1k to each family.
728    final int countPerFamily = 1000;
729    Set<byte[]> familyNames = new HashSet<>();
730    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
731    for (byte[] fam : htd.getFamiliesKeys()) {
732      scopes.put(fam, 0);
733    }
734    for (HColumnDescriptor hcd : htd.getFamilies()) {
735      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, htd, mvcc,
736        scopes);
737      familyNames.add(hcd.getName());
738    }
739
740    // Add a cache flush, shouldn't have any effect
741    wal.startCacheFlush(regionName, familyNames);
742    wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
743
744    // Add an edit to another family, should be skipped.
745    WALEdit edit = new WALEdit();
746    long now = ee.currentTime();
747    edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName));
748    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
749      edit);
750
751    // Delete the c family to verify deletes make it over.
752    edit = new WALEdit();
753    now = ee.currentTime();
754    edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
755    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
756      edit);
757
758    // Sync.
759    wal.sync();
760    // Make a new conf and a new fs for the splitter to run on so we can take
761    // over old wal.
762    final Configuration newConf = HBaseConfiguration.create(this.conf);
763    User user = HBaseTestingUtility.getDifferentUser(newConf, ".replay.wal.secondtime");
764    user.runAs(new PrivilegedExceptionAction<Void>() {
765      @Override
766      public Void run() throws Exception {
767        runWALSplit(newConf);
768        FileSystem newFS = FileSystem.get(newConf);
769        // 100k seems to make for about 4 flushes during HRegion#initialize.
770        newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
771        // Make a new wal for new region.
772        WAL newWal = createWAL(newConf, hbaseRootDir, logName);
773        final AtomicInteger flushcount = new AtomicInteger(0);
774        try {
775          final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
776            @Override
777            protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
778              final Collection<HStore> storesToFlush, MonitoredTask status,
779              boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
780              LOG.info("InternalFlushCache Invoked");
781              FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
782                Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker);
783              flushcount.incrementAndGet();
784              return fs;
785            }
786          };
787          // The seq id this region has opened up with
788          long seqid = region.initialize();
789
790          // The mvcc readpoint of from inserting data.
791          long writePoint = mvcc.getWritePoint();
792
793          // We flushed during init.
794          assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
795          assertTrue((seqid - 1) == writePoint);
796
797          Get get = new Get(rowName);
798          Result result = region.get(get);
799          // Make sure we only see the good edits
800          assertEquals(countPerFamily * (htd.getFamilies().size() - 1), result.size());
801          region.close();
802        } finally {
803          newWal.close();
804        }
805        return null;
806      }
807    });
808  }
809
810  @Test
811  // the following test is for HBASE-6065
812  public void testSequentialEditLogSeqNum() throws IOException {
813    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
814    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
815    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
816    deleteDir(basedir);
817    final byte[] rowName = tableName.getName();
818    final int countPerFamily = 10;
819    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
820
821    // Mock the WAL
822    MockWAL wal = createMockWAL();
823
824    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
825    for (HColumnDescriptor hcd : htd.getFamilies()) {
826      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
827    }
828
829    // Let us flush the region
830    // But this time completeflushcache is not yet done
831    region.flush(true);
832    for (HColumnDescriptor hcd : htd.getFamilies()) {
833      addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
834    }
835    long lastestSeqNumber = region.getReadPoint(null);
836    // get the current seq no
837    wal.doCompleteCacheFlush = true;
838    // allow complete cache flush with the previous seq number got after first
839    // set of edits.
840    wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
841    wal.shutdown();
842    FileStatus[] listStatus = wal.getFiles();
843    assertNotNull(listStatus);
844    assertTrue(listStatus.length > 0);
845    WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null,
846      wals, null);
847    FileStatus[] listStatus1 =
848      this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName),
849        new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() {
850          @Override
851          public boolean accept(Path p) {
852            return !WALSplitUtil.isSequenceIdFile(p);
853          }
854        });
855    int editCount = 0;
856    for (FileStatus fileStatus : listStatus1) {
857      editCount = Integer.parseInt(fileStatus.getPath().getName());
858    }
859    // The sequence number should be same
860    assertEquals(
861      "The sequence number of the recoverd.edits and the current edit seq should be same",
862      lastestSeqNumber, editCount);
863  }
864
865  /**
866   * testcase for https://issues.apache.org/jira/browse/HBASE-15252
867   */
868  @Test
869  public void testDatalossWhenInputError() throws Exception {
870    final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
871    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
872    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
873    deleteDir(basedir);
874    final byte[] rowName = tableName.getName();
875    final int countPerFamily = 10;
876    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
877    HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
878    Path regionDir = region1.getWALRegionDir();
879    HBaseTestingUtility.closeRegionAndWAL(region1);
880
881    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
882    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
883    for (HColumnDescriptor hcd : htd.getFamilies()) {
884      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
885    }
886    // Now assert edits made it in.
887    final Get g = new Get(rowName);
888    Result result = region.get(g);
889    assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
890    // Now close the region (without flush), split the log, reopen the region and assert that
891    // replay of log has the correct effect.
892    region.close(true);
893    wal.shutdown();
894
895    runWALSplit(this.conf);
896
897    // here we let the DFSInputStream throw an IOException just after the WALHeader.
898    Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
899    FSDataInputStream stream = fs.open(editFile);
900    stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
901    Class<? extends AbstractFSWALProvider.Reader> logReaderClass =
902      conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
903        AbstractFSWALProvider.Reader.class);
904    AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance();
905    reader.init(this.fs, editFile, conf, stream);
906    final long headerLength = stream.getPos();
907    reader.close();
908    FileSystem spyFs = spy(this.fs);
909    doAnswer(new Answer<FSDataInputStream>() {
910
911      @Override
912      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
913        FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
914        Field field = FilterInputStream.class.getDeclaredField("in");
915        field.setAccessible(true);
916        final DFSInputStream in = (DFSInputStream) field.get(stream);
917        DFSInputStream spyIn = spy(in);
918        doAnswer(new Answer<Integer>() {
919
920          private long pos;
921
922          @Override
923          public Integer answer(InvocationOnMock invocation) throws Throwable {
924            if (pos >= headerLength) {
925              throw new IOException("read over limit");
926            }
927            int b = (Integer) invocation.callRealMethod();
928            if (b > 0) {
929              pos += b;
930            }
931            return b;
932          }
933        }).when(spyIn).read(any(byte[].class), anyInt(), anyInt());
934        doAnswer(new Answer<Void>() {
935
936          @Override
937          public Void answer(InvocationOnMock invocation) throws Throwable {
938            invocation.callRealMethod();
939            in.close();
940            return null;
941          }
942        }).when(spyIn).close();
943        field.set(stream, spyIn);
944        return stream;
945      }
946    }).when(spyFs).open(eq(editFile));
947
948    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
949    HRegion region2;
950    try {
951      // log replay should fail due to the IOException, otherwise we may lose data.
952      region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
953      assertEquals(result.size(), region2.get(g).size());
954    } catch (IOException e) {
955      assertEquals("read over limit", e.getMessage());
956    }
957    region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
958    assertEquals(result.size(), region2.get(g).size());
959  }
960
961  /**
962   * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
963   */
964  private void testNameConflictWhenSplit(boolean largeFirst)
965    throws IOException, StreamLacksCapabilityException {
966    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
967    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
968    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
969    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
970    deleteDir(basedir);
971
972    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
973    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
974    for (byte[] fam : htd.getFamiliesKeys()) {
975      scopes.put(fam, 0);
976    }
977    HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
978    HBaseTestingUtility.closeRegionAndWAL(region);
979    final byte[] family = htd.getColumnFamilies()[0].getName();
980    final byte[] rowName = tableName.getName();
981    FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes);
982    FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes);
983
984    Path largeFile = new Path(logDir, "wal-1");
985    Path smallFile = new Path(logDir, "wal-2");
986    writerWALFile(largeFile, Arrays.asList(entry1, entry2));
987    writerWALFile(smallFile, Arrays.asList(entry2));
988    FileStatus first, second;
989    if (largeFirst) {
990      first = fs.getFileStatus(largeFile);
991      second = fs.getFileStatus(smallFile);
992    } else {
993      first = fs.getFileStatus(smallFile);
994      second = fs.getFileStatus(largeFile);
995    }
996    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null);
997    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null);
998    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
999    region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
1000    assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
1001    assertEquals(2, region.get(new Get(rowName)).size());
1002  }
1003
1004  @Test
1005  public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException {
1006    testNameConflictWhenSplit(true);
1007  }
1008
1009  @Test
1010  public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException {
1011    testNameConflictWhenSplit(false);
1012  }
1013
1014  static class MockWAL extends FSHLog {
1015    boolean doCompleteCacheFlush = false;
1016
1017    public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
1018      throws IOException {
1019      super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1020    }
1021
1022    @Override
1023    public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
1024      if (!doCompleteCacheFlush) {
1025        return;
1026      }
1027      super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
1028    }
1029  }
1030
1031  private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1032    HTableDescriptor htd = new HTableDescriptor(tableName);
1033    HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1034    htd.addFamily(a);
1035    return htd;
1036  }
1037
1038  private MockWAL createMockWAL() throws IOException {
1039    MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
1040    wal.init();
1041    // Set down maximum recovery so we dfsclient doesn't linger retrying something
1042    // long gone.
1043    HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1044    return wal;
1045  }
1046
1047  // Flusher used in this test. Keep count of how often we are called and
1048  // actually run the flush inside here.
1049  static class TestFlusher implements FlushRequester {
1050    private HRegion r;
1051
1052    @Override
1053    public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
1054      try {
1055        r.flush(false);
1056        return true;
1057      } catch (IOException e) {
1058        throw new RuntimeException("Exception flushing", e);
1059      }
1060    }
1061
1062    @Override
1063    public boolean requestFlush(HRegion region, List<byte[]> families,
1064      FlushLifeCycleTracker tracker) {
1065      return true;
1066    }
1067
1068    @Override
1069    public boolean requestDelayedFlush(HRegion region, long when) {
1070      return true;
1071    }
1072
1073    @Override
1074    public void registerFlushRequestListener(FlushRequestListener listener) {
1075
1076    }
1077
1078    @Override
1079    public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1080      return false;
1081    }
1082
1083    @Override
1084    public void setGlobalMemStoreLimit(long globalMemStoreSize) {
1085
1086    }
1087  }
1088
1089  private WALKeyImpl createWALKey(final TableName tableName, final HRegionInfo hri,
1090    final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
1091    return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
1092  }
1093
1094  private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
1095    int index) {
1096    byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
1097    byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index));
1098    WALEdit edit = new WALEdit();
1099    edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
1100    return edit;
1101  }
1102
1103  private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
1104    byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
1105    int index, NavigableMap<byte[], Integer> scopes) throws IOException {
1106    FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
1107      createWALEdit(rowName, family, ee, index), hri, true, null);
1108    entry.stampRegionSequenceId(mvcc.begin());
1109    return entry;
1110  }
1111
1112  private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
1113    final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1114    final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc,
1115    NavigableMap<byte[], Integer> scopes) throws IOException {
1116    for (int j = 0; j < count; j++) {
1117      wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes),
1118        createWALEdit(rowName, family, ee, j));
1119    }
1120    wal.sync();
1121  }
1122
1123  public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
1124    EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
1125    List<Put> puts = new ArrayList<>();
1126    for (int j = 0; j < count; j++) {
1127      byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1128      Put p = new Put(rowName);
1129      p.addColumn(family, qualifier, ee.currentTime(), rowName);
1130      r.put(p);
1131      puts.add(p);
1132    }
1133    return puts;
1134  }
1135
1136  /*
1137   * Creates an HRI around an HTD that has <code>tableName</code> and three column families named
1138   * 'a','b', and 'c'.
1139   * @param tableName Name of table to use when we create HTableDescriptor.
1140   */
1141  private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1142    return new HRegionInfo(tableName, null, null, false);
1143  }
1144
1145  /*
1146   * Run the split. Verify only single split file made. n * @return The single split file made n
1147   */
1148  private Path runWALSplit(final Configuration c) throws IOException {
1149    List<Path> splits =
1150      WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
1151    // Split should generate only 1 file since there's only 1 region
1152    assertEquals("splits=" + splits, 1, splits.size());
1153    // Make sure the file exists
1154    assertTrue(fs.exists(splits.get(0)));
1155    LOG.info("Split file=" + splits.get(0));
1156    return splits.get(0);
1157  }
1158
1159  private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1160    HTableDescriptor htd = new HTableDescriptor(tableName);
1161    HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1162    htd.addFamily(a);
1163    HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
1164    htd.addFamily(b);
1165    HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
1166    htd.addFamily(c);
1167    return htd;
1168  }
1169
1170  private void writerWALFile(Path file, List<FSWALEntry> entries)
1171    throws IOException, StreamLacksCapabilityException {
1172    fs.mkdirs(file.getParent());
1173    ProtobufLogWriter writer = new ProtobufLogWriter();
1174    writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file),
1175      StreamSlowMonitor.create(conf, "testMonitor"));
1176    for (FSWALEntry entry : entries) {
1177      writer.append(entry);
1178    }
1179    writer.sync(false);
1180    writer.close();
1181  }
1182
1183  protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName)
1184    throws IOException;
1185}