001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotEquals;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.Comparator;
036import java.util.HashSet;
037import java.util.List;
038import java.util.Map;
039import java.util.NavigableMap;
040import java.util.Set;
041import java.util.TreeMap;
042import java.util.UUID;
043import java.util.concurrent.ConcurrentSkipListMap;
044import java.util.concurrent.CountDownLatch;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.stream.Collectors;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.FileStatus;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.Coprocessor;
054import org.apache.hadoop.hbase.ExtendedCellScanner;
055import org.apache.hadoop.hbase.HBaseConfiguration;
056import org.apache.hadoop.hbase.HBaseTestingUtil;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.KeyValue;
059import org.apache.hadoop.hbase.ServerName;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
063import org.apache.hadoop.hbase.client.Durability;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.RegionInfo;
067import org.apache.hadoop.hbase.client.RegionInfoBuilder;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.client.TableDescriptor;
070import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
071import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
072import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
073import org.apache.hadoop.hbase.regionserver.ChunkCreator;
074import org.apache.hadoop.hbase.regionserver.FlushPolicy;
075import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
076import org.apache.hadoop.hbase.regionserver.HRegion;
077import org.apache.hadoop.hbase.regionserver.HStore;
078import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
079import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
080import org.apache.hadoop.hbase.regionserver.RegionServerServices;
081import org.apache.hadoop.hbase.regionserver.SequenceId;
082import org.apache.hadoop.hbase.util.Bytes;
083import org.apache.hadoop.hbase.util.CommonFSUtils;
084import org.apache.hadoop.hbase.util.EnvironmentEdge;
085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
086import org.apache.hadoop.hbase.util.Threads;
087import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
088import org.apache.hadoop.hbase.wal.WAL;
089import org.apache.hadoop.hbase.wal.WALEdit;
090import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
091import org.apache.hadoop.hbase.wal.WALKey;
092import org.apache.hadoop.hbase.wal.WALKeyImpl;
093import org.junit.AfterClass;
094import org.junit.Before;
095import org.junit.BeforeClass;
096import org.junit.Rule;
097import org.junit.Test;
098import org.junit.rules.TestName;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102public abstract class AbstractTestFSWAL {
103
104  protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFSWAL.class);
105
106  protected static Configuration CONF;
107  protected static FileSystem FS;
108  protected static Path DIR;
109  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
110
111  @Rule
112  public final TestName currentTest = new TestName();
113
114  @Before
115  public void setUp() throws Exception {
116    FileStatus[] entries = FS.listStatus(new Path("/"));
117    for (FileStatus dir : entries) {
118      FS.delete(dir.getPath(), true);
119    }
120    final Path hbaseDir = TEST_UTIL.createRootDir();
121    final Path hbaseWALDir = TEST_UTIL.createWALRootDir();
122    DIR = new Path(hbaseWALDir, currentTest.getMethodName());
123    assertNotEquals(hbaseDir, hbaseWALDir);
124    FS.mkdirs(DIR);
125  }
126
127  @BeforeClass
128  public static void setUpBeforeClass() throws Exception {
129    // Make block sizes small.
130    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
131    // quicker heartbeat interval for faster DN death notification
132    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
133    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
134    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
135
136    // faster failover with cluster.shutdown();fs.close() idiom
137    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
138    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
139    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
140    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
141      SampleRegionWALCoprocessor.class.getName());
142    TEST_UTIL.startMiniDFSCluster(3);
143
144    CONF = TEST_UTIL.getConfiguration();
145    FS = TEST_UTIL.getDFSCluster().getFileSystem();
146  }
147
148  @AfterClass
149  public static void tearDownAfterClass() throws Exception {
150    TEST_UTIL.shutdownMiniCluster();
151  }
152
153  protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String WALDir,
154    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
155    boolean failIfWALExists, String prefix, String suffix) throws IOException;
156
157  protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String WALDir,
158    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
159    boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException;
160
161  /**
162   * A loaded WAL coprocessor won't break existing WAL test cases.
163   */
164  @Test
165  public void testWALCoprocessorLoaded() throws Exception {
166    // test to see whether the coprocessor is loaded or not.
167    AbstractFSWAL<?> wal = null;
168    try {
169      wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
170        HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
171      WALCoprocessorHost host = wal.getCoprocessorHost();
172      Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
173      assertNotNull(c);
174    } finally {
175      if (wal != null) {
176        wal.close();
177      }
178    }
179  }
180
181  protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
182    MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes, String cf)
183    throws IOException {
184    final byte[] row = Bytes.toBytes(cf);
185    for (int i = 0; i < times; i++) {
186      long timestamp = EnvironmentEdgeManager.currentTime();
187      WALEdit cols = new WALEdit();
188      WALEditInternalHelper.addExtendedCell(cols, new KeyValue(row, row, row, timestamp, row));
189      WALKeyImpl key =
190        new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), SequenceId.NO_SEQUENCE_ID,
191          timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
192      log.appendData(hri, key, cols);
193    }
194    log.sync();
195  }
196
197  /**
198   * helper method to simulate region flush for a WAL.
199   */
200  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
201    wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
202    wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
203  }
204
205  /**
206   * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
207   * exception if we do). Comparison is based on the timestamp present in the wal name.
208   */
209  @Test
210  public void testWALComparator() throws Exception {
211    AbstractFSWAL<?> wal1 = null;
212    AbstractFSWAL<?> walMeta = null;
213    try {
214      wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
215        HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
216      LOG.debug("Log obtained is: " + wal1);
217      Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
218      Path p1 = wal1.computeFilename(11);
219      Path p2 = wal1.computeFilename(12);
220      // comparing with itself returns 0
221      assertTrue(comp.compare(p1, p1) == 0);
222      // comparing with different filenum.
223      assertTrue(comp.compare(p1, p2) < 0);
224      walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
225        HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null,
226        AbstractFSWALProvider.META_WAL_PROVIDER_ID);
227      Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
228
229      Path p1WithMeta = walMeta.computeFilename(11);
230      Path p2WithMeta = walMeta.computeFilename(12);
231      assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
232      assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
233      // mixing meta and non-meta logs gives error
234      boolean ex = false;
235      try {
236        comp.compare(p1WithMeta, p2);
237      } catch (IllegalArgumentException e) {
238        ex = true;
239      }
240      assertTrue("Comparator doesn't complain while checking meta log files", ex);
241      boolean exMeta = false;
242      try {
243        compMeta.compare(p1WithMeta, p2);
244      } catch (IllegalArgumentException e) {
245        exMeta = true;
246      }
247      assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
248    } finally {
249      if (wal1 != null) {
250        wal1.close();
251      }
252      if (walMeta != null) {
253        walMeta.close();
254      }
255    }
256  }
257
258  // now we will close asynchronously and will not archive a wal file unless it is fully closed, so
259  // sometimes we need to wait a bit before asserting, especially when you want to test the removal
260  // of numRolledLogFiles
261  private void waitNumRolledLogFiles(AbstractFSWAL<?> wal, int expected) {
262    TEST_UTIL.waitFor(5000, () -> wal.getNumRolledLogFiles() == expected);
263  }
264
265  private void testFindMemStoresEligibleForFlush(AbstractFSWAL<?> wal) throws IOException {
266    String cf1 = "cf1";
267    String cf2 = "cf2";
268    String cf3 = "cf3";
269    TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1"))
270      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build();
271    TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2"))
272      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build();
273    RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
274    RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
275
276    List<ColumnFamilyDescriptor> cfs = new ArrayList<>();
277    cfs.add(ColumnFamilyDescriptorBuilder.of(cf1));
278    cfs.add(ColumnFamilyDescriptorBuilder.of(cf2));
279    TableDescriptor t3 =
280      TableDescriptorBuilder.newBuilder(TableName.valueOf("t3")).setColumnFamilies(cfs).build();
281    RegionInfo hri3 = RegionInfoBuilder.newBuilder(t3.getTableName()).build();
282
283    // add edits and roll the wal
284    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
285    NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
286    for (byte[] fam : t1.getColumnFamilyNames()) {
287      scopes1.put(fam, 0);
288    }
289    NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
290    for (byte[] fam : t2.getColumnFamilyNames()) {
291      scopes2.put(fam, 0);
292    }
293    NavigableMap<byte[], Integer> scopes3 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
294    for (byte[] fam : t3.getColumnFamilyNames()) {
295      scopes3.put(fam, 0);
296    }
297    addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
298    wal.rollWriter();
299    // add some more edits and roll the wal. This would reach the log number threshold
300    addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
301    wal.rollWriter();
302    // with above rollWriter call, the max logs limit is reached.
303    waitNumRolledLogFiles(wal, 2);
304
305    // get the regions to flush; since there is only one region in the oldest wal, it should
306    // return only one region.
307    Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush();
308    assertEquals(1, regionsToFlush.size());
309    assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
310    // insert edits in second region
311    addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
312    // get the regions to flush, it should still read region1.
313    regionsToFlush = wal.findRegionsToForceFlush();
314    assertEquals(1, regionsToFlush.size());
315    assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
316    // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
317    // remain.
318    flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
319    wal.rollWriter();
320    // only one wal should remain now (that is for the second region).
321    waitNumRolledLogFiles(wal, 1);
322    // flush the second region
323    flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
324    wal.rollWriter(true);
325    // no wal should remain now.
326    waitNumRolledLogFiles(wal, 0);
327    // add edits both to region 1 and region 2, and roll.
328    addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
329    addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
330    wal.rollWriter();
331    // add edits and roll the writer, to reach the max logs limit.
332    waitNumRolledLogFiles(wal, 1);
333    addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
334    wal.rollWriter();
335    // it should return two regions to flush, as the oldest wal file has entries
336    // for both regions.
337    regionsToFlush = wal.findRegionsToForceFlush();
338    assertEquals(2, regionsToFlush.size());
339    // flush both regions
340    flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
341    flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
342    wal.rollWriter(true);
343    waitNumRolledLogFiles(wal, 0);
344    // Add an edit to region1, and roll the wal.
345    addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
346    // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
347    wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
348    wal.rollWriter();
349    wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
350    waitNumRolledLogFiles(wal, 1);
351
352    // clear test data
353    flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
354    wal.rollWriter(true);
355    // add edits for three familes
356    addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
357    addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2);
358    addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3);
359    wal.rollWriter();
360    addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
361    wal.rollWriter();
362    waitNumRolledLogFiles(wal, 2);
363    // flush one family before archive oldest wal
364    Set<byte[]> flushedFamilyNames = new HashSet<>();
365    flushedFamilyNames.add(Bytes.toBytes(cf1));
366    flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames);
367    regionsToFlush = wal.findRegionsToForceFlush();
368    // then only two family need to be flushed when archive oldest wal
369    assertEquals(1, regionsToFlush.size());
370    assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
371    assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size());
372  }
373
374  /**
375   * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of
376   * regions which should be flushed in order to archive the oldest wal file.
377   * <p>
378   * This method tests this behavior by inserting edits and rolling the wal enough times to reach
379   * the max number of logs threshold. It checks whether we get the "right regions and stores" for
380   * flush on rolling the wal.
381   */
382  @Test
383  public void testFindMemStoresEligibleForFlush() throws Exception {
384    LOG.debug("testFindMemStoresEligibleForFlush");
385    Configuration conf1 = HBaseConfiguration.create(CONF);
386    conf1.setInt("hbase.regionserver.maxlogs", 1);
387    try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
388      HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null)) {
389      testFindMemStoresEligibleForFlush(wal);
390    }
391
392  }
393
394  @Test(expected = IOException.class)
395  public void testFailedToCreateWALIfParentRenamed()
396    throws IOException, CommonFSUtils.StreamLacksCapabilityException {
397    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF),
398      currentTest.getMethodName(), HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
399    long filenum = EnvironmentEdgeManager.currentTime();
400    Path path = wal.computeFilename(filenum);
401    wal.createWriterInstance(FS, path);
402    Path parent = path.getParent();
403    path = wal.computeFilename(filenum + 1);
404    Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
405    FS.rename(parent, newPath);
406    wal.createWriterInstance(FS, path);
407    fail("It should fail to create the new WAL");
408  }
409
410  /**
411   * Test flush for sure has a sequence id that is beyond the last edit appended. We do this by
412   * slowing appends in the background ring buffer thread while in foreground we call flush. The
413   * addition of the sync over HRegion in flush should fix an issue where flush was returning before
414   * all of its appends had made it out to the WAL (HBASE-11109).
415   * @see <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a>
416   */
417  @Test
418  public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
419    String testName = currentTest.getMethodName();
420    final TableName tableName = TableName.valueOf(testName);
421    final RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
422    final byte[] rowName = tableName.getName();
423    final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
424      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build();
425    HRegion r = HBaseTestingUtil.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(),
426      TEST_UTIL.getConfiguration(), htd);
427    HBaseTestingUtil.closeRegionAndWAL(r);
428    final int countPerFamily = 10;
429    final AtomicBoolean goslow = new AtomicBoolean(false);
430    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
431    for (byte[] fam : htd.getColumnFamilyNames()) {
432      scopes.put(fam, 0);
433    }
434    // subclass and doctor a method.
435    AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
436      testName, CONF, null, true, null, null, new Runnable() {
437
438        @Override
439        public void run() {
440          if (goslow.get()) {
441            Threads.sleep(100);
442            LOG.debug("Sleeping before appending 100ms");
443          }
444        }
445      });
446    HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
447      TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
448    EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
449    try {
450      List<Put> puts = null;
451      for (byte[] fam : htd.getColumnFamilyNames()) {
452        puts = TestWALReplay.addRegionEdits(rowName, fam, countPerFamily, ee, region, "x");
453      }
454
455      // Now assert edits made it in.
456      final Get g = new Get(rowName);
457      Result result = region.get(g);
458      assertEquals(countPerFamily * htd.getColumnFamilyNames().size(), result.size());
459
460      // Construct a WALEdit and add it a few times to the WAL.
461      WALEdit edits = new WALEdit();
462      for (Put p : puts) {
463        ExtendedCellScanner cs = p.cellScanner();
464        while (cs.advance()) {
465          WALEditInternalHelper.addExtendedCell(edits, cs.current());
466        }
467      }
468      // Add any old cluster id.
469      List<UUID> clusterIds = new ArrayList<>(1);
470      clusterIds.add(TEST_UTIL.getRandomUUID());
471      // Now make appends run slow.
472      goslow.set(true);
473      for (int i = 0; i < countPerFamily; i++) {
474        final RegionInfo info = region.getRegionInfo();
475        final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
476          EnvironmentEdgeManager.currentTime(), clusterIds, -1, -1, region.getMVCC(), scopes);
477        wal.append(info, logkey, edits, true);
478        region.getMVCC().completeAndWait(logkey.getWriteEntry());
479      }
480      region.flush(true);
481      // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
482      long currentSequenceId = region.getReadPoint(null);
483      // Now release the appends
484      goslow.set(false);
485      assertTrue(currentSequenceId >= region.getReadPoint(null));
486    } finally {
487      region.close(true);
488      wal.close();
489    }
490  }
491
492  @Test
493  public void testSyncNoAppend() throws IOException {
494    String testName = currentTest.getMethodName();
495    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
496      CONF, null, true, null, null);
497    try {
498      wal.sync();
499    } finally {
500      wal.close();
501    }
502  }
503
504  @Test
505  public void testWriteEntryCanBeNull() throws IOException {
506    String testName = currentTest.getMethodName();
507    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
508      CONF, null, true, null, null);
509    wal.close();
510    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
511      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
512    RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
513    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
514    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
515    for (byte[] fam : td.getColumnFamilyNames()) {
516      scopes.put(fam, 0);
517    }
518    long timestamp = EnvironmentEdgeManager.currentTime();
519    byte[] row = Bytes.toBytes("row");
520    WALEdit cols = new WALEdit();
521    WALEditInternalHelper.addExtendedCell(cols, new KeyValue(row, row, row, timestamp, row));
522    WALKeyImpl key =
523      new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID,
524        timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
525    try {
526      wal.append(ri, key, cols, true);
527      fail("Should fail since the wal has already been closed");
528    } catch (IOException e) {
529      // expected
530      assertThat(e.getMessage(), containsString("log is closed"));
531      // the WriteEntry should be null since we fail before setting it.
532      assertNull(key.getWriteEntry());
533    }
534  }
535
536  @Test(expected = WALClosedException.class)
537  public void testRollWriterForClosedWAL() throws IOException {
538    String testName = currentTest.getMethodName();
539    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
540      CONF, null, true, null, null);
541    wal.close();
542    wal.rollWriter();
543  }
544
545  private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend,
546    CountDownLatch holdAppend) throws IOException {
547    FS.mkdirs(new Path(CommonFSUtils.getRootDir(CONF), testName));
548    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
549      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
550    // newWAL has already called wal.init()
551    wal.registerWALActionsListener(new WALActionsListener() {
552      @Override
553      public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
554        if (startHoldingForAppend.get()) {
555          try {
556            holdAppend.await();
557          } catch (InterruptedException e) {
558            LOG.error(e.toString(), e);
559          }
560        }
561      }
562    });
563    return wal;
564  }
565
566  private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal)
567    throws IOException {
568    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
569    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
570      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
571    TEST_UTIL.createLocalHRegion(hri, CONF, htd, wal).close();
572    RegionServerServices rsServices = mock(RegionServerServices.class);
573    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
574    when(rsServices.getConfiguration()).thenReturn(conf);
575    return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null);
576  }
577
578  private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put,
579    Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend,
580    CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend)
581    throws InterruptedException, IOException {
582    // do a regular write first because of memstore size calculation.
583    region.put(put);
584
585    startHoldingForAppend.set(true);
586    region.put(new Put(put).setDurability(Durability.ASYNC_WAL));
587
588    // give the put a chance to start
589    Threads.sleep(3000);
590
591    exec.submit(flushOrCloseRegion);
592
593    // give the flush a chance to start. Flush should have got the region lock, and
594    // should have been waiting on the mvcc complete after this.
595    Threads.sleep(3000);
596
597    // let the append to WAL go through now that the flush already started
598    holdAppend.countDown();
599    flushOrCloseFinished.await();
600  }
601
602  // Testcase for HBASE-23181
603  @Test
604  public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
605    String testName = currentTest.getMethodName();
606    byte[] b = Bytes.toBytes("b");
607    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
608      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
609
610    AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
611    CountDownLatch holdAppend = new CountDownLatch(1);
612    CountDownLatch closeFinished = new CountDownLatch(1);
613    ExecutorService exec = Executors.newFixedThreadPool(1);
614    AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
615    // open a new region which uses this WAL
616    HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal);
617    try {
618      doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> {
619        try {
620          Map<?, ?> closeResult = region.close();
621          LOG.info("Close result:" + closeResult);
622          closeFinished.countDown();
623        } catch (IOException e) {
624          LOG.error(e.toString(), e);
625        }
626      }, startHoldingForAppend, closeFinished, holdAppend);
627
628      // now check the region's unflushed seqIds.
629      long seqId = getEarliestMemStoreSeqNum(wal, region.getRegionInfo().getEncodedNameAsBytes());
630      assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
631        seqId);
632    } finally {
633      exec.shutdownNow();
634      region.close();
635      wal.close();
636    }
637  }
638
639  public static long getEarliestMemStoreSeqNum(WAL wal, byte[] encodedRegionName) {
640    if (wal != null) {
641      if (wal instanceof AbstractFSWAL) {
642        return ((AbstractFSWAL<?>) wal).getSequenceIdAccounting()
643          .getLowestSequenceId(encodedRegionName);
644      }
645    }
646    return HConstants.NO_SEQNUM;
647  }
648
649  private static final Set<byte[]> STORES_TO_FLUSH =
650    Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR));
651
652  // Testcase for HBASE-23157
653  @Test
654  public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException {
655    String testName = currentTest.getMethodName();
656    byte[] a = Bytes.toBytes("a");
657    byte[] b = Bytes.toBytes("b");
658    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
659      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(a))
660      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
661
662    AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
663    CountDownLatch holdAppend = new CountDownLatch(1);
664    CountDownLatch flushFinished = new CountDownLatch(1);
665    ExecutorService exec = Executors.newFixedThreadPool(2);
666    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
667    conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class,
668      FlushPolicy.class);
669    AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
670    // open a new region which uses this WAL
671    HRegion region = createHoldingHRegion(conf, htd, wal);
672    try {
673      Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b);
674      doPutWithAsyncWAL(exec, region, put, () -> {
675        try {
676          HRegion.FlushResult flushResult = region.flush(true);
677          LOG.info("Flush result:" + flushResult.getResult());
678          LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
679          flushFinished.countDown();
680        } catch (IOException e) {
681          LOG.error(e.toString(), e);
682        }
683      }, startHoldingForAppend, flushFinished, holdAppend);
684
685      // get the max flushed sequence id after the first flush
686      long maxFlushedSeqId1 = region.getMaxFlushedSeqId();
687
688      region.put(put);
689      // this time we only flush family a
690      STORES_TO_FLUSH.add(a);
691      region.flush(false);
692
693      // get the max flushed sequence id after the second flush
694      long maxFlushedSeqId2 = region.getMaxFlushedSeqId();
695      // make sure that the maxFlushedSequenceId does not go backwards
696      assertTrue(
697        "maxFlushedSeqId1(" + maxFlushedSeqId1
698          + ") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")",
699        maxFlushedSeqId1 <= maxFlushedSeqId2);
700    } finally {
701      exec.shutdownNow();
702      region.close();
703      wal.close();
704    }
705  }
706
707  public static final class FlushSpecificStoresPolicy extends FlushPolicy {
708
709    @Override
710    public Collection<HStore> selectStoresToFlush() {
711      if (STORES_TO_FLUSH.isEmpty()) {
712        return region.getStores();
713      } else {
714        return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList());
715      }
716    }
717  }
718}