001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertThat; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.Comparator; 034import java.util.List; 035import java.util.Map; 036import java.util.NavigableMap; 037import java.util.Set; 038import java.util.TreeMap; 039import java.util.UUID; 040import java.util.concurrent.CountDownLatch; 041import java.util.concurrent.ExecutorService; 042import java.util.concurrent.Executors; 043import java.util.concurrent.atomic.AtomicBoolean; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileStatus; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.CellScanner; 049import org.apache.hadoop.hbase.Coprocessor; 050import org.apache.hadoop.hbase.HBaseConfiguration; 051import org.apache.hadoop.hbase.HBaseTestingUtility; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.KeyValue; 054import org.apache.hadoop.hbase.ServerName; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 057import org.apache.hadoop.hbase.client.Durability; 058import org.apache.hadoop.hbase.client.Get; 059import org.apache.hadoop.hbase.client.Put; 060import org.apache.hadoop.hbase.client.RegionInfo; 061import org.apache.hadoop.hbase.client.RegionInfoBuilder; 062import org.apache.hadoop.hbase.client.Result; 063import org.apache.hadoop.hbase.client.TableDescriptor; 064import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 065import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 066import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 067import org.apache.hadoop.hbase.regionserver.ChunkCreator; 068import org.apache.hadoop.hbase.regionserver.HRegion; 069import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; 070import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 071import org.apache.hadoop.hbase.regionserver.RegionServerServices; 072import org.apache.hadoop.hbase.regionserver.SequenceId; 073import org.apache.hadoop.hbase.util.Bytes; 074import org.apache.hadoop.hbase.util.CommonFSUtils; 075import org.apache.hadoop.hbase.util.EnvironmentEdge; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.hbase.util.Threads; 078import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 079import org.apache.hadoop.hbase.wal.WAL; 080import org.apache.hadoop.hbase.wal.WALEdit; 081import org.apache.hadoop.hbase.wal.WALKey; 082import org.apache.hadoop.hbase.wal.WALKeyImpl; 083 084import org.junit.AfterClass; 085import org.junit.Before; 086import org.junit.BeforeClass; 087import org.junit.Rule; 088import org.junit.Test; 089import org.junit.rules.TestName; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093public abstract class AbstractTestFSWAL { 094 095 protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFSWAL.class); 096 097 protected static Configuration CONF; 098 protected static FileSystem FS; 099 protected static Path DIR; 100 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 101 102 @Rule 103 public final TestName currentTest = new TestName(); 104 105 @Before 106 public void setUp() throws Exception { 107 FileStatus[] entries = FS.listStatus(new Path("/")); 108 for (FileStatus dir : entries) { 109 FS.delete(dir.getPath(), true); 110 } 111 final Path hbaseDir = TEST_UTIL.createRootDir(); 112 final Path hbaseWALDir = TEST_UTIL.createWALRootDir(); 113 DIR = new Path(hbaseWALDir, currentTest.getMethodName()); 114 assertNotEquals(hbaseDir, hbaseWALDir); 115 } 116 117 @BeforeClass 118 public static void setUpBeforeClass() throws Exception { 119 // Make block sizes small. 120 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 121 // quicker heartbeat interval for faster DN death notification 122 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 123 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 124 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 125 126 // faster failover with cluster.shutdown();fs.close() idiom 127 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 128 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 129 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 130 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 131 SampleRegionWALCoprocessor.class.getName()); 132 TEST_UTIL.startMiniDFSCluster(3); 133 134 CONF = TEST_UTIL.getConfiguration(); 135 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 136 } 137 138 @AfterClass 139 public static void tearDownAfterClass() throws Exception { 140 TEST_UTIL.shutdownMiniCluster(); 141 } 142 143 protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String WALDir, 144 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 145 boolean failIfWALExists, String prefix, String suffix) throws IOException; 146 147 protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String WALDir, 148 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 149 boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException; 150 151 /** 152 * A loaded WAL coprocessor won't break existing WAL test cases. 153 */ 154 @Test 155 public void testWALCoprocessorLoaded() throws Exception { 156 // test to see whether the coprocessor is loaded or not. 157 AbstractFSWAL<?> wal = null; 158 try { 159 wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 160 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 161 WALCoprocessorHost host = wal.getCoprocessorHost(); 162 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 163 assertNotNull(c); 164 } finally { 165 if (wal != null) { 166 wal.close(); 167 } 168 } 169 } 170 171 protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, 172 MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) 173 throws IOException { 174 final byte[] row = Bytes.toBytes("row"); 175 for (int i = 0; i < times; i++) { 176 long timestamp = System.currentTimeMillis(); 177 WALEdit cols = new WALEdit(); 178 cols.add(new KeyValue(row, row, row, timestamp, row)); 179 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), 180 SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, 181 HConstants.NO_NONCE, mvcc, scopes); 182 log.appendData(hri, key, cols); 183 } 184 log.sync(); 185 } 186 187 /** 188 * helper method to simulate region flush for a WAL. 189 * @param wal 190 * @param regionEncodedName 191 */ 192 protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) { 193 wal.startCacheFlush(regionEncodedName, flushedFamilyNames); 194 wal.completeCacheFlush(regionEncodedName); 195 } 196 197 /** 198 * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws 199 * exception if we do). Comparison is based on the timestamp present in the wal name. 200 * @throws Exception 201 */ 202 @Test 203 public void testWALComparator() throws Exception { 204 AbstractFSWAL<?> wal1 = null; 205 AbstractFSWAL<?> walMeta = null; 206 try { 207 wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 208 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 209 LOG.debug("Log obtained is: " + wal1); 210 Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR; 211 Path p1 = wal1.computeFilename(11); 212 Path p2 = wal1.computeFilename(12); 213 // comparing with itself returns 0 214 assertTrue(comp.compare(p1, p1) == 0); 215 // comparing with different filenum. 216 assertTrue(comp.compare(p1, p2) < 0); 217 walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 218 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, 219 AbstractFSWALProvider.META_WAL_PROVIDER_ID); 220 Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR; 221 222 Path p1WithMeta = walMeta.computeFilename(11); 223 Path p2WithMeta = walMeta.computeFilename(12); 224 assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0); 225 assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0); 226 // mixing meta and non-meta logs gives error 227 boolean ex = false; 228 try { 229 comp.compare(p1WithMeta, p2); 230 } catch (IllegalArgumentException e) { 231 ex = true; 232 } 233 assertTrue("Comparator doesn't complain while checking meta log files", ex); 234 boolean exMeta = false; 235 try { 236 compMeta.compare(p1WithMeta, p2); 237 } catch (IllegalArgumentException e) { 238 exMeta = true; 239 } 240 assertTrue("Meta comparator doesn't complain while checking log files", exMeta); 241 } finally { 242 if (wal1 != null) { 243 wal1.close(); 244 } 245 if (walMeta != null) { 246 walMeta.close(); 247 } 248 } 249 } 250 251 /** 252 * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of 253 * regions which should be flushed in order to archive the oldest wal file. 254 * <p> 255 * This method tests this behavior by inserting edits and rolling the wal enough times to reach 256 * the max number of logs threshold. It checks whether we get the "right regions" for flush on 257 * rolling the wal. 258 * @throws Exception 259 */ 260 @Test 261 public void testFindMemStoresEligibleForFlush() throws Exception { 262 LOG.debug("testFindMemStoresEligibleForFlush"); 263 Configuration conf1 = HBaseConfiguration.create(CONF); 264 conf1.setInt("hbase.regionserver.maxlogs", 1); 265 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(), 266 HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null); 267 TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1")) 268 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 269 TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2")) 270 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 271 RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build(); 272 RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build(); 273 // add edits and roll the wal 274 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 275 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 276 for (byte[] fam : t1.getColumnFamilyNames()) { 277 scopes1.put(fam, 0); 278 } 279 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 280 for (byte[] fam : t2.getColumnFamilyNames()) { 281 scopes2.put(fam, 0); 282 } 283 try { 284 addEdits(wal, hri1, t1, 2, mvcc, scopes1); 285 wal.rollWriter(); 286 // add some more edits and roll the wal. This would reach the log number threshold 287 addEdits(wal, hri1, t1, 2, mvcc, scopes1); 288 wal.rollWriter(); 289 // with above rollWriter call, the max logs limit is reached. 290 assertTrue(wal.getNumRolledLogFiles() == 2); 291 292 // get the regions to flush; since there is only one region in the oldest wal, it should 293 // return only one region. 294 byte[][] regionsToFlush = wal.findRegionsToForceFlush(); 295 assertEquals(1, regionsToFlush.length); 296 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); 297 // insert edits in second region 298 addEdits(wal, hri2, t2, 2, mvcc, scopes2); 299 // get the regions to flush, it should still read region1. 300 regionsToFlush = wal.findRegionsToForceFlush(); 301 assertEquals(1, regionsToFlush.length); 302 assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); 303 // flush region 1, and roll the wal file. Only last wal which has entries for region1 should 304 // remain. 305 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 306 wal.rollWriter(); 307 // only one wal should remain now (that is for the second region). 308 assertEquals(1, wal.getNumRolledLogFiles()); 309 // flush the second region 310 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); 311 wal.rollWriter(true); 312 // no wal should remain now. 313 assertEquals(0, wal.getNumRolledLogFiles()); 314 // add edits both to region 1 and region 2, and roll. 315 addEdits(wal, hri1, t1, 2, mvcc, scopes1); 316 addEdits(wal, hri2, t2, 2, mvcc, scopes2); 317 wal.rollWriter(); 318 // add edits and roll the writer, to reach the max logs limit. 319 assertEquals(1, wal.getNumRolledLogFiles()); 320 addEdits(wal, hri1, t1, 2, mvcc, scopes1); 321 wal.rollWriter(); 322 // it should return two regions to flush, as the oldest wal file has entries 323 // for both regions. 324 regionsToFlush = wal.findRegionsToForceFlush(); 325 assertEquals(2, regionsToFlush.length); 326 // flush both regions 327 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 328 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); 329 wal.rollWriter(true); 330 assertEquals(0, wal.getNumRolledLogFiles()); 331 // Add an edit to region1, and roll the wal. 332 addEdits(wal, hri1, t1, 2, mvcc, scopes1); 333 // tests partial flush: roll on a partial flush, and ensure that wal is not archived. 334 wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 335 wal.rollWriter(); 336 wal.completeCacheFlush(hri1.getEncodedNameAsBytes()); 337 assertEquals(1, wal.getNumRolledLogFiles()); 338 } finally { 339 if (wal != null) { 340 wal.close(); 341 } 342 } 343 } 344 345 @Test(expected = IOException.class) 346 public void testFailedToCreateWALIfParentRenamed() throws IOException, 347 CommonFSUtils.StreamLacksCapabilityException { 348 final String name = "testFailedToCreateWALIfParentRenamed"; 349 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name, 350 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 351 long filenum = System.currentTimeMillis(); 352 Path path = wal.computeFilename(filenum); 353 wal.createWriterInstance(path); 354 Path parent = path.getParent(); 355 path = wal.computeFilename(filenum + 1); 356 Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting"); 357 FS.rename(parent, newPath); 358 wal.createWriterInstance(path); 359 fail("It should fail to create the new WAL"); 360 } 361 362 /** 363 * Test flush for sure has a sequence id that is beyond the last edit appended. We do this by 364 * slowing appends in the background ring buffer thread while in foreground we call flush. The 365 * addition of the sync over HRegion in flush should fix an issue where flush was returning before 366 * all of its appends had made it out to the WAL (HBASE-11109). 367 * @throws IOException 368 * @see <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a> 369 */ 370 @Test 371 public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException { 372 String testName = currentTest.getMethodName(); 373 final TableName tableName = TableName.valueOf(testName); 374 final RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 375 final byte[] rowName = tableName.getName(); 376 final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 377 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build(); 378 HRegion r = HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(), 379 TEST_UTIL.getConfiguration(), htd); 380 HBaseTestingUtility.closeRegionAndWAL(r); 381 final int countPerFamily = 10; 382 final AtomicBoolean goslow = new AtomicBoolean(false); 383 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 384 for (byte[] fam : htd.getColumnFamilyNames()) { 385 scopes.put(fam, 0); 386 } 387 // subclass and doctor a method. 388 AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 389 testName, CONF, null, true, null, null, new Runnable() { 390 391 @Override 392 public void run() { 393 if (goslow.get()) { 394 Threads.sleep(100); 395 LOG.debug("Sleeping before appending 100ms"); 396 } 397 } 398 }); 399 HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(), 400 TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal); 401 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 402 try { 403 List<Put> puts = null; 404 for (byte[] fam : htd.getColumnFamilyNames()) { 405 puts = 406 TestWALReplay.addRegionEdits(rowName, fam, countPerFamily, ee, region, "x"); 407 } 408 409 // Now assert edits made it in. 410 final Get g = new Get(rowName); 411 Result result = region.get(g); 412 assertEquals(countPerFamily * htd.getColumnFamilyNames().size(), result.size()); 413 414 // Construct a WALEdit and add it a few times to the WAL. 415 WALEdit edits = new WALEdit(); 416 for (Put p : puts) { 417 CellScanner cs = p.cellScanner(); 418 while (cs.advance()) { 419 edits.add(cs.current()); 420 } 421 } 422 // Add any old cluster id. 423 List<UUID> clusterIds = new ArrayList<>(1); 424 clusterIds.add(TEST_UTIL.getRandomUUID()); 425 // Now make appends run slow. 426 goslow.set(true); 427 for (int i = 0; i < countPerFamily; i++) { 428 final RegionInfo info = region.getRegionInfo(); 429 final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 430 System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes); 431 wal.append(info, logkey, edits, true); 432 region.getMVCC().completeAndWait(logkey.getWriteEntry()); 433 } 434 region.flush(true); 435 // FlushResult.flushSequenceId is not visible here so go get the current sequence id. 436 long currentSequenceId = region.getReadPoint(null); 437 // Now release the appends 438 goslow.set(false); 439 assertTrue(currentSequenceId >= region.getReadPoint(null)); 440 } finally { 441 region.close(true); 442 wal.close(); 443 } 444 } 445 446 @Test 447 public void testSyncNoAppend() throws IOException { 448 String testName = currentTest.getMethodName(); 449 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 450 CONF, null, true, null, null); 451 wal.init(); 452 try { 453 wal.sync(); 454 } finally { 455 wal.close(); 456 } 457 } 458 459 @Test 460 public void testWriteEntryCanBeNull() throws IOException { 461 String testName = currentTest.getMethodName(); 462 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 463 CONF, null, true, null, null); 464 wal.close(); 465 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 466 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 467 RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); 468 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 469 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 470 for (byte[] fam : td.getColumnFamilyNames()) { 471 scopes.put(fam, 0); 472 } 473 long timestamp = System.currentTimeMillis(); 474 byte[] row = Bytes.toBytes("row"); 475 WALEdit cols = new WALEdit(); 476 cols.add(new KeyValue(row, row, row, timestamp, row)); 477 WALKeyImpl key = 478 new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID, 479 timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); 480 try { 481 wal.append(ri, key, cols, true); 482 fail("Should fail since the wal has already been closed"); 483 } catch (IOException e) { 484 // expected 485 assertThat(e.getMessage(), containsString("log is closed")); 486 // the WriteEntry should be null since we fail before setting it. 487 assertNull(key.getWriteEntry()); 488 } 489 } 490 491 @Test 492 public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { 493 final String testName = currentTest.getMethodName(); 494 final byte[] b = Bytes.toBytes("b"); 495 496 final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 497 final CountDownLatch holdAppend = new CountDownLatch(1); 498 final CountDownLatch closeFinished = new CountDownLatch(1); 499 final CountDownLatch putFinished = new CountDownLatch(1); 500 501 try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName, 502 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { 503 wal.init(); 504 wal.registerWALActionsListener(new WALActionsListener() { 505 @Override 506 public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { 507 if (startHoldingForAppend.get()) { 508 try { 509 holdAppend.await(); 510 } catch (InterruptedException e) { 511 LOG.error(e.toString(), e); 512 } 513 } 514 } 515 }); 516 517 // open a new region which uses this WAL 518 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 519 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 520 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 521 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 522 TEST_UTIL.createLocalHRegion(hri, htd, wal).close(); 523 RegionServerServices rsServices = mock(RegionServerServices.class); 524 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); 525 when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); 526 final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, 527 TEST_UTIL.getConfiguration(), rsServices, null); 528 529 ExecutorService exec = Executors.newFixedThreadPool(2); 530 531 // do a regular write first because of memstore size calculation. 532 region.put(new Put(b).addColumn(b, b, b)); 533 534 startHoldingForAppend.set(true); 535 exec.submit(new Runnable() { 536 @Override 537 public void run() { 538 try { 539 region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL)); 540 putFinished.countDown(); 541 } catch (IOException e) { 542 LOG.error(e.toString(), e); 543 } 544 } 545 }); 546 547 // give the put a chance to start 548 Threads.sleep(3000); 549 550 exec.submit(new Runnable() { 551 @Override 552 public void run() { 553 try { 554 Map<?, ?> closeResult = region.close(); 555 LOG.info("Close result:" + closeResult); 556 closeFinished.countDown(); 557 } catch (IOException e) { 558 LOG.error(e.toString(), e); 559 } 560 } 561 }); 562 563 // give the flush a chance to start. Flush should have got the region lock, and 564 // should have been waiting on the mvcc complete after this. 565 Threads.sleep(3000); 566 567 // let the append to WAL go through now that the flush already started 568 holdAppend.countDown(); 569 putFinished.await(); 570 closeFinished.await(); 571 572 // now check the region's unflushed seqIds. 573 long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); 574 assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM, 575 seqId); 576 577 wal.close(); 578 } 579 } 580}