001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.anyInt;
026import static org.mockito.ArgumentMatchers.eq;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.spy;
029import static org.mockito.Mockito.when;
030
031import java.io.FilterInputStream;
032import java.io.IOException;
033import java.lang.reflect.Field;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collection;
038import java.util.HashSet;
039import java.util.List;
040import java.util.NavigableMap;
041import java.util.Set;
042import java.util.TreeMap;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.atomic.AtomicInteger;
045import java.util.function.Consumer;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FSDataInputStream;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.ExtendedCell;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HBaseTestingUtil;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.KeyValue;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.RegionInfo;
067import org.apache.hadoop.hbase.client.RegionInfoBuilder;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.client.ResultScanner;
070import org.apache.hadoop.hbase.client.Scan;
071import org.apache.hadoop.hbase.client.Table;
072import org.apache.hadoop.hbase.client.TableDescriptor;
073import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
074import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
075import org.apache.hadoop.hbase.monitoring.MonitoredTask;
076import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
077import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
078import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
079import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
080import org.apache.hadoop.hbase.regionserver.FlushRequester;
081import org.apache.hadoop.hbase.regionserver.HRegion;
082import org.apache.hadoop.hbase.regionserver.HRegionServer;
083import org.apache.hadoop.hbase.regionserver.HStore;
084import org.apache.hadoop.hbase.regionserver.MemStoreSizing;
085import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
086import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
087import org.apache.hadoop.hbase.regionserver.Region;
088import org.apache.hadoop.hbase.regionserver.RegionScanner;
089import org.apache.hadoop.hbase.regionserver.RegionServerServices;
090import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
091import org.apache.hadoop.hbase.security.User;
092import org.apache.hadoop.hbase.util.Bytes;
093import org.apache.hadoop.hbase.util.CommonFSUtils;
094import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
095import org.apache.hadoop.hbase.util.EnvironmentEdge;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.HFileTestUtil;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
100import org.apache.hadoop.hbase.wal.WAL;
101import org.apache.hadoop.hbase.wal.WALEdit;
102import org.apache.hadoop.hbase.wal.WALFactory;
103import org.apache.hadoop.hbase.wal.WALKeyImpl;
104import org.apache.hadoop.hbase.wal.WALSplitUtil;
105import org.apache.hadoop.hbase.wal.WALSplitter;
106import org.apache.hadoop.hbase.wal.WALStreamReader;
107import org.apache.hadoop.hdfs.DFSInputStream;
108import org.junit.After;
109import org.junit.AfterClass;
110import org.junit.Before;
111import org.junit.BeforeClass;
112import org.junit.Rule;
113import org.junit.Test;
114import org.junit.rules.TestName;
115import org.mockito.Mockito;
116import org.mockito.invocation.InvocationOnMock;
117import org.mockito.stubbing.Answer;
118import org.slf4j.Logger;
119import org.slf4j.LoggerFactory;
120
121/**
122 * Test replay of edits out of a WAL split.
123 */
124public abstract class AbstractTestWALReplay {
125  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
126  static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
127  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
128  private Path hbaseRootDir = null;
129  private String logName;
130  private Path oldLogDir;
131  private Path logDir;
132  private FileSystem fs;
133  private Configuration conf;
134  private WALFactory wals;
135
136  @Rule
137  public final TestName currentTest = new TestName();
138
139  @BeforeClass
140  public static void setUpBeforeClass() throws Exception {
141    Configuration conf = TEST_UTIL.getConfiguration();
142    // The below config supported by 0.20-append and CDH3b2
143    conf.setInt("dfs.client.block.recovery.retries", 2);
144    TEST_UTIL.startMiniCluster(3);
145    Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
146    LOG.info("hbase.rootdir=" + hbaseRootDir);
147    CommonFSUtils.setRootDir(conf, hbaseRootDir);
148  }
149
150  @AfterClass
151  public static void tearDownAfterClass() throws Exception {
152    TEST_UTIL.shutdownMiniCluster();
153  }
154
155  @Before
156  public void setUp() throws Exception {
157    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
158    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
159    this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf);
160    this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
161    String serverName = ServerName
162      .valueOf(currentTest.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime())
163      .toString();
164    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
165    this.logDir = new Path(this.hbaseRootDir, logName);
166    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
167      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
168    }
169    this.wals = new WALFactory(conf, currentTest.getMethodName());
170  }
171
172  @After
173  public void tearDown() throws Exception {
174    this.wals.close();
175    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
176  }
177
178  /*
179   * @param p Directory to cleanup
180   */
181  private void deleteDir(final Path p) throws IOException {
182    if (this.fs.exists(p)) {
183      if (!this.fs.delete(p, true)) {
184        throw new IOException("Failed remove of " + p);
185      }
186    }
187  }
188
189  /**
190   *   */
191  @Test
192  public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
193    final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
194    byte[] family1 = Bytes.toBytes("cf1");
195    byte[] family2 = Bytes.toBytes("cf2");
196    byte[] qualifier = Bytes.toBytes("q");
197    byte[] value = Bytes.toBytes("testV");
198    byte[][] familys = { family1, family2 };
199    TEST_UTIL.createTable(tableName, familys);
200    Table htable = TEST_UTIL.getConnection().getTable(tableName);
201    Put put = new Put(Bytes.toBytes("r1"));
202    put.addColumn(family1, qualifier, value);
203    htable.put(put);
204    ResultScanner resultScanner = htable.getScanner(new Scan());
205    int count = 0;
206    while (resultScanner.next() != null) {
207      count++;
208    }
209    resultScanner.close();
210    assertEquals(1, count);
211
212    SingleProcessHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
213    List<HRegion> regions = hbaseCluster.getRegions(tableName);
214    assertEquals(1, regions.size());
215
216    // move region to another regionserver
217    Region destRegion = regions.get(0);
218    int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName());
219    assertTrue("Please start more than 1 regionserver",
220      hbaseCluster.getRegionServerThreads().size() > 1);
221    int destServerNum = 0;
222    while (destServerNum == originServerNum) {
223      destServerNum++;
224    }
225    HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
226    HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
227    // move region to destination regionserver
228    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName());
229
230    // delete the row
231    Delete del = new Delete(Bytes.toBytes("r1"));
232    htable.delete(del);
233    resultScanner = htable.getScanner(new Scan());
234    count = 0;
235    while (resultScanner.next() != null) {
236      count++;
237    }
238    resultScanner.close();
239    assertEquals(0, count);
240
241    // flush region and make major compaction
242    HRegion region =
243      (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
244    region.flush(true);
245    // wait to complete major compaction
246    for (HStore store : region.getStores()) {
247      store.triggerMajorCompaction();
248    }
249    region.compact(true);
250
251    // move region to origin regionserver
252    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName());
253    // abort the origin regionserver
254    originServer.abort("testing");
255
256    // see what we get
257    Result result = htable.get(new Get(Bytes.toBytes("r1")));
258    if (result != null) {
259      assertTrue("Row is deleted, but we get" + result.toString(),
260        (result == null) || result.isEmpty());
261    }
262    resultScanner.close();
263  }
264
265  /**
266   * Tests for hbase-2727.
267   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a>
268   */
269  @Test
270  public void test2727() throws Exception {
271    // Test being able to have > 1 set of edits in the recovered.edits directory.
272    // Ensure edits are replayed properly.
273    final TableName tableName = TableName.valueOf("test2727");
274
275    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
276    RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
277    Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
278    deleteDir(basedir);
279
280    TableDescriptor tableDescriptor = createBasic3FamilyHTD(tableName);
281    Region region2 =
282      HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, tableDescriptor);
283    HBaseTestingUtil.closeRegionAndWAL(region2);
284    final byte[] rowName = tableName.getName();
285
286    WAL wal1 = createWAL(this.conf, hbaseRootDir, logName);
287    // Add 1k to each family.
288    final int countPerFamily = 1000;
289
290    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
291    for (byte[] fam : tableDescriptor.getColumnFamilyNames()) {
292      scopes.put(fam, 0);
293    }
294    for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
295      addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal1,
296        mvcc, scopes);
297    }
298    wal1.shutdown();
299    runWALSplit(this.conf);
300
301    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
302    // Add 1k to each family.
303    for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
304      addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal2,
305        mvcc, scopes);
306    }
307    wal2.shutdown();
308    runWALSplit(this.conf);
309
310    WAL wal3 = createWAL(this.conf, hbaseRootDir, logName);
311    try {
312      HRegion region =
313        HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, tableDescriptor, wal3);
314      long seqid = region.getOpenSeqNum();
315      // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
316      // When opened, this region would apply 6k edits, and increment the sequenceId by 1
317      assertTrue(seqid > mvcc.getWritePoint());
318      assertEquals(seqid - 1, mvcc.getWritePoint());
319      LOG.debug(
320        "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint());
321
322      // TODO: Scan all.
323      region.close();
324    } finally {
325      wal3.close();
326    }
327  }
328
329  /**
330   * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'.
331   */
332  @Test
333  public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException,
334    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
335    final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
336    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
337    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
338    deleteDir(basedir);
339    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
340    Region region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
341    HBaseTestingUtil.closeRegionAndWAL(region2);
342    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
343    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
344
345    byte[] family = htd.getColumnFamilies()[0].getName();
346    Path f = new Path(basedir, "hfile");
347    HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
348      Bytes.toBytes("z"), 10);
349    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
350    hfs.add(Pair.newPair(family, f.toString()));
351    region.bulkLoadHFiles(hfs, true, null);
352
353    // Add an edit so something in the WAL
354    byte[] row = tableName.getName();
355    region.put((new Put(row)).addColumn(family, family, family));
356    wal.sync();
357    final int rowsInsertedCount = 11;
358
359    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
360
361    // Now 'crash' the region by stealing its wal
362    final Configuration newConf = HBaseConfiguration.create(this.conf);
363    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
364    user.runAs(new PrivilegedExceptionAction() {
365      @Override
366      public Object run() throws Exception {
367        runWALSplit(newConf);
368        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
369
370        HRegion region2 =
371          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
372        long seqid2 = region2.getOpenSeqNum();
373        assertTrue(seqid2 > -1);
374        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
375
376        // I can't close wal1. Its been appropriated when we split.
377        region2.close();
378        wal2.close();
379        return null;
380      }
381    });
382  }
383
384  /**
385   * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
386   * files) and an edit in the memstore.
387   * <p/>
388   * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries from
389   * being replayed"
390   */
391  @Test
392  public void testCompactedBulkLoadedFiles() throws IOException, SecurityException,
393    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
394    final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles");
395    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
396    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
397    deleteDir(basedir);
398    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
399    HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
400    HBaseTestingUtil.closeRegionAndWAL(region2);
401    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
402    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
403
404    // Add an edit so something in the WAL
405    byte[] row = tableName.getName();
406    byte[] family = htd.getColumnFamilies()[0].getName();
407    region.put((new Put(row)).addColumn(family, family, family));
408    wal.sync();
409
410    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
411    for (int i = 0; i < 3; i++) {
412      Path f = new Path(basedir, "hfile" + i);
413      HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
414        Bytes.toBytes(i + "50"), 10);
415      hfs.add(Pair.newPair(family, f.toString()));
416    }
417    region.bulkLoadHFiles(hfs, true, null);
418    final int rowsInsertedCount = 31;
419    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
420
421    // major compact to turn all the bulk loaded files into one normal file
422    region.compact(true);
423    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
424
425    // Now 'crash' the region by stealing its wal
426    final Configuration newConf = HBaseConfiguration.create(this.conf);
427    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
428    user.runAs(new PrivilegedExceptionAction() {
429      @Override
430      public Object run() throws Exception {
431        runWALSplit(newConf);
432        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
433
434        HRegion region2 =
435          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
436        long seqid2 = region2.getOpenSeqNum();
437        assertTrue(seqid2 > -1);
438        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
439
440        // I can't close wal1. Its been appropriated when we split.
441        region2.close();
442        wal2.close();
443        return null;
444      }
445    });
446  }
447
448  /**
449   * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify
450   * seqids.
451   */
452  @Test
453  public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException,
454    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
455    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
456    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
457    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
458    deleteDir(basedir);
459    final byte[] rowName = tableName.getName();
460    final int countPerFamily = 10;
461    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
462    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
463    HBaseTestingUtil.closeRegionAndWAL(region3);
464    // Write countPerFamily edits into the three families. Do a flush on one
465    // of the families during the load of edits so its seqid is not same as
466    // others to test we do right thing when different seqids.
467    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
468    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
469    long seqid = region.getOpenSeqNum();
470    boolean first = true;
471    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
472      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
473      if (first) {
474        // If first, so we have at least one family w/ different seqid to rest.
475        region.flush(true);
476        first = false;
477      }
478    }
479    // Now assert edits made it in.
480    final Get g = new Get(rowName);
481    Result result = region.get(g);
482    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
483    // Now close the region (without flush), split the log, reopen the region and assert that
484    // replay of log has the correct effect, that our seqids are calculated correctly so
485    // all edits in logs are seen as 'stale'/old.
486    region.close(true);
487    wal.shutdown();
488    runWALSplit(this.conf);
489    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
490    HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
491    long seqid2 = region2.getOpenSeqNum();
492    assertTrue(seqid + result.size() < seqid2);
493    final Result result1b = region2.get(g);
494    assertEquals(result.size(), result1b.size());
495
496    // Next test. Add more edits, then 'crash' this region by stealing its wal
497    // out from under it and assert that replay of the log adds the edits back
498    // correctly when region is opened again.
499    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
500      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
501    }
502    // Get count of edits.
503    final Result result2 = region2.get(g);
504    assertEquals(2 * result.size(), result2.size());
505    wal2.sync();
506    final Configuration newConf = HBaseConfiguration.create(this.conf);
507    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
508    user.runAs(new PrivilegedExceptionAction<Object>() {
509      @Override
510      public Object run() throws Exception {
511        runWALSplit(newConf);
512        FileSystem newFS = FileSystem.get(newConf);
513        // Make a new wal for new region open.
514        WAL wal3 = createWAL(newConf, hbaseRootDir, logName);
515        final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
516        HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
517          @Override
518          protected void restoreEdit(HStore s, ExtendedCell cell, MemStoreSizing memstoreSizing) {
519            super.restoreEdit(s, cell, memstoreSizing);
520            countOfRestoredEdits.incrementAndGet();
521          }
522        };
523        long seqid3 = region3.initialize();
524        Result result3 = region3.get(g);
525        // Assert that count of cells is same as before crash.
526        assertEquals(result2.size(), result3.size());
527        assertEquals(htd.getColumnFamilies().length * countPerFamily, countOfRestoredEdits.get());
528
529        // I can't close wal1. Its been appropriated when we split.
530        region3.close();
531        wal3.close();
532        return null;
533      }
534    });
535  }
536
537  /**
538   * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores
539   * got flushed but others did not.
540   * <p/>
541   * Unfortunately, there is no easy hook to flush at a store level. The way we get around this is
542   * by flushing at the region level, and then deleting the recently flushed store file for one of
543   * the Stores. This would put us back in the situation where all but that store got flushed and
544   * the region died.
545   * <p/>
546   * We restart Region again, and verify that the edits were replayed.
547   */
548  @Test
549  public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException,
550    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
551    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
552    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
553    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
554    deleteDir(basedir);
555    final byte[] rowName = tableName.getName();
556    final int countPerFamily = 10;
557    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
558    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
559    HBaseTestingUtil.closeRegionAndWAL(region3);
560    // Write countPerFamily edits into the three families. Do a flush on one
561    // of the families during the load of edits so its seqid is not same as
562    // others to test we do right thing when different seqids.
563    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
564    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
565    long seqid = region.getOpenSeqNum();
566    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
567      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
568    }
569
570    // Now assert edits made it in.
571    final Get g = new Get(rowName);
572    Result result = region.get(g);
573    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
574
575    // Let us flush the region
576    region.flush(true);
577    region.close(true);
578    wal.shutdown();
579
580    // delete the store files in the second column family to simulate a failure
581    // in between the flushcache();
582    // we have 3 families. killing the middle one ensures that taking the maximum
583    // will make us fail.
584    int cf_count = 0;
585    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
586      cf_count++;
587      if (cf_count == 2) {
588        region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
589      }
590    }
591
592    // Let us try to split and recover
593    runWALSplit(this.conf);
594    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
595    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
596    long seqid2 = region2.getOpenSeqNum();
597    assertTrue(seqid + result.size() < seqid2);
598
599    final Result result1b = region2.get(g);
600    assertEquals(result.size(), result1b.size());
601  }
602
603  // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
604  // Only throws exception if throwExceptionWhenFlushing is set true.
605  public static class CustomStoreFlusher extends DefaultStoreFlusher {
606    // Switch between throw and not throw exception in flush
607    public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
608
609    public CustomStoreFlusher(Configuration conf, HStore store) {
610      super(conf, store);
611    }
612
613    @Override
614    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
615      MonitoredTask status, ThroughputController throughputController,
616      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
617      if (throwExceptionWhenFlushing.get()) {
618        throw new IOException("Simulated exception by tests");
619      }
620      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
621        writerCreationTracker);
622    }
623  }
624
625  /**
626   * Test that we could recover the data correctly after aborting flush. In the test, first we abort
627   * flush after writing some data, then writing more data and flush again, at last verify the data.
628   */
629  @Test
630  public void testReplayEditsAfterAbortingFlush() throws IOException {
631    final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush");
632    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
633    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
634    deleteDir(basedir);
635    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
636    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
637    HBaseTestingUtil.closeRegionAndWAL(region3);
638    // Write countPerFamily edits into the three families. Do a flush on one
639    // of the families during the load of edits so its seqid is not same as
640    // others to test we do right thing when different seqids.
641    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
642    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
643    Mockito.doReturn(false).when(rsServices).isAborted();
644    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
645    when(rsServices.getConfiguration()).thenReturn(conf);
646    Configuration customConf = new Configuration(this.conf);
647    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
648      CustomStoreFlusher.class.getName());
649    HRegion region =
650      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
651    int writtenRowCount = 10;
652    List<ColumnFamilyDescriptor> families = Arrays.asList((htd.getColumnFamilies()));
653    for (int i = 0; i < writtenRowCount; i++) {
654      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
655      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
656        Bytes.toBytes("val"));
657      region.put(put);
658    }
659
660    // Now assert edits made it in.
661    RegionScanner scanner = region.getScanner(new Scan());
662    assertEquals(writtenRowCount, getScannedCount(scanner));
663
664    // Let us flush the region
665    CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
666    try {
667      region.flush(true);
668      fail("Injected exception hasn't been thrown");
669    } catch (IOException e) {
670      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
671      // simulated to abort server
672      Mockito.doReturn(true).when(rsServices).isAborted();
673      region.setClosing(false); // region normally does not accept writes after
674      // DroppedSnapshotException. We mock around it for this test.
675    }
676    // writing more data
677    int moreRow = 10;
678    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
679      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
680      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
681        Bytes.toBytes("val"));
682      region.put(put);
683    }
684    writtenRowCount += moreRow;
685    // call flush again
686    CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
687    try {
688      region.flush(true);
689    } catch (IOException t) {
690      LOG.info(
691        "Expected exception when flushing region because server is stopped," + t.getMessage());
692    }
693
694    region.close(true);
695    wal.shutdown();
696
697    // Let us try to split and recover
698    runWALSplit(this.conf);
699    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
700    Mockito.doReturn(false).when(rsServices).isAborted();
701    HRegion region2 =
702      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
703    scanner = region2.getScanner(new Scan());
704    assertEquals(writtenRowCount, getScannedCount(scanner));
705  }
706
707  private int getScannedCount(RegionScanner scanner) throws IOException {
708    int scannedCount = 0;
709    List<Cell> results = new ArrayList<>();
710    while (true) {
711      boolean existMore = scanner.next(results);
712      if (!results.isEmpty()) {
713        scannedCount++;
714      }
715      if (!existMore) {
716        break;
717      }
718      results.clear();
719    }
720    return scannedCount;
721  }
722
723  /**
724   * Create an HRegion with the result of a WAL split and test we only see the good edits=
725   */
726  @Test
727  public void testReplayEditsWrittenIntoWAL() throws Exception {
728    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
729    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
730    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
731    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
732    deleteDir(basedir);
733
734    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
735    HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
736    HBaseTestingUtil.closeRegionAndWAL(region2);
737    final WAL wal = createWAL(this.conf, hbaseRootDir, logName);
738    final byte[] rowName = tableName.getName();
739    final byte[] regionName = hri.getEncodedNameAsBytes();
740
741    // Add 1k to each family.
742    final int countPerFamily = 1000;
743    Set<byte[]> familyNames = new HashSet<>();
744    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
745    for (byte[] fam : htd.getColumnFamilyNames()) {
746      scopes.put(fam, 0);
747    }
748    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
749      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, mvcc, scopes);
750      familyNames.add(hcd.getName());
751    }
752
753    // Add a cache flush, shouldn't have any effect
754    wal.startCacheFlush(regionName, familyNames);
755    wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
756
757    // Add an edit to another family, should be skipped.
758    WALEdit edit = new WALEdit();
759    long now = ee.currentTime();
760    edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName));
761    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
762      edit);
763
764    // Delete the c family to verify deletes make it over.
765    edit = new WALEdit();
766    now = ee.currentTime();
767    edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
768    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
769      edit);
770
771    // Sync.
772    wal.sync();
773    // Make a new conf and a new fs for the splitter to run on so we can take
774    // over old wal.
775    final Configuration newConf = HBaseConfiguration.create(this.conf);
776    User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime");
777    user.runAs(new PrivilegedExceptionAction<Void>() {
778      @Override
779      public Void run() throws Exception {
780        runWALSplit(newConf);
781        FileSystem newFS = FileSystem.get(newConf);
782        // 100k seems to make for about 4 flushes during HRegion#initialize.
783        newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
784        // Make a new wal for new region.
785        WAL newWal = createWAL(newConf, hbaseRootDir, logName);
786        final AtomicInteger flushcount = new AtomicInteger(0);
787        try {
788          final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
789            @Override
790            protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
791              final Collection<HStore> storesToFlush, MonitoredTask status,
792              boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
793              LOG.info("InternalFlushCache Invoked");
794              FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
795                Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker);
796              flushcount.incrementAndGet();
797              return fs;
798            }
799          };
800          // The seq id this region has opened up with
801          long seqid = region.initialize();
802
803          // The mvcc readpoint of from inserting data.
804          long writePoint = mvcc.getWritePoint();
805
806          // We flushed during init.
807          assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
808          assertTrue((seqid - 1) == writePoint);
809
810          Get get = new Get(rowName);
811          Result result = region.get(get);
812          // Make sure we only see the good edits
813          assertEquals(countPerFamily * (htd.getColumnFamilies().length - 1), result.size());
814          region.close();
815        } finally {
816          newWal.close();
817        }
818        return null;
819      }
820    });
821  }
822
823  @Test
824  // the following test is for HBASE-6065
825  public void testSequentialEditLogSeqNum() throws IOException {
826    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
827    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
828    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
829    deleteDir(basedir);
830    final byte[] rowName = tableName.getName();
831    final int countPerFamily = 10;
832    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
833
834    // Mock the WAL
835    MockWAL wal = createMockWAL();
836
837    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
838    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
839      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
840    }
841
842    // Let us flush the region
843    // But this time completeflushcache is not yet done
844    region.flush(true);
845    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
846      addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
847    }
848    long lastestSeqNumber = region.getReadPoint(null);
849    // get the current seq no
850    wal.doCompleteCacheFlush = true;
851    // allow complete cache flush with the previous seq number got after first
852    // set of edits.
853    wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
854    wal.shutdown();
855    FileStatus[] listStatus = wal.getFiles();
856    assertNotNull(listStatus);
857    assertTrue(listStatus.length > 0);
858    WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null,
859      wals, null);
860    FileStatus[] listStatus1 =
861      this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName),
862        new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() {
863          @Override
864          public boolean accept(Path p) {
865            return !WALSplitUtil.isSequenceIdFile(p);
866          }
867        });
868    int editCount = 0;
869    for (FileStatus fileStatus : listStatus1) {
870      editCount = Integer.parseInt(fileStatus.getPath().getName());
871    }
872    // The sequence number should be same
873    assertEquals(
874      "The sequence number of the recoverd.edits and the current edit seq should be same",
875      lastestSeqNumber, editCount);
876  }
877
878  /**
879   * testcase for https://issues.apache.org/jira/browse/HBASE-15252
880   */
881  @Test
882  public void testDatalossWhenInputError() throws Exception {
883    final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
884    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
885    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
886    deleteDir(basedir);
887    final byte[] rowName = tableName.getName();
888    final int countPerFamily = 10;
889    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
890    HRegion region1 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
891    Path regionDir = region1.getWALRegionDir();
892    HBaseTestingUtil.closeRegionAndWAL(region1);
893
894    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
895    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
896    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
897      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
898    }
899    // Now assert edits made it in.
900    final Get g = new Get(rowName);
901    Result result = region.get(g);
902    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
903    // Now close the region (without flush), split the log, reopen the region and assert that
904    // replay of log has the correct effect.
905    region.close(true);
906    wal.shutdown();
907
908    runWALSplit(this.conf);
909
910    // here we let the DFSInputStream throw an IOException just after the WALHeader.
911    Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
912    final long headerLength;
913    try (WALStreamReader reader = WALFactory.createStreamReader(fs, editFile, conf)) {
914      headerLength = reader.getPosition();
915    }
916    FileSystem spyFs = spy(this.fs);
917    doAnswer(new Answer<FSDataInputStream>() {
918
919      @Override
920      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
921        FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
922        Field field = FilterInputStream.class.getDeclaredField("in");
923        field.setAccessible(true);
924        final DFSInputStream in = (DFSInputStream) field.get(stream);
925        DFSInputStream spyIn = spy(in);
926        doAnswer(new Answer<Integer>() {
927
928          private long pos;
929
930          @Override
931          public Integer answer(InvocationOnMock invocation) throws Throwable {
932            if (pos >= headerLength) {
933              throw new IOException("read over limit");
934            }
935            int b = (Integer) invocation.callRealMethod();
936            if (b > 0) {
937              pos += b;
938            }
939            return b;
940          }
941        }).when(spyIn).read(any(byte[].class), anyInt(), anyInt());
942        doAnswer(new Answer<Void>() {
943
944          @Override
945          public Void answer(InvocationOnMock invocation) throws Throwable {
946            invocation.callRealMethod();
947            in.close();
948            return null;
949          }
950        }).when(spyIn).close();
951        field.set(stream, spyIn);
952        return stream;
953      }
954    }).when(spyFs).open(eq(editFile));
955
956    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
957    HRegion region2;
958    try {
959      // log replay should fail due to the IOException, otherwise we may lose data.
960      region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
961      assertEquals(result.size(), region2.get(g).size());
962    } catch (IOException e) {
963      assertEquals("read over limit", e.getMessage());
964    }
965    region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
966    assertEquals(result.size(), region2.get(g).size());
967  }
968
969  /**
970   * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
971   */
972  private void testNameConflictWhenSplit(boolean largeFirst)
973    throws IOException, StreamLacksCapabilityException {
974    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
975    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
976    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
977    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
978    deleteDir(basedir);
979
980    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
981    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
982    for (byte[] fam : htd.getColumnFamilyNames()) {
983      scopes.put(fam, 0);
984    }
985    HRegion region = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
986    HBaseTestingUtil.closeRegionAndWAL(region);
987    final byte[] family = htd.getColumnFamilies()[0].getName();
988    final byte[] rowName = tableName.getName();
989    FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes);
990    FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes);
991
992    Path largeFile = new Path(logDir, "wal-1");
993    Path smallFile = new Path(logDir, "wal-2");
994    writerWALFile(largeFile, Arrays.asList(entry1, entry2));
995    writerWALFile(smallFile, Arrays.asList(entry2));
996    FileStatus first, second;
997    if (largeFirst) {
998      first = fs.getFileStatus(largeFile);
999      second = fs.getFileStatus(smallFile);
1000    } else {
1001      first = fs.getFileStatus(smallFile);
1002      second = fs.getFileStatus(largeFile);
1003    }
1004    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null);
1005    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null);
1006    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
1007    region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
1008    assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
1009    assertEquals(2, region.get(new Get(rowName)).size());
1010  }
1011
1012  @Test
1013  public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException {
1014    testNameConflictWhenSplit(true);
1015  }
1016
1017  @Test
1018  public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException {
1019    testNameConflictWhenSplit(false);
1020  }
1021
1022  static class MockWAL extends FSHLog {
1023    boolean doCompleteCacheFlush = false;
1024
1025    public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
1026      throws IOException {
1027      super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1028    }
1029
1030    @Override
1031    public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
1032      if (!doCompleteCacheFlush) {
1033        return;
1034      }
1035      super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
1036    }
1037  }
1038
1039  private TableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1040    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1041    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a")));
1042    return builder.build();
1043  }
1044
1045  private MockWAL createMockWAL() throws IOException {
1046    MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
1047    wal.init();
1048    // Set down maximum recovery so we dfsclient doesn't linger retrying something
1049    // long gone.
1050    HBaseTestingUtil.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1051    return wal;
1052  }
1053
1054  // Flusher used in this test. Keep count of how often we are called and
1055  // actually run the flush inside here.
1056  static class TestFlusher implements FlushRequester {
1057    private HRegion r;
1058
1059    @Override
1060    public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
1061      try {
1062        r.flush(false);
1063        return true;
1064      } catch (IOException e) {
1065        throw new RuntimeException("Exception flushing", e);
1066      }
1067    }
1068
1069    @Override
1070    public boolean requestFlush(HRegion region, List<byte[]> families,
1071      FlushLifeCycleTracker tracker) {
1072      return true;
1073    }
1074
1075    @Override
1076    public boolean requestDelayedFlush(HRegion region, long when) {
1077      return true;
1078    }
1079
1080    @Override
1081    public void registerFlushRequestListener(FlushRequestListener listener) {
1082
1083    }
1084
1085    @Override
1086    public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1087      return false;
1088    }
1089
1090    @Override
1091    public void setGlobalMemStoreLimit(long globalMemStoreSize) {
1092
1093    }
1094  }
1095
1096  private WALKeyImpl createWALKey(final TableName tableName, final RegionInfo hri,
1097    final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
1098    return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
1099  }
1100
1101  private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
1102    int index) {
1103    byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
1104    byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index));
1105    WALEdit edit = new WALEdit();
1106    edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
1107    return edit;
1108  }
1109
1110  private FSWALEntry createFSWALEntry(TableDescriptor htd, RegionInfo hri, long sequence,
1111    byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
1112    int index, NavigableMap<byte[], Integer> scopes) throws IOException {
1113    FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
1114      createWALEdit(rowName, family, ee, index), hri, true, null);
1115    entry.stampRegionSequenceId(mvcc.begin());
1116    return entry;
1117  }
1118
1119  private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName,
1120    final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1121    final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
1122    throws IOException {
1123    for (int j = 0; j < count; j++) {
1124      wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes),
1125        createWALEdit(rowName, family, ee, j));
1126    }
1127    wal.sync();
1128  }
1129
1130  public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
1131    EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
1132    List<Put> puts = new ArrayList<>();
1133    for (int j = 0; j < count; j++) {
1134      byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1135      Put p = new Put(rowName);
1136      p.addColumn(family, qualifier, ee.currentTime(), rowName);
1137      r.put(p);
1138      puts.add(p);
1139    }
1140    return puts;
1141  }
1142
1143  /**
1144   * Creates an HRI around an HTD that has <code>tableName</code> and three column families named
1145   * 'a','b', and 'c'.
1146   * @param tableName Name of table to use when we create HTableDescriptor.
1147   */
1148  private RegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1149    return RegionInfoBuilder.newBuilder(tableName).build();
1150  }
1151
1152  /**
1153   * Run the split. Verify only single split file made.
1154   * @return The single split file made
1155   */
1156  private Path runWALSplit(final Configuration c) throws IOException {
1157    List<Path> splits =
1158      WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
1159    // Split should generate only 1 file since there's only 1 region
1160    assertEquals("splits=" + splits, 1, splits.size());
1161    // Make sure the file exists
1162    assertTrue(fs.exists(splits.get(0)));
1163    LOG.info("Split file=" + splits.get(0));
1164    return splits.get(0);
1165  }
1166
1167  private TableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1168    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1169    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a")));
1170    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("b")));
1171    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("c")));
1172    return builder.build();
1173  }
1174
1175  private void writerWALFile(Path file, List<FSWALEntry> entries)
1176    throws IOException, StreamLacksCapabilityException {
1177    fs.mkdirs(file.getParent());
1178    ProtobufLogWriter writer = new ProtobufLogWriter();
1179    writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file),
1180      StreamSlowMonitor.create(conf, "testMonitor"));
1181    for (FSWALEntry entry : entries) {
1182      writer.append(entry);
1183    }
1184    writer.sync(false);
1185    writer.close();
1186  }
1187
1188  protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName)
1189    throws IOException;
1190}