001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertThat;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.Comparator;
034import java.util.List;
035import java.util.Map;
036import java.util.NavigableMap;
037import java.util.Set;
038import java.util.TreeMap;
039import java.util.UUID;
040import java.util.concurrent.CountDownLatch;
041import java.util.concurrent.ExecutorService;
042import java.util.concurrent.Executors;
043import java.util.concurrent.atomic.AtomicBoolean;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.CellScanner;
049import org.apache.hadoop.hbase.Coprocessor;
050import org.apache.hadoop.hbase.HBaseConfiguration;
051import org.apache.hadoop.hbase.HBaseTestingUtility;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.KeyValue;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
057import org.apache.hadoop.hbase.client.Durability;
058import org.apache.hadoop.hbase.client.Get;
059import org.apache.hadoop.hbase.client.Put;
060import org.apache.hadoop.hbase.client.RegionInfo;
061import org.apache.hadoop.hbase.client.RegionInfoBuilder;
062import org.apache.hadoop.hbase.client.Result;
063import org.apache.hadoop.hbase.client.TableDescriptor;
064import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
065import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
066import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
067import org.apache.hadoop.hbase.regionserver.ChunkCreator;
068import org.apache.hadoop.hbase.regionserver.HRegion;
069import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
070import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
071import org.apache.hadoop.hbase.regionserver.RegionServerServices;
072import org.apache.hadoop.hbase.regionserver.SequenceId;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.util.CommonFSUtils;
075import org.apache.hadoop.hbase.util.EnvironmentEdge;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.hbase.util.FSUtils;
078import org.apache.hadoop.hbase.util.Threads;
079import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
080import org.apache.hadoop.hbase.wal.WAL;
081import org.apache.hadoop.hbase.wal.WALEdit;
082import org.apache.hadoop.hbase.wal.WALKey;
083import org.apache.hadoop.hbase.wal.WALKeyImpl;
084import org.junit.AfterClass;
085import org.junit.Before;
086import org.junit.BeforeClass;
087import org.junit.Rule;
088import org.junit.Test;
089import org.junit.rules.TestName;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093public abstract class AbstractTestFSWAL {
094
095  protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFSWAL.class);
096
097  protected static Configuration CONF;
098  protected static FileSystem FS;
099  protected static Path DIR;
100  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
101
102  @Rule
103  public final TestName currentTest = new TestName();
104
105  @Before
106  public void setUp() throws Exception {
107    FileStatus[] entries = FS.listStatus(new Path("/"));
108    for (FileStatus dir : entries) {
109      FS.delete(dir.getPath(), true);
110    }
111    final Path hbaseDir = TEST_UTIL.createRootDir();
112    final Path hbaseWALDir = TEST_UTIL.createWALRootDir();
113    DIR = new Path(hbaseWALDir, currentTest.getMethodName());
114    assertNotEquals(hbaseDir, hbaseWALDir);
115  }
116
117  @BeforeClass
118  public static void setUpBeforeClass() throws Exception {
119    // Make block sizes small.
120    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
121    // quicker heartbeat interval for faster DN death notification
122    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
123    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
124    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
125
126    // faster failover with cluster.shutdown();fs.close() idiom
127    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
128    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
129    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
130    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
131      SampleRegionWALCoprocessor.class.getName());
132    TEST_UTIL.startMiniDFSCluster(3);
133
134    CONF = TEST_UTIL.getConfiguration();
135    FS = TEST_UTIL.getDFSCluster().getFileSystem();
136  }
137
138  @AfterClass
139  public static void tearDownAfterClass() throws Exception {
140    TEST_UTIL.shutdownMiniCluster();
141  }
142
143  protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String WALDir,
144      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
145      boolean failIfWALExists, String prefix, String suffix) throws IOException;
146
147  protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String WALDir,
148      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
149      boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException;
150
151  /**
152   * A loaded WAL coprocessor won't break existing WAL test cases.
153   */
154  @Test
155  public void testWALCoprocessorLoaded() throws Exception {
156    // test to see whether the coprocessor is loaded or not.
157    AbstractFSWAL<?> wal = null;
158    try {
159      wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
160          HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
161      WALCoprocessorHost host = wal.getCoprocessorHost();
162      Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
163      assertNotNull(c);
164    } finally {
165      if (wal != null) {
166        wal.close();
167      }
168    }
169  }
170
171  protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
172      MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
173      throws IOException {
174    final byte[] row = Bytes.toBytes("row");
175    for (int i = 0; i < times; i++) {
176      long timestamp = System.currentTimeMillis();
177      WALEdit cols = new WALEdit();
178      cols.add(new KeyValue(row, row, row, timestamp, row));
179      WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(),
180          SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
181          HConstants.NO_NONCE, mvcc, scopes);
182      log.appendData(hri, key, cols);
183    }
184    log.sync();
185  }
186
187  /**
188   * helper method to simulate region flush for a WAL.
189   * @param wal
190   * @param regionEncodedName
191   */
192  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
193    wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
194    wal.completeCacheFlush(regionEncodedName);
195  }
196
197  /**
198   * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
199   * exception if we do). Comparison is based on the timestamp present in the wal name.
200   * @throws Exception
201   */
202  @Test
203  public void testWALComparator() throws Exception {
204    AbstractFSWAL<?> wal1 = null;
205    AbstractFSWAL<?> walMeta = null;
206    try {
207      wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
208          HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
209      LOG.debug("Log obtained is: " + wal1);
210      Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
211      Path p1 = wal1.computeFilename(11);
212      Path p2 = wal1.computeFilename(12);
213      // comparing with itself returns 0
214      assertTrue(comp.compare(p1, p1) == 0);
215      // comparing with different filenum.
216      assertTrue(comp.compare(p1, p2) < 0);
217      walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
218          HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null,
219          AbstractFSWALProvider.META_WAL_PROVIDER_ID);
220      Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
221
222      Path p1WithMeta = walMeta.computeFilename(11);
223      Path p2WithMeta = walMeta.computeFilename(12);
224      assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
225      assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
226      // mixing meta and non-meta logs gives error
227      boolean ex = false;
228      try {
229        comp.compare(p1WithMeta, p2);
230      } catch (IllegalArgumentException e) {
231        ex = true;
232      }
233      assertTrue("Comparator doesn't complain while checking meta log files", ex);
234      boolean exMeta = false;
235      try {
236        compMeta.compare(p1WithMeta, p2);
237      } catch (IllegalArgumentException e) {
238        exMeta = true;
239      }
240      assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
241    } finally {
242      if (wal1 != null) {
243        wal1.close();
244      }
245      if (walMeta != null) {
246        walMeta.close();
247      }
248    }
249  }
250
251  /**
252   * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of
253   * regions which should be flushed in order to archive the oldest wal file.
254   * <p>
255   * This method tests this behavior by inserting edits and rolling the wal enough times to reach
256   * the max number of logs threshold. It checks whether we get the "right regions" for flush on
257   * rolling the wal.
258   * @throws Exception
259   */
260  @Test
261  public void testFindMemStoresEligibleForFlush() throws Exception {
262    LOG.debug("testFindMemStoresEligibleForFlush");
263    Configuration conf1 = HBaseConfiguration.create(CONF);
264    conf1.setInt("hbase.regionserver.maxlogs", 1);
265    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
266      HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
267    TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1"))
268      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
269    TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2"))
270      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
271    RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
272    RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
273    // add edits and roll the wal
274    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
275    NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
276    for (byte[] fam : t1.getColumnFamilyNames()) {
277      scopes1.put(fam, 0);
278    }
279    NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
280    for (byte[] fam : t2.getColumnFamilyNames()) {
281      scopes2.put(fam, 0);
282    }
283    try {
284      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
285      wal.rollWriter();
286      // add some more edits and roll the wal. This would reach the log number threshold
287      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
288      wal.rollWriter();
289      // with above rollWriter call, the max logs limit is reached.
290      assertTrue(wal.getNumRolledLogFiles() == 2);
291
292      // get the regions to flush; since there is only one region in the oldest wal, it should
293      // return only one region.
294      byte[][] regionsToFlush = wal.findRegionsToForceFlush();
295      assertEquals(1, regionsToFlush.length);
296      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
297      // insert edits in second region
298      addEdits(wal, hri2, t2, 2, mvcc, scopes2);
299      // get the regions to flush, it should still read region1.
300      regionsToFlush = wal.findRegionsToForceFlush();
301      assertEquals(1, regionsToFlush.length);
302      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
303      // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
304      // remain.
305      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
306      wal.rollWriter();
307      // only one wal should remain now (that is for the second region).
308      assertEquals(1, wal.getNumRolledLogFiles());
309      // flush the second region
310      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
311      wal.rollWriter(true);
312      // no wal should remain now.
313      assertEquals(0, wal.getNumRolledLogFiles());
314      // add edits both to region 1 and region 2, and roll.
315      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
316      addEdits(wal, hri2, t2, 2, mvcc, scopes2);
317      wal.rollWriter();
318      // add edits and roll the writer, to reach the max logs limit.
319      assertEquals(1, wal.getNumRolledLogFiles());
320      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
321      wal.rollWriter();
322      // it should return two regions to flush, as the oldest wal file has entries
323      // for both regions.
324      regionsToFlush = wal.findRegionsToForceFlush();
325      assertEquals(2, regionsToFlush.length);
326      // flush both regions
327      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
328      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
329      wal.rollWriter(true);
330      assertEquals(0, wal.getNumRolledLogFiles());
331      // Add an edit to region1, and roll the wal.
332      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
333      // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
334      wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
335      wal.rollWriter();
336      wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
337      assertEquals(1, wal.getNumRolledLogFiles());
338    } finally {
339      if (wal != null) {
340        wal.close();
341      }
342    }
343  }
344
345  @Test(expected = IOException.class)
346  public void testFailedToCreateWALIfParentRenamed() throws IOException,
347      CommonFSUtils.StreamLacksCapabilityException {
348    final String name = "testFailedToCreateWALIfParentRenamed";
349    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name,
350      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
351    long filenum = System.currentTimeMillis();
352    Path path = wal.computeFilename(filenum);
353    wal.createWriterInstance(path);
354    Path parent = path.getParent();
355    path = wal.computeFilename(filenum + 1);
356    Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
357    FS.rename(parent, newPath);
358    wal.createWriterInstance(path);
359    fail("It should fail to create the new WAL");
360  }
361
362  /**
363   * Test flush for sure has a sequence id that is beyond the last edit appended. We do this by
364   * slowing appends in the background ring buffer thread while in foreground we call flush. The
365   * addition of the sync over HRegion in flush should fix an issue where flush was returning before
366   * all of its appends had made it out to the WAL (HBASE-11109).
367   * @throws IOException
368   * @see <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a>
369   */
370  @Test
371  public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
372    String testName = currentTest.getMethodName();
373    final TableName tableName = TableName.valueOf(testName);
374    final RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
375    final byte[] rowName = tableName.getName();
376    final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
377      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build();
378    HRegion r = HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(),
379      TEST_UTIL.getConfiguration(), htd);
380    HBaseTestingUtility.closeRegionAndWAL(r);
381    final int countPerFamily = 10;
382    final AtomicBoolean goslow = new AtomicBoolean(false);
383    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
384    for (byte[] fam : htd.getColumnFamilyNames()) {
385      scopes.put(fam, 0);
386    }
387    // subclass and doctor a method.
388    AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
389        testName, CONF, null, true, null, null, new Runnable() {
390
391          @Override
392          public void run() {
393            if (goslow.get()) {
394              Threads.sleep(100);
395              LOG.debug("Sleeping before appending 100ms");
396            }
397          }
398        });
399    HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
400      TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
401    EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
402    try {
403      List<Put> puts = null;
404      for (byte[] fam : htd.getColumnFamilyNames()) {
405        puts =
406            TestWALReplay.addRegionEdits(rowName, fam, countPerFamily, ee, region, "x");
407      }
408
409      // Now assert edits made it in.
410      final Get g = new Get(rowName);
411      Result result = region.get(g);
412      assertEquals(countPerFamily * htd.getColumnFamilyNames().size(), result.size());
413
414      // Construct a WALEdit and add it a few times to the WAL.
415      WALEdit edits = new WALEdit();
416      for (Put p : puts) {
417        CellScanner cs = p.cellScanner();
418        while (cs.advance()) {
419          edits.add(cs.current());
420        }
421      }
422      // Add any old cluster id.
423      List<UUID> clusterIds = new ArrayList<>(1);
424      clusterIds.add(TEST_UTIL.getRandomUUID());
425      // Now make appends run slow.
426      goslow.set(true);
427      for (int i = 0; i < countPerFamily; i++) {
428        final RegionInfo info = region.getRegionInfo();
429        final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
430            System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
431        wal.append(info, logkey, edits, true);
432        region.getMVCC().completeAndWait(logkey.getWriteEntry());
433      }
434      region.flush(true);
435      // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
436      long currentSequenceId = region.getReadPoint(null);
437      // Now release the appends
438      goslow.set(false);
439      assertTrue(currentSequenceId >= region.getReadPoint(null));
440    } finally {
441      region.close(true);
442      wal.close();
443    }
444  }
445
446  @Test
447  public void testSyncNoAppend() throws IOException {
448    String testName = currentTest.getMethodName();
449    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
450        CONF, null, true, null, null);
451    wal.init();
452    try {
453      wal.sync();
454    } finally {
455      wal.close();
456    }
457  }
458
459  @Test
460  public void testWriteEntryCanBeNull() throws IOException {
461    String testName = currentTest.getMethodName();
462    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
463      CONF, null, true, null, null);
464    wal.close();
465    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
466      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
467    RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
468    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
469    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
470    for (byte[] fam : td.getColumnFamilyNames()) {
471      scopes.put(fam, 0);
472    }
473    long timestamp = System.currentTimeMillis();
474    byte[] row = Bytes.toBytes("row");
475    WALEdit cols = new WALEdit();
476    cols.add(new KeyValue(row, row, row, timestamp, row));
477    WALKeyImpl key =
478        new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID,
479          timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
480    try {
481      wal.append(ri, key, cols, true);
482      fail("Should fail since the wal has already been closed");
483    } catch (IOException e) {
484      // expected
485      assertThat(e.getMessage(), containsString("log is closed"));
486      // the WriteEntry should be null since we fail before setting it.
487      assertNull(key.getWriteEntry());
488    }
489  }
490
491  @Test
492  public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
493    final String testName = currentTest.getMethodName();
494    final byte[] b = Bytes.toBytes("b");
495
496    final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
497    final CountDownLatch holdAppend = new CountDownLatch(1);
498    final CountDownLatch closeFinished = new CountDownLatch(1);
499    final CountDownLatch putFinished = new CountDownLatch(1);
500
501    try (AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getRootDir(CONF), testName,
502      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
503      wal.init();
504      wal.registerWALActionsListener(new WALActionsListener() {
505        @Override
506        public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
507          if (startHoldingForAppend.get()) {
508            try {
509              holdAppend.await();
510            } catch (InterruptedException e) {
511              LOG.error(e.toString(), e);
512            }
513          }
514        }
515      });
516
517      // open a new region which uses this WAL
518      TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
519        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
520      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
521      ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
522      TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
523      RegionServerServices rsServices = mock(RegionServerServices.class);
524      when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
525      when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
526      final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal,
527        TEST_UTIL.getConfiguration(), rsServices, null);
528
529      ExecutorService exec = Executors.newFixedThreadPool(2);
530
531      // do a regular write first because of memstore size calculation.
532      region.put(new Put(b).addColumn(b, b, b));
533
534      startHoldingForAppend.set(true);
535      exec.submit(new Runnable() {
536        @Override
537        public void run() {
538          try {
539            region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL));
540            putFinished.countDown();
541          } catch (IOException e) {
542            LOG.error(e.toString(), e);
543          }
544        }
545      });
546
547      // give the put a chance to start
548      Threads.sleep(3000);
549
550      exec.submit(new Runnable() {
551        @Override
552        public void run() {
553          try {
554            Map<?, ?> closeResult = region.close();
555            LOG.info("Close result:" + closeResult);
556            closeFinished.countDown();
557          } catch (IOException e) {
558            LOG.error(e.toString(), e);
559          }
560        }
561      });
562
563      // give the flush a chance to start. Flush should have got the region lock, and
564      // should have been waiting on the mvcc complete after this.
565      Threads.sleep(3000);
566
567      // let the append to WAL go through now that the flush already started
568      holdAppend.countDown();
569      putFinished.await();
570      closeFinished.await();
571
572      // now check the region's unflushed seqIds.
573      long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
574      assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
575        seqId);
576
577      wal.close();
578    }
579  }
580}