001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.anyInt;
026import static org.mockito.ArgumentMatchers.eq;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.spy;
029import static org.mockito.Mockito.when;
030
031import java.io.FilterInputStream;
032import java.io.IOException;
033import java.lang.reflect.Field;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collection;
038import java.util.HashSet;
039import java.util.List;
040import java.util.NavigableMap;
041import java.util.Set;
042import java.util.TreeMap;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.atomic.AtomicInteger;
045import java.util.function.Consumer;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FSDataInputStream;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.ExtendedCell;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HBaseTestingUtil;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.KeyValue;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.RegionInfo;
067import org.apache.hadoop.hbase.client.RegionInfoBuilder;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.client.ResultScanner;
070import org.apache.hadoop.hbase.client.Scan;
071import org.apache.hadoop.hbase.client.Table;
072import org.apache.hadoop.hbase.client.TableDescriptor;
073import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
074import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
075import org.apache.hadoop.hbase.monitoring.MonitoredTask;
076import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
077import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
078import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
079import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
080import org.apache.hadoop.hbase.regionserver.FlushRequester;
081import org.apache.hadoop.hbase.regionserver.HRegion;
082import org.apache.hadoop.hbase.regionserver.HRegionServer;
083import org.apache.hadoop.hbase.regionserver.HStore;
084import org.apache.hadoop.hbase.regionserver.MemStoreSizing;
085import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
086import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
087import org.apache.hadoop.hbase.regionserver.Region;
088import org.apache.hadoop.hbase.regionserver.RegionScanner;
089import org.apache.hadoop.hbase.regionserver.RegionServerServices;
090import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
091import org.apache.hadoop.hbase.security.User;
092import org.apache.hadoop.hbase.util.Bytes;
093import org.apache.hadoop.hbase.util.CommonFSUtils;
094import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
095import org.apache.hadoop.hbase.util.EnvironmentEdge;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.HFileTestUtil;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
100import org.apache.hadoop.hbase.wal.WAL;
101import org.apache.hadoop.hbase.wal.WALEdit;
102import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
103import org.apache.hadoop.hbase.wal.WALFactory;
104import org.apache.hadoop.hbase.wal.WALKeyImpl;
105import org.apache.hadoop.hbase.wal.WALSplitUtil;
106import org.apache.hadoop.hbase.wal.WALSplitter;
107import org.apache.hadoop.hbase.wal.WALStreamReader;
108import org.apache.hadoop.hdfs.DFSInputStream;
109import org.junit.jupiter.api.AfterAll;
110import org.junit.jupiter.api.AfterEach;
111import org.junit.jupiter.api.BeforeEach;
112import org.junit.jupiter.api.Test;
113import org.junit.jupiter.api.TestInfo;
114import org.mockito.Mockito;
115import org.mockito.invocation.InvocationOnMock;
116import org.mockito.stubbing.Answer;
117import org.slf4j.Logger;
118import org.slf4j.LoggerFactory;
119
120/**
121 * Test replay of edits out of a WAL split.
122 */
123public abstract class AbstractTestWALReplay {
124  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
125  static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
126  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
127  private Path hbaseRootDir = null;
128  private String logName;
129  private Path oldLogDir;
130  private Path logDir;
131  private FileSystem fs;
132  private Configuration conf;
133  private WALFactory wals;
134
135  private String currentTest;
136
137  public static void setUpBeforeClass() throws Exception {
138    Configuration conf = TEST_UTIL.getConfiguration();
139    // The below config supported by 0.20-append and CDH3b2
140    conf.setInt("dfs.client.block.recovery.retries", 2);
141    TEST_UTIL.startMiniCluster(3);
142    Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
143    LOG.info("hbase.rootdir=" + hbaseRootDir);
144    CommonFSUtils.setRootDir(conf, hbaseRootDir);
145  }
146
147  @AfterAll
148  public static void tearDownAfterClass() throws Exception {
149    TEST_UTIL.shutdownMiniCluster();
150  }
151
152  @BeforeEach
153  public void setUp(TestInfo testInfo) throws Exception {
154    currentTest = testInfo.getTestMethod().get().getName();
155    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
156    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
157    this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf);
158    this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
159    String serverName = ServerName
160      .valueOf(currentTest + "-manual", 16010, EnvironmentEdgeManager.currentTime()).toString();
161    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
162    this.logDir = new Path(this.hbaseRootDir, logName);
163    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
164      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
165    }
166    this.wals = new WALFactory(conf, currentTest);
167  }
168
169  @AfterEach
170  public void tearDown() throws Exception {
171    this.wals.close();
172    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
173  }
174
175  /*
176   * @param p Directory to cleanup
177   */
178  private void deleteDir(final Path p) throws IOException {
179    if (this.fs.exists(p)) {
180      if (!this.fs.delete(p, true)) {
181        throw new IOException("Failed remove of " + p);
182      }
183    }
184  }
185
186  /**
187   *   */
188  @Test
189  public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
190    final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
191    byte[] family1 = Bytes.toBytes("cf1");
192    byte[] family2 = Bytes.toBytes("cf2");
193    byte[] qualifier = Bytes.toBytes("q");
194    byte[] value = Bytes.toBytes("testV");
195    byte[][] familys = { family1, family2 };
196    TEST_UTIL.createTable(tableName, familys);
197    Table htable = TEST_UTIL.getConnection().getTable(tableName);
198    Put put = new Put(Bytes.toBytes("r1"));
199    put.addColumn(family1, qualifier, value);
200    htable.put(put);
201    ResultScanner resultScanner = htable.getScanner(new Scan());
202    int count = 0;
203    while (resultScanner.next() != null) {
204      count++;
205    }
206    resultScanner.close();
207    assertEquals(1, count);
208
209    SingleProcessHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
210    List<HRegion> regions = hbaseCluster.getRegions(tableName);
211    assertEquals(1, regions.size());
212
213    // move region to another regionserver
214    Region destRegion = regions.get(0);
215    int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName());
216    assertTrue(hbaseCluster.getRegionServerThreads().size() > 1,
217      "Please start more than 1 regionserver");
218    int destServerNum = 0;
219    while (destServerNum == originServerNum) {
220      destServerNum++;
221    }
222    HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
223    HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
224    // move region to destination regionserver
225    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName());
226
227    // delete the row
228    Delete del = new Delete(Bytes.toBytes("r1"));
229    htable.delete(del);
230    resultScanner = htable.getScanner(new Scan());
231    count = 0;
232    while (resultScanner.next() != null) {
233      count++;
234    }
235    resultScanner.close();
236    assertEquals(0, count);
237
238    // flush region and make major compaction
239    HRegion region =
240      (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
241    region.flush(true);
242    // wait to complete major compaction
243    for (HStore store : region.getStores()) {
244      store.triggerMajorCompaction();
245    }
246    region.compact(true);
247
248    // move region to origin regionserver
249    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName());
250    // abort the origin regionserver
251    originServer.abort("testing");
252
253    // see what we get
254    Result result = htable.get(new Get(Bytes.toBytes("r1")));
255    if (result != null) {
256      assertTrue((result == null) || result.isEmpty(),
257        "Row is deleted, but we get" + result.toString());
258    }
259    resultScanner.close();
260  }
261
262  /**
263   * Tests for hbase-2727.
264   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a>
265   */
266  @Test
267  public void test2727() throws Exception {
268    // Test being able to have > 1 set of edits in the recovered.edits directory.
269    // Ensure edits are replayed properly.
270    final TableName tableName = TableName.valueOf("test2727");
271
272    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
273    RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
274    Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
275    deleteDir(basedir);
276
277    TableDescriptor tableDescriptor = createBasic3FamilyHTD(tableName);
278    Region region2 =
279      HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, tableDescriptor);
280    HBaseTestingUtil.closeRegionAndWAL(region2);
281    final byte[] rowName = tableName.getName();
282
283    WAL wal1 = createWAL(this.conf, hbaseRootDir, logName);
284    // Add 1k to each family.
285    final int countPerFamily = 1000;
286
287    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
288    for (byte[] fam : tableDescriptor.getColumnFamilyNames()) {
289      scopes.put(fam, 0);
290    }
291    for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
292      addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal1,
293        mvcc, scopes);
294    }
295    wal1.shutdown();
296    runWALSplit(this.conf);
297
298    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
299    // Add 1k to each family.
300    for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
301      addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal2,
302        mvcc, scopes);
303    }
304    wal2.shutdown();
305    runWALSplit(this.conf);
306
307    WAL wal3 = createWAL(this.conf, hbaseRootDir, logName);
308    try {
309      HRegion region =
310        HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, tableDescriptor, wal3);
311      long seqid = region.getOpenSeqNum();
312      // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
313      // When opened, this region would apply 6k edits, and increment the sequenceId by 1
314      assertTrue(seqid > mvcc.getWritePoint());
315      assertEquals(seqid - 1, mvcc.getWritePoint());
316      LOG.debug(
317        "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint());
318
319      // TODO: Scan all.
320      region.close();
321    } finally {
322      wal3.close();
323    }
324  }
325
326  /**
327   * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'.
328   */
329  @Test
330  public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException,
331    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
332    final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
333    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
334    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
335    deleteDir(basedir);
336    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
337    Region region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
338    HBaseTestingUtil.closeRegionAndWAL(region2);
339    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
340    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
341
342    byte[] family = htd.getColumnFamilies()[0].getName();
343    Path f = new Path(basedir, "hfile");
344    HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
345      Bytes.toBytes("z"), 10);
346    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
347    hfs.add(Pair.newPair(family, f.toString()));
348    region.bulkLoadHFiles(hfs, true, null);
349
350    // Add an edit so something in the WAL
351    byte[] row = tableName.getName();
352    region.put((new Put(row)).addColumn(family, family, family));
353    wal.sync();
354    final int rowsInsertedCount = 11;
355
356    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
357
358    // Now 'crash' the region by stealing its wal
359    final Configuration newConf = HBaseConfiguration.create(this.conf);
360    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
361    user.runAs(new PrivilegedExceptionAction() {
362      @Override
363      public Object run() throws Exception {
364        runWALSplit(newConf);
365        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
366
367        HRegion region2 =
368          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
369        long seqid2 = region2.getOpenSeqNum();
370        assertTrue(seqid2 > -1);
371        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
372
373        // I can't close wal1. Its been appropriated when we split.
374        region2.close();
375        wal2.close();
376        return null;
377      }
378    });
379  }
380
381  /**
382   * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
383   * files) and an edit in the memstore.
384   * <p/>
385   * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries from
386   * being replayed"
387   */
388  @Test
389  public void testCompactedBulkLoadedFiles() throws IOException, SecurityException,
390    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
391    final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles");
392    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
393    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
394    deleteDir(basedir);
395    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
396    HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
397    HBaseTestingUtil.closeRegionAndWAL(region2);
398    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
399    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
400
401    // Add an edit so something in the WAL
402    byte[] row = tableName.getName();
403    byte[] family = htd.getColumnFamilies()[0].getName();
404    region.put((new Put(row)).addColumn(family, family, family));
405    wal.sync();
406
407    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
408    for (int i = 0; i < 3; i++) {
409      Path f = new Path(basedir, "hfile" + i);
410      HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
411        Bytes.toBytes(i + "50"), 10);
412      hfs.add(Pair.newPair(family, f.toString()));
413    }
414    region.bulkLoadHFiles(hfs, true, null);
415    final int rowsInsertedCount = 31;
416    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
417
418    // major compact to turn all the bulk loaded files into one normal file
419    region.compact(true);
420    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
421
422    // Now 'crash' the region by stealing its wal
423    final Configuration newConf = HBaseConfiguration.create(this.conf);
424    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
425    user.runAs(new PrivilegedExceptionAction() {
426      @Override
427      public Object run() throws Exception {
428        runWALSplit(newConf);
429        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
430
431        HRegion region2 =
432          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
433        long seqid2 = region2.getOpenSeqNum();
434        assertTrue(seqid2 > -1);
435        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
436
437        // I can't close wal1. Its been appropriated when we split.
438        region2.close();
439        wal2.close();
440        return null;
441      }
442    });
443  }
444
445  /**
446   * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify
447   * seqids.
448   */
449  @Test
450  public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException,
451    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
452    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
453    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
454    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
455    deleteDir(basedir);
456    final byte[] rowName = tableName.getName();
457    final int countPerFamily = 10;
458    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
459    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
460    HBaseTestingUtil.closeRegionAndWAL(region3);
461    // Write countPerFamily edits into the three families. Do a flush on one
462    // of the families during the load of edits so its seqid is not same as
463    // others to test we do right thing when different seqids.
464    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
465    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
466    long seqid = region.getOpenSeqNum();
467    boolean first = true;
468    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
469      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
470      if (first) {
471        // If first, so we have at least one family w/ different seqid to rest.
472        region.flush(true);
473        first = false;
474      }
475    }
476    // Now assert edits made it in.
477    final Get g = new Get(rowName);
478    Result result = region.get(g);
479    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
480    // Now close the region (without flush), split the log, reopen the region and assert that
481    // replay of log has the correct effect, that our seqids are calculated correctly so
482    // all edits in logs are seen as 'stale'/old.
483    region.close(true);
484    wal.shutdown();
485    runWALSplit(this.conf);
486    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
487    HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
488    long seqid2 = region2.getOpenSeqNum();
489    assertTrue(seqid + result.size() < seqid2);
490    final Result result1b = region2.get(g);
491    assertEquals(result.size(), result1b.size());
492
493    // Next test. Add more edits, then 'crash' this region by stealing its wal
494    // out from under it and assert that replay of the log adds the edits back
495    // correctly when region is opened again.
496    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
497      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
498    }
499    // Get count of edits.
500    final Result result2 = region2.get(g);
501    assertEquals(2 * result.size(), result2.size());
502    wal2.sync();
503    final Configuration newConf = HBaseConfiguration.create(this.conf);
504    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
505    user.runAs(new PrivilegedExceptionAction<Object>() {
506      @Override
507      public Object run() throws Exception {
508        runWALSplit(newConf);
509        FileSystem newFS = FileSystem.get(newConf);
510        // Make a new wal for new region open.
511        WAL wal3 = createWAL(newConf, hbaseRootDir, logName);
512        final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
513        HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
514          @Override
515          protected void restoreEdit(HStore s, ExtendedCell cell, MemStoreSizing memstoreSizing) {
516            super.restoreEdit(s, cell, memstoreSizing);
517            countOfRestoredEdits.incrementAndGet();
518          }
519        };
520        long seqid3 = region3.initialize();
521        Result result3 = region3.get(g);
522        // Assert that count of cells is same as before crash.
523        assertEquals(result2.size(), result3.size());
524        assertEquals(htd.getColumnFamilies().length * countPerFamily, countOfRestoredEdits.get());
525
526        // I can't close wal1. Its been appropriated when we split.
527        region3.close();
528        wal3.close();
529        return null;
530      }
531    });
532  }
533
534  /**
535   * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores
536   * got flushed but others did not.
537   * <p/>
538   * Unfortunately, there is no easy hook to flush at a store level. The way we get around this is
539   * by flushing at the region level, and then deleting the recently flushed store file for one of
540   * the Stores. This would put us back in the situation where all but that store got flushed and
541   * the region died.
542   * <p/>
543   * We restart Region again, and verify that the edits were replayed.
544   */
545  @Test
546  public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException,
547    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
548    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
549    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
550    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
551    deleteDir(basedir);
552    final byte[] rowName = tableName.getName();
553    final int countPerFamily = 10;
554    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
555    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
556    HBaseTestingUtil.closeRegionAndWAL(region3);
557    // Write countPerFamily edits into the three families. Do a flush on one
558    // of the families during the load of edits so its seqid is not same as
559    // others to test we do right thing when different seqids.
560    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
561    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
562    long seqid = region.getOpenSeqNum();
563    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
564      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
565    }
566
567    // Now assert edits made it in.
568    final Get g = new Get(rowName);
569    Result result = region.get(g);
570    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
571
572    // Let us flush the region
573    region.flush(true);
574    region.close(true);
575    wal.shutdown();
576
577    // delete the store files in the second column family to simulate a failure
578    // in between the flushcache();
579    // we have 3 families. killing the middle one ensures that taking the maximum
580    // will make us fail.
581    int cf_count = 0;
582    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
583      cf_count++;
584      if (cf_count == 2) {
585        region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
586      }
587    }
588
589    // Let us try to split and recover
590    runWALSplit(this.conf);
591    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
592    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
593    long seqid2 = region2.getOpenSeqNum();
594    assertTrue(seqid + result.size() < seqid2);
595
596    final Result result1b = region2.get(g);
597    assertEquals(result.size(), result1b.size());
598  }
599
600  // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
601  // Only throws exception if throwExceptionWhenFlushing is set true.
602  public static class CustomStoreFlusher extends DefaultStoreFlusher {
603    // Switch between throw and not throw exception in flush
604    public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
605
606    public CustomStoreFlusher(Configuration conf, HStore store) {
607      super(conf, store);
608    }
609
610    @Override
611    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
612      MonitoredTask status, ThroughputController throughputController,
613      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
614      if (throwExceptionWhenFlushing.get()) {
615        throw new IOException("Simulated exception by tests");
616      }
617      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
618        writerCreationTracker);
619    }
620  }
621
622  /**
623   * Test that we could recover the data correctly after aborting flush. In the test, first we abort
624   * flush after writing some data, then writing more data and flush again, at last verify the data.
625   */
626  @Test
627  public void testReplayEditsAfterAbortingFlush() throws IOException {
628    final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush");
629    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
630    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
631    deleteDir(basedir);
632    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
633    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
634    HBaseTestingUtil.closeRegionAndWAL(region3);
635    // Write countPerFamily edits into the three families. Do a flush on one
636    // of the families during the load of edits so its seqid is not same as
637    // others to test we do right thing when different seqids.
638    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
639    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
640    Mockito.doReturn(false).when(rsServices).isAborted();
641    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
642    when(rsServices.getConfiguration()).thenReturn(conf);
643    Configuration customConf = new Configuration(this.conf);
644    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
645      CustomStoreFlusher.class.getName());
646    HRegion region =
647      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
648    int writtenRowCount = 10;
649    List<ColumnFamilyDescriptor> families = Arrays.asList((htd.getColumnFamilies()));
650    for (int i = 0; i < writtenRowCount; i++) {
651      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
652      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
653        Bytes.toBytes("val"));
654      region.put(put);
655    }
656
657    // Now assert edits made it in.
658    RegionScanner scanner = region.getScanner(new Scan());
659    assertEquals(writtenRowCount, getScannedCount(scanner));
660
661    // Let us flush the region
662    CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
663    try {
664      region.flush(true);
665      fail("Injected exception hasn't been thrown");
666    } catch (IOException e) {
667      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
668      // simulated to abort server
669      Mockito.doReturn(true).when(rsServices).isAborted();
670      region.setClosing(false); // region normally does not accept writes after
671      // DroppedSnapshotException. We mock around it for this test.
672    }
673    // writing more data
674    int moreRow = 10;
675    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
676      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
677      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
678        Bytes.toBytes("val"));
679      region.put(put);
680    }
681    writtenRowCount += moreRow;
682    // call flush again
683    CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
684    try {
685      region.flush(true);
686    } catch (IOException t) {
687      LOG.info(
688        "Expected exception when flushing region because server is stopped," + t.getMessage());
689    }
690
691    region.close(true);
692    wal.shutdown();
693
694    // Let us try to split and recover
695    runWALSplit(this.conf);
696    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
697    Mockito.doReturn(false).when(rsServices).isAborted();
698    HRegion region2 =
699      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
700    scanner = region2.getScanner(new Scan());
701    assertEquals(writtenRowCount, getScannedCount(scanner));
702  }
703
704  private int getScannedCount(RegionScanner scanner) throws IOException {
705    int scannedCount = 0;
706    List<Cell> results = new ArrayList<>();
707    while (true) {
708      boolean existMore = scanner.next(results);
709      if (!results.isEmpty()) {
710        scannedCount++;
711      }
712      if (!existMore) {
713        break;
714      }
715      results.clear();
716    }
717    return scannedCount;
718  }
719
720  /**
721   * Create an HRegion with the result of a WAL split and test we only see the good edits=
722   */
723  @Test
724  public void testReplayEditsWrittenIntoWAL() throws Exception {
725    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
726    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
727    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
728    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
729    deleteDir(basedir);
730
731    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
732    HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
733    HBaseTestingUtil.closeRegionAndWAL(region2);
734    final WAL wal = createWAL(this.conf, hbaseRootDir, logName);
735    final byte[] rowName = tableName.getName();
736    final byte[] regionName = hri.getEncodedNameAsBytes();
737
738    // Add 1k to each family.
739    final int countPerFamily = 1000;
740    Set<byte[]> familyNames = new HashSet<>();
741    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
742    for (byte[] fam : htd.getColumnFamilyNames()) {
743      scopes.put(fam, 0);
744    }
745    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
746      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, mvcc, scopes);
747      familyNames.add(hcd.getName());
748    }
749
750    // Add a cache flush, shouldn't have any effect
751    wal.startCacheFlush(regionName, familyNames);
752    wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
753
754    // Add an edit to another family, should be skipped.
755    WALEdit edit = new WALEdit();
756    long now = ee.currentTime();
757    WALEditInternalHelper.addExtendedCell(edit,
758      new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName));
759    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
760      edit);
761
762    // Delete the c family to verify deletes make it over.
763    edit = new WALEdit();
764    now = ee.currentTime();
765    WALEditInternalHelper.addExtendedCell(edit,
766      new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
767    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
768      edit);
769
770    // Sync.
771    wal.sync();
772    // Make a new conf and a new fs for the splitter to run on so we can take
773    // over old wal.
774    final Configuration newConf = HBaseConfiguration.create(this.conf);
775    User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime");
776    user.runAs(new PrivilegedExceptionAction<Void>() {
777      @Override
778      public Void run() throws Exception {
779        runWALSplit(newConf);
780        FileSystem newFS = FileSystem.get(newConf);
781        // 100k seems to make for about 4 flushes during HRegion#initialize.
782        newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
783        // Make a new wal for new region.
784        WAL newWal = createWAL(newConf, hbaseRootDir, logName);
785        final AtomicInteger flushcount = new AtomicInteger(0);
786        try {
787          final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
788            @Override
789            protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
790              final Collection<HStore> storesToFlush, MonitoredTask status,
791              boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
792              LOG.info("InternalFlushCache Invoked");
793              FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
794                Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker);
795              flushcount.incrementAndGet();
796              return fs;
797            }
798          };
799          // The seq id this region has opened up with
800          long seqid = region.initialize();
801
802          // The mvcc readpoint of from inserting data.
803          long writePoint = mvcc.getWritePoint();
804
805          // We flushed during init.
806          assertTrue(flushcount.get() > 0, "Flushcount=" + flushcount.get());
807          assertTrue((seqid - 1) == writePoint);
808
809          Get get = new Get(rowName);
810          Result result = region.get(get);
811          // Make sure we only see the good edits
812          assertEquals(countPerFamily * (htd.getColumnFamilies().length - 1), result.size());
813          region.close();
814        } finally {
815          newWal.close();
816        }
817        return null;
818      }
819    });
820  }
821
822  @Test
823  // the following test is for HBASE-6065
824  public void testSequentialEditLogSeqNum() throws IOException {
825    final TableName tableName = TableName.valueOf(currentTest);
826    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
827    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
828    deleteDir(basedir);
829    final byte[] rowName = tableName.getName();
830    final int countPerFamily = 10;
831    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
832
833    // Mock the WAL
834    MockWAL wal = createMockWAL();
835    TEST_UTIL.createRegionDir(hri,
836      TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem());
837    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
838    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
839      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
840    }
841
842    // Let us flush the region
843    // But this time completeflushcache is not yet done
844    region.flush(true);
845    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
846      addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
847    }
848    long lastestSeqNumber = region.getReadPoint(null);
849    // get the current seq no
850    wal.doCompleteCacheFlush = true;
851    // allow complete cache flush with the previous seq number got after first
852    // set of edits.
853    wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
854    wal.shutdown();
855    FileStatus[] listStatus = wal.getFiles();
856    assertNotNull(listStatus);
857    assertTrue(listStatus.length > 0);
858    WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null,
859      wals, null);
860    FileStatus[] listStatus1 =
861      this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName),
862        new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() {
863          @Override
864          public boolean accept(Path p) {
865            return !WALSplitUtil.isSequenceIdFile(p);
866          }
867        });
868    int editCount = 0;
869    for (FileStatus fileStatus : listStatus1) {
870      editCount = Integer.parseInt(fileStatus.getPath().getName());
871    }
872    // The sequence number should be same
873    assertEquals(lastestSeqNumber, editCount,
874      "The sequence number of the recoverd.edits and the current edit seq should be same");
875  }
876
877  /**
878   * testcase for https://issues.apache.org/jira/browse/HBASE-15252
879   */
880  @Test
881  public void testDatalossWhenInputError() throws Exception {
882    final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
883    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
884    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
885    deleteDir(basedir);
886    final byte[] rowName = tableName.getName();
887    final int countPerFamily = 10;
888    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
889    HRegion region1 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
890    Path regionDir = region1.getWALRegionDir();
891    HBaseTestingUtil.closeRegionAndWAL(region1);
892
893    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
894    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
895    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
896      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
897    }
898    // Now assert edits made it in.
899    final Get g = new Get(rowName);
900    Result result = region.get(g);
901    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
902    // Now close the region (without flush), split the log, reopen the region and assert that
903    // replay of log has the correct effect.
904    region.close(true);
905    wal.shutdown();
906
907    runWALSplit(this.conf);
908
909    // here we let the DFSInputStream throw an IOException just after the WALHeader.
910    Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
911    final long headerLength;
912    try (WALStreamReader reader = WALFactory.createStreamReader(fs, editFile, conf)) {
913      headerLength = reader.getPosition();
914    }
915    FileSystem spyFs = spy(this.fs);
916    doAnswer(new Answer<FSDataInputStream>() {
917
918      @Override
919      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
920        FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
921        Field field = FilterInputStream.class.getDeclaredField("in");
922        field.setAccessible(true);
923        final DFSInputStream in = (DFSInputStream) field.get(stream);
924        DFSInputStream spyIn = spy(in);
925        doAnswer(new Answer<Integer>() {
926
927          private long pos;
928
929          @Override
930          public Integer answer(InvocationOnMock invocation) throws Throwable {
931            if (pos >= headerLength) {
932              throw new IOException("read over limit");
933            }
934            int b = (Integer) invocation.callRealMethod();
935            if (b > 0) {
936              pos += b;
937            }
938            return b;
939          }
940        }).when(spyIn).read(any(byte[].class), anyInt(), anyInt());
941        doAnswer(new Answer<Void>() {
942
943          @Override
944          public Void answer(InvocationOnMock invocation) throws Throwable {
945            invocation.callRealMethod();
946            in.close();
947            return null;
948          }
949        }).when(spyIn).close();
950        field.set(stream, spyIn);
951        return stream;
952      }
953    }).when(spyFs).open(eq(editFile));
954
955    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
956    HRegion region2;
957    try {
958      // log replay should fail due to the IOException, otherwise we may lose data.
959      region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
960      assertEquals(result.size(), region2.get(g).size());
961    } catch (IOException e) {
962      assertEquals("read over limit", e.getMessage());
963    }
964    region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
965    assertEquals(result.size(), region2.get(g).size());
966  }
967
968  /**
969   * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
970   */
971  private void testNameConflictWhenSplit(boolean largeFirst)
972    throws IOException, StreamLacksCapabilityException {
973    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
974    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
975    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
976    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
977    deleteDir(basedir);
978
979    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
980    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
981    for (byte[] fam : htd.getColumnFamilyNames()) {
982      scopes.put(fam, 0);
983    }
984    HRegion region = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
985    HBaseTestingUtil.closeRegionAndWAL(region);
986    final byte[] family = htd.getColumnFamilies()[0].getName();
987    final byte[] rowName = tableName.getName();
988    FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes);
989    FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes);
990
991    Path largeFile = new Path(logDir, "wal-1");
992    Path smallFile = new Path(logDir, "wal-2");
993    writerWALFile(largeFile, Arrays.asList(entry1, entry2));
994    writerWALFile(smallFile, Arrays.asList(entry2));
995    FileStatus first, second;
996    if (largeFirst) {
997      first = fs.getFileStatus(largeFile);
998      second = fs.getFileStatus(smallFile);
999    } else {
1000      first = fs.getFileStatus(smallFile);
1001      second = fs.getFileStatus(largeFile);
1002    }
1003    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null);
1004    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null);
1005    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
1006    region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
1007    assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
1008    assertEquals(2, region.get(new Get(rowName)).size());
1009  }
1010
1011  @Test
1012  public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException {
1013    testNameConflictWhenSplit(true);
1014  }
1015
1016  @Test
1017  public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException {
1018    testNameConflictWhenSplit(false);
1019  }
1020
1021  static class MockWAL extends FSHLog {
1022    boolean doCompleteCacheFlush = false;
1023
1024    public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
1025      throws IOException {
1026      super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1027    }
1028
1029    @Override
1030    public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
1031      if (!doCompleteCacheFlush) {
1032        return;
1033      }
1034      super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
1035    }
1036  }
1037
1038  private TableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1039    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1040    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a")));
1041    return builder.build();
1042  }
1043
1044  private MockWAL createMockWAL() throws IOException {
1045    fs.mkdirs(new Path(hbaseRootDir, logName));
1046    MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
1047    wal.init();
1048    // Set down maximum recovery so we dfsclient doesn't linger retrying something
1049    // long gone.
1050    HBaseTestingUtil.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1051    return wal;
1052  }
1053
1054  // Flusher used in this test. Keep count of how often we are called and
1055  // actually run the flush inside here.
1056  static class TestFlusher implements FlushRequester {
1057    private HRegion r;
1058
1059    @Override
1060    public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
1061      try {
1062        r.flush(false);
1063        return true;
1064      } catch (IOException e) {
1065        throw new RuntimeException("Exception flushing", e);
1066      }
1067    }
1068
1069    @Override
1070    public boolean requestFlush(HRegion region, List<byte[]> families,
1071      FlushLifeCycleTracker tracker) {
1072      return true;
1073    }
1074
1075    @Override
1076    public boolean requestDelayedFlush(HRegion region, long when) {
1077      return true;
1078    }
1079
1080    @Override
1081    public void registerFlushRequestListener(FlushRequestListener listener) {
1082
1083    }
1084
1085    @Override
1086    public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1087      return false;
1088    }
1089
1090    @Override
1091    public void setGlobalMemStoreLimit(long globalMemStoreSize) {
1092
1093    }
1094  }
1095
1096  private WALKeyImpl createWALKey(final TableName tableName, final RegionInfo hri,
1097    final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
1098    return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
1099  }
1100
1101  private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
1102    int index) {
1103    byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
1104    byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index));
1105    WALEdit edit = new WALEdit();
1106    WALEditInternalHelper.addExtendedCell(edit,
1107      new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
1108    return edit;
1109  }
1110
1111  private FSWALEntry createFSWALEntry(TableDescriptor htd, RegionInfo hri, long sequence,
1112    byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
1113    int index, NavigableMap<byte[], Integer> scopes) throws IOException {
1114    FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
1115      createWALEdit(rowName, family, ee, index), hri, true, null);
1116    entry.stampRegionSequenceId(mvcc.begin());
1117    return entry;
1118  }
1119
1120  private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName,
1121    final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1122    final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
1123    throws IOException {
1124    for (int j = 0; j < count; j++) {
1125      wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes),
1126        createWALEdit(rowName, family, ee, j));
1127    }
1128    wal.sync();
1129  }
1130
1131  public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
1132    EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
1133    List<Put> puts = new ArrayList<>();
1134    for (int j = 0; j < count; j++) {
1135      byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1136      Put p = new Put(rowName);
1137      p.addColumn(family, qualifier, ee.currentTime(), rowName);
1138      r.put(p);
1139      puts.add(p);
1140    }
1141    return puts;
1142  }
1143
1144  /**
1145   * Creates an HRI around an HTD that has <code>tableName</code> and three column families named
1146   * 'a','b', and 'c'.
1147   * @param tableName Name of table to use when we create HTableDescriptor.
1148   */
1149  private RegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1150    return RegionInfoBuilder.newBuilder(tableName).build();
1151  }
1152
1153  /**
1154   * Run the split. Verify only single split file made.
1155   * @return The single split file made
1156   */
1157  private Path runWALSplit(final Configuration c) throws IOException {
1158    List<Path> splits =
1159      WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
1160    // Split should generate only 1 file since there's only 1 region
1161    assertEquals(1, splits.size(), "splits=" + splits);
1162    // Make sure the file exists
1163    assertTrue(fs.exists(splits.get(0)));
1164    LOG.info("Split file=" + splits.get(0));
1165    return splits.get(0);
1166  }
1167
1168  private TableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1169    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1170    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a")));
1171    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("b")));
1172    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("c")));
1173    return builder.build();
1174  }
1175
1176  private void writerWALFile(Path file, List<FSWALEntry> entries)
1177    throws IOException, StreamLacksCapabilityException {
1178    fs.mkdirs(file.getParent());
1179    ProtobufLogWriter writer = new ProtobufLogWriter();
1180    writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file),
1181      StreamSlowMonitor.create(conf, "testMonitor"));
1182    for (FSWALEntry entry : entries) {
1183      writer.append(entry);
1184    }
1185    writer.sync(false);
1186    writer.close();
1187  }
1188
1189  protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName)
1190    throws IOException;
1191}