001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
021import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.Mockito.when;
027import java.io.IOException;
028import java.security.PrivilegedExceptionAction;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataInputStream;
036import org.apache.hadoop.fs.FSDataOutputStream;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Get;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.RegionInfoBuilder;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.Scan;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
057import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
058import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
059import org.apache.hadoop.hbase.regionserver.HRegion;
060import org.apache.hadoop.hbase.regionserver.RegionScanner;
061import org.apache.hadoop.hbase.regionserver.RegionServerServices;
062import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay;
063import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
064import org.apache.hadoop.hbase.security.User;
065import org.apache.hadoop.hbase.testclassification.MediumTests;
066import org.apache.hadoop.hbase.testclassification.RegionServerTests;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.CommonFSUtils;
069import org.apache.hadoop.hbase.util.EnvironmentEdge;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.FSTableDescriptors;
072import org.apache.hadoop.hbase.util.Pair;
073import org.junit.After;
074import org.junit.AfterClass;
075import org.junit.Before;
076import org.junit.BeforeClass;
077import org.junit.ClassRule;
078import org.junit.Rule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.junit.rules.TestName;
082import org.mockito.Mockito;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086@Category({ RegionServerTests.class, MediumTests.class })
087public class TestWALSplitToHFile {
088  @ClassRule
089  public static final HBaseClassTestRule CLASS_RULE =
090      HBaseClassTestRule.forClass(TestWALSplitToHFile.class);
091
092  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
093  static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
094  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
095  private Path rootDir = null;
096  private String logName;
097  private Path oldLogDir;
098  private Path logDir;
099  private FileSystem fs;
100  private Configuration conf;
101  private WALFactory wals;
102
103  private static final byte[] ROW = Bytes.toBytes("row");
104  private static final byte[] QUALIFIER = Bytes.toBytes("q");
105  private static final byte[] VALUE1 = Bytes.toBytes("value1");
106  private static final byte[] VALUE2 = Bytes.toBytes("value2");
107  private static final int countPerFamily = 10;
108
109  @Rule
110  public final TestName TEST_NAME = new TestName();
111
112  @BeforeClass
113  public static void setUpBeforeClass() throws Exception {
114    Configuration conf = UTIL.getConfiguration();
115    conf.setBoolean(WAL_SPLIT_TO_HFILE, true);
116    UTIL.startMiniCluster(3);
117    Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
118    LOG.info("hbase.rootdir=" + hbaseRootDir);
119    CommonFSUtils.setRootDir(conf, hbaseRootDir);
120  }
121
122  @AfterClass
123  public static void tearDownAfterClass() throws Exception {
124    UTIL.shutdownMiniCluster();
125  }
126
127  @Before
128  public void setUp() throws Exception {
129    this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
130    this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false);
131    this.fs = UTIL.getDFSCluster().getFileSystem();
132    this.rootDir = CommonFSUtils.getRootDir(this.conf);
133    this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
134    String serverName =
135        ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis())
136            .toString();
137    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
138    this.logDir = new Path(this.rootDir, logName);
139    if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) {
140      UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
141    }
142    this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
143  }
144
145  @After
146  public void tearDown() throws Exception {
147    this.wals.close();
148    UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
149  }
150
151  /*
152   * @param p Directory to cleanup
153   */
154  private void deleteDir(final Path p) throws IOException {
155    if (this.fs.exists(p)) {
156      if (!this.fs.delete(p, true)) {
157        throw new IOException("Failed remove of " + p);
158      }
159    }
160  }
161
162  private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException {
163    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
164    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build());
165    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build());
166    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build());
167    TableDescriptor td = builder.build();
168    UTIL.getAdmin().createTable(td);
169    return td;
170  }
171
172  private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
173    FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
174    wal.init();
175    return wal;
176  }
177
178  private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException {
179    FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf);
180    wal.init();
181    return wal;
182  }
183
184  private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException {
185    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
186    final TableDescriptor td = createBasic3FamilyTD(tableName);
187    final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
188    final Path tableDir = CommonFSUtils.getTableDir(this.rootDir, tableName);
189    deleteDir(tableDir);
190    FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
191    HRegion region = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
192    HBaseTestingUtility.closeRegionAndWAL(region);
193    return new Pair<>(td, ri);
194  }
195
196  private void writeData(TableDescriptor td, HRegion region) throws IOException {
197    final long timestamp = this.ee.currentTime();
198    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
199      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
200    }
201  }
202
203  @Test
204  public void testDifferentRootDirAndWALRootDir() throws Exception {
205    // Change wal root dir and reset the configuration
206    Path walRootDir = UTIL.createWALRootDir();
207    this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
208
209    FileSystem walFs = CommonFSUtils.getWALFileSystem(this.conf);
210    this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
211    String serverName =
212        ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis())
213            .toString();
214    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
215    this.logDir = new Path(walRootDir, logName);
216    this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
217
218    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
219    TableDescriptor td = pair.getFirst();
220    RegionInfo ri = pair.getSecond();
221
222    WAL wal = createWAL(walFs, walRootDir, logName);
223    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
224    writeData(td, region);
225
226    // Now close the region without flush
227    region.close(true);
228    wal.shutdown();
229    // split the log
230    WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
231
232    WAL wal2 = createWAL(walFs, walRootDir, logName);
233    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
234    Result result2 = region2.get(new Get(ROW));
235    assertEquals(td.getColumnFamilies().length, result2.size());
236    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
237      assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
238    }
239  }
240
241  @Test
242  public void testCorruptRecoveredHFile() throws Exception {
243    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
244    TableDescriptor td = pair.getFirst();
245    RegionInfo ri = pair.getSecond();
246
247    WAL wal = createWAL(this.conf, rootDir, logName);
248    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
249    writeData(td, region);
250
251    // Now close the region without flush
252    region.close(true);
253    wal.shutdown();
254    // split the log
255    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
256
257    // Write a corrupt recovered hfile
258    Path regionDir =
259        new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName());
260    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
261      FileStatus[] files =
262          WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
263      assertNotNull(files);
264      assertTrue(files.length > 0);
265      writeCorruptRecoveredHFile(files[0].getPath());
266    }
267
268    // Failed to reopen the region
269    WAL wal2 = createWAL(this.conf, rootDir, logName);
270    try {
271      HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
272      fail("Should fail to open region");
273    } catch (CorruptHFileException che) {
274      // Expected
275    }
276
277    // Set skip errors to true and reopen the region
278    this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true);
279    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
280    Result result2 = region2.get(new Get(ROW));
281    assertEquals(td.getColumnFamilies().length, result2.size());
282    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
283      assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
284      // Assert the corrupt file was skipped and still exist
285      FileStatus[] files =
286          WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
287      assertNotNull(files);
288      assertEquals(1, files.length);
289      assertTrue(files[0].getPath().getName().contains("corrupt"));
290    }
291  }
292
293  @Test
294  public void testPutWithSameTimestamp() throws Exception {
295    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
296    TableDescriptor td = pair.getFirst();
297    RegionInfo ri = pair.getSecond();
298
299    WAL wal = createWAL(this.conf, rootDir, logName);
300    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
301    final long timestamp = this.ee.currentTime();
302    // Write data and flush
303    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
304      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
305    }
306    region.flush(true);
307
308    // Write data with same timestamp and do not flush
309    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
310      region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2));
311    }
312    // Now close the region without flush
313    region.close(true);
314    wal.shutdown();
315    // split the log
316    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
317
318    // reopen the region
319    WAL wal2 = createWAL(this.conf, rootDir, logName);
320    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
321    Result result2 = region2.get(new Get(ROW));
322    assertEquals(td.getColumnFamilies().length, result2.size());
323    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
324      assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER)));
325    }
326  }
327
328  @Test
329  public void testRecoverSequenceId() throws Exception {
330    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
331    TableDescriptor td = pair.getFirst();
332    RegionInfo ri = pair.getSecond();
333
334    WAL wal = createWAL(this.conf, rootDir, logName);
335    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
336    Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>();
337    // Write data and do not flush
338    for (int i = 0; i < countPerFamily; i++) {
339      for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
340        region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1));
341        Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
342        assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
343        List<Cell> cells = result.listCells();
344        assertEquals(1, cells.size());
345        seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(),
346          cells.get(0).getSequenceId());
347      }
348    }
349
350    // Now close the region without flush
351    region.close(true);
352    wal.shutdown();
353    // split the log
354    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
355
356    // reopen the region
357    WAL wal2 = createWAL(this.conf, rootDir, logName);
358    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
359    // assert the seqid was recovered
360    for (int i = 0; i < countPerFamily; i++) {
361      for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
362        Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
363        assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
364        List<Cell> cells = result.listCells();
365        assertEquals(1, cells.size());
366        assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()),
367          cells.get(0).getSequenceId());
368      }
369    }
370  }
371
372  /**
373   * Test writing edits into an HRegion, closing it, splitting logs, opening
374   * Region again.  Verify seqids.
375   */
376  @Test
377  public void testWrittenViaHRegion()
378      throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
379    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
380    TableDescriptor td = pair.getFirst();
381    RegionInfo ri = pair.getSecond();
382
383    // Write countPerFamily edits into the three families.  Do a flush on one
384    // of the families during the load of edits so its seqid is not same as
385    // others to test we do right thing when different seqids.
386    WAL wal = createWAL(this.conf, rootDir, logName);
387    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
388    long seqid = region.getOpenSeqNum();
389    boolean first = true;
390    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
391      addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
392      if (first) {
393        // If first, so we have at least one family w/ different seqid to rest.
394        region.flush(true);
395        first = false;
396      }
397    }
398    // Now assert edits made it in.
399    final Get g = new Get(ROW);
400    Result result = region.get(g);
401    assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
402    // Now close the region (without flush), split the log, reopen the region and assert that
403    // replay of log has the correct effect, that our seqids are calculated correctly so
404    // all edits in logs are seen as 'stale'/old.
405    region.close(true);
406    wal.shutdown();
407    try {
408      WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
409    } catch (Exception e) {
410      LOG.debug("Got exception", e);
411    }
412
413    WAL wal2 = createWAL(this.conf, rootDir, logName);
414    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
415    long seqid2 = region2.getOpenSeqNum();
416    assertTrue(seqid + result.size() < seqid2);
417    final Result result1b = region2.get(g);
418    assertEquals(result.size(), result1b.size());
419
420    // Next test.  Add more edits, then 'crash' this region by stealing its wal
421    // out from under it and assert that replay of the log adds the edits back
422    // correctly when region is opened again.
423    for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
424      addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y");
425    }
426    // Get count of edits.
427    final Result result2 = region2.get(g);
428    assertEquals(2 * result.size(), result2.size());
429    wal2.sync();
430    final Configuration newConf = HBaseConfiguration.create(this.conf);
431    User user = HBaseTestingUtility.getDifferentUser(newConf, td.getTableName().getNameAsString());
432    user.runAs(new PrivilegedExceptionAction<Object>() {
433      @Override
434      public Object run() throws Exception {
435        WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals);
436        FileSystem newFS = FileSystem.get(newConf);
437        // Make a new wal for new region open.
438        WAL wal3 = createWAL(newConf, rootDir, logName);
439        Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
440        HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
441        long seqid3 = region3.initialize();
442        Result result3 = region3.get(g);
443        // Assert that count of cells is same as before crash.
444        assertEquals(result2.size(), result3.size());
445
446        // I can't close wal1.  Its been appropriated when we split.
447        region3.close();
448        wal3.close();
449        return null;
450      }
451    });
452  }
453
454  /**
455   * Test that we recover correctly when there is a failure in between the
456   * flushes. i.e. Some stores got flushed but others did not.
457   * Unfortunately, there is no easy hook to flush at a store level. The way
458   * we get around this is by flushing at the region level, and then deleting
459   * the recently flushed store file for one of the Stores. This would put us
460   * back in the situation where all but that store got flushed and the region
461   * died.
462   * We restart Region again, and verify that the edits were replayed.
463   */
464  @Test
465  public void testAfterPartialFlush()
466      throws IOException, SecurityException, IllegalArgumentException {
467    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
468    TableDescriptor td = pair.getFirst();
469    RegionInfo ri = pair.getSecond();
470
471    // Write countPerFamily edits into the three families.  Do a flush on one
472    // of the families during the load of edits so its seqid is not same as
473    // others to test we do right thing when different seqids.
474    WAL wal = createWAL(this.conf, rootDir, logName);
475    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
476    long seqid = region.getOpenSeqNum();
477    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
478      addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
479    }
480
481    // Now assert edits made it in.
482    final Get g = new Get(ROW);
483    Result result = region.get(g);
484    assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
485
486    // Let us flush the region
487    region.flush(true);
488    region.close(true);
489    wal.shutdown();
490
491    // delete the store files in the second column family to simulate a failure
492    // in between the flushcache();
493    // we have 3 families. killing the middle one ensures that taking the maximum
494    // will make us fail.
495    int cf_count = 0;
496    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
497      cf_count++;
498      if (cf_count == 2) {
499        region.getRegionFileSystem().deleteFamily(cfd.getNameAsString());
500      }
501    }
502
503    // Let us try to split and recover
504    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
505    WAL wal2 = createWAL(this.conf, rootDir, logName);
506    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
507    long seqid2 = region2.getOpenSeqNum();
508    assertTrue(seqid + result.size() < seqid2);
509
510    final Result result1b = region2.get(g);
511    assertEquals(result.size(), result1b.size());
512  }
513
514  /**
515   * Test that we could recover the data correctly after aborting flush. In the
516   * test, first we abort flush after writing some data, then writing more data
517   * and flush again, at last verify the data.
518   */
519  @Test
520  public void testAfterAbortingFlush() throws IOException {
521    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
522    TableDescriptor td = pair.getFirst();
523    RegionInfo ri = pair.getSecond();
524
525    // Write countPerFamily edits into the three families. Do a flush on one
526    // of the families during the load of edits so its seqid is not same as
527    // others to test we do right thing when different seqids.
528    WAL wal = createWAL(this.conf, rootDir, logName);
529    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
530    Mockito.doReturn(false).when(rsServices).isAborted();
531    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
532    when(rsServices.getConfiguration()).thenReturn(conf);
533    Configuration customConf = new Configuration(this.conf);
534    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
535        AbstractTestWALReplay.CustomStoreFlusher.class.getName());
536    HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null);
537    int writtenRowCount = 10;
538    List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
539    for (int i = 0; i < writtenRowCount; i++) {
540      Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
541      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
542          Bytes.toBytes("val"));
543      region.put(put);
544    }
545
546    // Now assert edits made it in.
547    RegionScanner scanner = region.getScanner(new Scan());
548    assertEquals(writtenRowCount, getScannedCount(scanner));
549
550    // Let us flush the region
551    AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
552    try {
553      region.flush(true);
554      fail("Injected exception hasn't been thrown");
555    } catch (IOException e) {
556      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
557      // simulated to abort server
558      Mockito.doReturn(true).when(rsServices).isAborted();
559      region.setClosing(false); // region normally does not accept writes after
560      // DroppedSnapshotException. We mock around it for this test.
561    }
562    // writing more data
563    int moreRow = 10;
564    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
565      Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
566      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
567          Bytes.toBytes("val"));
568      region.put(put);
569    }
570    writtenRowCount += moreRow;
571    // call flush again
572    AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
573    try {
574      region.flush(true);
575    } catch (IOException t) {
576      LOG.info(
577          "Expected exception when flushing region because server is stopped," + t.getMessage());
578    }
579
580    region.close(true);
581    wal.shutdown();
582
583    // Let us try to split and recover
584    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
585    WAL wal2 = createWAL(this.conf, rootDir, logName);
586    Mockito.doReturn(false).when(rsServices).isAborted();
587    HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null);
588    scanner = region2.getScanner(new Scan());
589    assertEquals(writtenRowCount, getScannedCount(scanner));
590  }
591
592  private int getScannedCount(RegionScanner scanner) throws IOException {
593    int scannedCount = 0;
594    List<Cell> results = new ArrayList<>();
595    while (true) {
596      boolean existMore = scanner.next(results);
597      if (!results.isEmpty()) {
598        scannedCount++;
599      }
600      if (!existMore) {
601        break;
602      }
603      results.clear();
604    }
605    return scannedCount;
606  }
607
608  private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception {
609    // Read the recovered hfile
610    int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen();
611    FSDataInputStream in = fs.open(recoveredHFile);
612    byte[] fileContent = new byte[fileSize];
613    in.readFully(0, fileContent, 0, fileSize);
614    in.close();
615
616    // Write a corrupt hfile by append garbage
617    Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt");
618    FSDataOutputStream out;
619    out = fs.create(path);
620    out.write(fileContent);
621    out.write(Bytes.toBytes("-----"));
622    out.close();
623  }
624}