001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
021import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.security.PrivilegedExceptionAction;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataInputStream;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.ExtendedCell;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ClientInternalHelper;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.RegionInfoBuilder;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.TableDescriptor;
059import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
060import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
061import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
062import org.apache.hadoop.hbase.regionserver.HRegion;
063import org.apache.hadoop.hbase.regionserver.RegionScanner;
064import org.apache.hadoop.hbase.regionserver.RegionServerServices;
065import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay;
066import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
067import org.apache.hadoop.hbase.security.User;
068import org.apache.hadoop.hbase.testclassification.MediumTests;
069import org.apache.hadoop.hbase.testclassification.RegionServerTests;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.CommonFSUtils;
072import org.apache.hadoop.hbase.util.EnvironmentEdge;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
074import org.apache.hadoop.hbase.util.FSTableDescriptors;
075import org.apache.hadoop.hbase.util.Pair;
076import org.junit.After;
077import org.junit.AfterClass;
078import org.junit.Before;
079import org.junit.BeforeClass;
080import org.junit.ClassRule;
081import org.junit.Rule;
082import org.junit.Test;
083import org.junit.experimental.categories.Category;
084import org.junit.rules.TestName;
085import org.mockito.Mockito;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089@Category({ RegionServerTests.class, MediumTests.class })
090public class TestWALSplitToHFile {
091  @ClassRule
092  public static final HBaseClassTestRule CLASS_RULE =
093    HBaseClassTestRule.forClass(TestWALSplitToHFile.class);
094
095  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
096  static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
097  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
098  private Path rootDir = null;
099  private String logName;
100  private Path oldLogDir;
101  private Path logDir;
102  private FileSystem fs;
103  private Configuration conf;
104  private WALFactory wals;
105
106  private static final byte[] ROW = Bytes.toBytes("row");
107  private static final byte[] QUALIFIER = Bytes.toBytes("q");
108  private static final byte[] VALUE1 = Bytes.toBytes("value1");
109  private static final byte[] VALUE2 = Bytes.toBytes("value2");
110  private static final int countPerFamily = 10;
111
112  @Rule
113  public final TestName TEST_NAME = new TestName();
114
115  @BeforeClass
116  public static void setUpBeforeClass() throws Exception {
117    Configuration conf = UTIL.getConfiguration();
118    conf.setBoolean(WAL_SPLIT_TO_HFILE, true);
119    UTIL.startMiniCluster(3);
120    Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
121    LOG.info("hbase.rootdir=" + hbaseRootDir);
122    CommonFSUtils.setRootDir(conf, hbaseRootDir);
123  }
124
125  @AfterClass
126  public static void tearDownAfterClass() throws Exception {
127    UTIL.shutdownMiniCluster();
128  }
129
130  @Before
131  public void setUp() throws Exception {
132    this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
133    this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false);
134    this.fs = UTIL.getDFSCluster().getFileSystem();
135    this.rootDir = CommonFSUtils.getRootDir(this.conf);
136    this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
137    String serverName = ServerName
138      .valueOf(TEST_NAME.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime())
139      .toString();
140    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
141    this.logDir = new Path(this.rootDir, logName);
142    if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) {
143      UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
144    }
145    this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
146  }
147
148  @After
149  public void tearDown() throws Exception {
150    this.wals.close();
151    UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
152  }
153
154  /*
155   * @param p Directory to cleanup
156   */
157  private void deleteDir(final Path p) throws IOException {
158    if (this.fs.exists(p)) {
159      if (!this.fs.delete(p, true)) {
160        throw new IOException("Failed remove of " + p);
161      }
162    }
163  }
164
165  private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException {
166    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
167    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build());
168    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build());
169    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build());
170    TableDescriptor td = builder.build();
171    UTIL.getAdmin().createTable(td);
172    return td;
173  }
174
175  private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
176    FileSystem fs = hbaseRootDir.getFileSystem(c);
177    fs.mkdirs(new Path(hbaseRootDir, logName));
178    FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, c);
179    wal.init();
180    return wal;
181  }
182
183  private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException {
184    fs.mkdirs(new Path(hbaseRootDir, logName));
185    FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf);
186    wal.init();
187    return wal;
188  }
189
190  private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException {
191    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
192    final TableDescriptor td = createBasic3FamilyTD(tableName);
193    final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
194    final Path tableDir = CommonFSUtils.getTableDir(this.rootDir, tableName);
195    deleteDir(tableDir);
196    FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
197    HRegion region = HBaseTestingUtil.createRegionAndWAL(ri, rootDir, this.conf, td);
198    HBaseTestingUtil.closeRegionAndWAL(region);
199    return new Pair<>(td, ri);
200  }
201
202  private void writeData(TableDescriptor td, HRegion region) throws IOException {
203    final long timestamp = this.ee.currentTime();
204    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
205      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
206    }
207  }
208
209  @Test
210  public void testDifferentRootDirAndWALRootDir() throws Exception {
211    // Change wal root dir and reset the configuration
212    Path walRootDir = UTIL.createWALRootDir();
213    this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
214
215    FileSystem walFs = CommonFSUtils.getWALFileSystem(this.conf);
216    this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
217    String serverName = ServerName
218      .valueOf(TEST_NAME.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime())
219      .toString();
220    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
221    this.logDir = new Path(walRootDir, logName);
222    this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
223
224    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
225    TableDescriptor td = pair.getFirst();
226    RegionInfo ri = pair.getSecond();
227
228    WAL wal = createWAL(walFs, walRootDir, logName);
229    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
230    writeData(td, region);
231
232    // Now close the region without flush
233    region.close(true);
234    wal.shutdown();
235    // split the log
236    WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
237
238    WAL wal2 = createWAL(walFs, walRootDir, logName);
239    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
240    Result result2 = region2.get(new Get(ROW));
241    assertEquals(td.getColumnFamilies().length, result2.size());
242    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
243      assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
244    }
245  }
246
247  @Test
248  public void testCorruptRecoveredHFile() throws Exception {
249    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
250    TableDescriptor td = pair.getFirst();
251    RegionInfo ri = pair.getSecond();
252
253    WAL wal = createWAL(this.conf, rootDir, logName);
254    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
255    writeData(td, region);
256
257    // Now close the region without flush
258    region.close(true);
259    wal.shutdown();
260    // split the log
261    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
262
263    // Write a corrupt recovered hfile
264    Path regionDir =
265      new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName());
266    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
267      FileStatus[] files =
268        WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
269      assertNotNull(files);
270      assertTrue(files.length > 0);
271      writeCorruptRecoveredHFile(files[0].getPath());
272    }
273
274    // Failed to reopen the region
275    WAL wal2 = createWAL(this.conf, rootDir, logName);
276    try {
277      HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
278      fail("Should fail to open region");
279    } catch (CorruptHFileException che) {
280      // Expected
281    }
282
283    // Set skip errors to true and reopen the region
284    this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true);
285    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
286    Result result2 = region2.get(new Get(ROW));
287    assertEquals(td.getColumnFamilies().length, result2.size());
288    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
289      assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
290      // Assert the corrupt file was skipped and still exist
291      FileStatus[] files =
292        WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
293      assertNotNull(files);
294      assertEquals(1, files.length);
295      assertTrue(files[0].getPath().getName().contains("corrupt"));
296    }
297  }
298
299  @Test
300  public void testPutWithSameTimestamp() throws Exception {
301    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
302    TableDescriptor td = pair.getFirst();
303    RegionInfo ri = pair.getSecond();
304
305    WAL wal = createWAL(this.conf, rootDir, logName);
306    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
307    final long timestamp = this.ee.currentTime();
308    // Write data and flush
309    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
310      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
311    }
312    region.flush(true);
313
314    // Write data with same timestamp and do not flush
315    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
316      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2));
317    }
318    // Now close the region without flush
319    region.close(true);
320    wal.shutdown();
321    // split the log
322    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
323
324    // reopen the region
325    WAL wal2 = createWAL(this.conf, rootDir, logName);
326    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
327    Result result2 = region2.get(new Get(ROW));
328    assertEquals(td.getColumnFamilies().length, result2.size());
329    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
330      assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER)));
331    }
332  }
333
334  @Test
335  public void testRecoverSequenceId() throws Exception {
336    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
337    TableDescriptor td = pair.getFirst();
338    RegionInfo ri = pair.getSecond();
339
340    WAL wal = createWAL(this.conf, rootDir, logName);
341    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
342    Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>();
343    // Write data and do not flush
344    for (int i = 0; i < countPerFamily; i++) {
345      for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
346        region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1));
347        Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
348        assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
349        ExtendedCell[] cells = ClientInternalHelper.getExtendedRawCells(result);
350        assertEquals(1, cells.length);
351        seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(),
352          cells[0].getSequenceId());
353      }
354    }
355
356    // Now close the region without flush
357    region.close(true);
358    wal.shutdown();
359    // split the log
360    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
361
362    // reopen the region
363    WAL wal2 = createWAL(this.conf, rootDir, logName);
364    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
365    // assert the seqid was recovered
366    for (int i = 0; i < countPerFamily; i++) {
367      for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
368        Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
369        assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
370        ExtendedCell[] cells = ClientInternalHelper.getExtendedRawCells(result);
371        assertEquals(1, cells.length);
372        assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()), cells[0].getSequenceId());
373      }
374    }
375  }
376
377  /**
378   * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify
379   * seqids.
380   */
381  @Test
382  public void testWrittenViaHRegion()
383    throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
384    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
385    TableDescriptor td = pair.getFirst();
386    RegionInfo ri = pair.getSecond();
387
388    // Write countPerFamily edits into the three families. Do a flush on one
389    // of the families during the load of edits so its seqid is not same as
390    // others to test we do right thing when different seqids.
391    WAL wal = createWAL(this.conf, rootDir, logName);
392    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
393    long seqid = region.getOpenSeqNum();
394    boolean first = true;
395    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
396      addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
397      if (first) {
398        // If first, so we have at least one family w/ different seqid to rest.
399        region.flush(true);
400        first = false;
401      }
402    }
403    // Now assert edits made it in.
404    final Get g = new Get(ROW);
405    Result result = region.get(g);
406    assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
407    // Now close the region (without flush), split the log, reopen the region and assert that
408    // replay of log has the correct effect, that our seqids are calculated correctly so
409    // all edits in logs are seen as 'stale'/old.
410    region.close(true);
411    wal.shutdown();
412    try {
413      WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
414    } catch (Exception e) {
415      LOG.debug("Got exception", e);
416    }
417
418    WAL wal2 = createWAL(this.conf, rootDir, logName);
419    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
420    long seqid2 = region2.getOpenSeqNum();
421    assertTrue(seqid + result.size() < seqid2);
422    final Result result1b = region2.get(g);
423    assertEquals(result.size(), result1b.size());
424
425    // Next test. Add more edits, then 'crash' this region by stealing its wal
426    // out from under it and assert that replay of the log adds the edits back
427    // correctly when region is opened again.
428    for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
429      addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y");
430    }
431    // Get count of edits.
432    final Result result2 = region2.get(g);
433    assertEquals(2 * result.size(), result2.size());
434    wal2.sync();
435    final Configuration newConf = HBaseConfiguration.create(this.conf);
436    User user = HBaseTestingUtil.getDifferentUser(newConf, td.getTableName().getNameAsString());
437    user.runAs(new PrivilegedExceptionAction<Object>() {
438      @Override
439      public Object run() throws Exception {
440        WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals);
441        FileSystem newFS = FileSystem.get(newConf);
442        // Make a new wal for new region open.
443        WAL wal3 = createWAL(newConf, rootDir, logName);
444        Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
445        HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
446        long seqid3 = region3.initialize();
447        Result result3 = region3.get(g);
448        // Assert that count of cells is same as before crash.
449        assertEquals(result2.size(), result3.size());
450
451        // I can't close wal1. Its been appropriated when we split.
452        region3.close();
453        wal3.close();
454        return null;
455      }
456    });
457  }
458
459  /**
460   * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores
461   * got flushed but others did not. Unfortunately, there is no easy hook to flush at a store level.
462   * The way we get around this is by flushing at the region level, and then deleting the recently
463   * flushed store file for one of the Stores. This would put us back in the situation where all but
464   * that store got flushed and the region died. We restart Region again, and verify that the edits
465   * were replayed.
466   */
467  @Test
468  public void testAfterPartialFlush()
469    throws IOException, SecurityException, IllegalArgumentException {
470    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
471    TableDescriptor td = pair.getFirst();
472    RegionInfo ri = pair.getSecond();
473
474    // Write countPerFamily edits into the three families. Do a flush on one
475    // of the families during the load of edits so its seqid is not same as
476    // others to test we do right thing when different seqids.
477    WAL wal = createWAL(this.conf, rootDir, logName);
478    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
479    long seqid = region.getOpenSeqNum();
480    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
481      addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
482    }
483
484    // Now assert edits made it in.
485    final Get g = new Get(ROW);
486    Result result = region.get(g);
487    assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
488
489    // Let us flush the region
490    region.flush(true);
491    region.close(true);
492    wal.shutdown();
493
494    // delete the store files in the second column family to simulate a failure
495    // in between the flushcache();
496    // we have 3 families. killing the middle one ensures that taking the maximum
497    // will make us fail.
498    int cf_count = 0;
499    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
500      cf_count++;
501      if (cf_count == 2) {
502        region.getRegionFileSystem().deleteFamily(cfd.getNameAsString());
503      }
504    }
505
506    // Let us try to split and recover
507    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
508    WAL wal2 = createWAL(this.conf, rootDir, logName);
509    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
510    long seqid2 = region2.getOpenSeqNum();
511    assertTrue(seqid + result.size() < seqid2);
512
513    final Result result1b = region2.get(g);
514    assertEquals(result.size(), result1b.size());
515  }
516
517  /**
518   * Test that we could recover the data correctly after aborting flush. In the test, first we abort
519   * flush after writing some data, then writing more data and flush again, at last verify the data.
520   */
521  @Test
522  public void testAfterAbortingFlush() throws IOException {
523    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
524    TableDescriptor td = pair.getFirst();
525    RegionInfo ri = pair.getSecond();
526
527    // Write countPerFamily edits into the three families. Do a flush on one
528    // of the families during the load of edits so its seqid is not same as
529    // others to test we do right thing when different seqids.
530    WAL wal = createWAL(this.conf, rootDir, logName);
531    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
532    Mockito.doReturn(false).when(rsServices).isAborted();
533    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
534    when(rsServices.getConfiguration()).thenReturn(conf);
535    Configuration customConf = new Configuration(this.conf);
536    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
537      AbstractTestWALReplay.CustomStoreFlusher.class.getName());
538    HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null);
539    int writtenRowCount = 10;
540    List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
541    for (int i = 0; i < writtenRowCount; i++) {
542      Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
543      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
544        Bytes.toBytes("val"));
545      region.put(put);
546    }
547
548    // Now assert edits made it in.
549    RegionScanner scanner = region.getScanner(new Scan());
550    assertEquals(writtenRowCount, getScannedCount(scanner));
551
552    // Let us flush the region
553    AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
554    try {
555      region.flush(true);
556      fail("Injected exception hasn't been thrown");
557    } catch (IOException e) {
558      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
559      // simulated to abort server
560      Mockito.doReturn(true).when(rsServices).isAborted();
561      region.setClosing(false); // region normally does not accept writes after
562      // DroppedSnapshotException. We mock around it for this test.
563    }
564    // writing more data
565    int moreRow = 10;
566    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
567      Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
568      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
569        Bytes.toBytes("val"));
570      region.put(put);
571    }
572    writtenRowCount += moreRow;
573    // call flush again
574    AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
575    try {
576      region.flush(true);
577    } catch (IOException t) {
578      LOG.info(
579        "Expected exception when flushing region because server is stopped," + t.getMessage());
580    }
581
582    region.close(true);
583    wal.shutdown();
584
585    // Let us try to split and recover
586    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
587    WAL wal2 = createWAL(this.conf, rootDir, logName);
588    Mockito.doReturn(false).when(rsServices).isAborted();
589    HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null);
590    scanner = region2.getScanner(new Scan());
591    assertEquals(writtenRowCount, getScannedCount(scanner));
592  }
593
594  private int getScannedCount(RegionScanner scanner) throws IOException {
595    int scannedCount = 0;
596    List<Cell> results = new ArrayList<>();
597    while (true) {
598      boolean existMore = scanner.next(results);
599      if (!results.isEmpty()) {
600        scannedCount++;
601      }
602      if (!existMore) {
603        break;
604      }
605      results.clear();
606    }
607    return scannedCount;
608  }
609
610  private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception {
611    // Read the recovered hfile
612    int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen();
613    FSDataInputStream in = fs.open(recoveredHFile);
614    byte[] fileContent = new byte[fileSize];
615    in.readFully(0, fileContent, 0, fileSize);
616    in.close();
617
618    // Write a corrupt hfile by append garbage
619    Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt");
620    FSDataOutputStream out;
621    out = fs.create(path);
622    out.write(fileContent);
623    out.write(Bytes.toBytes("-----"));
624    out.close();
625  }
626}