001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotEquals; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028import static org.mockito.Mockito.mock; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.Comparator; 036import java.util.HashSet; 037import java.util.List; 038import java.util.Map; 039import java.util.NavigableMap; 040import java.util.Set; 041import java.util.TreeMap; 042import java.util.UUID; 043import java.util.concurrent.ConcurrentSkipListMap; 044import java.util.concurrent.CountDownLatch; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.stream.Collectors; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.FileStatus; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.CellScanner; 054import org.apache.hadoop.hbase.Coprocessor; 055import org.apache.hadoop.hbase.HBaseConfiguration; 056import org.apache.hadoop.hbase.HBaseTestingUtility; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.KeyValue; 059import org.apache.hadoop.hbase.ServerName; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 063import org.apache.hadoop.hbase.client.Durability; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.RegionInfo; 067import org.apache.hadoop.hbase.client.RegionInfoBuilder; 068import org.apache.hadoop.hbase.client.Result; 069import org.apache.hadoop.hbase.client.TableDescriptor; 070import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 071import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 072import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 073import org.apache.hadoop.hbase.regionserver.ChunkCreator; 074import org.apache.hadoop.hbase.regionserver.FlushPolicy; 075import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; 076import org.apache.hadoop.hbase.regionserver.HRegion; 077import org.apache.hadoop.hbase.regionserver.HStore; 078import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 079import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 080import org.apache.hadoop.hbase.regionserver.RegionServerServices; 081import org.apache.hadoop.hbase.regionserver.SequenceId; 082import org.apache.hadoop.hbase.util.Bytes; 083import org.apache.hadoop.hbase.util.CommonFSUtils; 084import org.apache.hadoop.hbase.util.EnvironmentEdge; 085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 086import org.apache.hadoop.hbase.util.Threads; 087import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 088import org.apache.hadoop.hbase.wal.WAL; 089import org.apache.hadoop.hbase.wal.WALEdit; 090import org.apache.hadoop.hbase.wal.WALKey; 091import org.apache.hadoop.hbase.wal.WALKeyImpl; 092import org.junit.AfterClass; 093import org.junit.Before; 094import org.junit.BeforeClass; 095import org.junit.Rule; 096import org.junit.Test; 097import org.junit.rules.TestName; 098import org.slf4j.Logger; 099import org.slf4j.LoggerFactory; 100 101public abstract class AbstractTestFSWAL { 102 103 protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFSWAL.class); 104 105 protected static Configuration CONF; 106 protected static FileSystem FS; 107 protected static Path DIR; 108 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 109 110 @Rule 111 public final TestName currentTest = new TestName(); 112 113 @Before 114 public void setUp() throws Exception { 115 FileStatus[] entries = FS.listStatus(new Path("/")); 116 for (FileStatus dir : entries) { 117 FS.delete(dir.getPath(), true); 118 } 119 final Path hbaseDir = TEST_UTIL.createRootDir(); 120 final Path hbaseWALDir = TEST_UTIL.createWALRootDir(); 121 DIR = new Path(hbaseWALDir, currentTest.getMethodName()); 122 assertNotEquals(hbaseDir, hbaseWALDir); 123 } 124 125 @BeforeClass 126 public static void setUpBeforeClass() throws Exception { 127 // Make block sizes small. 128 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 129 // quicker heartbeat interval for faster DN death notification 130 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 131 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 132 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 133 134 // faster failover with cluster.shutdown();fs.close() idiom 135 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 136 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 137 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 138 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 139 SampleRegionWALCoprocessor.class.getName()); 140 TEST_UTIL.startMiniDFSCluster(3); 141 142 CONF = TEST_UTIL.getConfiguration(); 143 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 144 } 145 146 @AfterClass 147 public static void tearDownAfterClass() throws Exception { 148 TEST_UTIL.shutdownMiniCluster(); 149 } 150 151 protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String WALDir, 152 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 153 boolean failIfWALExists, String prefix, String suffix) throws IOException; 154 155 protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String WALDir, 156 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 157 boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException; 158 159 /** 160 * A loaded WAL coprocessor won't break existing WAL test cases. 161 */ 162 @Test 163 public void testWALCoprocessorLoaded() throws Exception { 164 // test to see whether the coprocessor is loaded or not. 165 AbstractFSWAL<?> wal = null; 166 try { 167 wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 168 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 169 WALCoprocessorHost host = wal.getCoprocessorHost(); 170 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 171 assertNotNull(c); 172 } finally { 173 if (wal != null) { 174 wal.close(); 175 } 176 } 177 } 178 179 protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, 180 MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes, String cf) 181 throws IOException { 182 final byte[] row = Bytes.toBytes(cf); 183 for (int i = 0; i < times; i++) { 184 long timestamp = EnvironmentEdgeManager.currentTime(); 185 WALEdit cols = new WALEdit(); 186 cols.add(new KeyValue(row, row, row, timestamp, row)); 187 WALKeyImpl key = 188 new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), SequenceId.NO_SEQUENCE_ID, 189 timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); 190 log.appendData(hri, key, cols); 191 } 192 log.sync(); 193 } 194 195 /** 196 * helper method to simulate region flush for a WAL. 197 */ 198 protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) { 199 wal.startCacheFlush(regionEncodedName, flushedFamilyNames); 200 wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM); 201 } 202 203 /** 204 * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws 205 * exception if we do). Comparison is based on the timestamp present in the wal name. n 206 */ 207 @Test 208 public void testWALComparator() throws Exception { 209 AbstractFSWAL<?> wal1 = null; 210 AbstractFSWAL<?> walMeta = null; 211 try { 212 wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 213 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 214 LOG.debug("Log obtained is: " + wal1); 215 Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR; 216 Path p1 = wal1.computeFilename(11); 217 Path p2 = wal1.computeFilename(12); 218 // comparing with itself returns 0 219 assertTrue(comp.compare(p1, p1) == 0); 220 // comparing with different filenum. 221 assertTrue(comp.compare(p1, p2) < 0); 222 walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 223 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, 224 AbstractFSWALProvider.META_WAL_PROVIDER_ID); 225 Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR; 226 227 Path p1WithMeta = walMeta.computeFilename(11); 228 Path p2WithMeta = walMeta.computeFilename(12); 229 assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0); 230 assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0); 231 // mixing meta and non-meta logs gives error 232 boolean ex = false; 233 try { 234 comp.compare(p1WithMeta, p2); 235 } catch (IllegalArgumentException e) { 236 ex = true; 237 } 238 assertTrue("Comparator doesn't complain while checking meta log files", ex); 239 boolean exMeta = false; 240 try { 241 compMeta.compare(p1WithMeta, p2); 242 } catch (IllegalArgumentException e) { 243 exMeta = true; 244 } 245 assertTrue("Meta comparator doesn't complain while checking log files", exMeta); 246 } finally { 247 if (wal1 != null) { 248 wal1.close(); 249 } 250 if (walMeta != null) { 251 walMeta.close(); 252 } 253 } 254 } 255 256 /** 257 * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of 258 * regions which should be flushed in order to archive the oldest wal file. 259 * <p> 260 * This method tests this behavior by inserting edits and rolling the wal enough times to reach 261 * the max number of logs threshold. It checks whether we get the "right regions and stores" for 262 * flush on rolling the wal. n 263 */ 264 @Test 265 public void testFindMemStoresEligibleForFlush() throws Exception { 266 LOG.debug("testFindMemStoresEligibleForFlush"); 267 Configuration conf1 = HBaseConfiguration.create(CONF); 268 conf1.setInt("hbase.regionserver.maxlogs", 1); 269 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(), 270 HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null); 271 String cf1 = "cf1"; 272 String cf2 = "cf2"; 273 String cf3 = "cf3"; 274 TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1")) 275 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build(); 276 TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2")) 277 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build(); 278 RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build(); 279 RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build(); 280 281 List<ColumnFamilyDescriptor> cfs = new ArrayList(); 282 cfs.add(ColumnFamilyDescriptorBuilder.of(cf1)); 283 cfs.add(ColumnFamilyDescriptorBuilder.of(cf2)); 284 TableDescriptor t3 = 285 TableDescriptorBuilder.newBuilder(TableName.valueOf("t3")).setColumnFamilies(cfs).build(); 286 RegionInfo hri3 = RegionInfoBuilder.newBuilder(t3.getTableName()).build(); 287 288 // add edits and roll the wal 289 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 290 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 291 for (byte[] fam : t1.getColumnFamilyNames()) { 292 scopes1.put(fam, 0); 293 } 294 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 295 for (byte[] fam : t2.getColumnFamilyNames()) { 296 scopes2.put(fam, 0); 297 } 298 NavigableMap<byte[], Integer> scopes3 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 299 for (byte[] fam : t3.getColumnFamilyNames()) { 300 scopes3.put(fam, 0); 301 } 302 try { 303 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 304 wal.rollWriter(); 305 // add some more edits and roll the wal. This would reach the log number threshold 306 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 307 wal.rollWriter(); 308 // with above rollWriter call, the max logs limit is reached. 309 assertTrue(wal.getNumRolledLogFiles() == 2); 310 311 // get the regions to flush; since there is only one region in the oldest wal, it should 312 // return only one region. 313 Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush(); 314 assertEquals(1, regionsToFlush.size()); 315 assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); 316 // insert edits in second region 317 addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); 318 // get the regions to flush, it should still read region1. 319 regionsToFlush = wal.findRegionsToForceFlush(); 320 assertEquals(1, regionsToFlush.size()); 321 assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); 322 // flush region 1, and roll the wal file. Only last wal which has entries for region1 should 323 // remain. 324 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 325 wal.rollWriter(); 326 // only one wal should remain now (that is for the second region). 327 assertEquals(1, wal.getNumRolledLogFiles()); 328 // flush the second region 329 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); 330 wal.rollWriter(true); 331 // no wal should remain now. 332 assertEquals(0, wal.getNumRolledLogFiles()); 333 // add edits both to region 1 and region 2, and roll. 334 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 335 addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); 336 wal.rollWriter(); 337 // add edits and roll the writer, to reach the max logs limit. 338 assertEquals(1, wal.getNumRolledLogFiles()); 339 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 340 wal.rollWriter(); 341 // it should return two regions to flush, as the oldest wal file has entries 342 // for both regions. 343 regionsToFlush = wal.findRegionsToForceFlush(); 344 assertEquals(2, regionsToFlush.size()); 345 // flush both regions 346 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 347 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); 348 wal.rollWriter(true); 349 assertEquals(0, wal.getNumRolledLogFiles()); 350 // Add an edit to region1, and roll the wal. 351 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 352 // tests partial flush: roll on a partial flush, and ensure that wal is not archived. 353 wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 354 wal.rollWriter(); 355 wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 356 assertEquals(1, wal.getNumRolledLogFiles()); 357 358 // clear test data 359 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 360 wal.rollWriter(true); 361 // add edits for three familes 362 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); 363 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2); 364 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3); 365 wal.rollWriter(); 366 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); 367 wal.rollWriter(); 368 assertEquals(2, wal.getNumRolledLogFiles()); 369 // flush one family before archive oldest wal 370 Set<byte[]> flushedFamilyNames = new HashSet<>(); 371 flushedFamilyNames.add(Bytes.toBytes(cf1)); 372 flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames); 373 regionsToFlush = wal.findRegionsToForceFlush(); 374 // then only two family need to be flushed when archive oldest wal 375 assertEquals(1, regionsToFlush.size()); 376 assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); 377 assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size()); 378 } finally { 379 if (wal != null) { 380 wal.close(); 381 } 382 } 383 } 384 385 @Test(expected = IOException.class) 386 public void testFailedToCreateWALIfParentRenamed() 387 throws IOException, CommonFSUtils.StreamLacksCapabilityException { 388 final String name = "testFailedToCreateWALIfParentRenamed"; 389 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name, 390 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 391 long filenum = EnvironmentEdgeManager.currentTime(); 392 Path path = wal.computeFilename(filenum); 393 wal.createWriterInstance(path); 394 Path parent = path.getParent(); 395 path = wal.computeFilename(filenum + 1); 396 Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting"); 397 FS.rename(parent, newPath); 398 wal.createWriterInstance(path); 399 fail("It should fail to create the new WAL"); 400 } 401 402 /** 403 * Test flush for sure has a sequence id that is beyond the last edit appended. We do this by 404 * slowing appends in the background ring buffer thread while in foreground we call flush. The 405 * addition of the sync over HRegion in flush should fix an issue where flush was returning before 406 * all of its appends had made it out to the WAL (HBASE-11109). n * @see 407 * <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a> 408 */ 409 @Test 410 public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException { 411 String testName = currentTest.getMethodName(); 412 final TableName tableName = TableName.valueOf(testName); 413 final RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 414 final byte[] rowName = tableName.getName(); 415 final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 416 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build(); 417 HRegion r = HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(), 418 TEST_UTIL.getConfiguration(), htd); 419 HBaseTestingUtility.closeRegionAndWAL(r); 420 final int countPerFamily = 10; 421 final AtomicBoolean goslow = new AtomicBoolean(false); 422 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 423 for (byte[] fam : htd.getColumnFamilyNames()) { 424 scopes.put(fam, 0); 425 } 426 // subclass and doctor a method. 427 AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 428 testName, CONF, null, true, null, null, new Runnable() { 429 430 @Override 431 public void run() { 432 if (goslow.get()) { 433 Threads.sleep(100); 434 LOG.debug("Sleeping before appending 100ms"); 435 } 436 } 437 }); 438 HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(), 439 TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal); 440 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 441 try { 442 List<Put> puts = null; 443 for (byte[] fam : htd.getColumnFamilyNames()) { 444 puts = TestWALReplay.addRegionEdits(rowName, fam, countPerFamily, ee, region, "x"); 445 } 446 447 // Now assert edits made it in. 448 final Get g = new Get(rowName); 449 Result result = region.get(g); 450 assertEquals(countPerFamily * htd.getColumnFamilyNames().size(), result.size()); 451 452 // Construct a WALEdit and add it a few times to the WAL. 453 WALEdit edits = new WALEdit(); 454 for (Put p : puts) { 455 CellScanner cs = p.cellScanner(); 456 while (cs.advance()) { 457 edits.add(cs.current()); 458 } 459 } 460 // Add any old cluster id. 461 List<UUID> clusterIds = new ArrayList<>(1); 462 clusterIds.add(TEST_UTIL.getRandomUUID()); 463 // Now make appends run slow. 464 goslow.set(true); 465 for (int i = 0; i < countPerFamily; i++) { 466 final RegionInfo info = region.getRegionInfo(); 467 final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 468 EnvironmentEdgeManager.currentTime(), clusterIds, -1, -1, region.getMVCC(), scopes); 469 wal.append(info, logkey, edits, true); 470 region.getMVCC().completeAndWait(logkey.getWriteEntry()); 471 } 472 region.flush(true); 473 // FlushResult.flushSequenceId is not visible here so go get the current sequence id. 474 long currentSequenceId = region.getReadPoint(null); 475 // Now release the appends 476 goslow.set(false); 477 assertTrue(currentSequenceId >= region.getReadPoint(null)); 478 } finally { 479 region.close(true); 480 wal.close(); 481 } 482 } 483 484 @Test 485 public void testSyncNoAppend() throws IOException { 486 String testName = currentTest.getMethodName(); 487 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 488 CONF, null, true, null, null); 489 wal.init(); 490 try { 491 wal.sync(); 492 } finally { 493 wal.close(); 494 } 495 } 496 497 @Test 498 public void testWriteEntryCanBeNull() throws IOException { 499 String testName = currentTest.getMethodName(); 500 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 501 CONF, null, true, null, null); 502 wal.close(); 503 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 504 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 505 RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); 506 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 507 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 508 for (byte[] fam : td.getColumnFamilyNames()) { 509 scopes.put(fam, 0); 510 } 511 long timestamp = EnvironmentEdgeManager.currentTime(); 512 byte[] row = Bytes.toBytes("row"); 513 WALEdit cols = new WALEdit(); 514 cols.add(new KeyValue(row, row, row, timestamp, row)); 515 WALKeyImpl key = 516 new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID, 517 timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); 518 try { 519 wal.append(ri, key, cols, true); 520 fail("Should fail since the wal has already been closed"); 521 } catch (IOException e) { 522 // expected 523 assertThat(e.getMessage(), containsString("log is closed")); 524 // the WriteEntry should be null since we fail before setting it. 525 assertNull(key.getWriteEntry()); 526 } 527 } 528 529 private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend, 530 CountDownLatch holdAppend) throws IOException { 531 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName, 532 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 533 wal.init(); 534 wal.registerWALActionsListener(new WALActionsListener() { 535 @Override 536 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 537 if (startHoldingForAppend.get()) { 538 try { 539 holdAppend.await(); 540 } catch (InterruptedException e) { 541 LOG.error(e.toString(), e); 542 } 543 } 544 } 545 }); 546 return wal; 547 } 548 549 private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal) 550 throws IOException { 551 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 552 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 553 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 554 TEST_UTIL.createLocalHRegion(hri, CONF, htd, wal).close(); 555 RegionServerServices rsServices = mock(RegionServerServices.class); 556 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); 557 when(rsServices.getConfiguration()).thenReturn(conf); 558 return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null); 559 } 560 561 private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put, 562 Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend, 563 CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend) 564 throws InterruptedException, IOException { 565 // do a regular write first because of memstore size calculation. 566 region.put(put); 567 568 startHoldingForAppend.set(true); 569 region.put(new Put(put).setDurability(Durability.ASYNC_WAL)); 570 571 // give the put a chance to start 572 Threads.sleep(3000); 573 574 exec.submit(flushOrCloseRegion); 575 576 // give the flush a chance to start. Flush should have got the region lock, and 577 // should have been waiting on the mvcc complete after this. 578 Threads.sleep(3000); 579 580 // let the append to WAL go through now that the flush already started 581 holdAppend.countDown(); 582 flushOrCloseFinished.await(); 583 } 584 585 // Testcase for HBASE-23181 586 @Test 587 public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { 588 String testName = currentTest.getMethodName(); 589 byte[] b = Bytes.toBytes("b"); 590 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 591 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 592 593 AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 594 CountDownLatch holdAppend = new CountDownLatch(1); 595 CountDownLatch closeFinished = new CountDownLatch(1); 596 ExecutorService exec = Executors.newFixedThreadPool(1); 597 AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend); 598 // open a new region which uses this WAL 599 HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal); 600 try { 601 doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> { 602 try { 603 Map<?, ?> closeResult = region.close(); 604 LOG.info("Close result:" + closeResult); 605 closeFinished.countDown(); 606 } catch (IOException e) { 607 LOG.error(e.toString(), e); 608 } 609 }, startHoldingForAppend, closeFinished, holdAppend); 610 611 // now check the region's unflushed seqIds. 612 long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); 613 assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM, 614 seqId); 615 } finally { 616 exec.shutdownNow(); 617 region.close(); 618 wal.close(); 619 } 620 } 621 622 private static final Set<byte[]> STORES_TO_FLUSH = 623 Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR)); 624 625 // Testcase for HBASE-23157 626 @Test 627 public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException { 628 String testName = currentTest.getMethodName(); 629 byte[] a = Bytes.toBytes("a"); 630 byte[] b = Bytes.toBytes("b"); 631 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 632 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(a)) 633 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 634 635 AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 636 CountDownLatch holdAppend = new CountDownLatch(1); 637 CountDownLatch flushFinished = new CountDownLatch(1); 638 ExecutorService exec = Executors.newFixedThreadPool(2); 639 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 640 conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class, 641 FlushPolicy.class); 642 AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend); 643 // open a new region which uses this WAL 644 HRegion region = createHoldingHRegion(conf, htd, wal); 645 try { 646 Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b); 647 doPutWithAsyncWAL(exec, region, put, () -> { 648 try { 649 HRegion.FlushResult flushResult = region.flush(true); 650 LOG.info("Flush result:" + flushResult.getResult()); 651 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); 652 flushFinished.countDown(); 653 } catch (IOException e) { 654 LOG.error(e.toString(), e); 655 } 656 }, startHoldingForAppend, flushFinished, holdAppend); 657 658 // get the max flushed sequence id after the first flush 659 long maxFlushedSeqId1 = region.getMaxFlushedSeqId(); 660 661 region.put(put); 662 // this time we only flush family a 663 STORES_TO_FLUSH.add(a); 664 region.flush(false); 665 666 // get the max flushed sequence id after the second flush 667 long maxFlushedSeqId2 = region.getMaxFlushedSeqId(); 668 // make sure that the maxFlushedSequenceId does not go backwards 669 assertTrue( 670 "maxFlushedSeqId1(" + maxFlushedSeqId1 671 + ") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")", 672 maxFlushedSeqId1 <= maxFlushedSeqId2); 673 } finally { 674 exec.shutdownNow(); 675 region.close(); 676 wal.close(); 677 } 678 } 679 680 public static final class FlushSpecificStoresPolicy extends FlushPolicy { 681 682 @Override 683 public Collection<HStore> selectStoresToFlush() { 684 if (STORES_TO_FLUSH.isEmpty()) { 685 return region.getStores(); 686 } else { 687 return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList()); 688 } 689 } 690 } 691}