001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
021import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.junit.jupiter.api.Assertions.fail;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.security.PrivilegedExceptionAction;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataInputStream;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.ExtendedCell;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.ClientInternalHelper;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.RegionInfoBuilder;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.Scan;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
060import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.RegionScanner;
063import org.apache.hadoop.hbase.regionserver.RegionServerServices;
064import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay;
065import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
066import org.apache.hadoop.hbase.security.User;
067import org.apache.hadoop.hbase.testclassification.MediumTests;
068import org.apache.hadoop.hbase.testclassification.RegionServerTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.CommonFSUtils;
071import org.apache.hadoop.hbase.util.EnvironmentEdge;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
073import org.apache.hadoop.hbase.util.FSTableDescriptors;
074import org.apache.hadoop.hbase.util.Pair;
075import org.junit.jupiter.api.AfterAll;
076import org.junit.jupiter.api.AfterEach;
077import org.junit.jupiter.api.BeforeAll;
078import org.junit.jupiter.api.BeforeEach;
079import org.junit.jupiter.api.Tag;
080import org.junit.jupiter.api.Test;
081import org.junit.jupiter.api.TestInfo;
082import org.mockito.Mockito;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086@Tag(RegionServerTests.TAG)
087@Tag(MediumTests.TAG)
088public class TestWALSplitToHFile {
089
090  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
091  static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
092  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
093  private Path rootDir = null;
094  private String logName;
095  private Path oldLogDir;
096  private Path logDir;
097  private FileSystem fs;
098  private Configuration conf;
099  private WALFactory wals;
100
101  private static final byte[] ROW = Bytes.toBytes("row");
102  private static final byte[] QUALIFIER = Bytes.toBytes("q");
103  private static final byte[] VALUE1 = Bytes.toBytes("value1");
104  private static final byte[] VALUE2 = Bytes.toBytes("value2");
105  private static final int countPerFamily = 10;
106
107  private String testMethodName;
108
109  @BeforeAll
110  public static void setUpBeforeClass() throws Exception {
111    Configuration conf = UTIL.getConfiguration();
112    conf.setBoolean(WAL_SPLIT_TO_HFILE, true);
113    UTIL.startMiniCluster(3);
114    Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
115    LOG.info("hbase.rootdir=" + hbaseRootDir);
116    CommonFSUtils.setRootDir(conf, hbaseRootDir);
117  }
118
119  @AfterAll
120  public static void tearDownAfterClass() throws Exception {
121    UTIL.shutdownMiniCluster();
122  }
123
124  @BeforeEach
125  public void setUp(TestInfo testInfo) throws Exception {
126    testMethodName = testInfo.getTestMethod().get().getName();
127    this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
128    this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false);
129    this.fs = UTIL.getDFSCluster().getFileSystem();
130    this.rootDir = CommonFSUtils.getRootDir(this.conf);
131    this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
132    String serverName = ServerName
133      .valueOf(testMethodName + "-manual", 16010, EnvironmentEdgeManager.currentTime()).toString();
134    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
135    this.logDir = new Path(this.rootDir, logName);
136    if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) {
137      UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
138    }
139    this.wals = new WALFactory(conf, testMethodName);
140  }
141
142  @AfterEach
143  public void tearDown() throws Exception {
144    this.wals.close();
145    UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
146  }
147
148  /*
149   * @param p Directory to cleanup
150   */
151  private void deleteDir(final Path p) throws IOException {
152    if (this.fs.exists(p)) {
153      if (!this.fs.delete(p, true)) {
154        throw new IOException("Failed remove of " + p);
155      }
156    }
157  }
158
159  private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException {
160    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
161    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build());
162    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build());
163    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build());
164    TableDescriptor td = builder.build();
165    UTIL.getAdmin().createTable(td);
166    return td;
167  }
168
169  private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
170    FileSystem fs = hbaseRootDir.getFileSystem(c);
171    fs.mkdirs(new Path(hbaseRootDir, logName));
172    FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, c);
173    wal.init();
174    return wal;
175  }
176
177  private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException {
178    fs.mkdirs(new Path(hbaseRootDir, logName));
179    FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf);
180    wal.init();
181    return wal;
182  }
183
184  private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException {
185    final TableName tableName = TableName.valueOf(testMethodName);
186    final TableDescriptor td = createBasic3FamilyTD(tableName);
187    final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
188    final Path tableDir = CommonFSUtils.getTableDir(this.rootDir, tableName);
189    deleteDir(tableDir);
190    FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
191    HRegion region = HBaseTestingUtil.createRegionAndWAL(ri, rootDir, this.conf, td);
192    HBaseTestingUtil.closeRegionAndWAL(region);
193    return new Pair<>(td, ri);
194  }
195
196  private void writeData(TableDescriptor td, HRegion region) throws IOException {
197    final long timestamp = this.ee.currentTime();
198    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
199      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
200    }
201  }
202
203  @Test
204  public void testDifferentRootDirAndWALRootDir() throws Exception {
205    // Change wal root dir and reset the configuration
206    Path walRootDir = UTIL.createWALRootDir();
207    this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
208
209    FileSystem walFs = CommonFSUtils.getWALFileSystem(this.conf);
210    this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
211    String serverName = ServerName
212      .valueOf(testMethodName + "-manual", 16010, EnvironmentEdgeManager.currentTime()).toString();
213    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
214    this.logDir = new Path(walRootDir, logName);
215    this.wals = new WALFactory(conf, testMethodName);
216
217    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
218    TableDescriptor td = pair.getFirst();
219    RegionInfo ri = pair.getSecond();
220
221    WAL wal = createWAL(walFs, walRootDir, logName);
222    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
223    writeData(td, region);
224
225    // Now close the region without flush
226    region.close(true);
227    wal.shutdown();
228    // split the log
229    WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
230
231    WAL wal2 = createWAL(walFs, walRootDir, logName);
232    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
233    Result result2 = region2.get(new Get(ROW));
234    assertEquals(td.getColumnFamilies().length, result2.size());
235    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
236      assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
237    }
238  }
239
240  @Test
241  public void testCorruptRecoveredHFile() throws Exception {
242    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
243    TableDescriptor td = pair.getFirst();
244    RegionInfo ri = pair.getSecond();
245
246    WAL wal = createWAL(this.conf, rootDir, logName);
247    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
248    writeData(td, region);
249
250    // Now close the region without flush
251    region.close(true);
252    wal.shutdown();
253    // split the log
254    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
255
256    // Write a corrupt recovered hfile
257    Path regionDir =
258      new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName());
259    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
260      FileStatus[] files =
261        WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
262      assertNotNull(files);
263      assertTrue(files.length > 0);
264      writeCorruptRecoveredHFile(files[0].getPath());
265    }
266
267    // Failed to reopen the region
268    WAL wal2 = createWAL(this.conf, rootDir, logName);
269    try {
270      HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
271      fail("Should fail to open region");
272    } catch (CorruptHFileException che) {
273      // Expected
274    }
275
276    // Set skip errors to true and reopen the region
277    this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true);
278    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
279    Result result2 = region2.get(new Get(ROW));
280    assertEquals(td.getColumnFamilies().length, result2.size());
281    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
282      assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
283      // Assert the corrupt file was skipped and still exist
284      FileStatus[] files =
285        WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
286      assertNotNull(files);
287      assertEquals(1, files.length);
288      assertTrue(files[0].getPath().getName().contains("corrupt"));
289    }
290  }
291
292  @Test
293  public void testPutWithSameTimestamp() throws Exception {
294    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
295    TableDescriptor td = pair.getFirst();
296    RegionInfo ri = pair.getSecond();
297
298    WAL wal = createWAL(this.conf, rootDir, logName);
299    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
300    final long timestamp = this.ee.currentTime();
301    // Write data and flush
302    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
303      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
304    }
305    region.flush(true);
306
307    // Write data with same timestamp and do not flush
308    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
309      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2));
310    }
311    // Now close the region without flush
312    region.close(true);
313    wal.shutdown();
314    // split the log
315    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
316
317    // reopen the region
318    WAL wal2 = createWAL(this.conf, rootDir, logName);
319    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
320    Result result2 = region2.get(new Get(ROW));
321    assertEquals(td.getColumnFamilies().length, result2.size());
322    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
323      assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER)));
324    }
325  }
326
327  @Test
328  public void testRecoverSequenceId() throws Exception {
329    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
330    TableDescriptor td = pair.getFirst();
331    RegionInfo ri = pair.getSecond();
332
333    WAL wal = createWAL(this.conf, rootDir, logName);
334    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
335    Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>();
336    // Write data and do not flush
337    for (int i = 0; i < countPerFamily; i++) {
338      for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
339        region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1));
340        Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
341        assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
342        ExtendedCell[] cells = ClientInternalHelper.getExtendedRawCells(result);
343        assertEquals(1, cells.length);
344        seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(),
345          cells[0].getSequenceId());
346      }
347    }
348
349    // Now close the region without flush
350    region.close(true);
351    wal.shutdown();
352    // split the log
353    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
354
355    // reopen the region
356    WAL wal2 = createWAL(this.conf, rootDir, logName);
357    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
358    // assert the seqid was recovered
359    for (int i = 0; i < countPerFamily; i++) {
360      for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
361        Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
362        assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
363        ExtendedCell[] cells = ClientInternalHelper.getExtendedRawCells(result);
364        assertEquals(1, cells.length);
365        assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()), cells[0].getSequenceId());
366      }
367    }
368  }
369
370  /**
371   * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify
372   * seqids.
373   */
374  @Test
375  public void testWrittenViaHRegion()
376    throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
377    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
378    TableDescriptor td = pair.getFirst();
379    RegionInfo ri = pair.getSecond();
380
381    // Write countPerFamily edits into the three families. Do a flush on one
382    // of the families during the load of edits so its seqid is not same as
383    // others to test we do right thing when different seqids.
384    WAL wal = createWAL(this.conf, rootDir, logName);
385    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
386    long seqid = region.getOpenSeqNum();
387    boolean first = true;
388    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
389      addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
390      if (first) {
391        // If first, so we have at least one family w/ different seqid to rest.
392        region.flush(true);
393        first = false;
394      }
395    }
396    // Now assert edits made it in.
397    final Get g = new Get(ROW);
398    Result result = region.get(g);
399    assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
400    // Now close the region (without flush), split the log, reopen the region and assert that
401    // replay of log has the correct effect, that our seqids are calculated correctly so
402    // all edits in logs are seen as 'stale'/old.
403    region.close(true);
404    wal.shutdown();
405    try {
406      WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
407    } catch (Exception e) {
408      LOG.debug("Got exception", e);
409    }
410
411    WAL wal2 = createWAL(this.conf, rootDir, logName);
412    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
413    long seqid2 = region2.getOpenSeqNum();
414    assertTrue(seqid + result.size() < seqid2);
415    final Result result1b = region2.get(g);
416    assertEquals(result.size(), result1b.size());
417
418    // Next test. Add more edits, then 'crash' this region by stealing its wal
419    // out from under it and assert that replay of the log adds the edits back
420    // correctly when region is opened again.
421    for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
422      addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y");
423    }
424    // Get count of edits.
425    final Result result2 = region2.get(g);
426    assertEquals(2 * result.size(), result2.size());
427    wal2.sync();
428    final Configuration newConf = HBaseConfiguration.create(this.conf);
429    User user = HBaseTestingUtil.getDifferentUser(newConf, td.getTableName().getNameAsString());
430    user.runAs(new PrivilegedExceptionAction<Object>() {
431      @Override
432      public Object run() throws Exception {
433        WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals);
434        FileSystem newFS = FileSystem.get(newConf);
435        // Make a new wal for new region open.
436        WAL wal3 = createWAL(newConf, rootDir, logName);
437        Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
438        HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
439        long seqid3 = region3.initialize();
440        Result result3 = region3.get(g);
441        // Assert that count of cells is same as before crash.
442        assertEquals(result2.size(), result3.size());
443
444        // I can't close wal1. Its been appropriated when we split.
445        region3.close();
446        wal3.close();
447        return null;
448      }
449    });
450  }
451
452  /**
453   * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores
454   * got flushed but others did not. Unfortunately, there is no easy hook to flush at a store level.
455   * The way we get around this is by flushing at the region level, and then deleting the recently
456   * flushed store file for one of the Stores. This would put us back in the situation where all but
457   * that store got flushed and the region died. We restart Region again, and verify that the edits
458   * were replayed.
459   */
460  @Test
461  public void testAfterPartialFlush()
462    throws IOException, SecurityException, IllegalArgumentException {
463    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
464    TableDescriptor td = pair.getFirst();
465    RegionInfo ri = pair.getSecond();
466
467    // Write countPerFamily edits into the three families. Do a flush on one
468    // of the families during the load of edits so its seqid is not same as
469    // others to test we do right thing when different seqids.
470    WAL wal = createWAL(this.conf, rootDir, logName);
471    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
472    long seqid = region.getOpenSeqNum();
473    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
474      addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
475    }
476
477    // Now assert edits made it in.
478    final Get g = new Get(ROW);
479    Result result = region.get(g);
480    assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
481
482    // Let us flush the region
483    region.flush(true);
484    region.close(true);
485    wal.shutdown();
486
487    // delete the store files in the second column family to simulate a failure
488    // in between the flushcache();
489    // we have 3 families. killing the middle one ensures that taking the maximum
490    // will make us fail.
491    int cf_count = 0;
492    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
493      cf_count++;
494      if (cf_count == 2) {
495        region.getRegionFileSystem().deleteFamily(cfd.getNameAsString());
496      }
497    }
498
499    // Let us try to split and recover
500    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
501    WAL wal2 = createWAL(this.conf, rootDir, logName);
502    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
503    long seqid2 = region2.getOpenSeqNum();
504    assertTrue(seqid + result.size() < seqid2);
505
506    final Result result1b = region2.get(g);
507    assertEquals(result.size(), result1b.size());
508  }
509
510  /**
511   * Test that we could recover the data correctly after aborting flush. In the test, first we abort
512   * flush after writing some data, then writing more data and flush again, at last verify the data.
513   */
514  @Test
515  public void testAfterAbortingFlush() throws IOException {
516    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
517    TableDescriptor td = pair.getFirst();
518    RegionInfo ri = pair.getSecond();
519
520    // Write countPerFamily edits into the three families. Do a flush on one
521    // of the families during the load of edits so its seqid is not same as
522    // others to test we do right thing when different seqids.
523    WAL wal = createWAL(this.conf, rootDir, logName);
524    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
525    Mockito.doReturn(false).when(rsServices).isAborted();
526    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
527    when(rsServices.getConfiguration()).thenReturn(conf);
528    Configuration customConf = new Configuration(this.conf);
529    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
530      AbstractTestWALReplay.CustomStoreFlusher.class.getName());
531    HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null);
532    int writtenRowCount = 10;
533    List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
534    for (int i = 0; i < writtenRowCount; i++) {
535      Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
536      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
537        Bytes.toBytes("val"));
538      region.put(put);
539    }
540
541    // Now assert edits made it in.
542    RegionScanner scanner = region.getScanner(new Scan());
543    assertEquals(writtenRowCount, getScannedCount(scanner));
544
545    // Let us flush the region
546    AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
547    try {
548      region.flush(true);
549      fail("Injected exception hasn't been thrown");
550    } catch (IOException e) {
551      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
552      // simulated to abort server
553      Mockito.doReturn(true).when(rsServices).isAborted();
554      region.setClosing(false); // region normally does not accept writes after
555      // DroppedSnapshotException. We mock around it for this test.
556    }
557    // writing more data
558    int moreRow = 10;
559    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
560      Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
561      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
562        Bytes.toBytes("val"));
563      region.put(put);
564    }
565    writtenRowCount += moreRow;
566    // call flush again
567    AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
568    try {
569      region.flush(true);
570    } catch (IOException t) {
571      LOG.info(
572        "Expected exception when flushing region because server is stopped," + t.getMessage());
573    }
574
575    region.close(true);
576    wal.shutdown();
577
578    // Let us try to split and recover
579    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
580    WAL wal2 = createWAL(this.conf, rootDir, logName);
581    Mockito.doReturn(false).when(rsServices).isAborted();
582    HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null);
583    scanner = region2.getScanner(new Scan());
584    assertEquals(writtenRowCount, getScannedCount(scanner));
585  }
586
587  private int getScannedCount(RegionScanner scanner) throws IOException {
588    int scannedCount = 0;
589    List<Cell> results = new ArrayList<>();
590    while (true) {
591      boolean existMore = scanner.next(results);
592      if (!results.isEmpty()) {
593        scannedCount++;
594      }
595      if (!existMore) {
596        break;
597      }
598      results.clear();
599    }
600    return scannedCount;
601  }
602
603  private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception {
604    // Read the recovered hfile
605    int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen();
606    FSDataInputStream in = fs.open(recoveredHFile);
607    byte[] fileContent = new byte[fileSize];
608    in.readFully(0, fileContent, 0, fileSize);
609    in.close();
610
611    // Write a corrupt hfile by append garbage
612    Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt");
613    FSDataOutputStream out;
614    out = fs.create(path);
615    out.write(fileContent);
616    out.write(Bytes.toBytes("-----"));
617    out.close();
618  }
619}