001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.anyInt;
026import static org.mockito.ArgumentMatchers.eq;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.spy;
029import static org.mockito.Mockito.when;
030
031import java.io.FilterInputStream;
032import java.io.IOException;
033import java.lang.reflect.Field;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collection;
038import java.util.HashSet;
039import java.util.List;
040import java.util.NavigableMap;
041import java.util.Set;
042import java.util.TreeMap;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.atomic.AtomicInteger;
045import java.util.function.Consumer;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FSDataInputStream;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtil;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
062import org.apache.hadoop.hbase.client.Delete;
063import org.apache.hadoop.hbase.client.Get;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.RegionInfo;
066import org.apache.hadoop.hbase.client.RegionInfoBuilder;
067import org.apache.hadoop.hbase.client.Result;
068import org.apache.hadoop.hbase.client.ResultScanner;
069import org.apache.hadoop.hbase.client.Scan;
070import org.apache.hadoop.hbase.client.Table;
071import org.apache.hadoop.hbase.client.TableDescriptor;
072import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
073import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
074import org.apache.hadoop.hbase.monitoring.MonitoredTask;
075import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
076import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
077import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
078import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
079import org.apache.hadoop.hbase.regionserver.FlushRequester;
080import org.apache.hadoop.hbase.regionserver.HRegion;
081import org.apache.hadoop.hbase.regionserver.HRegionServer;
082import org.apache.hadoop.hbase.regionserver.HStore;
083import org.apache.hadoop.hbase.regionserver.MemStoreSizing;
084import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
085import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
086import org.apache.hadoop.hbase.regionserver.Region;
087import org.apache.hadoop.hbase.regionserver.RegionScanner;
088import org.apache.hadoop.hbase.regionserver.RegionServerServices;
089import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
090import org.apache.hadoop.hbase.security.User;
091import org.apache.hadoop.hbase.util.Bytes;
092import org.apache.hadoop.hbase.util.CommonFSUtils;
093import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
094import org.apache.hadoop.hbase.util.EnvironmentEdge;
095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
096import org.apache.hadoop.hbase.util.HFileTestUtil;
097import org.apache.hadoop.hbase.util.Pair;
098import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
099import org.apache.hadoop.hbase.wal.WAL;
100import org.apache.hadoop.hbase.wal.WALEdit;
101import org.apache.hadoop.hbase.wal.WALFactory;
102import org.apache.hadoop.hbase.wal.WALKeyImpl;
103import org.apache.hadoop.hbase.wal.WALSplitUtil;
104import org.apache.hadoop.hbase.wal.WALSplitter;
105import org.apache.hadoop.hdfs.DFSInputStream;
106import org.junit.After;
107import org.junit.AfterClass;
108import org.junit.Before;
109import org.junit.BeforeClass;
110import org.junit.Rule;
111import org.junit.Test;
112import org.junit.rules.TestName;
113import org.mockito.Mockito;
114import org.mockito.invocation.InvocationOnMock;
115import org.mockito.stubbing.Answer;
116import org.slf4j.Logger;
117import org.slf4j.LoggerFactory;
118
119/**
120 * Test replay of edits out of a WAL split.
121 */
122public abstract class AbstractTestWALReplay {
123  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
124  static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
125  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
126  private Path hbaseRootDir = null;
127  private String logName;
128  private Path oldLogDir;
129  private Path logDir;
130  private FileSystem fs;
131  private Configuration conf;
132  private WALFactory wals;
133
134  @Rule
135  public final TestName currentTest = new TestName();
136
137  @BeforeClass
138  public static void setUpBeforeClass() throws Exception {
139    Configuration conf = TEST_UTIL.getConfiguration();
140    // The below config supported by 0.20-append and CDH3b2
141    conf.setInt("dfs.client.block.recovery.retries", 2);
142    TEST_UTIL.startMiniCluster(3);
143    Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
144    LOG.info("hbase.rootdir=" + hbaseRootDir);
145    CommonFSUtils.setRootDir(conf, hbaseRootDir);
146  }
147
148  @AfterClass
149  public static void tearDownAfterClass() throws Exception {
150    TEST_UTIL.shutdownMiniCluster();
151  }
152
153  @Before
154  public void setUp() throws Exception {
155    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
156    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
157    this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf);
158    this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
159    String serverName = ServerName
160      .valueOf(currentTest.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime())
161      .toString();
162    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
163    this.logDir = new Path(this.hbaseRootDir, logName);
164    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
165      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
166    }
167    this.wals = new WALFactory(conf, currentTest.getMethodName());
168  }
169
170  @After
171  public void tearDown() throws Exception {
172    this.wals.close();
173    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
174  }
175
176  /*
177   * @param p Directory to cleanup
178   */
179  private void deleteDir(final Path p) throws IOException {
180    if (this.fs.exists(p)) {
181      if (!this.fs.delete(p, true)) {
182        throw new IOException("Failed remove of " + p);
183      }
184    }
185  }
186
187  /**
188   *   */
189  @Test
190  public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
191    final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
192    byte[] family1 = Bytes.toBytes("cf1");
193    byte[] family2 = Bytes.toBytes("cf2");
194    byte[] qualifier = Bytes.toBytes("q");
195    byte[] value = Bytes.toBytes("testV");
196    byte[][] familys = { family1, family2 };
197    TEST_UTIL.createTable(tableName, familys);
198    Table htable = TEST_UTIL.getConnection().getTable(tableName);
199    Put put = new Put(Bytes.toBytes("r1"));
200    put.addColumn(family1, qualifier, value);
201    htable.put(put);
202    ResultScanner resultScanner = htable.getScanner(new Scan());
203    int count = 0;
204    while (resultScanner.next() != null) {
205      count++;
206    }
207    resultScanner.close();
208    assertEquals(1, count);
209
210    SingleProcessHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
211    List<HRegion> regions = hbaseCluster.getRegions(tableName);
212    assertEquals(1, regions.size());
213
214    // move region to another regionserver
215    Region destRegion = regions.get(0);
216    int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName());
217    assertTrue("Please start more than 1 regionserver",
218      hbaseCluster.getRegionServerThreads().size() > 1);
219    int destServerNum = 0;
220    while (destServerNum == originServerNum) {
221      destServerNum++;
222    }
223    HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
224    HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
225    // move region to destination regionserver
226    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName());
227
228    // delete the row
229    Delete del = new Delete(Bytes.toBytes("r1"));
230    htable.delete(del);
231    resultScanner = htable.getScanner(new Scan());
232    count = 0;
233    while (resultScanner.next() != null) {
234      count++;
235    }
236    resultScanner.close();
237    assertEquals(0, count);
238
239    // flush region and make major compaction
240    HRegion region =
241      (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
242    region.flush(true);
243    // wait to complete major compaction
244    for (HStore store : region.getStores()) {
245      store.triggerMajorCompaction();
246    }
247    region.compact(true);
248
249    // move region to origin regionserver
250    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName());
251    // abort the origin regionserver
252    originServer.abort("testing");
253
254    // see what we get
255    Result result = htable.get(new Get(Bytes.toBytes("r1")));
256    if (result != null) {
257      assertTrue("Row is deleted, but we get" + result.toString(),
258        (result == null) || result.isEmpty());
259    }
260    resultScanner.close();
261  }
262
263  /**
264   * Tests for hbase-2727.
265   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a>
266   */
267  @Test
268  public void test2727() throws Exception {
269    // Test being able to have > 1 set of edits in the recovered.edits directory.
270    // Ensure edits are replayed properly.
271    final TableName tableName = TableName.valueOf("test2727");
272
273    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
274    RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
275    Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
276    deleteDir(basedir);
277
278    TableDescriptor tableDescriptor = createBasic3FamilyHTD(tableName);
279    Region region2 =
280      HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, tableDescriptor);
281    HBaseTestingUtil.closeRegionAndWAL(region2);
282    final byte[] rowName = tableName.getName();
283
284    WAL wal1 = createWAL(this.conf, hbaseRootDir, logName);
285    // Add 1k to each family.
286    final int countPerFamily = 1000;
287
288    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
289    for (byte[] fam : tableDescriptor.getColumnFamilyNames()) {
290      scopes.put(fam, 0);
291    }
292    for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
293      addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal1,
294        mvcc, scopes);
295    }
296    wal1.shutdown();
297    runWALSplit(this.conf);
298
299    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
300    // Add 1k to each family.
301    for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
302      addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal2,
303        mvcc, scopes);
304    }
305    wal2.shutdown();
306    runWALSplit(this.conf);
307
308    WAL wal3 = createWAL(this.conf, hbaseRootDir, logName);
309    try {
310      HRegion region =
311        HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, tableDescriptor, wal3);
312      long seqid = region.getOpenSeqNum();
313      // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
314      // When opened, this region would apply 6k edits, and increment the sequenceId by 1
315      assertTrue(seqid > mvcc.getWritePoint());
316      assertEquals(seqid - 1, mvcc.getWritePoint());
317      LOG.debug(
318        "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint());
319
320      // TODO: Scan all.
321      region.close();
322    } finally {
323      wal3.close();
324    }
325  }
326
327  /**
328   * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'.
329   */
330  @Test
331  public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException,
332    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
333    final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
334    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
335    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
336    deleteDir(basedir);
337    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
338    Region region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
339    HBaseTestingUtil.closeRegionAndWAL(region2);
340    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
341    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
342
343    byte[] family = htd.getColumnFamilies()[0].getName();
344    Path f = new Path(basedir, "hfile");
345    HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
346      Bytes.toBytes("z"), 10);
347    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
348    hfs.add(Pair.newPair(family, f.toString()));
349    region.bulkLoadHFiles(hfs, true, null);
350
351    // Add an edit so something in the WAL
352    byte[] row = tableName.getName();
353    region.put((new Put(row)).addColumn(family, family, family));
354    wal.sync();
355    final int rowsInsertedCount = 11;
356
357    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
358
359    // Now 'crash' the region by stealing its wal
360    final Configuration newConf = HBaseConfiguration.create(this.conf);
361    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
362    user.runAs(new PrivilegedExceptionAction() {
363      @Override
364      public Object run() throws Exception {
365        runWALSplit(newConf);
366        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
367
368        HRegion region2 =
369          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
370        long seqid2 = region2.getOpenSeqNum();
371        assertTrue(seqid2 > -1);
372        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
373
374        // I can't close wal1. Its been appropriated when we split.
375        region2.close();
376        wal2.close();
377        return null;
378      }
379    });
380  }
381
382  /**
383   * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
384   * files) and an edit in the memstore.
385   * <p/>
386   * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries from
387   * being replayed"
388   */
389  @Test
390  public void testCompactedBulkLoadedFiles() throws IOException, SecurityException,
391    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
392    final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles");
393    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
394    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
395    deleteDir(basedir);
396    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
397    HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
398    HBaseTestingUtil.closeRegionAndWAL(region2);
399    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
400    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
401
402    // Add an edit so something in the WAL
403    byte[] row = tableName.getName();
404    byte[] family = htd.getColumnFamilies()[0].getName();
405    region.put((new Put(row)).addColumn(family, family, family));
406    wal.sync();
407
408    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
409    for (int i = 0; i < 3; i++) {
410      Path f = new Path(basedir, "hfile" + i);
411      HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
412        Bytes.toBytes(i + "50"), 10);
413      hfs.add(Pair.newPair(family, f.toString()));
414    }
415    region.bulkLoadHFiles(hfs, true, null);
416    final int rowsInsertedCount = 31;
417    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
418
419    // major compact to turn all the bulk loaded files into one normal file
420    region.compact(true);
421    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
422
423    // Now 'crash' the region by stealing its wal
424    final Configuration newConf = HBaseConfiguration.create(this.conf);
425    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
426    user.runAs(new PrivilegedExceptionAction() {
427      @Override
428      public Object run() throws Exception {
429        runWALSplit(newConf);
430        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
431
432        HRegion region2 =
433          HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2);
434        long seqid2 = region2.getOpenSeqNum();
435        assertTrue(seqid2 > -1);
436        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
437
438        // I can't close wal1. Its been appropriated when we split.
439        region2.close();
440        wal2.close();
441        return null;
442      }
443    });
444  }
445
446  /**
447   * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify
448   * seqids.
449   */
450  @Test
451  public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException,
452    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
453    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
454    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
455    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
456    deleteDir(basedir);
457    final byte[] rowName = tableName.getName();
458    final int countPerFamily = 10;
459    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
460    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
461    HBaseTestingUtil.closeRegionAndWAL(region3);
462    // Write countPerFamily edits into the three families. Do a flush on one
463    // of the families during the load of edits so its seqid is not same as
464    // others to test we do right thing when different seqids.
465    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
466    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
467    long seqid = region.getOpenSeqNum();
468    boolean first = true;
469    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
470      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
471      if (first) {
472        // If first, so we have at least one family w/ different seqid to rest.
473        region.flush(true);
474        first = false;
475      }
476    }
477    // Now assert edits made it in.
478    final Get g = new Get(rowName);
479    Result result = region.get(g);
480    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
481    // Now close the region (without flush), split the log, reopen the region and assert that
482    // replay of log has the correct effect, that our seqids are calculated correctly so
483    // all edits in logs are seen as 'stale'/old.
484    region.close(true);
485    wal.shutdown();
486    runWALSplit(this.conf);
487    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
488    HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
489    long seqid2 = region2.getOpenSeqNum();
490    assertTrue(seqid + result.size() < seqid2);
491    final Result result1b = region2.get(g);
492    assertEquals(result.size(), result1b.size());
493
494    // Next test. Add more edits, then 'crash' this region by stealing its wal
495    // out from under it and assert that replay of the log adds the edits back
496    // correctly when region is opened again.
497    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
498      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
499    }
500    // Get count of edits.
501    final Result result2 = region2.get(g);
502    assertEquals(2 * result.size(), result2.size());
503    wal2.sync();
504    final Configuration newConf = HBaseConfiguration.create(this.conf);
505    User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString());
506    user.runAs(new PrivilegedExceptionAction<Object>() {
507      @Override
508      public Object run() throws Exception {
509        runWALSplit(newConf);
510        FileSystem newFS = FileSystem.get(newConf);
511        // Make a new wal for new region open.
512        WAL wal3 = createWAL(newConf, hbaseRootDir, logName);
513        final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
514        HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
515          @Override
516          protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) {
517            super.restoreEdit(s, cell, memstoreSizing);
518            countOfRestoredEdits.incrementAndGet();
519          }
520        };
521        long seqid3 = region3.initialize();
522        Result result3 = region3.get(g);
523        // Assert that count of cells is same as before crash.
524        assertEquals(result2.size(), result3.size());
525        assertEquals(htd.getColumnFamilies().length * countPerFamily, countOfRestoredEdits.get());
526
527        // I can't close wal1. Its been appropriated when we split.
528        region3.close();
529        wal3.close();
530        return null;
531      }
532    });
533  }
534
535  /**
536   * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores
537   * got flushed but others did not.
538   * <p/>
539   * Unfortunately, there is no easy hook to flush at a store level. The way we get around this is
540   * by flushing at the region level, and then deleting the recently flushed store file for one of
541   * the Stores. This would put us back in the situation where all but that store got flushed and
542   * the region died.
543   * <p/>
544   * We restart Region again, and verify that the edits were replayed.
545   */
546  @Test
547  public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException,
548    IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException {
549    final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion");
550    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
551    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
552    deleteDir(basedir);
553    final byte[] rowName = tableName.getName();
554    final int countPerFamily = 10;
555    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
556    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
557    HBaseTestingUtil.closeRegionAndWAL(region3);
558    // Write countPerFamily edits into the three families. Do a flush on one
559    // of the families during the load of edits so its seqid is not same as
560    // others to test we do right thing when different seqids.
561    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
562    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
563    long seqid = region.getOpenSeqNum();
564    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
565      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
566    }
567
568    // Now assert edits made it in.
569    final Get g = new Get(rowName);
570    Result result = region.get(g);
571    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
572
573    // Let us flush the region
574    region.flush(true);
575    region.close(true);
576    wal.shutdown();
577
578    // delete the store files in the second column family to simulate a failure
579    // in between the flushcache();
580    // we have 3 families. killing the middle one ensures that taking the maximum
581    // will make us fail.
582    int cf_count = 0;
583    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
584      cf_count++;
585      if (cf_count == 2) {
586        region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
587      }
588    }
589
590    // Let us try to split and recover
591    runWALSplit(this.conf);
592    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
593    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
594    long seqid2 = region2.getOpenSeqNum();
595    assertTrue(seqid + result.size() < seqid2);
596
597    final Result result1b = region2.get(g);
598    assertEquals(result.size(), result1b.size());
599  }
600
601  // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
602  // Only throws exception if throwExceptionWhenFlushing is set true.
603  public static class CustomStoreFlusher extends DefaultStoreFlusher {
604    // Switch between throw and not throw exception in flush
605    public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
606
607    public CustomStoreFlusher(Configuration conf, HStore store) {
608      super(conf, store);
609    }
610
611    @Override
612    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
613      MonitoredTask status, ThroughputController throughputController,
614      FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
615      if (throwExceptionWhenFlushing.get()) {
616        throw new IOException("Simulated exception by tests");
617      }
618      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker,
619        writerCreationTracker);
620    }
621  }
622
623  /**
624   * Test that we could recover the data correctly after aborting flush. In the test, first we abort
625   * flush after writing some data, then writing more data and flush again, at last verify the data.
626   */
627  @Test
628  public void testReplayEditsAfterAbortingFlush() throws IOException {
629    final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush");
630    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
631    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
632    deleteDir(basedir);
633    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
634    HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
635    HBaseTestingUtil.closeRegionAndWAL(region3);
636    // Write countPerFamily edits into the three families. Do a flush on one
637    // of the families during the load of edits so its seqid is not same as
638    // others to test we do right thing when different seqids.
639    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
640    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
641    Mockito.doReturn(false).when(rsServices).isAborted();
642    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
643    when(rsServices.getConfiguration()).thenReturn(conf);
644    Configuration customConf = new Configuration(this.conf);
645    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
646      CustomStoreFlusher.class.getName());
647    HRegion region =
648      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
649    int writtenRowCount = 10;
650    List<ColumnFamilyDescriptor> families = Arrays.asList((htd.getColumnFamilies()));
651    for (int i = 0; i < writtenRowCount; i++) {
652      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
653      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
654        Bytes.toBytes("val"));
655      region.put(put);
656    }
657
658    // Now assert edits made it in.
659    RegionScanner scanner = region.getScanner(new Scan());
660    assertEquals(writtenRowCount, getScannedCount(scanner));
661
662    // Let us flush the region
663    CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
664    try {
665      region.flush(true);
666      fail("Injected exception hasn't been thrown");
667    } catch (IOException e) {
668      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
669      // simulated to abort server
670      Mockito.doReturn(true).when(rsServices).isAborted();
671      region.setClosing(false); // region normally does not accept writes after
672      // DroppedSnapshotException. We mock around it for this test.
673    }
674    // writing more data
675    int moreRow = 10;
676    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
677      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
678      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
679        Bytes.toBytes("val"));
680      region.put(put);
681    }
682    writtenRowCount += moreRow;
683    // call flush again
684    CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
685    try {
686      region.flush(true);
687    } catch (IOException t) {
688      LOG.info(
689        "Expected exception when flushing region because server is stopped," + t.getMessage());
690    }
691
692    region.close(true);
693    wal.shutdown();
694
695    // Let us try to split and recover
696    runWALSplit(this.conf);
697    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
698    Mockito.doReturn(false).when(rsServices).isAborted();
699    HRegion region2 =
700      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
701    scanner = region2.getScanner(new Scan());
702    assertEquals(writtenRowCount, getScannedCount(scanner));
703  }
704
705  private int getScannedCount(RegionScanner scanner) throws IOException {
706    int scannedCount = 0;
707    List<Cell> results = new ArrayList<>();
708    while (true) {
709      boolean existMore = scanner.next(results);
710      if (!results.isEmpty()) {
711        scannedCount++;
712      }
713      if (!existMore) {
714        break;
715      }
716      results.clear();
717    }
718    return scannedCount;
719  }
720
721  /**
722   * Create an HRegion with the result of a WAL split and test we only see the good edits=
723   */
724  @Test
725  public void testReplayEditsWrittenIntoWAL() throws Exception {
726    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
727    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
728    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
729    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
730    deleteDir(basedir);
731
732    final TableDescriptor htd = createBasic3FamilyHTD(tableName);
733    HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
734    HBaseTestingUtil.closeRegionAndWAL(region2);
735    final WAL wal = createWAL(this.conf, hbaseRootDir, logName);
736    final byte[] rowName = tableName.getName();
737    final byte[] regionName = hri.getEncodedNameAsBytes();
738
739    // Add 1k to each family.
740    final int countPerFamily = 1000;
741    Set<byte[]> familyNames = new HashSet<>();
742    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
743    for (byte[] fam : htd.getColumnFamilyNames()) {
744      scopes.put(fam, 0);
745    }
746    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
747      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, mvcc, scopes);
748      familyNames.add(hcd.getName());
749    }
750
751    // Add a cache flush, shouldn't have any effect
752    wal.startCacheFlush(regionName, familyNames);
753    wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
754
755    // Add an edit to another family, should be skipped.
756    WALEdit edit = new WALEdit();
757    long now = ee.currentTime();
758    edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName));
759    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
760      edit);
761
762    // Delete the c family to verify deletes make it over.
763    edit = new WALEdit();
764    now = ee.currentTime();
765    edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
766    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
767      edit);
768
769    // Sync.
770    wal.sync();
771    // Make a new conf and a new fs for the splitter to run on so we can take
772    // over old wal.
773    final Configuration newConf = HBaseConfiguration.create(this.conf);
774    User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime");
775    user.runAs(new PrivilegedExceptionAction<Void>() {
776      @Override
777      public Void run() throws Exception {
778        runWALSplit(newConf);
779        FileSystem newFS = FileSystem.get(newConf);
780        // 100k seems to make for about 4 flushes during HRegion#initialize.
781        newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
782        // Make a new wal for new region.
783        WAL newWal = createWAL(newConf, hbaseRootDir, logName);
784        final AtomicInteger flushcount = new AtomicInteger(0);
785        try {
786          final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
787            @Override
788            protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
789              final Collection<HStore> storesToFlush, MonitoredTask status,
790              boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
791              LOG.info("InternalFlushCache Invoked");
792              FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
793                Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker);
794              flushcount.incrementAndGet();
795              return fs;
796            }
797          };
798          // The seq id this region has opened up with
799          long seqid = region.initialize();
800
801          // The mvcc readpoint of from inserting data.
802          long writePoint = mvcc.getWritePoint();
803
804          // We flushed during init.
805          assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
806          assertTrue((seqid - 1) == writePoint);
807
808          Get get = new Get(rowName);
809          Result result = region.get(get);
810          // Make sure we only see the good edits
811          assertEquals(countPerFamily * (htd.getColumnFamilies().length - 1), result.size());
812          region.close();
813        } finally {
814          newWal.close();
815        }
816        return null;
817      }
818    });
819  }
820
821  @Test
822  // the following test is for HBASE-6065
823  public void testSequentialEditLogSeqNum() throws IOException {
824    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
825    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
826    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
827    deleteDir(basedir);
828    final byte[] rowName = tableName.getName();
829    final int countPerFamily = 10;
830    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
831
832    // Mock the WAL
833    MockWAL wal = createMockWAL();
834
835    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
836    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
837      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
838    }
839
840    // Let us flush the region
841    // But this time completeflushcache is not yet done
842    region.flush(true);
843    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
844      addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
845    }
846    long lastestSeqNumber = region.getReadPoint(null);
847    // get the current seq no
848    wal.doCompleteCacheFlush = true;
849    // allow complete cache flush with the previous seq number got after first
850    // set of edits.
851    wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
852    wal.shutdown();
853    FileStatus[] listStatus = wal.getFiles();
854    assertNotNull(listStatus);
855    assertTrue(listStatus.length > 0);
856    WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null,
857      wals, null);
858    FileStatus[] listStatus1 =
859      this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName),
860        new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() {
861          @Override
862          public boolean accept(Path p) {
863            return !WALSplitUtil.isSequenceIdFile(p);
864          }
865        });
866    int editCount = 0;
867    for (FileStatus fileStatus : listStatus1) {
868      editCount = Integer.parseInt(fileStatus.getPath().getName());
869    }
870    // The sequence number should be same
871    assertEquals(
872      "The sequence number of the recoverd.edits and the current edit seq should be same",
873      lastestSeqNumber, editCount);
874  }
875
876  /**
877   * testcase for https://issues.apache.org/jira/browse/HBASE-15252
878   */
879  @Test
880  public void testDatalossWhenInputError() throws Exception {
881    final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
882    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
883    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
884    deleteDir(basedir);
885    final byte[] rowName = tableName.getName();
886    final int countPerFamily = 10;
887    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
888    HRegion region1 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
889    Path regionDir = region1.getWALRegionDir();
890    HBaseTestingUtil.closeRegionAndWAL(region1);
891
892    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
893    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
894    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
895      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
896    }
897    // Now assert edits made it in.
898    final Get g = new Get(rowName);
899    Result result = region.get(g);
900    assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size());
901    // Now close the region (without flush), split the log, reopen the region and assert that
902    // replay of log has the correct effect.
903    region.close(true);
904    wal.shutdown();
905
906    runWALSplit(this.conf);
907
908    // here we let the DFSInputStream throw an IOException just after the WALHeader.
909    Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
910    FSDataInputStream stream = fs.open(editFile);
911    stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
912    Class<? extends AbstractFSWALProvider.Reader> logReaderClass =
913      conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
914        AbstractFSWALProvider.Reader.class);
915    AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance();
916    reader.init(this.fs, editFile, conf, stream);
917    final long headerLength = stream.getPos();
918    reader.close();
919    FileSystem spyFs = spy(this.fs);
920    doAnswer(new Answer<FSDataInputStream>() {
921
922      @Override
923      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
924        FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
925        Field field = FilterInputStream.class.getDeclaredField("in");
926        field.setAccessible(true);
927        final DFSInputStream in = (DFSInputStream) field.get(stream);
928        DFSInputStream spyIn = spy(in);
929        doAnswer(new Answer<Integer>() {
930
931          private long pos;
932
933          @Override
934          public Integer answer(InvocationOnMock invocation) throws Throwable {
935            if (pos >= headerLength) {
936              throw new IOException("read over limit");
937            }
938            int b = (Integer) invocation.callRealMethod();
939            if (b > 0) {
940              pos += b;
941            }
942            return b;
943          }
944        }).when(spyIn).read(any(byte[].class), anyInt(), anyInt());
945        doAnswer(new Answer<Void>() {
946
947          @Override
948          public Void answer(InvocationOnMock invocation) throws Throwable {
949            invocation.callRealMethod();
950            in.close();
951            return null;
952          }
953        }).when(spyIn).close();
954        field.set(stream, spyIn);
955        return stream;
956      }
957    }).when(spyFs).open(eq(editFile));
958
959    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
960    HRegion region2;
961    try {
962      // log replay should fail due to the IOException, otherwise we may lose data.
963      region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
964      assertEquals(result.size(), region2.get(g).size());
965    } catch (IOException e) {
966      assertEquals("read over limit", e.getMessage());
967    }
968    region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
969    assertEquals(result.size(), region2.get(g).size());
970  }
971
972  /**
973   * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
974   */
975  private void testNameConflictWhenSplit(boolean largeFirst)
976    throws IOException, StreamLacksCapabilityException {
977    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
978    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
979    final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
980    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
981    deleteDir(basedir);
982
983    final TableDescriptor htd = createBasic1FamilyHTD(tableName);
984    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
985    for (byte[] fam : htd.getColumnFamilyNames()) {
986      scopes.put(fam, 0);
987    }
988    HRegion region = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
989    HBaseTestingUtil.closeRegionAndWAL(region);
990    final byte[] family = htd.getColumnFamilies()[0].getName();
991    final byte[] rowName = tableName.getName();
992    FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes);
993    FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes);
994
995    Path largeFile = new Path(logDir, "wal-1");
996    Path smallFile = new Path(logDir, "wal-2");
997    writerWALFile(largeFile, Arrays.asList(entry1, entry2));
998    writerWALFile(smallFile, Arrays.asList(entry2));
999    FileStatus first, second;
1000    if (largeFirst) {
1001      first = fs.getFileStatus(largeFile);
1002      second = fs.getFileStatus(smallFile);
1003    } else {
1004      first = fs.getFileStatus(smallFile);
1005      second = fs.getFileStatus(largeFile);
1006    }
1007    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null);
1008    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null);
1009    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
1010    region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
1011    assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
1012    assertEquals(2, region.get(new Get(rowName)).size());
1013  }
1014
1015  @Test
1016  public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException {
1017    testNameConflictWhenSplit(true);
1018  }
1019
1020  @Test
1021  public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException {
1022    testNameConflictWhenSplit(false);
1023  }
1024
1025  static class MockWAL extends FSHLog {
1026    boolean doCompleteCacheFlush = false;
1027
1028    public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
1029      throws IOException {
1030      super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1031    }
1032
1033    @Override
1034    public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
1035      if (!doCompleteCacheFlush) {
1036        return;
1037      }
1038      super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
1039    }
1040  }
1041
1042  private TableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1043    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1044    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a")));
1045    return builder.build();
1046  }
1047
1048  private MockWAL createMockWAL() throws IOException {
1049    MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
1050    wal.init();
1051    // Set down maximum recovery so we dfsclient doesn't linger retrying something
1052    // long gone.
1053    HBaseTestingUtil.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1054    return wal;
1055  }
1056
1057  // Flusher used in this test. Keep count of how often we are called and
1058  // actually run the flush inside here.
1059  static class TestFlusher implements FlushRequester {
1060    private HRegion r;
1061
1062    @Override
1063    public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
1064      try {
1065        r.flush(false);
1066        return true;
1067      } catch (IOException e) {
1068        throw new RuntimeException("Exception flushing", e);
1069      }
1070    }
1071
1072    @Override
1073    public boolean requestFlush(HRegion region, List<byte[]> families,
1074      FlushLifeCycleTracker tracker) {
1075      return true;
1076    }
1077
1078    @Override
1079    public boolean requestDelayedFlush(HRegion region, long when) {
1080      return true;
1081    }
1082
1083    @Override
1084    public void registerFlushRequestListener(FlushRequestListener listener) {
1085
1086    }
1087
1088    @Override
1089    public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1090      return false;
1091    }
1092
1093    @Override
1094    public void setGlobalMemStoreLimit(long globalMemStoreSize) {
1095
1096    }
1097  }
1098
1099  private WALKeyImpl createWALKey(final TableName tableName, final RegionInfo hri,
1100    final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
1101    return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
1102  }
1103
1104  private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
1105    int index) {
1106    byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
1107    byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index));
1108    WALEdit edit = new WALEdit();
1109    edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
1110    return edit;
1111  }
1112
1113  private FSWALEntry createFSWALEntry(TableDescriptor htd, RegionInfo hri, long sequence,
1114    byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
1115    int index, NavigableMap<byte[], Integer> scopes) throws IOException {
1116    FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
1117      createWALEdit(rowName, family, ee, index), hri, true, null);
1118    entry.stampRegionSequenceId(mvcc.begin());
1119    return entry;
1120  }
1121
1122  private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName,
1123    final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1124    final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
1125    throws IOException {
1126    for (int j = 0; j < count; j++) {
1127      wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes),
1128        createWALEdit(rowName, family, ee, j));
1129    }
1130    wal.sync();
1131  }
1132
1133  public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
1134    EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
1135    List<Put> puts = new ArrayList<>();
1136    for (int j = 0; j < count; j++) {
1137      byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1138      Put p = new Put(rowName);
1139      p.addColumn(family, qualifier, ee.currentTime(), rowName);
1140      r.put(p);
1141      puts.add(p);
1142    }
1143    return puts;
1144  }
1145
1146  /**
1147   * Creates an HRI around an HTD that has <code>tableName</code> and three column families named
1148   * 'a','b', and 'c'.
1149   * @param tableName Name of table to use when we create HTableDescriptor.
1150   */
1151  private RegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1152    return RegionInfoBuilder.newBuilder(tableName).build();
1153  }
1154
1155  /**
1156   * Run the split. Verify only single split file made.
1157   * @return The single split file made
1158   */
1159  private Path runWALSplit(final Configuration c) throws IOException {
1160    List<Path> splits =
1161      WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
1162    // Split should generate only 1 file since there's only 1 region
1163    assertEquals("splits=" + splits, 1, splits.size());
1164    // Make sure the file exists
1165    assertTrue(fs.exists(splits.get(0)));
1166    LOG.info("Split file=" + splits.get(0));
1167    return splits.get(0);
1168  }
1169
1170  private TableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1171    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1172    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a")));
1173    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("b")));
1174    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("c")));
1175    return builder.build();
1176  }
1177
1178  private void writerWALFile(Path file, List<FSWALEntry> entries)
1179    throws IOException, StreamLacksCapabilityException {
1180    fs.mkdirs(file.getParent());
1181    ProtobufLogWriter writer = new ProtobufLogWriter();
1182    writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file),
1183      StreamSlowMonitor.create(conf, "testMonitor"));
1184    for (FSWALEntry entry : entries) {
1185      writer.append(entry);
1186    }
1187    writer.sync(false);
1188    writer.close();
1189  }
1190
1191  protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName)
1192    throws IOException;
1193}