001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver.wal;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025import static org.mockito.ArgumentMatchers.any;
026import static org.mockito.ArgumentMatchers.anyInt;
027import static org.mockito.ArgumentMatchers.eq;
028import static org.mockito.Mockito.doAnswer;
029import static org.mockito.Mockito.spy;
030import static org.mockito.Mockito.when;
031
032import java.io.FilterInputStream;
033import java.io.IOException;
034import java.lang.reflect.Field;
035import java.security.PrivilegedExceptionAction;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.Collection;
039import java.util.HashSet;
040import java.util.List;
041import java.util.NavigableMap;
042import java.util.Set;
043import java.util.TreeMap;
044import java.util.concurrent.atomic.AtomicBoolean;
045import java.util.concurrent.atomic.AtomicInteger;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FSDataInputStream;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtility;
055import org.apache.hadoop.hbase.HColumnDescriptor;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionInfo;
058import org.apache.hadoop.hbase.HTableDescriptor;
059import org.apache.hadoop.hbase.KeyValue;
060import org.apache.hadoop.hbase.MiniHBaseCluster;
061import org.apache.hadoop.hbase.ServerName;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.ResultScanner;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.Table;
070import org.apache.hadoop.hbase.monitoring.MonitoredTask;
071import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
072import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
073import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
074import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
075import org.apache.hadoop.hbase.regionserver.FlushRequester;
076import org.apache.hadoop.hbase.regionserver.HRegion;
077import org.apache.hadoop.hbase.regionserver.HRegionServer;
078import org.apache.hadoop.hbase.regionserver.HStore;
079import org.apache.hadoop.hbase.regionserver.MemStoreSizing;
080import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
081import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
082import org.apache.hadoop.hbase.regionserver.Region;
083import org.apache.hadoop.hbase.regionserver.RegionScanner;
084import org.apache.hadoop.hbase.regionserver.RegionServerServices;
085import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
086import org.apache.hadoop.hbase.security.User;
087import org.apache.hadoop.hbase.util.Bytes;
088import org.apache.hadoop.hbase.util.CommonFSUtils;
089import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
090import org.apache.hadoop.hbase.util.EnvironmentEdge;
091import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
092import org.apache.hadoop.hbase.util.HFileTestUtil;
093import org.apache.hadoop.hbase.util.Pair;
094import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
095import org.apache.hadoop.hbase.wal.WAL;
096import org.apache.hadoop.hbase.wal.WALEdit;
097import org.apache.hadoop.hbase.wal.WALFactory;
098import org.apache.hadoop.hbase.wal.WALKeyImpl;
099import org.apache.hadoop.hbase.wal.WALSplitUtil;
100import org.apache.hadoop.hbase.wal.WALSplitter;
101import org.apache.hadoop.hdfs.DFSInputStream;
102import org.junit.After;
103import org.junit.AfterClass;
104import org.junit.Before;
105import org.junit.BeforeClass;
106import org.junit.Rule;
107import org.junit.Test;
108import org.junit.rules.TestName;
109import org.mockito.Mockito;
110import org.mockito.invocation.InvocationOnMock;
111import org.mockito.stubbing.Answer;
112import org.slf4j.Logger;
113import org.slf4j.LoggerFactory;
114
115/**
116 * Test replay of edits out of a WAL split.
117 */
118public abstract class AbstractTestWALReplay {
119  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
120  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
121  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
122  private Path hbaseRootDir = null;
123  private String logName;
124  private Path oldLogDir;
125  private Path logDir;
126  private FileSystem fs;
127  private Configuration conf;
128  private WALFactory wals;
129
130  @Rule
131  public final TestName currentTest = new TestName();
132
133
134  @BeforeClass
135  public static void setUpBeforeClass() throws Exception {
136    Configuration conf = TEST_UTIL.getConfiguration();
137    // The below config supported by 0.20-append and CDH3b2
138    conf.setInt("dfs.client.block.recovery.retries", 2);
139    TEST_UTIL.startMiniCluster(3);
140    Path hbaseRootDir =
141      TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
142    LOG.info("hbase.rootdir=" + hbaseRootDir);
143    CommonFSUtils.setRootDir(conf, hbaseRootDir);
144  }
145
146  @AfterClass
147  public static void tearDownAfterClass() throws Exception {
148    TEST_UTIL.shutdownMiniCluster();
149  }
150
151  @Before
152  public void setUp() throws Exception {
153    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
154    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
155    this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf);
156    this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
157    String serverName =
158      ServerName.valueOf(currentTest.getMethodName() + "-manual", 16010, System.currentTimeMillis())
159          .toString();
160    this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
161    this.logDir = new Path(this.hbaseRootDir, logName);
162    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
163      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
164    }
165    this.wals = new WALFactory(conf, currentTest.getMethodName());
166  }
167
168  @After
169  public void tearDown() throws Exception {
170    this.wals.close();
171    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
172  }
173
174  /*
175   * @param p Directory to cleanup
176   */
177  private void deleteDir(final Path p) throws IOException {
178    if (this.fs.exists(p)) {
179      if (!this.fs.delete(p, true)) {
180        throw new IOException("Failed remove of " + p);
181      }
182    }
183  }
184
185  /**
186   *
187   * @throws Exception
188   */
189  @Test
190  public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
191    final TableName tableName =
192        TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
193    byte[] family1 = Bytes.toBytes("cf1");
194    byte[] family2 = Bytes.toBytes("cf2");
195    byte[] qualifier = Bytes.toBytes("q");
196    byte[] value = Bytes.toBytes("testV");
197    byte[][] familys = { family1, family2 };
198    TEST_UTIL.createTable(tableName, familys);
199    Table htable = TEST_UTIL.getConnection().getTable(tableName);
200    Put put = new Put(Bytes.toBytes("r1"));
201    put.addColumn(family1, qualifier, value);
202    htable.put(put);
203    ResultScanner resultScanner = htable.getScanner(new Scan());
204    int count = 0;
205    while (resultScanner.next() != null) {
206      count++;
207    }
208    resultScanner.close();
209    assertEquals(1, count);
210
211    MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
212    List<HRegion> regions = hbaseCluster.getRegions(tableName);
213    assertEquals(1, regions.size());
214
215    // move region to another regionserver
216    Region destRegion = regions.get(0);
217    int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName());
218    assertTrue("Please start more than 1 regionserver",
219        hbaseCluster.getRegionServerThreads().size() > 1);
220    int destServerNum = 0;
221    while (destServerNum == originServerNum) {
222      destServerNum++;
223    }
224    HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
225    HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
226    // move region to destination regionserver
227    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName());
228
229    // delete the row
230    Delete del = new Delete(Bytes.toBytes("r1"));
231    htable.delete(del);
232    resultScanner = htable.getScanner(new Scan());
233    count = 0;
234    while (resultScanner.next() != null) {
235      count++;
236    }
237    resultScanner.close();
238    assertEquals(0, count);
239
240    // flush region and make major compaction
241    HRegion region =
242        (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
243    region.flush(true);
244    // wait to complete major compaction
245    for (HStore store : region.getStores()) {
246      store.triggerMajorCompaction();
247    }
248    region.compact(true);
249
250    // move region to origin regionserver
251    TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName());
252    // abort the origin regionserver
253    originServer.abort("testing");
254
255    // see what we get
256    Result result = htable.get(new Get(Bytes.toBytes("r1")));
257    if (result != null) {
258      assertTrue("Row is deleted, but we get" + result.toString(),
259          (result == null) || result.isEmpty());
260    }
261    resultScanner.close();
262  }
263
264  /**
265   * Tests for hbase-2727.
266   * @throws Exception
267   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a>
268   */
269  @Test
270  public void test2727() throws Exception {
271    // Test being able to have > 1 set of edits in the recovered.edits directory.
272    // Ensure edits are replayed properly.
273    final TableName tableName =
274        TableName.valueOf("test2727");
275
276    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
277    HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
278    Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
279    deleteDir(basedir);
280
281    HTableDescriptor htd = createBasic3FamilyHTD(tableName);
282    Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
283    HBaseTestingUtility.closeRegionAndWAL(region2);
284    final byte [] rowName = tableName.getName();
285
286    WAL wal1 = createWAL(this.conf, hbaseRootDir, logName);
287    // Add 1k to each family.
288    final int countPerFamily = 1000;
289
290    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
291    for(byte[] fam : htd.getFamiliesKeys()) {
292      scopes.put(fam, 0);
293    }
294    for (HColumnDescriptor hcd: htd.getFamilies()) {
295      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
296          wal1, htd, mvcc, scopes);
297    }
298    wal1.shutdown();
299    runWALSplit(this.conf);
300
301    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
302    // Add 1k to each family.
303    for (HColumnDescriptor hcd: htd.getFamilies()) {
304      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
305          ee, wal2, htd, mvcc, scopes);
306    }
307    wal2.shutdown();
308    runWALSplit(this.conf);
309
310    WAL wal3 = createWAL(this.conf, hbaseRootDir, logName);
311    try {
312      HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
313      long seqid = region.getOpenSeqNum();
314      // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
315      // When opened, this region would apply 6k edits, and increment the sequenceId by 1
316      assertTrue(seqid > mvcc.getWritePoint());
317      assertEquals(seqid - 1, mvcc.getWritePoint());
318      LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
319          + mvcc.getReadPoint());
320
321      // TODO: Scan all.
322      region.close();
323    } finally {
324      wal3.close();
325    }
326  }
327
328  /**
329   * Test case of HRegion that is only made out of bulk loaded files.  Assert
330   * that we don't 'crash'.
331   * @throws IOException
332   * @throws IllegalAccessException
333   * @throws NoSuchFieldException
334   * @throws IllegalArgumentException
335   * @throws SecurityException
336   */
337  @Test
338  public void testRegionMadeOfBulkLoadedFilesOnly()
339  throws IOException, SecurityException, IllegalArgumentException,
340      NoSuchFieldException, IllegalAccessException, InterruptedException {
341    final TableName tableName =
342        TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
343    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
344    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
345    deleteDir(basedir);
346    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
347    Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
348    HBaseTestingUtility.closeRegionAndWAL(region2);
349    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
350    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
351
352    byte [] family = htd.getFamilies().iterator().next().getName();
353    Path f =  new Path(basedir, "hfile");
354    HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
355        Bytes.toBytes("z"), 10);
356    List<Pair<byte[], String>> hfs = new ArrayList<>(1);
357    hfs.add(Pair.newPair(family, f.toString()));
358    region.bulkLoadHFiles(hfs, true, null);
359
360    // Add an edit so something in the WAL
361    byte[] row = tableName.getName();
362    region.put((new Put(row)).addColumn(family, family, family));
363    wal.sync();
364    final int rowsInsertedCount = 11;
365
366    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
367
368    // Now 'crash' the region by stealing its wal
369    final Configuration newConf = HBaseConfiguration.create(this.conf);
370    User user = HBaseTestingUtility.getDifferentUser(newConf,
371        tableName.getNameAsString());
372    user.runAs(new PrivilegedExceptionAction() {
373      @Override
374      public Object run() throws Exception {
375        runWALSplit(newConf);
376        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
377
378        HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
379          hbaseRootDir, hri, htd, wal2);
380        long seqid2 = region2.getOpenSeqNum();
381        assertTrue(seqid2 > -1);
382        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
383
384        // I can't close wal1.  Its been appropriated when we split.
385        region2.close();
386        wal2.close();
387        return null;
388      }
389    });
390  }
391
392  /**
393   * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
394   * files) and an edit in the memstore.
395   * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries
396   * from being replayed"
397   * @throws IOException
398   * @throws IllegalAccessException
399   * @throws NoSuchFieldException
400   * @throws IllegalArgumentException
401   * @throws SecurityException
402   */
403  @Test
404  public void testCompactedBulkLoadedFiles()
405      throws IOException, SecurityException, IllegalArgumentException,
406      NoSuchFieldException, IllegalAccessException, InterruptedException {
407    final TableName tableName =
408        TableName.valueOf("testCompactedBulkLoadedFiles");
409    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
410    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
411    deleteDir(basedir);
412    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
413    HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
414    HBaseTestingUtility.closeRegionAndWAL(region2);
415    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
416    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
417
418    // Add an edit so something in the WAL
419    byte [] row = tableName.getName();
420    byte [] family = htd.getFamilies().iterator().next().getName();
421    region.put((new Put(row)).addColumn(family, family, family));
422    wal.sync();
423
424    List <Pair<byte[],String>>  hfs= new ArrayList<>(1);
425    for (int i = 0; i < 3; i++) {
426      Path f = new Path(basedir, "hfile"+i);
427      HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
428          Bytes.toBytes(i + "50"), 10);
429      hfs.add(Pair.newPair(family, f.toString()));
430    }
431    region.bulkLoadHFiles(hfs, true, null);
432    final int rowsInsertedCount = 31;
433    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
434
435    // major compact to turn all the bulk loaded files into one normal file
436    region.compact(true);
437    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
438
439    // Now 'crash' the region by stealing its wal
440    final Configuration newConf = HBaseConfiguration.create(this.conf);
441    User user = HBaseTestingUtility.getDifferentUser(newConf,
442        tableName.getNameAsString());
443    user.runAs(new PrivilegedExceptionAction() {
444      @Override
445      public Object run() throws Exception {
446        runWALSplit(newConf);
447        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
448
449        HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
450            hbaseRootDir, hri, htd, wal2);
451        long seqid2 = region2.getOpenSeqNum();
452        assertTrue(seqid2 > -1);
453        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
454
455        // I can't close wal1.  Its been appropriated when we split.
456        region2.close();
457        wal2.close();
458        return null;
459      }
460    });
461  }
462
463
464  /**
465   * Test writing edits into an HRegion, closing it, splitting logs, opening
466   * Region again.  Verify seqids.
467   * @throws IOException
468   * @throws IllegalAccessException
469   * @throws NoSuchFieldException
470   * @throws IllegalArgumentException
471   * @throws SecurityException
472   */
473  @Test
474  public void testReplayEditsWrittenViaHRegion()
475  throws IOException, SecurityException, IllegalArgumentException,
476      NoSuchFieldException, IllegalAccessException, InterruptedException {
477    final TableName tableName =
478        TableName.valueOf("testReplayEditsWrittenViaHRegion");
479    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
480    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
481    deleteDir(basedir);
482    final byte[] rowName = tableName.getName();
483    final int countPerFamily = 10;
484    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
485    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
486    HBaseTestingUtility.closeRegionAndWAL(region3);
487    // Write countPerFamily edits into the three families.  Do a flush on one
488    // of the families during the load of edits so its seqid is not same as
489    // others to test we do right thing when different seqids.
490    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
491    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
492    long seqid = region.getOpenSeqNum();
493    boolean first = true;
494    for (HColumnDescriptor hcd: htd.getFamilies()) {
495      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
496      if (first) {
497        // If first, so we have at least one family w/ different seqid to rest.
498        region.flush(true);
499        first = false;
500      }
501    }
502    // Now assert edits made it in.
503    final Get g = new Get(rowName);
504    Result result = region.get(g);
505    assertEquals(countPerFamily * htd.getFamilies().size(),
506      result.size());
507    // Now close the region (without flush), split the log, reopen the region and assert that
508    // replay of log has the correct effect, that our seqids are calculated correctly so
509    // all edits in logs are seen as 'stale'/old.
510    region.close(true);
511    wal.shutdown();
512    runWALSplit(this.conf);
513    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
514    HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
515    long seqid2 = region2.getOpenSeqNum();
516    assertTrue(seqid + result.size() < seqid2);
517    final Result result1b = region2.get(g);
518    assertEquals(result.size(), result1b.size());
519
520    // Next test.  Add more edits, then 'crash' this region by stealing its wal
521    // out from under it and assert that replay of the log adds the edits back
522    // correctly when region is opened again.
523    for (HColumnDescriptor hcd: htd.getFamilies()) {
524      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
525    }
526    // Get count of edits.
527    final Result result2 = region2.get(g);
528    assertEquals(2 * result.size(), result2.size());
529    wal2.sync();
530    final Configuration newConf = HBaseConfiguration.create(this.conf);
531    User user = HBaseTestingUtility.getDifferentUser(newConf,
532      tableName.getNameAsString());
533    user.runAs(new PrivilegedExceptionAction<Object>() {
534      @Override
535      public Object run() throws Exception {
536        runWALSplit(newConf);
537        FileSystem newFS = FileSystem.get(newConf);
538        // Make a new wal for new region open.
539        WAL wal3 = createWAL(newConf, hbaseRootDir, logName);
540        final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
541        HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
542          @Override
543          protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) {
544            super.restoreEdit(s, cell, memstoreSizing);
545            countOfRestoredEdits.incrementAndGet();
546          }
547        };
548        long seqid3 = region3.initialize();
549        Result result3 = region3.get(g);
550        // Assert that count of cells is same as before crash.
551        assertEquals(result2.size(), result3.size());
552        assertEquals(htd.getFamilies().size() * countPerFamily,
553          countOfRestoredEdits.get());
554
555        // I can't close wal1.  Its been appropriated when we split.
556        region3.close();
557        wal3.close();
558        return null;
559      }
560    });
561  }
562
563  /**
564   * Test that we recover correctly when there is a failure in between the
565   * flushes. i.e. Some stores got flushed but others did not.
566   *
567   * Unfortunately, there is no easy hook to flush at a store level. The way
568   * we get around this is by flushing at the region level, and then deleting
569   * the recently flushed store file for one of the Stores. This would put us
570   * back in the situation where all but that store got flushed and the region
571   * died.
572   *
573   * We restart Region again, and verify that the edits were replayed.
574   *
575   * @throws IOException
576   * @throws IllegalAccessException
577   * @throws NoSuchFieldException
578   * @throws IllegalArgumentException
579   * @throws SecurityException
580   */
581  @Test
582  public void testReplayEditsAfterPartialFlush()
583  throws IOException, SecurityException, IllegalArgumentException,
584      NoSuchFieldException, IllegalAccessException, InterruptedException {
585    final TableName tableName =
586        TableName.valueOf("testReplayEditsWrittenViaHRegion");
587    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
588    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
589    deleteDir(basedir);
590    final byte[] rowName = tableName.getName();
591    final int countPerFamily = 10;
592    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
593    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
594    HBaseTestingUtility.closeRegionAndWAL(region3);
595    // Write countPerFamily edits into the three families.  Do a flush on one
596    // of the families during the load of edits so its seqid is not same as
597    // others to test we do right thing when different seqids.
598    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
599    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
600    long seqid = region.getOpenSeqNum();
601    for (HColumnDescriptor hcd: htd.getFamilies()) {
602      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
603    }
604
605    // Now assert edits made it in.
606    final Get g = new Get(rowName);
607    Result result = region.get(g);
608    assertEquals(countPerFamily * htd.getFamilies().size(),
609      result.size());
610
611    // Let us flush the region
612    region.flush(true);
613    region.close(true);
614    wal.shutdown();
615
616    // delete the store files in the second column family to simulate a failure
617    // in between the flushcache();
618    // we have 3 families. killing the middle one ensures that taking the maximum
619    // will make us fail.
620    int cf_count = 0;
621    for (HColumnDescriptor hcd: htd.getFamilies()) {
622      cf_count++;
623      if (cf_count == 2) {
624        region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
625      }
626    }
627
628
629    // Let us try to split and recover
630    runWALSplit(this.conf);
631    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
632    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
633    long seqid2 = region2.getOpenSeqNum();
634    assertTrue(seqid + result.size() < seqid2);
635
636    final Result result1b = region2.get(g);
637    assertEquals(result.size(), result1b.size());
638  }
639
640
641  // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
642  // Only throws exception if throwExceptionWhenFlushing is set true.
643  public static class CustomStoreFlusher extends DefaultStoreFlusher {
644    // Switch between throw and not throw exception in flush
645    public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
646
647    public CustomStoreFlusher(Configuration conf, HStore store) {
648      super(conf, store);
649    }
650
651    @Override
652    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
653        MonitoredTask status, ThroughputController throughputController,
654        FlushLifeCycleTracker tracker) throws IOException {
655      if (throwExceptionWhenFlushing.get()) {
656        throw new IOException("Simulated exception by tests");
657      }
658      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
659    }
660  };
661
662  /**
663   * Test that we could recover the data correctly after aborting flush. In the
664   * test, first we abort flush after writing some data, then writing more data
665   * and flush again, at last verify the data.
666   * @throws IOException
667   */
668  @Test
669  public void testReplayEditsAfterAbortingFlush() throws IOException {
670    final TableName tableName =
671        TableName.valueOf("testReplayEditsAfterAbortingFlush");
672    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
673    final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName);
674    deleteDir(basedir);
675    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
676    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
677    HBaseTestingUtility.closeRegionAndWAL(region3);
678    // Write countPerFamily edits into the three families. Do a flush on one
679    // of the families during the load of edits so its seqid is not same as
680    // others to test we do right thing when different seqids.
681    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
682    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
683    Mockito.doReturn(false).when(rsServices).isAborted();
684    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
685    when(rsServices.getConfiguration()).thenReturn(conf);
686    Configuration customConf = new Configuration(this.conf);
687    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
688        CustomStoreFlusher.class.getName());
689    HRegion region =
690      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
691    int writtenRowCount = 10;
692    List<HColumnDescriptor> families = new ArrayList<>(htd.getFamilies());
693    for (int i = 0; i < writtenRowCount; i++) {
694      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
695      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
696          Bytes.toBytes("val"));
697      region.put(put);
698    }
699
700    // Now assert edits made it in.
701    RegionScanner scanner = region.getScanner(new Scan());
702    assertEquals(writtenRowCount, getScannedCount(scanner));
703
704    // Let us flush the region
705    CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
706    try {
707      region.flush(true);
708      fail("Injected exception hasn't been thrown");
709    } catch (IOException e) {
710      LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
711      // simulated to abort server
712      Mockito.doReturn(true).when(rsServices).isAborted();
713      region.setClosing(false); // region normally does not accept writes after
714      // DroppedSnapshotException. We mock around it for this test.
715    }
716    // writing more data
717    int moreRow = 10;
718    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
719      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
720      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
721          Bytes.toBytes("val"));
722      region.put(put);
723    }
724    writtenRowCount += moreRow;
725    // call flush again
726    CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
727    try {
728      region.flush(true);
729    } catch (IOException t) {
730      LOG.info("Expected exception when flushing region because server is stopped,"
731          + t.getMessage());
732    }
733
734    region.close(true);
735    wal.shutdown();
736
737    // Let us try to split and recover
738    runWALSplit(this.conf);
739    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
740    Mockito.doReturn(false).when(rsServices).isAborted();
741    HRegion region2 =
742      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
743    scanner = region2.getScanner(new Scan());
744    assertEquals(writtenRowCount, getScannedCount(scanner));
745  }
746
747  private int getScannedCount(RegionScanner scanner) throws IOException {
748    int scannedCount = 0;
749    List<Cell> results = new ArrayList<>();
750    while (true) {
751      boolean existMore = scanner.next(results);
752      if (!results.isEmpty())
753        scannedCount++;
754      if (!existMore)
755        break;
756      results.clear();
757    }
758    return scannedCount;
759  }
760
761  /**
762   * Create an HRegion with the result of a WAL split and test we only see the
763   * good edits
764   * @throws Exception
765   */
766  @Test
767  public void testReplayEditsWrittenIntoWAL() throws Exception {
768    final TableName tableName =
769        TableName.valueOf("testReplayEditsWrittenIntoWAL");
770    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
771    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
772    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
773    deleteDir(basedir);
774
775    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
776    HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
777    HBaseTestingUtility.closeRegionAndWAL(region2);
778    final WAL wal = createWAL(this.conf, hbaseRootDir, logName);
779    final byte[] rowName = tableName.getName();
780    final byte[] regionName = hri.getEncodedNameAsBytes();
781
782    // Add 1k to each family.
783    final int countPerFamily = 1000;
784    Set<byte[]> familyNames = new HashSet<>();
785    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
786    for(byte[] fam : htd.getFamiliesKeys()) {
787      scopes.put(fam, 0);
788    }
789    for (HColumnDescriptor hcd: htd.getFamilies()) {
790      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
791          ee, wal, htd, mvcc, scopes);
792      familyNames.add(hcd.getName());
793    }
794
795    // Add a cache flush, shouldn't have any effect
796    wal.startCacheFlush(regionName, familyNames);
797    wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM);
798
799    // Add an edit to another family, should be skipped.
800    WALEdit edit = new WALEdit();
801    long now = ee.currentTime();
802    edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
803      now, rowName));
804    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
805      edit);
806
807    // Delete the c family to verify deletes make it over.
808    edit = new WALEdit();
809    now = ee.currentTime();
810    edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
811    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes),
812      edit);
813
814    // Sync.
815    wal.sync();
816    // Make a new conf and a new fs for the splitter to run on so we can take
817    // over old wal.
818    final Configuration newConf = HBaseConfiguration.create(this.conf);
819    User user = HBaseTestingUtility.getDifferentUser(newConf,
820      ".replay.wal.secondtime");
821    user.runAs(new PrivilegedExceptionAction<Void>() {
822      @Override
823      public Void run() throws Exception {
824        runWALSplit(newConf);
825        FileSystem newFS = FileSystem.get(newConf);
826        // 100k seems to make for about 4 flushes during HRegion#initialize.
827        newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
828        // Make a new wal for new region.
829        WAL newWal = createWAL(newConf, hbaseRootDir, logName);
830        final AtomicInteger flushcount = new AtomicInteger(0);
831        try {
832          final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
833            @Override
834            protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
835                final Collection<HStore> storesToFlush, MonitoredTask status,
836                boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
837              LOG.info("InternalFlushCache Invoked");
838              FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
839                Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker);
840              flushcount.incrementAndGet();
841              return fs;
842            }
843          };
844          // The seq id this region has opened up with
845          long seqid = region.initialize();
846
847          // The mvcc readpoint of from inserting data.
848          long writePoint = mvcc.getWritePoint();
849
850          // We flushed during init.
851          assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
852          assertTrue((seqid - 1) == writePoint);
853
854          Get get = new Get(rowName);
855          Result result = region.get(get);
856          // Make sure we only see the good edits
857          assertEquals(countPerFamily * (htd.getFamilies().size() - 1),
858            result.size());
859          region.close();
860        } finally {
861          newWal.close();
862        }
863        return null;
864      }
865    });
866  }
867
868  @Test
869  // the following test is for HBASE-6065
870  public void testSequentialEditLogSeqNum() throws IOException {
871    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
872    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
873    final Path basedir =
874      CommonFSUtils.getWALTableDir(conf, tableName);
875    deleteDir(basedir);
876    final byte[] rowName = tableName.getName();
877    final int countPerFamily = 10;
878    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
879
880    // Mock the WAL
881    MockWAL wal = createMockWAL();
882
883    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
884    for (HColumnDescriptor hcd : htd.getFamilies()) {
885      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
886    }
887
888    // Let us flush the region
889    // But this time completeflushcache is not yet done
890    region.flush(true);
891    for (HColumnDescriptor hcd : htd.getFamilies()) {
892      addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
893    }
894    long lastestSeqNumber = region.getReadPoint(null);
895    // get the current seq no
896    wal.doCompleteCacheFlush = true;
897    // allow complete cache flush with the previous seq number got after first
898    // set of edits.
899    wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
900    wal.shutdown();
901    FileStatus[] listStatus = wal.getFiles();
902    assertNotNull(listStatus);
903    assertTrue(listStatus.length > 0);
904    WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null,
905      wals, null);
906    FileStatus[] listStatus1 =
907      this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName),
908        new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() {
909          @Override
910          public boolean accept(Path p) {
911            return !WALSplitUtil.isSequenceIdFile(p);
912          }
913        });
914    int editCount = 0;
915    for (FileStatus fileStatus : listStatus1) {
916      editCount = Integer.parseInt(fileStatus.getPath().getName());
917    }
918    // The sequence number should be same
919    assertEquals(
920        "The sequence number of the recoverd.edits and the current edit seq should be same",
921        lastestSeqNumber, editCount);
922  }
923
924  /**
925   * testcase for https://issues.apache.org/jira/browse/HBASE-15252
926   */
927  @Test
928  public void testDatalossWhenInputError() throws Exception {
929    final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
930    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
931    final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName);
932    deleteDir(basedir);
933    final byte[] rowName = tableName.getName();
934    final int countPerFamily = 10;
935    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
936    HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
937    Path regionDir = region1.getWALRegionDir();
938    HBaseTestingUtility.closeRegionAndWAL(region1);
939
940    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
941    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
942    for (HColumnDescriptor hcd : htd.getFamilies()) {
943      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
944    }
945    // Now assert edits made it in.
946    final Get g = new Get(rowName);
947    Result result = region.get(g);
948    assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
949    // Now close the region (without flush), split the log, reopen the region and assert that
950    // replay of log has the correct effect.
951    region.close(true);
952    wal.shutdown();
953
954    runWALSplit(this.conf);
955
956    // here we let the DFSInputStream throw an IOException just after the WALHeader.
957    Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
958    FSDataInputStream stream = fs.open(editFile);
959    stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
960    Class<? extends AbstractFSWALProvider.Reader> logReaderClass =
961        conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
962          AbstractFSWALProvider.Reader.class);
963    AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance();
964    reader.init(this.fs, editFile, conf, stream);
965    final long headerLength = stream.getPos();
966    reader.close();
967    FileSystem spyFs = spy(this.fs);
968    doAnswer(new Answer<FSDataInputStream>() {
969
970      @Override
971      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
972        FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
973        Field field = FilterInputStream.class.getDeclaredField("in");
974        field.setAccessible(true);
975        final DFSInputStream in = (DFSInputStream) field.get(stream);
976        DFSInputStream spyIn = spy(in);
977        doAnswer(new Answer<Integer>() {
978
979          private long pos;
980
981          @Override
982          public Integer answer(InvocationOnMock invocation) throws Throwable {
983            if (pos >= headerLength) {
984              throw new IOException("read over limit");
985            }
986            int b = (Integer) invocation.callRealMethod();
987            if (b > 0) {
988              pos += b;
989            }
990            return b;
991          }
992        }).when(spyIn).read(any(byte[].class), anyInt(), anyInt());
993        doAnswer(new Answer<Void>() {
994
995          @Override
996          public Void answer(InvocationOnMock invocation) throws Throwable {
997            invocation.callRealMethod();
998            in.close();
999            return null;
1000          }
1001        }).when(spyIn).close();
1002        field.set(stream, spyIn);
1003        return stream;
1004      }
1005    }).when(spyFs).open(eq(editFile));
1006
1007    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
1008    HRegion region2;
1009    try {
1010      // log replay should fail due to the IOException, otherwise we may lose data.
1011      region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
1012      assertEquals(result.size(), region2.get(g).size());
1013    } catch (IOException e) {
1014      assertEquals("read over limit", e.getMessage());
1015    }
1016    region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
1017    assertEquals(result.size(), region2.get(g).size());
1018  }
1019
1020  /**
1021   * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
1022   */
1023  private void testNameConflictWhenSplit(boolean largeFirst) throws IOException,
1024      StreamLacksCapabilityException {
1025    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
1026    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
1027    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
1028    final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName);
1029    deleteDir(basedir);
1030
1031    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
1032    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1033    for (byte[] fam : htd.getFamiliesKeys()) {
1034      scopes.put(fam, 0);
1035    }
1036    HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
1037    HBaseTestingUtility.closeRegionAndWAL(region);
1038    final byte[] family = htd.getColumnFamilies()[0].getName();
1039    final byte[] rowName = tableName.getName();
1040    FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes);
1041    FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes);
1042
1043    Path largeFile = new Path(logDir, "wal-1");
1044    Path smallFile = new Path(logDir, "wal-2");
1045    writerWALFile(largeFile, Arrays.asList(entry1, entry2));
1046    writerWALFile(smallFile, Arrays.asList(entry2));
1047    FileStatus first, second;
1048    if (largeFirst) {
1049      first = fs.getFileStatus(largeFile);
1050      second = fs.getFileStatus(smallFile);
1051    } else {
1052      first = fs.getFileStatus(smallFile);
1053      second = fs.getFileStatus(largeFile);
1054    }
1055    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null);
1056    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null);
1057    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
1058    region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
1059    assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
1060    assertEquals(2, region.get(new Get(rowName)).size());
1061  }
1062
1063  @Test
1064  public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException {
1065    testNameConflictWhenSplit(true);
1066  }
1067
1068  @Test
1069  public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException {
1070    testNameConflictWhenSplit(false);
1071  }
1072
1073  static class MockWAL extends FSHLog {
1074    boolean doCompleteCacheFlush = false;
1075
1076    public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
1077        throws IOException {
1078      super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1079    }
1080
1081    @Override
1082    public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
1083      if (!doCompleteCacheFlush) {
1084        return;
1085      }
1086      super.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
1087    }
1088  }
1089
1090  private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1091    HTableDescriptor htd = new HTableDescriptor(tableName);
1092    HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1093    htd.addFamily(a);
1094    return htd;
1095  }
1096
1097  private MockWAL createMockWAL() throws IOException {
1098    MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
1099    wal.init();
1100    // Set down maximum recovery so we dfsclient doesn't linger retrying something
1101    // long gone.
1102    HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1103    return wal;
1104  }
1105
1106  // Flusher used in this test.  Keep count of how often we are called and
1107  // actually run the flush inside here.
1108  static class TestFlusher implements FlushRequester {
1109    private HRegion r;
1110
1111    @Override
1112    public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
1113      try {
1114        r.flush(false);
1115        return true;
1116      } catch (IOException e) {
1117        throw new RuntimeException("Exception flushing", e);
1118      }
1119    }
1120
1121    @Override
1122    public boolean requestFlush(HRegion region, List<byte[]> families,
1123        FlushLifeCycleTracker tracker) {
1124      return true;
1125    }
1126
1127    @Override
1128    public boolean requestDelayedFlush(HRegion region, long when) {
1129      return true;
1130    }
1131
1132    @Override
1133    public void registerFlushRequestListener(FlushRequestListener listener) {
1134
1135    }
1136
1137    @Override
1138    public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1139      return false;
1140    }
1141
1142    @Override
1143    public void setGlobalMemStoreLimit(long globalMemStoreSize) {
1144
1145    }
1146  }
1147
1148  private WALKeyImpl createWALKey(final TableName tableName, final HRegionInfo hri,
1149      final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
1150    return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
1151  }
1152
1153  private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
1154      int index) {
1155    byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
1156    byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index));
1157    WALEdit edit = new WALEdit();
1158    edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
1159    return edit;
1160  }
1161
1162  private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
1163    byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
1164    int index, NavigableMap<byte[], Integer> scopes) throws IOException {
1165    FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
1166      createWALEdit(rowName, family, ee, index), hri, true, null);
1167    entry.stampRegionSequenceId(mvcc.begin());
1168    return entry;
1169  }
1170
1171  private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
1172      final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1173      final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc,
1174      NavigableMap<byte[], Integer> scopes) throws IOException {
1175    for (int j = 0; j < count; j++) {
1176      wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes),
1177        createWALEdit(rowName, family, ee, j));
1178    }
1179    wal.sync();
1180  }
1181
1182  public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
1183      EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
1184    List<Put> puts = new ArrayList<>();
1185    for (int j = 0; j < count; j++) {
1186      byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1187      Put p = new Put(rowName);
1188      p.addColumn(family, qualifier, ee.currentTime(), rowName);
1189      r.put(p);
1190      puts.add(p);
1191    }
1192    return puts;
1193  }
1194
1195  /*
1196   * Creates an HRI around an HTD that has <code>tableName</code> and three
1197   * column families named 'a','b', and 'c'.
1198   * @param tableName Name of table to use when we create HTableDescriptor.
1199   */
1200   private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1201    return new HRegionInfo(tableName, null, null, false);
1202   }
1203
1204  /*
1205   * Run the split.  Verify only single split file made.
1206   * @param c
1207   * @return The single split file made
1208   * @throws IOException
1209   */
1210  private Path runWALSplit(final Configuration c) throws IOException {
1211    List<Path> splits = WALSplitter.split(
1212      hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
1213    // Split should generate only 1 file since there's only 1 region
1214    assertEquals("splits=" + splits, 1, splits.size());
1215    // Make sure the file exists
1216    assertTrue(fs.exists(splits.get(0)));
1217    LOG.info("Split file=" + splits.get(0));
1218    return splits.get(0);
1219  }
1220
1221  private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1222    HTableDescriptor htd = new HTableDescriptor(tableName);
1223    HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1224    htd.addFamily(a);
1225    HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
1226    htd.addFamily(b);
1227    HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
1228    htd.addFamily(c);
1229    return htd;
1230  }
1231
1232  private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException,
1233      StreamLacksCapabilityException {
1234    fs.mkdirs(file.getParent());
1235    ProtobufLogWriter writer = new ProtobufLogWriter();
1236    writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file));
1237    for (FSWALEntry entry : entries) {
1238      writer.append(entry);
1239    }
1240    writer.sync(false);
1241    writer.close();
1242  }
1243
1244  protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName)
1245      throws IOException;
1246}