001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotEquals;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.Comparator;
036import java.util.HashSet;
037import java.util.List;
038import java.util.Map;
039import java.util.NavigableMap;
040import java.util.Set;
041import java.util.TreeMap;
042import java.util.UUID;
043import java.util.concurrent.ConcurrentSkipListMap;
044import java.util.concurrent.CountDownLatch;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.stream.Collectors;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.FileStatus;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.CellScanner;
054import org.apache.hadoop.hbase.Coprocessor;
055import org.apache.hadoop.hbase.HBaseConfiguration;
056import org.apache.hadoop.hbase.HBaseTestingUtility;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.KeyValue;
059import org.apache.hadoop.hbase.ServerName;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
063import org.apache.hadoop.hbase.client.Durability;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.RegionInfo;
067import org.apache.hadoop.hbase.client.RegionInfoBuilder;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.client.TableDescriptor;
070import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
071import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
072import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
073import org.apache.hadoop.hbase.regionserver.ChunkCreator;
074import org.apache.hadoop.hbase.regionserver.FlushPolicy;
075import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
076import org.apache.hadoop.hbase.regionserver.HRegion;
077import org.apache.hadoop.hbase.regionserver.HStore;
078import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
079import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
080import org.apache.hadoop.hbase.regionserver.RegionServerServices;
081import org.apache.hadoop.hbase.regionserver.SequenceId;
082import org.apache.hadoop.hbase.util.Bytes;
083import org.apache.hadoop.hbase.util.CommonFSUtils;
084import org.apache.hadoop.hbase.util.EnvironmentEdge;
085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
086import org.apache.hadoop.hbase.util.Threads;
087import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
088import org.apache.hadoop.hbase.wal.WAL;
089import org.apache.hadoop.hbase.wal.WALEdit;
090import org.apache.hadoop.hbase.wal.WALKey;
091import org.apache.hadoop.hbase.wal.WALKeyImpl;
092import org.junit.AfterClass;
093import org.junit.Before;
094import org.junit.BeforeClass;
095import org.junit.Rule;
096import org.junit.Test;
097import org.junit.rules.TestName;
098import org.slf4j.Logger;
099import org.slf4j.LoggerFactory;
100
101public abstract class AbstractTestFSWAL {
102
103  protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFSWAL.class);
104
105  protected static Configuration CONF;
106  protected static FileSystem FS;
107  protected static Path DIR;
108  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
109
110  @Rule
111  public final TestName currentTest = new TestName();
112
113  @Before
114  public void setUp() throws Exception {
115    FileStatus[] entries = FS.listStatus(new Path("/"));
116    for (FileStatus dir : entries) {
117      FS.delete(dir.getPath(), true);
118    }
119    final Path hbaseDir = TEST_UTIL.createRootDir();
120    final Path hbaseWALDir = TEST_UTIL.createWALRootDir();
121    DIR = new Path(hbaseWALDir, currentTest.getMethodName());
122    assertNotEquals(hbaseDir, hbaseWALDir);
123  }
124
125  @BeforeClass
126  public static void setUpBeforeClass() throws Exception {
127    // Make block sizes small.
128    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
129    // quicker heartbeat interval for faster DN death notification
130    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
131    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
132    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
133
134    // faster failover with cluster.shutdown();fs.close() idiom
135    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
136    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
137    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
138    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
139      SampleRegionWALCoprocessor.class.getName());
140    TEST_UTIL.startMiniDFSCluster(3);
141
142    CONF = TEST_UTIL.getConfiguration();
143    FS = TEST_UTIL.getDFSCluster().getFileSystem();
144  }
145
146  @AfterClass
147  public static void tearDownAfterClass() throws Exception {
148    TEST_UTIL.shutdownMiniCluster();
149  }
150
151  protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String WALDir,
152    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
153    boolean failIfWALExists, String prefix, String suffix) throws IOException;
154
155  protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String WALDir,
156    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
157    boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException;
158
159  /**
160   * A loaded WAL coprocessor won't break existing WAL test cases.
161   */
162  @Test
163  public void testWALCoprocessorLoaded() throws Exception {
164    // test to see whether the coprocessor is loaded or not.
165    AbstractFSWAL<?> wal = null;
166    try {
167      wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
168        HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
169      WALCoprocessorHost host = wal.getCoprocessorHost();
170      Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
171      assertNotNull(c);
172    } finally {
173      if (wal != null) {
174        wal.close();
175      }
176    }
177  }
178
179  protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
180    MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes, String cf)
181    throws IOException {
182    final byte[] row = Bytes.toBytes(cf);
183    for (int i = 0; i < times; i++) {
184      long timestamp = EnvironmentEdgeManager.currentTime();
185      WALEdit cols = new WALEdit();
186      cols.add(new KeyValue(row, row, row, timestamp, row));
187      WALKeyImpl key =
188        new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), SequenceId.NO_SEQUENCE_ID,
189          timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
190      log.appendData(hri, key, cols);
191    }
192    log.sync();
193  }
194
195  /**
196   * helper method to simulate region flush for a WAL.
197   */
198  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
199    wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
200    wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
201  }
202
203  /**
204   * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
205   * exception if we do). Comparison is based on the timestamp present in the wal name. n
206   */
207  @Test
208  public void testWALComparator() throws Exception {
209    AbstractFSWAL<?> wal1 = null;
210    AbstractFSWAL<?> walMeta = null;
211    try {
212      wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
213        HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
214      LOG.debug("Log obtained is: " + wal1);
215      Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
216      Path p1 = wal1.computeFilename(11);
217      Path p2 = wal1.computeFilename(12);
218      // comparing with itself returns 0
219      assertTrue(comp.compare(p1, p1) == 0);
220      // comparing with different filenum.
221      assertTrue(comp.compare(p1, p2) < 0);
222      walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
223        HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null,
224        AbstractFSWALProvider.META_WAL_PROVIDER_ID);
225      Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
226
227      Path p1WithMeta = walMeta.computeFilename(11);
228      Path p2WithMeta = walMeta.computeFilename(12);
229      assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
230      assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
231      // mixing meta and non-meta logs gives error
232      boolean ex = false;
233      try {
234        comp.compare(p1WithMeta, p2);
235      } catch (IllegalArgumentException e) {
236        ex = true;
237      }
238      assertTrue("Comparator doesn't complain while checking meta log files", ex);
239      boolean exMeta = false;
240      try {
241        compMeta.compare(p1WithMeta, p2);
242      } catch (IllegalArgumentException e) {
243        exMeta = true;
244      }
245      assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
246    } finally {
247      if (wal1 != null) {
248        wal1.close();
249      }
250      if (walMeta != null) {
251        walMeta.close();
252      }
253    }
254  }
255
256  /**
257   * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of
258   * regions which should be flushed in order to archive the oldest wal file.
259   * <p>
260   * This method tests this behavior by inserting edits and rolling the wal enough times to reach
261   * the max number of logs threshold. It checks whether we get the "right regions and stores" for
262   * flush on rolling the wal. n
263   */
264  @Test
265  public void testFindMemStoresEligibleForFlush() throws Exception {
266    LOG.debug("testFindMemStoresEligibleForFlush");
267    Configuration conf1 = HBaseConfiguration.create(CONF);
268    conf1.setInt("hbase.regionserver.maxlogs", 1);
269    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
270      HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
271    String cf1 = "cf1";
272    String cf2 = "cf2";
273    String cf3 = "cf3";
274    TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1"))
275      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build();
276    TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2"))
277      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build();
278    RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
279    RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
280
281    List<ColumnFamilyDescriptor> cfs = new ArrayList();
282    cfs.add(ColumnFamilyDescriptorBuilder.of(cf1));
283    cfs.add(ColumnFamilyDescriptorBuilder.of(cf2));
284    TableDescriptor t3 =
285      TableDescriptorBuilder.newBuilder(TableName.valueOf("t3")).setColumnFamilies(cfs).build();
286    RegionInfo hri3 = RegionInfoBuilder.newBuilder(t3.getTableName()).build();
287
288    // add edits and roll the wal
289    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
290    NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
291    for (byte[] fam : t1.getColumnFamilyNames()) {
292      scopes1.put(fam, 0);
293    }
294    NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
295    for (byte[] fam : t2.getColumnFamilyNames()) {
296      scopes2.put(fam, 0);
297    }
298    NavigableMap<byte[], Integer> scopes3 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
299    for (byte[] fam : t3.getColumnFamilyNames()) {
300      scopes3.put(fam, 0);
301    }
302    try {
303      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
304      wal.rollWriter();
305      // add some more edits and roll the wal. This would reach the log number threshold
306      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
307      wal.rollWriter();
308      // with above rollWriter call, the max logs limit is reached.
309      assertTrue(wal.getNumRolledLogFiles() == 2);
310
311      // get the regions to flush; since there is only one region in the oldest wal, it should
312      // return only one region.
313      Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush();
314      assertEquals(1, regionsToFlush.size());
315      assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
316      // insert edits in second region
317      addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
318      // get the regions to flush, it should still read region1.
319      regionsToFlush = wal.findRegionsToForceFlush();
320      assertEquals(1, regionsToFlush.size());
321      assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
322      // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
323      // remain.
324      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
325      wal.rollWriter();
326      // only one wal should remain now (that is for the second region).
327      assertEquals(1, wal.getNumRolledLogFiles());
328      // flush the second region
329      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
330      wal.rollWriter(true);
331      // no wal should remain now.
332      assertEquals(0, wal.getNumRolledLogFiles());
333      // add edits both to region 1 and region 2, and roll.
334      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
335      addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
336      wal.rollWriter();
337      // add edits and roll the writer, to reach the max logs limit.
338      assertEquals(1, wal.getNumRolledLogFiles());
339      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
340      wal.rollWriter();
341      // it should return two regions to flush, as the oldest wal file has entries
342      // for both regions.
343      regionsToFlush = wal.findRegionsToForceFlush();
344      assertEquals(2, regionsToFlush.size());
345      // flush both regions
346      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
347      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
348      wal.rollWriter(true);
349      assertEquals(0, wal.getNumRolledLogFiles());
350      // Add an edit to region1, and roll the wal.
351      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
352      // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
353      wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
354      wal.rollWriter();
355      wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
356      assertEquals(1, wal.getNumRolledLogFiles());
357
358      // clear test data
359      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
360      wal.rollWriter(true);
361      // add edits for three familes
362      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
363      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2);
364      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3);
365      wal.rollWriter();
366      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
367      wal.rollWriter();
368      assertEquals(2, wal.getNumRolledLogFiles());
369      // flush one family before archive oldest wal
370      Set<byte[]> flushedFamilyNames = new HashSet<>();
371      flushedFamilyNames.add(Bytes.toBytes(cf1));
372      flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames);
373      regionsToFlush = wal.findRegionsToForceFlush();
374      // then only two family need to be flushed when archive oldest wal
375      assertEquals(1, regionsToFlush.size());
376      assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
377      assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size());
378    } finally {
379      if (wal != null) {
380        wal.close();
381      }
382    }
383  }
384
385  @Test(expected = IOException.class)
386  public void testFailedToCreateWALIfParentRenamed()
387    throws IOException, CommonFSUtils.StreamLacksCapabilityException {
388    final String name = "testFailedToCreateWALIfParentRenamed";
389    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name,
390      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
391    long filenum = EnvironmentEdgeManager.currentTime();
392    Path path = wal.computeFilename(filenum);
393    wal.createWriterInstance(path);
394    Path parent = path.getParent();
395    path = wal.computeFilename(filenum + 1);
396    Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
397    FS.rename(parent, newPath);
398    wal.createWriterInstance(path);
399    fail("It should fail to create the new WAL");
400  }
401
402  /**
403   * Test flush for sure has a sequence id that is beyond the last edit appended. We do this by
404   * slowing appends in the background ring buffer thread while in foreground we call flush. The
405   * addition of the sync over HRegion in flush should fix an issue where flush was returning before
406   * all of its appends had made it out to the WAL (HBASE-11109). n * @see
407   * <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a>
408   */
409  @Test
410  public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
411    String testName = currentTest.getMethodName();
412    final TableName tableName = TableName.valueOf(testName);
413    final RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
414    final byte[] rowName = tableName.getName();
415    final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
416      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build();
417    HRegion r = HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(),
418      TEST_UTIL.getConfiguration(), htd);
419    HBaseTestingUtility.closeRegionAndWAL(r);
420    final int countPerFamily = 10;
421    final AtomicBoolean goslow = new AtomicBoolean(false);
422    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
423    for (byte[] fam : htd.getColumnFamilyNames()) {
424      scopes.put(fam, 0);
425    }
426    // subclass and doctor a method.
427    AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
428      testName, CONF, null, true, null, null, new Runnable() {
429
430        @Override
431        public void run() {
432          if (goslow.get()) {
433            Threads.sleep(100);
434            LOG.debug("Sleeping before appending 100ms");
435          }
436        }
437      });
438    HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
439      TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
440    EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
441    try {
442      List<Put> puts = null;
443      for (byte[] fam : htd.getColumnFamilyNames()) {
444        puts = TestWALReplay.addRegionEdits(rowName, fam, countPerFamily, ee, region, "x");
445      }
446
447      // Now assert edits made it in.
448      final Get g = new Get(rowName);
449      Result result = region.get(g);
450      assertEquals(countPerFamily * htd.getColumnFamilyNames().size(), result.size());
451
452      // Construct a WALEdit and add it a few times to the WAL.
453      WALEdit edits = new WALEdit();
454      for (Put p : puts) {
455        CellScanner cs = p.cellScanner();
456        while (cs.advance()) {
457          edits.add(cs.current());
458        }
459      }
460      // Add any old cluster id.
461      List<UUID> clusterIds = new ArrayList<>(1);
462      clusterIds.add(TEST_UTIL.getRandomUUID());
463      // Now make appends run slow.
464      goslow.set(true);
465      for (int i = 0; i < countPerFamily; i++) {
466        final RegionInfo info = region.getRegionInfo();
467        final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
468          EnvironmentEdgeManager.currentTime(), clusterIds, -1, -1, region.getMVCC(), scopes);
469        wal.append(info, logkey, edits, true);
470        region.getMVCC().completeAndWait(logkey.getWriteEntry());
471      }
472      region.flush(true);
473      // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
474      long currentSequenceId = region.getReadPoint(null);
475      // Now release the appends
476      goslow.set(false);
477      assertTrue(currentSequenceId >= region.getReadPoint(null));
478    } finally {
479      region.close(true);
480      wal.close();
481    }
482  }
483
484  @Test
485  public void testSyncNoAppend() throws IOException {
486    String testName = currentTest.getMethodName();
487    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
488      CONF, null, true, null, null);
489    wal.init();
490    try {
491      wal.sync();
492    } finally {
493      wal.close();
494    }
495  }
496
497  @Test
498  public void testWriteEntryCanBeNull() throws IOException {
499    String testName = currentTest.getMethodName();
500    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
501      CONF, null, true, null, null);
502    wal.close();
503    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
504      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
505    RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
506    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
507    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
508    for (byte[] fam : td.getColumnFamilyNames()) {
509      scopes.put(fam, 0);
510    }
511    long timestamp = EnvironmentEdgeManager.currentTime();
512    byte[] row = Bytes.toBytes("row");
513    WALEdit cols = new WALEdit();
514    cols.add(new KeyValue(row, row, row, timestamp, row));
515    WALKeyImpl key =
516      new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID,
517        timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
518    try {
519      wal.append(ri, key, cols, true);
520      fail("Should fail since the wal has already been closed");
521    } catch (IOException e) {
522      // expected
523      assertThat(e.getMessage(), containsString("log is closed"));
524      // the WriteEntry should be null since we fail before setting it.
525      assertNull(key.getWriteEntry());
526    }
527  }
528
529  private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend,
530    CountDownLatch holdAppend) throws IOException {
531    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
532      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
533    wal.init();
534    wal.registerWALActionsListener(new WALActionsListener() {
535      @Override
536      public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
537        if (startHoldingForAppend.get()) {
538          try {
539            holdAppend.await();
540          } catch (InterruptedException e) {
541            LOG.error(e.toString(), e);
542          }
543        }
544      }
545    });
546    return wal;
547  }
548
549  private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal)
550    throws IOException {
551    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
552    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
553      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
554    TEST_UTIL.createLocalHRegion(hri, CONF, htd, wal).close();
555    RegionServerServices rsServices = mock(RegionServerServices.class);
556    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
557    when(rsServices.getConfiguration()).thenReturn(conf);
558    return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null);
559  }
560
561  private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put,
562    Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend,
563    CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend)
564    throws InterruptedException, IOException {
565    // do a regular write first because of memstore size calculation.
566    region.put(put);
567
568    startHoldingForAppend.set(true);
569    region.put(new Put(put).setDurability(Durability.ASYNC_WAL));
570
571    // give the put a chance to start
572    Threads.sleep(3000);
573
574    exec.submit(flushOrCloseRegion);
575
576    // give the flush a chance to start. Flush should have got the region lock, and
577    // should have been waiting on the mvcc complete after this.
578    Threads.sleep(3000);
579
580    // let the append to WAL go through now that the flush already started
581    holdAppend.countDown();
582    flushOrCloseFinished.await();
583  }
584
585  // Testcase for HBASE-23181
586  @Test
587  public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
588    String testName = currentTest.getMethodName();
589    byte[] b = Bytes.toBytes("b");
590    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
591      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
592
593    AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
594    CountDownLatch holdAppend = new CountDownLatch(1);
595    CountDownLatch closeFinished = new CountDownLatch(1);
596    ExecutorService exec = Executors.newFixedThreadPool(1);
597    AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
598    // open a new region which uses this WAL
599    HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal);
600    try {
601      doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> {
602        try {
603          Map<?, ?> closeResult = region.close();
604          LOG.info("Close result:" + closeResult);
605          closeFinished.countDown();
606        } catch (IOException e) {
607          LOG.error(e.toString(), e);
608        }
609      }, startHoldingForAppend, closeFinished, holdAppend);
610
611      // now check the region's unflushed seqIds.
612      long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
613      assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
614        seqId);
615    } finally {
616      exec.shutdownNow();
617      region.close();
618      wal.close();
619    }
620  }
621
622  private static final Set<byte[]> STORES_TO_FLUSH =
623    Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR));
624
625  // Testcase for HBASE-23157
626  @Test
627  public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException {
628    String testName = currentTest.getMethodName();
629    byte[] a = Bytes.toBytes("a");
630    byte[] b = Bytes.toBytes("b");
631    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
632      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(a))
633      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
634
635    AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
636    CountDownLatch holdAppend = new CountDownLatch(1);
637    CountDownLatch flushFinished = new CountDownLatch(1);
638    ExecutorService exec = Executors.newFixedThreadPool(2);
639    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
640    conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class,
641      FlushPolicy.class);
642    AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
643    // open a new region which uses this WAL
644    HRegion region = createHoldingHRegion(conf, htd, wal);
645    try {
646      Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b);
647      doPutWithAsyncWAL(exec, region, put, () -> {
648        try {
649          HRegion.FlushResult flushResult = region.flush(true);
650          LOG.info("Flush result:" + flushResult.getResult());
651          LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
652          flushFinished.countDown();
653        } catch (IOException e) {
654          LOG.error(e.toString(), e);
655        }
656      }, startHoldingForAppend, flushFinished, holdAppend);
657
658      // get the max flushed sequence id after the first flush
659      long maxFlushedSeqId1 = region.getMaxFlushedSeqId();
660
661      region.put(put);
662      // this time we only flush family a
663      STORES_TO_FLUSH.add(a);
664      region.flush(false);
665
666      // get the max flushed sequence id after the second flush
667      long maxFlushedSeqId2 = region.getMaxFlushedSeqId();
668      // make sure that the maxFlushedSequenceId does not go backwards
669      assertTrue(
670        "maxFlushedSeqId1(" + maxFlushedSeqId1
671          + ") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")",
672        maxFlushedSeqId1 <= maxFlushedSeqId2);
673    } finally {
674      exec.shutdownNow();
675      region.close();
676      wal.close();
677    }
678  }
679
680  public static final class FlushSpecificStoresPolicy extends FlushPolicy {
681
682    @Override
683    public Collection<HStore> selectStoresToFlush() {
684      if (STORES_TO_FLUSH.isEmpty()) {
685        return region.getStores();
686      } else {
687        return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList());
688      }
689    }
690  }
691}