018package org.apache.hadoop.hbase.wal;
020import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
021import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.Mockito.when;
028import java.io.IOException;
029import java.security.PrivilegedExceptionAction;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataInputStream;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
058import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
059import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
060import org.apache.hadoop.hbase.regionserver.HRegion;
061import org.apache.hadoop.hbase.regionserver.RegionScanner;
062import org.apache.hadoop.hbase.regionserver.RegionServerServices;
063import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay;
064import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hadoop.hbase.testclassification.MediumTests;
067import org.apache.hadoop.hbase.testclassification.RegionServerTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CommonFSUtils;
070import org.apache.hadoop.hbase.util.EnvironmentEdge;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.FSTableDescriptors;
073import org.apache.hadoop.hbase.util.Pair;
074import org.junit.After;
075import org.junit.AfterClass;
076import org.junit.Before;
077import org.junit.BeforeClass;
078import org.junit.ClassRule;
079import org.junit.Rule;
080import org.junit.Test;
081import org.junit.experimental.categories.Category;
082import org.junit.rules.TestName;
083import org.mockito.Mockito;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
087@Category({ RegionServerTests.class, MediumTests.class })
088public class TestWALSplitToHFile {
089  @ClassRule
090  public static final HBaseClassTestRule CLASS_RULE =
091    HBaseClassTestRule.forClass(TestWALSplitToHFile.class);
093  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
094  static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
095  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
096  private Path rootDir = null;
097  private String logName;
098  private Path oldLogDir;
099  private Path logDir;
100  private FileSystem fs;
101  private Configuration conf;
102  private WALFactory wals;
104  private static final byte[] ROW = Bytes.toBytes("row");
105  private static final byte[] QUALIFIER = Bytes.toBytes("q");
106  private static final byte[] VALUE1 = Bytes.toBytes("value1");
107  private static final byte[] VALUE2 = Bytes.toBytes("value2");
108  private static final int countPerFamily = 10;
110  @Rule
111  public final TestName TEST_NAME = new TestName();
113  @BeforeClass
114  public static void setUpBeforeClass() throws Exception {
115    Configuration conf = UTIL.getConfiguration();
116    conf.setBoolean(WAL_SPLIT_TO_HFILE, true);
117    UTIL.startMiniCluster(3);
118    Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
119    LOG.info("hbase.rootdir=" + hbaseRootDir);
120    CommonFSUtils.setRootDir(conf, hbaseRootDir);
121  }
123  @AfterClass
124  public static void tearDownAfterClass() throws Exception {
125    UTIL.shutdownMiniCluster();
126  }
128  @Before
129  public void setUp() throws Exception {
130    this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
131    this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false);
132    this.fs = UTIL.getDFSCluster().getFileSystem();
133    this.rootDir = CommonFSUtils.getRootDir(this.conf);
134    this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
135    String serverName = ServerName
136      .valueOf(TEST_NAME.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime())
137      .toString();
138    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
139    this.logDir = new Path(this.rootDir, logName);
140    if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) {
141      UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
142    }
143    this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
144  }
146  @After
147  public void tearDown() throws Exception {
148    this.wals.close();
149    UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
150  }
152  /*
153   * @param p Directory to cleanup
154   */
155  private void deleteDir(final Path p) throws IOException {
156    if (this.fs.exists(p)) {
157      if (!this.fs.delete(p, true)) {
158        throw new IOException("Failed remove of " + p);
159      }
160    }
161  }
163  private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException {
164    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
165    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build());
166    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build());
167    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build());
168    TableDescriptor td = builder.build();
169    UTIL.getAdmin().createTable(td);
170    return td;
171  }
173  private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
174    FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
175    wal.init();
176    return wal;
177  }
179  private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException {
180    FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf);
181    wal.init();
182    return wal;
183  }
185  private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException {
186    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
187    final TableDescriptor td = createBasic3FamilyTD(tableName);
188    final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
189    final Path tableDir = CommonFSUtils.getTableDir(this.rootDir, tableName);
190    deleteDir(tableDir);
191    FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
192    HRegion region = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
193    HBaseTestingUtility.closeRegionAndWAL(region);
194    return new Pair<>(td, ri);
195  }
197  private void writeData(TableDescriptor td, HRegion region) throws IOException {
198    final long timestamp = this.ee.currentTime();
199    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
200      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
201    }
202  }
204  @Test
205  public void testDifferentRootDirAndWALRootDir() throws Exception {
206    // Change wal root dir and reset the configuration
207    Path walRootDir = UTIL.createWALRootDir();
208    this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
210    FileSystem walFs = CommonFSUtils.getWALFileSystem(this.conf);
211    this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
212    String serverName = ServerName
213      .valueOf(TEST_NAME.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime())
214      .toString();
215    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
216    this.logDir = new Path(walRootDir, logName);
217    this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
219    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
220    TableDescriptor td = pair.getFirst();
221    RegionInfo ri = pair.getSecond();
223    WAL wal = createWAL(walFs, walRootDir, logName);
224    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
225    writeData(td, region);
227    // Now close the region without flush
228    region.close(true);
229    wal.shutdown();
230    // split the log
231    WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
233    WAL wal2 = createWAL(walFs, walRootDir, logName);
234    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
235    Result result2 = region2.get(new Get(ROW));
236    assertEquals(td.getColumnFamilies().length, result2.size());
237    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
238      assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
239    }
240  }
242  @Test
243  public void testCorruptRecoveredHFile() throws Exception {
244    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
245    TableDescriptor td = pair.getFirst();
246    RegionInfo ri = pair.getSecond();
248    WAL wal = createWAL(this.conf, rootDir, logName);
249    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
250    writeData(td, region);
252    // Now close the region without flush
253    region.close(true);
254    wal.shutdown();
255    // split the log
256    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
258    // Write a corrupt recovered hfile
259    Path regionDir =
260      new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName());
261    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
262      FileStatus[] files =
263        WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
264      assertNotNull(files);
265      assertTrue(files.length > 0);
266      writeCorruptRecoveredHFile(files[0].getPath());
267    }
269    // Failed to reopen the region
270    WAL wal2 = createWAL(this.conf, rootDir, logName);
271    try {
272      HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
273      fail("Should fail to open region");
274    } catch (CorruptHFileException che) {
275      // Expected
276    }
278    // Set skip errors to true and reopen the region
279    this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true);
280    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
281    Result result2 = region2.get(new Get(ROW));
282    assertEquals(td.getColumnFamilies().length, result2.size());
283    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
284      assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
285      // Assert the corrupt file was skipped and still exist
286      FileStatus[] files =
287        WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
288      assertNotNull(files);
289      assertEquals(1, files.length);
290      assertTrue(files[0].getPath().getName().contains("corrupt"));
291    }
292  }
294  @Test
295  public void testPutWithSameTimestamp() throws Exception {
296    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
297    TableDescriptor td = pair.getFirst();
298    RegionInfo ri = pair.getSecond();
300    WAL wal = createWAL(this.conf, rootDir, logName);
301    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
302    final long timestamp = this.ee.currentTime();
303    // Write data and flush
304    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
305      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
306    }
307    region.flush(true);
309    // Write data with same timestamp and do not flush
310    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
311      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2));
312    }
313    // Now close the region without flush
314    region.close(true);
315    wal.shutdown();
316    // split the log
317    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
319    // reopen the region
320    WAL wal2 = createWAL(this.conf, rootDir, logName);
321    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
322    Result result2 = region2.get(new Get(ROW));
323    assertEquals(td.getColumnFamilies().length, result2.size());
324    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
325      assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER)));
326    }
327  }
329  @Test
330  public void testRecoverSequenceId() throws Exception {
331    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
332    TableDescriptor td = pair.getFirst();
333    RegionInfo ri = pair.getSecond();
335    WAL wal = createWAL(this.conf, rootDir, logName);
336    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
337    Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>();
338    // Write data and do not flush
339    for (int i = 0; i < countPerFamily; i++) {
340      for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
341        region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1));
342        Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
343        assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
344        List<Cell> cells = result.listCells();
345        assertEquals(1, cells.size());
346        seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(),
347          cells.get(0).getSequenceId());
348      }
349    }
351    // Now close the region without flush
352    region.close(true);
353    wal.shutdown();
354    // split the log
355    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
357    // reopen the region
358    WAL wal2 = createWAL(this.conf, rootDir, logName);
359    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
360    // assert the seqid was recovered
361    for (int i = 0; i < countPerFamily; i++) {
362      for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
363        Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
364        assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
365        List<Cell> cells = result.listCells();
366        assertEquals(1, cells.size());
367        assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()),
368          cells.get(0).getSequenceId());
369      }
370    }
371  }
373  /**
374   * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify
375   * seqids.
376   */
377  @Test
378  public void testWrittenViaHRegion()
379    throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
380    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
381    TableDescriptor td = pair.getFirst();
382    RegionInfo ri = pair.getSecond();
384    // Write countPerFamily edits into the three families. Do a flush on one
385    // of the families during the load of edits so its seqid is not same as
386    // others to test we do right thing when different seqids.
387    WAL wal = createWAL(this.conf, rootDir, logName);
388    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
389    long seqid = region.getOpenSeqNum();
390    boolean first = true;
391    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
392      addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
393      if (first) {
394        // If first, so we have at least one family w/ different seqid to rest.
395        region.flush(true);
396        first = false;
397      }
398    }
399    // Now assert edits made it in.
400    final Get g = new Get(ROW);
401    Result result = region.get(g);
402    assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
403    // Now close the region (without flush), split the log, reopen the region and assert that
404    // replay of log has the correct effect, that our seqids are calculated correctly so
405    // all edits in logs are seen as 'stale'/old.
406    region.close(true);
407    wal.shutdown();
408    try {
409      WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
410    } catch (Exception e) {
411      LOG.debug("Got exception", e);
412    }
414    WAL wal2 = createWAL(this.conf, rootDir, logName);
415    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
416    long seqid2 = region2.getOpenSeqNum();
417    assertTrue(seqid + result.size() < seqid2);
418    final Result result1b = region2.get(g);
419    assertEquals(result.size(), result1b.size());
421    // Next test. Add more edits, then 'crash' this region by stealing its wal
422    // out from under it and assert that replay of the log adds the edits back
423    // correctly when region is opened again.
424    for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
425      addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y");
426    }
427    // Get count of edits.
428    final Result result2 = region2.get(g);
429    assertEquals(2 * result.size(), result2.size());
430    wal2.sync();
431    final Configuration newConf = HBaseConfiguration.create(this.conf);
432    User user = HBaseTestingUtility.getDifferentUser(newConf, td.getTableName().getNameAsString());
433    user.runAs(new PrivilegedExceptionAction<Object>() {
434      @Override
435      public Object run() throws Exception {
436        WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals);
437        FileSystem newFS = FileSystem.get(newConf);
438        // Make a new wal for new region open.
439        WAL wal3 = createWAL(newConf, rootDir, logName);
440        Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
441        HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
442        long seqid3 = region3.initialize();
443        Result result3 = region3.get(g);
444        // Assert that count of cells is same as before crash.
445        assertEquals(result2.size(), result3.size());
447        // I can't close wal1. Its been appropriated when we split.
448        region3.close();
449        wal3.close();
450        return null;
451      }
452    });
453  }
455  /**
456   * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores
457   * got flushed but others did not. Unfortunately, there is no easy hook to flush at a store level.
458   * The way we get around this is by flushing at the region level, and then deleting the recently
459   * flushed store file for one of the Stores. This would put us back in the situation where all but
460   * that store got flushed and the region died. We restart Region again, and verify that the edits
461   * were replayed.
462   */
463  @Test
464  public void testAfterPartialFlush()
465    throws IOException, SecurityException, IllegalArgumentException {
466    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
467    TableDescriptor td = pair.getFirst();
468    RegionInfo ri = pair.getSecond();
470    // Write countPerFamily edits into the three families. Do a flush on one
471    // of the families during the load of edits so its seqid is not same as
472    // others to test we do right thing when different seqids.
473    WAL wal = createWAL(this.conf, rootDir, logName);
474    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
475    long seqid = region.getOpenSeqNum();
476    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
477      addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
478    }
480    // Now assert edits made it in.
481    final Get g = new Get(ROW);
482    Result result = region.get(g);
483    assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
485    // Let us flush the region
486    region.flush(true);
487    region.close(true);
488    wal.shutdown();
490    // delete the store files in the second column family to simulate a failure
491    // in between the flushcache();
492    // we have 3 families. killing the middle one ensures that taking the maximum
493    // will make us fail.
494    int cf_count = 0;
495    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
496      cf_count++;
497      if (cf_count == 2) {
498        region.getRegionFileSystem().deleteFamily(cfd.getNameAsString());
499      }
500    }
502    // Let us try to split and recover
503    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
504    WAL wal2 = createWAL(this.conf, rootDir, logName);
505    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
506    long seqid2 = region2.getOpenSeqNum();
507    assertTrue(seqid + result.size() < seqid2);
509    final Result result1b = region2.get(g);
510    assertEquals(result.size(), result1b.size());
511  }
513  /**
514   * Test that we could recover the data correctly after aborting flush. In the test, first we abort
515   * flush after writing some data, then writing more data and flush again, at last verify the data.
516   */
517  @Test
518  public void testAfterAbortingFlush() throws IOException {
519    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
520    TableDescriptor td = pair.getFirst();
521    RegionInfo ri = pair.getSecond();
523    // Write countPerFamily edits into the three families. Do a flush on one
524    // of the families during the load of edits so its seqid is not same as
525    // others to test we do right thing when different seqids.
526    WAL wal = createWAL(this.conf, rootDir, logName);
527    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
528    Mockito.doReturn(false).when(rsServices).isAborted();
529    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
530    when(rsServices.getConfiguration()).thenReturn(conf);
531    Configuration customConf = new Configuration(this.conf);
532    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
533      AbstractTestWALReplay.CustomStoreFlusher.class.getName());
534    HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null);
535    int writtenRowCount = 10;
536    List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
537    for (int i = 0; i < writtenRowCount; i++) {
538      Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
539      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
540        Bytes.toBytes("val"));
541      region.put(put);
542    }
544    // Now assert edits made it in.
545    RegionScanner scanner = region.getScanner(new Scan());
546    assertEquals(writtenRowCount, getScannedCount(scanner));
548    // Let us flush the region
549    AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
550    try {
551      region.flush(true);
552      fail("Injected exception hasn't been thrown");
553    } catch (IOException e) {
554      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
555      // simulated to abort server
556      Mockito.doReturn(true).when(rsServices).isAborted();
557      region.setClosing(false); // region normally does not accept writes after
558      // DroppedSnapshotException. We mock around it for this test.
559    }
560    // writing more data
561    int moreRow = 10;
562    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
563      Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
564      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
565        Bytes.toBytes("val"));
566      region.put(put);
567    }
568    writtenRowCount += moreRow;
569    // call flush again
570    AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
571    try {
572      region.flush(true);
573    } catch (IOException t) {
574      LOG.info(
575        "Expected exception when flushing region because server is stopped," + t.getMessage());
576    }
578    region.close(true);
579    wal.shutdown();
581    // Let us try to split and recover
582    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
583    WAL wal2 = createWAL(this.conf, rootDir, logName);
584    Mockito.doReturn(false).when(rsServices).isAborted();
585    HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null);
586    scanner = region2.getScanner(new Scan());
587    assertEquals(writtenRowCount, getScannedCount(scanner));
588  }
590  private int getScannedCount(RegionScanner scanner) throws IOException {
591    int scannedCount = 0;
592    List<Cell> results = new ArrayList<>();
593    while (true) {
594      boolean existMore = scanner.next(results);
595      if (!results.isEmpty()) {
596        scannedCount++;
597      }
598      if (!existMore) {
599        break;
600      }
601      results.clear();
602    }
603    return scannedCount;
604  }
606  private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception {
607    // Read the recovered hfile
608    int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen();
609    FSDataInputStream in = fs.open(recoveredHFile);
610    byte[] fileContent = new byte[fileSize];
611    in.readFully(0, fileContent, 0, fileSize);
612    in.close();
614    // Write a corrupt hfile by append garbage
615    Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt");
616    FSDataOutputStream out;
617    out = fs.create(path);
618    out.write(fileContent);
619    out.write(Bytes.toBytes("-----"));
620    out.close();
621  }