001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver.wal;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.ArgumentMatchers.anyInt;
027import static org.mockito.ArgumentMatchers.eq;
028import static org.mockito.Mockito.doAnswer;
029import static org.mockito.Mockito.spy;
030import static org.mockito.Mockito.when;
031
032import java.io.FilterInputStream;
033import java.io.IOException;
034import java.lang.reflect.Field;
035import java.security.PrivilegedExceptionAction;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.Collection;
039import java.util.HashSet;
040import java.util.List;
041import java.util.NavigableMap;
042import java.util.Set;
043import java.util.TreeMap;
044import java.util.concurrent.atomic.AtomicBoolean;
045import java.util.concurrent.atomic.AtomicInteger;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FSDataInputStream;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtil;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
062import org.apache.hadoop.hbase.client.Delete;
063import org.apache.hadoop.hbase.client.Get;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.RegionInfo;
066import org.apache.hadoop.hbase.client.RegionInfoBuilder;
067import org.apache.hadoop.hbase.client.Result;
068import org.apache.hadoop.hbase.client.ResultScanner;
069import org.apache.hadoop.hbase.client.Scan;
070import org.apache.hadoop.hbase.client.Table;
071import org.apache.hadoop.hbase.client.TableDescriptor;
072import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
073import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
074import org.apache.hadoop.hbase.monitoring.MonitoredTask;
075import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
076import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
077import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
078import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
079import org.apache.hadoop.hbase.regionserver.FlushRequester;
080import org.apache.hadoop.hbase.regionserver.HRegion;
081import org.apache.hadoop.hbase.regionserver.HRegionServer;
082import org.apache.hadoop.hbase.regionserver.HStore;
083import org.apache.hadoop.hbase.regionserver.MemStoreSizing;
084import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
085import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
086import org.apache.hadoop.hbase.regionserver.Region;
087import org.apache.hadoop.hbase.regionserver.RegionScanner;
088import org.apache.hadoop.hbase.regionserver.RegionServerServices;
089import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
090import org.apache.hadoop.hbase.security.User;
091import org.apache.hadoop.hbase.util.Bytes;
092import org.apache.hadoop.hbase.util.CommonFSUtils;
093import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
094import org.apache.hadoop.hbase.util.EnvironmentEdge;
095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
096import org.apache.hadoop.hbase.util.HFileTestUtil;
097import org.apache.hadoop.hbase.util.Pair;
098import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
099import org.apache.hadoop.hbase.wal.WAL;
100import org.apache.hadoop.hbase.wal.WALEdit;
101import org.apache.hadoop.hbase.wal.WALFactory;
102import org.apache.hadoop.hbase.wal.WALKeyImpl;
103import org.apache.hadoop.hbase.wal.WALSplitUtil;
104import org.apache.hadoop.hbase.wal.WALSplitter;
105import org.apache.hadoop.hdfs.DFSInputStream;
106import org.junit.After;
107import org.junit.AfterClass;
108import org.junit.Before;
109import org.junit.BeforeClass;
110import org.junit.Rule;
111import org.junit.Test;
112import org.junit.rules.TestName;
113import org.mockito.Mockito;
114import org.mockito.invocation.InvocationOnMock;
115import org.mockito.stubbing.Answer;
116import org.slf4j.Logger;
117import org.slf4j.LoggerFactory;
118
119/**
120 * Test replay of edits out of a WAL split.
121 */
122public abstract class AbstractTestWALReplay {
123  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
124  static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
125  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
126  private Path hbaseRootDir = null;
127  private String logName;
128  private Path oldLogDir;
129  private Path logDir;
130  private FileSystem fs;
131  private Configuration conf;
132  private WALFactory wals;
133
134  @Rule
135  public final TestName currentTest = new TestName();
136
137  @BeforeClass
138  public static void setUpBeforeClass() throws Exception {
139    Configuration conf = TEST_UTIL.getConfiguration();
140    // The below config supported by 0.20-append and CDH3b2
141    conf.setInt("dfs.client.block.recovery.retries", 2);
142    TEST_UTIL.startMiniCluster(3);
143    Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
144    LOG.info("hbase.rootdir=" + hbaseRootDir);
145    CommonFSUtils.setRootDir(conf, hbaseRootDir);
146  }
147
148  @AfterClass
149  public static void tearDownAfterClass() throws Exception {
150    TEST_UTIL.shutdownMiniCluster();
151  }
152
153  @Before
154  public void setUp() throws Exception {
155    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
156    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
157    this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf);
158    this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
159    String serverName =
160      ServerName.valueOf(currentTest.getMethodName() + "-manual", 16010,
161        EnvironmentEdgeManager.currentTime())
162        .toString();
163    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
164    this.logDir = new Path(this.hbaseRootDir, logName);
165    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
166      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
167    }
168    this.wals = new WALFactory(conf, currentTest.getMethodName());
169  }
170
171  @After
172  public void tearDown() throws Exception {
173    this.wals.close();
174    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
175  }
176
177  /*
178   * @param p Directory to cleanup
179   */
180  private void deleteDir(final Path p) throws IOException {
181    if (this.fs.exists(p)) {
182      if (!this.fs.delete(p, true)) {
183        throw new IOException("Failed remove of " + p);
184      }
185    }
186  }
187
188  /**
189   * @throws Exception
190   */
191  @Test
192  public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
193    final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
194    byte[] family1 = Bytes.toBytes("cf1");
195    byte[] family2 = Bytes.toBytes("cf2");
196    byte[] qualifier = Bytes.toBytes("q");
197    byte[] value = Bytes.toBytes("testV");
198    byte[][] familys = { family1, family2 };
199    TEST_UTIL.createTable(tableName, familys);
200    Table htable = TEST_UTIL.getConnection().getTable(tableName);
201    Put put = new Put(Bytes.toBytes("r1"));
202    put.addColumn(family1, qualifier, value);
203    htable.put(put);
204    ResultScanner resultScanner = htable.getScanner(new Scan());
205    int count = 0;
206    while (resultScanner.next() != null) {
207      count++;
208    }
209    resultScanner.close();
210    assertEquals(1, count);
211
212    SingleProcessHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
213    List<HRegion> regions = hbaseCluster.getRegions(tableName);
214    assertEquals(1, regions.size());
215
216    // move region to another regionserver
217    Region destRegion = regions.get(0);
218    int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName());
219    assertTrue("Please start more than 1 regionserver",
220      hbaseCluster.getRegionServerThreads().size() > 1);
221    int destServerNum = 0;
222    while (destServerNum == originServerNum) {
223      destServerNum++;
224    }
225    HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
226    HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
227    // move region to destination regionserver
228    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName());
229
230    // delete the row
231    Delete del = new Delete(Bytes.toBytes("r1"));
232    htable.delete(del);
233    resultScanner = htable.getScanner(new Scan());
234    count = 0;
235    while (resultScanner.next() != null) {
236      count++;
237    }
238    resultScanner.close();
239    assertEquals(0, count);
240
241    // flush region and make major compaction
242    HRegion region =
243      (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
244    region.flush(true);
245    // wait to complete major compaction
246    for (HStore store : region.getStores()) {
247      store.triggerMajorCompaction();
248    }
249    region.compact(true);
250
251    // move region to origin regionserver
252    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName());
253    // abort the origin regionserver
254    originServer.abort("testing");
255
256    // see what we get
257    Result result = htable.get(new Get(Bytes.toBytes("r1")));
258    if (result != null) {
259      assertTrue("Row is deleted, but we get" + result.toString(),
260        (result == null) || result.isEmpty());
261    }
262    resultScanner.close();
263  }
264
265  /**
266   * Tests for hbase-2727.
267   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a>
268   */
269  @Test
270  public void test2727() throws Exception {
271    // Test being able to have > 1 set of edits in the recovered.edits directory.
272    // Ensure edits are replayed properly.
273    final TableName tableName = TableName.valueOf("test2727");
274
275    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
276    RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
277    Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
278    deleteDir(basedir);
279
280    TableDescriptor tableDescriptor = createBasic3FamilyHTD(tableName);
281    Region region2 =
282      HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, tableDescriptor);
283    HBaseTestingUtil.closeRegionAndWAL(region2);
284    final byte[] rowName = tableName.getName();
285
286    WAL wal1 = createWAL(this.conf, hbaseRootDir, logName);
287    // Add 1k to each family.
288    final int countPerFamily = 1000;
289
290    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
291    for (byte[] fam : tableDescriptor.getColumnFamilyNames()) {
292      scopes.put(fam, 0);
293    }
294    for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
295      addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal1,
296        mvcc, scopes);
297    }
298    wal1.shutdown();
299    runWALSplit(this.conf);
300
301    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
302    // Add 1k to each family.
303    for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
304      addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal2,
305        mvcc, scopes);
306    }
307    wal2.shutdown();
308    runWALSplit(this.conf);
309
310    WAL wal3 = createWAL(this.conf, hbaseRootDir, logName);
311    try {
312      HRegion region =
313        HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, tableDescriptor, wal3);
314      long seqid = region.getOpenSeqNum();
315      // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
316      // When opened, this region would apply 6k edits, and increment the sequenceId by 1
317      assertTrue(seqid > mvcc.getWritePoint());
318      assertEquals(seqid - 1, mvcc.getWritePoint());
319      LOG.debug(
320        "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint());
321
322      // TODO: Scan all.
323      region.close();
324    } finally {
325      wal3.close();
326    }
327  }
328
329  /**
330   * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'.
331   */
332  @Test
333  public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException,
334    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
335    final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
336    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
337    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
338    deleteDir(basedir);
339    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
340    Region region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
341    HBaseTestingUtil.closeRegionAndWAL(region2);
342    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
343    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
344
345    byte[] family = htd.getColumnFamilies()[0].getName();
346    Path f = new Path(basedir, "hfile");
347    HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
348      Bytes.toBytes("z"), 10);
349    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
350    hfs.add(Pair.newPair(family, f.toString()));
351    region.bulkLoadHFiles(hfs, true, null);
352
353    // Add an edit so something in the WAL
354    byte[] row = tableName.getName();
355    region.put((new Put(row)).addColumn(family, family, family));
356    wal.sync();
357    final int rowsInsertedCount = 11;
358
359    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
360
361    // Now 'crash' the region by stealing its wal
362    final Configuration newConf = HBaseConfiguration.create(this.conf);
363    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
364    user.runAs(new PrivilegedExceptionAction() {
365      @Override
366      public Object run() throws Exception {
367        runWALSplit(newConf);
368        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
369
370        HRegion region2 =
371          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
372        long seqid2 = region2.getOpenSeqNum();
373        assertTrue(seqid2 > -1);
374        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
375
376        // I can't close wal1. Its been appropriated when we split.
377        region2.close();
378        wal2.close();
379        return null;
380      }
381    });
382  }
383
384  /**
385   * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
386   * files) and an edit in the memstore.
387   * <p/>
388   * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries from
389   * being replayed"
390   */
391  @Test
392  public void testCompactedBulkLoadedFiles() throws IOException, SecurityException,
393    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
394    final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles");
395    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
396    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
397    deleteDir(basedir);
398    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
399    HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
400    HBaseTestingUtil.closeRegionAndWAL(region2);
401    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
402    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
403
404    // Add an edit so something in the WAL
405    byte[] row = tableName.getName();
406    byte[] family = htd.getColumnFamilies()[0].getName();
407    region.put((new Put(row)).addColumn(family, family, family));
408    wal.sync();
409
410    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
411    for (int i = 0; i < 3; i++) {
412      Path f = new Path(basedir, "hfile" + i);
413      HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
414        Bytes.toBytes(i + "50"), 10);
415      hfs.add(Pair.newPair(family, f.toString()));
416    }
417    region.bulkLoadHFiles(hfs, true, null);
418    final int rowsInsertedCount = 31;
419    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
420
421    // major compact to turn all the bulk loaded files into one normal file
422    region.compact(true);
423    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
424
425    // Now 'crash' the region by stealing its wal
426    final Configuration newConf = HBaseConfiguration.create(this.conf);
427    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
428    user.runAs(new PrivilegedExceptionAction() {
429      @Override
430      public Object run() throws Exception {
431        runWALSplit(newConf);
432        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
433
434        HRegion region2 =
435          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
436        long seqid2 = region2.getOpenSeqNum();
437        assertTrue(seqid2 > -1);
438        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
439
440        // I can't close wal1. Its been appropriated when we split.
441        region2.close();
442        wal2.close();
443        return null;
444      }
445    });
446  }
447
448  /**
449   * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify
450   * seqids.
451   */
452  @Test
453  public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException,
454    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
455    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
456    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
457    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
458    deleteDir(basedir);
459    final byte[] rowName = tableName.getName();
460    final int countPerFamily = 10;
461    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
462    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
463    HBaseTestingUtil.closeRegionAndWAL(region3);
464    // Write countPerFamily edits into the three families. Do a flush on one
465    // of the families during the load of edits so its seqid is not same as
466    // others to test we do right thing when different seqids.
467    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
468    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
469    long seqid = region.getOpenSeqNum();
470    boolean first = true;
471    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
472      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
473      if (first) {
474        // If first, so we have at least one family w/ different seqid to rest.
475        region.flush(true);
476        first = false;
477      }
478    }
479    // Now assert edits made it in.
480    final Get g = new Get(rowName);
481    Result result = region.get(g);
482    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
483    // Now close the region (without flush), split the log, reopen the region and assert that
484    // replay of log has the correct effect, that our seqids are calculated correctly so
485    // all edits in logs are seen as 'stale'/old.
486    region.close(true);
487    wal.shutdown();
488    runWALSplit(this.conf);
489    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
490    HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
491    long seqid2 = region2.getOpenSeqNum();
492    assertTrue(seqid + result.size() < seqid2);
493    final Result result1b = region2.get(g);
494    assertEquals(result.size(), result1b.size());
495
496    // Next test. Add more edits, then 'crash' this region by stealing its wal
497    // out from under it and assert that replay of the log adds the edits back
498    // correctly when region is opened again.
499    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
500      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
501    }
502    // Get count of edits.
503    final Result result2 = region2.get(g);
504    assertEquals(2 * result.size(), result2.size());
505    wal2.sync();
506    final Configuration newConf = HBaseConfiguration.create(this.conf);
507    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
508    user.runAs(new PrivilegedExceptionAction<Object>() {
509      @Override
510      public Object run() throws Exception {
511        runWALSplit(newConf);
512        FileSystem newFS = FileSystem.get(newConf);
513        // Make a new wal for new region open.
514        WAL wal3 = createWAL(newConf, hbaseRootDir, logName);
515        final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
516        HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
517          @Override
518          protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) {
519            super.restoreEdit(s, cell, memstoreSizing);
520            countOfRestoredEdits.incrementAndGet();
521          }
522        };
523        long seqid3 = region3.initialize();
524        Result result3 = region3.get(g);
525        // Assert that count of cells is same as before crash.
526        assertEquals(result2.size(), result3.size());
527        assertEquals(htd.getColumnFamilies().length * countPerFamily, countOfRestoredEdits.get());
528
529        // I can't close wal1. Its been appropriated when we split.
530        region3.close();
531        wal3.close();
532        return null;
533      }
534    });
535  }
536
537  /**
538   * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores
539   * got flushed but others did not.
540   * <p/>
541   * Unfortunately, there is no easy hook to flush at a store level. The way we get around this is
542   * by flushing at the region level, and then deleting the recently flushed store file for one of
543   * the Stores. This would put us back in the situation where all but that store got flushed and
544   * the region died.
545   * <p/>
546   * We restart Region again, and verify that the edits were replayed.
547   */
548  @Test
549  public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException,
550    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
551    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
552    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
553    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
554    deleteDir(basedir);
555    final byte[] rowName = tableName.getName();
556    final int countPerFamily = 10;
557    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
558    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
559    HBaseTestingUtil.closeRegionAndWAL(region3);
560    // Write countPerFamily edits into the three families. Do a flush on one
561    // of the families during the load of edits so its seqid is not same as
562    // others to test we do right thing when different seqids.
563    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
564    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
565    long seqid = region.getOpenSeqNum();
566    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
567      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
568    }
569
570    // Now assert edits made it in.
571    final Get g = new Get(rowName);
572    Result result = region.get(g);
573    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
574
575    // Let us flush the region
576    region.flush(true);
577    region.close(true);
578    wal.shutdown();
579
580    // delete the store files in the second column family to simulate a failure
581    // in between the flushcache();
582    // we have 3 families. killing the middle one ensures that taking the maximum
583    // will make us fail.
584    int cf_count = 0;
585    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
586      cf_count++;
587      if (cf_count == 2) {
588        region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
589      }
590    }
591
592    // Let us try to split and recover
593    runWALSplit(this.conf);
594    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
595    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
596    long seqid2 = region2.getOpenSeqNum();
597    assertTrue(seqid + result.size() < seqid2);
598
599    final Result result1b = region2.get(g);
600    assertEquals(result.size(), result1b.size());
601  }
602
603  // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
604  // Only throws exception if throwExceptionWhenFlushing is set true.
605  public static class CustomStoreFlusher extends DefaultStoreFlusher {
606    // Switch between throw and not throw exception in flush
607    public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
608
609    public CustomStoreFlusher(Configuration conf, HStore store) {
610      super(conf, store);
611    }
612
613    @Override
614    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
615      MonitoredTask status, ThroughputController throughputController,
616      FlushLifeCycleTracker tracker) throws IOException {
617      if (throwExceptionWhenFlushing.get()) {
618        throw new IOException("Simulated exception by tests");
619      }
620      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
621    }
622  }
623
624  /**
625   * Test that we could recover the data correctly after aborting flush. In the test, first we abort
626   * flush after writing some data, then writing more data and flush again, at last verify the data.
627   */
628  @Test
629  public void testReplayEditsAfterAbortingFlush() throws IOException {
630    final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush");
631    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
632    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
633    deleteDir(basedir);
634    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
635    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
636    HBaseTestingUtil.closeRegionAndWAL(region3);
637    // Write countPerFamily edits into the three families. Do a flush on one
638    // of the families during the load of edits so its seqid is not same as
639    // others to test we do right thing when different seqids.
640    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
641    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
642    Mockito.doReturn(false).when(rsServices).isAborted();
643    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
644    when(rsServices.getConfiguration()).thenReturn(conf);
645    Configuration customConf = new Configuration(this.conf);
646    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
647      CustomStoreFlusher.class.getName());
648    HRegion region =
649      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
650    int writtenRowCount = 10;
651    List<ColumnFamilyDescriptor> families = Arrays.asList((htd.getColumnFamilies()));
652    for (int i = 0; i < writtenRowCount; i++) {
653      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
654      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
655        Bytes.toBytes("val"));
656      region.put(put);
657    }
658
659    // Now assert edits made it in.
660    RegionScanner scanner = region.getScanner(new Scan());
661    assertEquals(writtenRowCount, getScannedCount(scanner));
662
663    // Let us flush the region
664    CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
665    try {
666      region.flush(true);
667      fail("Injected exception hasn't been thrown");
668    } catch (IOException e) {
669      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
670      // simulated to abort server
671      Mockito.doReturn(true).when(rsServices).isAborted();
672      region.setClosing(false); // region normally does not accept writes after
673      // DroppedSnapshotException. We mock around it for this test.
674    }
675    // writing more data
676    int moreRow = 10;
677    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
678      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
679      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
680        Bytes.toBytes("val"));
681      region.put(put);
682    }
683    writtenRowCount += moreRow;
684    // call flush again
685    CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
686    try {
687      region.flush(true);
688    } catch (IOException t) {
689      LOG.info(
690        "Expected exception when flushing region because server is stopped," + t.getMessage());
691    }
692
693    region.close(true);
694    wal.shutdown();
695
696    // Let us try to split and recover
697    runWALSplit(this.conf);
698    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
699    Mockito.doReturn(false).when(rsServices).isAborted();
700    HRegion region2 =
701      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
702    scanner = region2.getScanner(new Scan());
703    assertEquals(writtenRowCount, getScannedCount(scanner));
704  }
705
706  private int getScannedCount(RegionScanner scanner) throws IOException {
707    int scannedCount = 0;
708    List<Cell> results = new ArrayList<>();
709    while (true) {
710      boolean existMore = scanner.next(results);
711      if (!results.isEmpty()) {
712        scannedCount++;
713      }
714      if (!existMore) {
715        break;
716      }
717      results.clear();
718    }
719    return scannedCount;
720  }
721
722  /**
723   * Create an HRegion with the result of a WAL split and test we only see the good edits=
724   */
725  @Test
726  public void testReplayEditsWrittenIntoWAL() throws Exception {
727    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
728    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
729    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
730    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
731    deleteDir(basedir);
732
733    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
734    HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
735    HBaseTestingUtil.closeRegionAndWAL(region2);
736    final WAL wal = createWAL(this.conf, hbaseRootDir, logName);
737    final byte[] rowName = tableName.getName();
738    final byte[] regionName = hri.getEncodedNameAsBytes();
739
740    // Add 1k to each family.
741    final int countPerFamily = 1000;
742    Set<byte[]> familyNames = new HashSet<>();
743    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
744    for (byte[] fam : htd.getColumnFamilyNames()) {
745      scopes.put(fam, 0);
746    }
747    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
748      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, mvcc, scopes);
749      familyNames.add(hcd.getName());
750    }
751
752    // Add a cache flush, shouldn't have any effect
753    wal.startCacheFlush(regionName, familyNames);
754    wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
755
756    // Add an edit to another family, should be skipped.
757    WALEdit edit = new WALEdit();
758    long now = ee.currentTime();
759    edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName));
760    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
761      edit);
762
763    // Delete the c family to verify deletes make it over.
764    edit = new WALEdit();
765    now = ee.currentTime();
766    edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
767    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
768      edit);
769
770    // Sync.
771    wal.sync();
772    // Make a new conf and a new fs for the splitter to run on so we can take
773    // over old wal.
774    final Configuration newConf = HBaseConfiguration.create(this.conf);
775    User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime");
776    user.runAs(new PrivilegedExceptionAction<Void>() {
777      @Override
778      public Void run() throws Exception {
779        runWALSplit(newConf);
780        FileSystem newFS = FileSystem.get(newConf);
781        // 100k seems to make for about 4 flushes during HRegion#initialize.
782        newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
783        // Make a new wal for new region.
784        WAL newWal = createWAL(newConf, hbaseRootDir, logName);
785        final AtomicInteger flushcount = new AtomicInteger(0);
786        try {
787          final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
788            @Override
789            protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
790              final Collection<HStore> storesToFlush, MonitoredTask status,
791              boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
792              LOG.info("InternalFlushCache Invoked");
793              FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
794                Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker);
795              flushcount.incrementAndGet();
796              return fs;
797            }
798          };
799          // The seq id this region has opened up with
800          long seqid = region.initialize();
801
802          // The mvcc readpoint of from inserting data.
803          long writePoint = mvcc.getWritePoint();
804
805          // We flushed during init.
806          assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
807          assertTrue((seqid - 1) == writePoint);
808
809          Get get = new Get(rowName);
810          Result result = region.get(get);
811          // Make sure we only see the good edits
812          assertEquals(countPerFamily * (htd.getColumnFamilies().length - 1), result.size());
813          region.close();
814        } finally {
815          newWal.close();
816        }
817        return null;
818      }
819    });
820  }
821
822  @Test
823  // the following test is for HBASE-6065
824  public void testSequentialEditLogSeqNum() throws IOException {
825    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
826    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
827    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
828    deleteDir(basedir);
829    final byte[] rowName = tableName.getName();
830    final int countPerFamily = 10;
831    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
832
833    // Mock the WAL
834    MockWAL wal = createMockWAL();
835
836    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
837    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
838      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
839    }
840
841    // Let us flush the region
842    // But this time completeflushcache is not yet done
843    region.flush(true);
844    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
845      addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
846    }
847    long lastestSeqNumber = region.getReadPoint(null);
848    // get the current seq no
849    wal.doCompleteCacheFlush = true;
850    // allow complete cache flush with the previous seq number got after first
851    // set of edits.
852    wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
853    wal.shutdown();
854    FileStatus[] listStatus = wal.getFiles();
855    assertNotNull(listStatus);
856    assertTrue(listStatus.length > 0);
857    WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null,
858      wals, null);
859    FileStatus[] listStatus1 =
860      this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName),
861        new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() {
862          @Override
863          public boolean accept(Path p) {
864            return !WALSplitUtil.isSequenceIdFile(p);
865          }
866        });
867    int editCount = 0;
868    for (FileStatus fileStatus : listStatus1) {
869      editCount = Integer.parseInt(fileStatus.getPath().getName());
870    }
871    // The sequence number should be same
872    assertEquals(
873      "The sequence number of the recoverd.edits and the current edit seq should be same",
874      lastestSeqNumber, editCount);
875  }
876
877  /**
878   * testcase for https://issues.apache.org/jira/browse/HBASE-15252
879   */
880  @Test
881  public void testDatalossWhenInputError() throws Exception {
882    final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
883    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
884    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
885    deleteDir(basedir);
886    final byte[] rowName = tableName.getName();
887    final int countPerFamily = 10;
888    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
889    HRegion region1 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
890    Path regionDir = region1.getWALRegionDir();
891    HBaseTestingUtil.closeRegionAndWAL(region1);
892
893    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
894    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
895    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
896      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
897    }
898    // Now assert edits made it in.
899    final Get g = new Get(rowName);
900    Result result = region.get(g);
901    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
902    // Now close the region (without flush), split the log, reopen the region and assert that
903    // replay of log has the correct effect.
904    region.close(true);
905    wal.shutdown();
906
907    runWALSplit(this.conf);
908
909    // here we let the DFSInputStream throw an IOException just after the WALHeader.
910    Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
911    FSDataInputStream stream = fs.open(editFile);
912    stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
913    Class<? extends AbstractFSWALProvider.Reader> logReaderClass =
914      conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
915        AbstractFSWALProvider.Reader.class);
916    AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance();
917    reader.init(this.fs, editFile, conf, stream);
918    final long headerLength = stream.getPos();
919    reader.close();
920    FileSystem spyFs = spy(this.fs);
921    doAnswer(new Answer<FSDataInputStream>() {
922
923      @Override
924      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
925        FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
926        Field field = FilterInputStream.class.getDeclaredField("in");
927        field.setAccessible(true);
928        final DFSInputStream in = (DFSInputStream) field.get(stream);
929        DFSInputStream spyIn = spy(in);
930        doAnswer(new Answer<Integer>() {
931
932          private long pos;
933
934          @Override
935          public Integer answer(InvocationOnMock invocation) throws Throwable {
936            if (pos >= headerLength) {
937              throw new IOException("read over limit");
938            }
939            int b = (Integer) invocation.callRealMethod();
940            if (b > 0) {
941              pos += b;
942            }
943            return b;
944          }
945        }).when(spyIn).read(any(byte[].class), anyInt(), anyInt());
946        doAnswer(new Answer<Void>() {
947
948          @Override
949          public Void answer(InvocationOnMock invocation) throws Throwable {
950            invocation.callRealMethod();
951            in.close();
952            return null;
953          }
954        }).when(spyIn).close();
955        field.set(stream, spyIn);
956        return stream;
957      }
958    }).when(spyFs).open(eq(editFile));
959
960    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
961    HRegion region2;
962    try {
963      // log replay should fail due to the IOException, otherwise we may lose data.
964      region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
965      assertEquals(result.size(), region2.get(g).size());
966    } catch (IOException e) {
967      assertEquals("read over limit", e.getMessage());
968    }
969    region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
970    assertEquals(result.size(), region2.get(g).size());
971  }
972
973  /**
974   * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
975   */
976  private void testNameConflictWhenSplit(boolean largeFirst)
977    throws IOException, StreamLacksCapabilityException {
978    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
979    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
980    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
981    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
982    deleteDir(basedir);
983
984    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
985    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
986    for (byte[] fam : htd.getColumnFamilyNames()) {
987      scopes.put(fam, 0);
988    }
989    HRegion region = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
990    HBaseTestingUtil.closeRegionAndWAL(region);
991    final byte[] family = htd.getColumnFamilies()[0].getName();
992    final byte[] rowName = tableName.getName();
993    FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes);
994    FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes);
995
996    Path largeFile = new Path(logDir, "wal-1");
997    Path smallFile = new Path(logDir, "wal-2");
998    writerWALFile(largeFile, Arrays.asList(entry1, entry2));
999    writerWALFile(smallFile, Arrays.asList(entry2));
1000    FileStatus first, second;
1001    if (largeFirst) {
1002      first = fs.getFileStatus(largeFile);
1003      second = fs.getFileStatus(smallFile);
1004    } else {
1005      first = fs.getFileStatus(smallFile);
1006      second = fs.getFileStatus(largeFile);
1007    }
1008    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null);
1009    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null);
1010    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
1011    region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
1012    assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
1013    assertEquals(2, region.get(new Get(rowName)).size());
1014  }
1015
1016  @Test
1017  public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException {
1018    testNameConflictWhenSplit(true);
1019  }
1020
1021  @Test
1022  public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException {
1023    testNameConflictWhenSplit(false);
1024  }
1025
1026  static class MockWAL extends FSHLog {
1027    boolean doCompleteCacheFlush = false;
1028
1029    public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
1030      throws IOException {
1031      super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1032    }
1033
1034    @Override
1035    public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
1036      if (!doCompleteCacheFlush) {
1037        return;
1038      }
1039      super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
1040    }
1041  }
1042
1043  private TableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1044    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1045    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a")));
1046    return builder.build();
1047  }
1048
1049  private MockWAL createMockWAL() throws IOException {
1050    MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
1051    wal.init();
1052    // Set down maximum recovery so we dfsclient doesn't linger retrying something
1053    // long gone.
1054    HBaseTestingUtil.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1055    return wal;
1056  }
1057
1058  // Flusher used in this test. Keep count of how often we are called and
1059  // actually run the flush inside here.
1060  static class TestFlusher implements FlushRequester {
1061    private HRegion r;
1062
1063    @Override
1064    public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
1065      try {
1066        r.flush(false);
1067        return true;
1068      } catch (IOException e) {
1069        throw new RuntimeException("Exception flushing", e);
1070      }
1071    }
1072
1073    @Override
1074    public boolean requestFlush(HRegion region, List<byte[]> families,
1075        FlushLifeCycleTracker tracker) {
1076      return true;
1077    }
1078
1079    @Override
1080    public boolean requestDelayedFlush(HRegion region, long when) {
1081      return true;
1082    }
1083
1084    @Override
1085    public void registerFlushRequestListener(FlushRequestListener listener) {
1086
1087    }
1088
1089    @Override
1090    public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1091      return false;
1092    }
1093
1094    @Override
1095    public void setGlobalMemStoreLimit(long globalMemStoreSize) {
1096
1097    }
1098  }
1099
1100  private WALKeyImpl createWALKey(final TableName tableName, final RegionInfo hri,
1101    final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
1102    return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
1103  }
1104
1105  private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
1106    int index) {
1107    byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
1108    byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index));
1109    WALEdit edit = new WALEdit();
1110    edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
1111    return edit;
1112  }
1113
1114  private FSWALEntry createFSWALEntry(TableDescriptor htd, RegionInfo hri, long sequence,
1115    byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
1116    int index, NavigableMap<byte[], Integer> scopes) throws IOException {
1117    FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
1118      createWALEdit(rowName, family, ee, index), hri, true, null);
1119    entry.stampRegionSequenceId(mvcc.begin());
1120    return entry;
1121  }
1122
1123  private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName,
1124    final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1125    final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
1126    throws IOException {
1127    for (int j = 0; j < count; j++) {
1128      wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes),
1129        createWALEdit(rowName, family, ee, j));
1130    }
1131    wal.sync();
1132  }
1133
1134  public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
1135    EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
1136    List<Put> puts = new ArrayList<>();
1137    for (int j = 0; j < count; j++) {
1138      byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1139      Put p = new Put(rowName);
1140      p.addColumn(family, qualifier, ee.currentTime(), rowName);
1141      r.put(p);
1142      puts.add(p);
1143    }
1144    return puts;
1145  }
1146
1147  /**
1148   * Creates an HRI around an HTD that has <code>tableName</code> and three column families named
1149   * 'a','b', and 'c'.
1150   * @param tableName Name of table to use when we create HTableDescriptor.
1151   */
1152  private RegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1153    return RegionInfoBuilder.newBuilder(tableName).build();
1154  }
1155
1156  /**
1157   * Run the split. Verify only single split file made.
1158   * @return The single split file made
1159   */
1160  private Path runWALSplit(final Configuration c) throws IOException {
1161    List<Path> splits =
1162      WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
1163    // Split should generate only 1 file since there's only 1 region
1164    assertEquals("splits=" + splits, 1, splits.size());
1165    // Make sure the file exists
1166    assertTrue(fs.exists(splits.get(0)));
1167    LOG.info("Split file=" + splits.get(0));
1168    return splits.get(0);
1169  }
1170
1171  private TableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1172    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1173    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a")));
1174    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("b")));
1175    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("c")));
1176    return builder.build();
1177  }
1178
1179  private void writerWALFile(Path file, List<FSWALEntry> entries)
1180    throws IOException, StreamLacksCapabilityException {
1181    fs.mkdirs(file.getParent());
1182    ProtobufLogWriter writer = new ProtobufLogWriter();
1183    writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file),
1184      StreamSlowMonitor.create(conf, "testMonitor"));
1185    for (FSWALEntry entry : entries) {
1186      writer.append(entry);
1187    }
1188    writer.sync(false);
1189    writer.close();
1190  }
1191
1192  protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName)
1193    throws IOException;
1194}