001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertNotEquals; 024import static org.junit.jupiter.api.Assertions.assertNotNull; 025import static org.junit.jupiter.api.Assertions.assertNull; 026import static org.junit.jupiter.api.Assertions.assertThrows; 027import static org.junit.jupiter.api.Assertions.assertTrue; 028import static org.junit.jupiter.api.Assertions.fail; 029import static org.mockito.Mockito.mock; 030import static org.mockito.Mockito.when; 031 032import java.io.IOException; 033import java.util.ArrayList; 034import java.util.Collection; 035import java.util.Collections; 036import java.util.Comparator; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.NavigableMap; 041import java.util.Set; 042import java.util.TreeMap; 043import java.util.UUID; 044import java.util.concurrent.ConcurrentSkipListMap; 045import java.util.concurrent.CountDownLatch; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Executors; 048import java.util.concurrent.atomic.AtomicBoolean; 049import java.util.stream.Collectors; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.hbase.Coprocessor; 055import org.apache.hadoop.hbase.ExtendedCellScanner; 056import org.apache.hadoop.hbase.HBaseConfiguration; 057import org.apache.hadoop.hbase.HBaseTestingUtil; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.ServerName; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 064import org.apache.hadoop.hbase.client.Durability; 065import org.apache.hadoop.hbase.client.Get; 066import org.apache.hadoop.hbase.client.Put; 067import org.apache.hadoop.hbase.client.RegionInfo; 068import org.apache.hadoop.hbase.client.RegionInfoBuilder; 069import org.apache.hadoop.hbase.client.Result; 070import org.apache.hadoop.hbase.client.TableDescriptor; 071import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 072import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 073import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; 074import org.apache.hadoop.hbase.regionserver.ChunkCreator; 075import org.apache.hadoop.hbase.regionserver.FlushPolicy; 076import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; 077import org.apache.hadoop.hbase.regionserver.HRegion; 078import org.apache.hadoop.hbase.regionserver.HStore; 079import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 080import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 081import org.apache.hadoop.hbase.regionserver.RegionServerServices; 082import org.apache.hadoop.hbase.regionserver.SequenceId; 083import org.apache.hadoop.hbase.util.Bytes; 084import org.apache.hadoop.hbase.util.CommonFSUtils; 085import org.apache.hadoop.hbase.util.EnvironmentEdge; 086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 087import org.apache.hadoop.hbase.util.Threads; 088import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 089import org.apache.hadoop.hbase.wal.WAL; 090import org.apache.hadoop.hbase.wal.WALEdit; 091import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 092import org.apache.hadoop.hbase.wal.WALKey; 093import org.apache.hadoop.hbase.wal.WALKeyImpl; 094import org.junit.jupiter.api.AfterAll; 095import org.junit.jupiter.api.BeforeAll; 096import org.junit.jupiter.api.BeforeEach; 097import org.junit.jupiter.api.Test; 098import org.junit.jupiter.api.TestInfo; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101 102public abstract class AbstractTestFSWAL { 103 104 protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFSWAL.class); 105 106 protected static Configuration CONF; 107 protected static FileSystem FS; 108 protected static Path DIR; 109 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 110 111 protected String testName; 112 113 @BeforeEach 114 public void setUp(TestInfo testInfo) throws Exception { 115 testName = testInfo.getTestMethod().get().getName(); 116 FileStatus[] entries = FS.listStatus(new Path("/")); 117 for (FileStatus dir : entries) { 118 FS.delete(dir.getPath(), true); 119 } 120 final Path hbaseDir = TEST_UTIL.createRootDir(); 121 final Path hbaseWALDir = TEST_UTIL.createWALRootDir(); 122 DIR = new Path(hbaseWALDir, testName); 123 assertNotEquals(hbaseDir, hbaseWALDir); 124 FS.mkdirs(DIR); 125 } 126 127 @BeforeAll 128 public static void setUpBeforeClass() throws Exception { 129 // Make block sizes small. 130 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); 131 // quicker heartbeat interval for faster DN death notification 132 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); 133 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); 134 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); 135 136 // faster failover with cluster.shutdown();fs.close() idiom 137 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); 138 TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); 139 TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); 140 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 141 SampleRegionWALCoprocessor.class.getName()); 142 TEST_UTIL.startMiniDFSCluster(3); 143 144 CONF = TEST_UTIL.getConfiguration(); 145 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 146 } 147 148 @AfterAll 149 public static void tearDownAfterClass() throws Exception { 150 TEST_UTIL.shutdownMiniCluster(); 151 } 152 153 protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String WALDir, 154 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 155 boolean failIfWALExists, String prefix, String suffix) throws IOException; 156 157 protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String WALDir, 158 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 159 boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException; 160 161 /** 162 * A loaded WAL coprocessor won't break existing WAL test cases. 163 */ 164 @Test 165 public void testWALCoprocessorLoaded() throws Exception { 166 // test to see whether the coprocessor is loaded or not. 167 AbstractFSWAL<?> wal = null; 168 try { 169 wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 170 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 171 WALCoprocessorHost host = wal.getCoprocessorHost(); 172 Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class); 173 assertNotNull(c); 174 } finally { 175 if (wal != null) { 176 wal.close(); 177 } 178 } 179 } 180 181 protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, 182 MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes, String cf) 183 throws IOException { 184 final byte[] row = Bytes.toBytes(cf); 185 for (int i = 0; i < times; i++) { 186 long timestamp = EnvironmentEdgeManager.currentTime(); 187 WALEdit cols = new WALEdit(); 188 WALEditInternalHelper.addExtendedCell(cols, new KeyValue(row, row, row, timestamp, row)); 189 WALKeyImpl key = 190 new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), SequenceId.NO_SEQUENCE_ID, 191 timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); 192 log.appendData(hri, key, cols); 193 } 194 log.sync(); 195 } 196 197 /** 198 * helper method to simulate region flush for a WAL. 199 */ 200 protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) { 201 wal.startCacheFlush(regionEncodedName, flushedFamilyNames); 202 wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM); 203 } 204 205 /** 206 * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws 207 * exception if we do). Comparison is based on the timestamp present in the wal name. 208 */ 209 @Test 210 public void testWALComparator() throws Exception { 211 AbstractFSWAL<?> wal1 = null; 212 AbstractFSWAL<?> walMeta = null; 213 try { 214 wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 215 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 216 LOG.debug("Log obtained is: " + wal1); 217 Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR; 218 Path p1 = wal1.computeFilename(11); 219 Path p2 = wal1.computeFilename(12); 220 // comparing with itself returns 0 221 assertTrue(comp.compare(p1, p1) == 0); 222 // comparing with different filenum. 223 assertTrue(comp.compare(p1, p2) < 0); 224 walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 225 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, 226 AbstractFSWALProvider.META_WAL_PROVIDER_ID); 227 Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR; 228 229 Path p1WithMeta = walMeta.computeFilename(11); 230 Path p2WithMeta = walMeta.computeFilename(12); 231 assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0); 232 assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0); 233 // mixing meta and non-meta logs gives error 234 boolean ex = false; 235 try { 236 comp.compare(p1WithMeta, p2); 237 } catch (IllegalArgumentException e) { 238 ex = true; 239 } 240 assertTrue(ex, "Comparator doesn't complain while checking meta log files"); 241 boolean exMeta = false; 242 try { 243 compMeta.compare(p1WithMeta, p2); 244 } catch (IllegalArgumentException e) { 245 exMeta = true; 246 } 247 assertTrue(exMeta, "Meta comparator doesn't complain while checking log files"); 248 } finally { 249 if (wal1 != null) { 250 wal1.close(); 251 } 252 if (walMeta != null) { 253 walMeta.close(); 254 } 255 } 256 } 257 258 // now we will close asynchronously and will not archive a wal file unless it is fully closed, so 259 // sometimes we need to wait a bit before asserting, especially when you want to test the removal 260 // of numRolledLogFiles 261 private void waitNumRolledLogFiles(AbstractFSWAL<?> wal, int expected) { 262 TEST_UTIL.waitFor(5000, () -> wal.getNumRolledLogFiles() == expected); 263 } 264 265 private void testFindMemStoresEligibleForFlush(AbstractFSWAL<?> wal) throws IOException { 266 String cf1 = "cf1"; 267 String cf2 = "cf2"; 268 String cf3 = "cf3"; 269 TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1")) 270 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build(); 271 TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2")) 272 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build(); 273 RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build(); 274 RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build(); 275 276 List<ColumnFamilyDescriptor> cfs = new ArrayList<>(); 277 cfs.add(ColumnFamilyDescriptorBuilder.of(cf1)); 278 cfs.add(ColumnFamilyDescriptorBuilder.of(cf2)); 279 TableDescriptor t3 = 280 TableDescriptorBuilder.newBuilder(TableName.valueOf("t3")).setColumnFamilies(cfs).build(); 281 RegionInfo hri3 = RegionInfoBuilder.newBuilder(t3.getTableName()).build(); 282 283 // add edits and roll the wal 284 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 285 NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 286 for (byte[] fam : t1.getColumnFamilyNames()) { 287 scopes1.put(fam, 0); 288 } 289 NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 290 for (byte[] fam : t2.getColumnFamilyNames()) { 291 scopes2.put(fam, 0); 292 } 293 NavigableMap<byte[], Integer> scopes3 = new TreeMap<>(Bytes.BYTES_COMPARATOR); 294 for (byte[] fam : t3.getColumnFamilyNames()) { 295 scopes3.put(fam, 0); 296 } 297 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 298 wal.rollWriter(); 299 // add some more edits and roll the wal. This would reach the log number threshold 300 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 301 wal.rollWriter(); 302 // with above rollWriter call, the max logs limit is reached. 303 waitNumRolledLogFiles(wal, 2); 304 305 // get the regions to flush; since there is only one region in the oldest wal, it should 306 // return only one region. 307 Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush(); 308 assertEquals(1, regionsToFlush.size()); 309 assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); 310 // insert edits in second region 311 addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); 312 // get the regions to flush, it should still read region1. 313 regionsToFlush = wal.findRegionsToForceFlush(); 314 assertEquals(1, regionsToFlush.size()); 315 assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); 316 // flush region 1, and roll the wal file. Only last wal which has entries for region1 should 317 // remain. 318 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 319 wal.rollWriter(); 320 // only one wal should remain now (that is for the second region). 321 waitNumRolledLogFiles(wal, 1); 322 // flush the second region 323 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); 324 wal.rollWriter(true); 325 // no wal should remain now. 326 waitNumRolledLogFiles(wal, 0); 327 // add edits both to region 1 and region 2, and roll. 328 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 329 addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); 330 wal.rollWriter(); 331 // add edits and roll the writer, to reach the max logs limit. 332 waitNumRolledLogFiles(wal, 1); 333 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 334 wal.rollWriter(); 335 // it should return two regions to flush, as the oldest wal file has entries 336 // for both regions. 337 regionsToFlush = wal.findRegionsToForceFlush(); 338 assertEquals(2, regionsToFlush.size()); 339 // flush both regions 340 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 341 flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); 342 wal.rollWriter(true); 343 waitNumRolledLogFiles(wal, 0); 344 // Add an edit to region1, and roll the wal. 345 addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); 346 // tests partial flush: roll on a partial flush, and ensure that wal is not archived. 347 wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 348 wal.rollWriter(); 349 wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 350 waitNumRolledLogFiles(wal, 1); 351 352 // clear test data 353 flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); 354 wal.rollWriter(true); 355 // add edits for three familes 356 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); 357 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2); 358 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3); 359 wal.rollWriter(); 360 addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); 361 wal.rollWriter(); 362 waitNumRolledLogFiles(wal, 2); 363 // flush one family before archive oldest wal 364 Set<byte[]> flushedFamilyNames = new HashSet<>(); 365 flushedFamilyNames.add(Bytes.toBytes(cf1)); 366 flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames); 367 regionsToFlush = wal.findRegionsToForceFlush(); 368 // then only two family need to be flushed when archive oldest wal 369 assertEquals(1, regionsToFlush.size()); 370 assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); 371 assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size()); 372 } 373 374 /** 375 * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of 376 * regions which should be flushed in order to archive the oldest wal file. 377 * <p> 378 * This method tests this behavior by inserting edits and rolling the wal enough times to reach 379 * the max number of logs threshold. It checks whether we get the "right regions and stores" for 380 * flush on rolling the wal. 381 */ 382 @Test 383 public void testFindMemStoresEligibleForFlush() throws Exception { 384 LOG.debug("testFindMemStoresEligibleForFlush"); 385 Configuration conf1 = HBaseConfiguration.create(CONF); 386 conf1.setInt("hbase.regionserver.maxlogs", 1); 387 try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(), 388 HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null)) { 389 testFindMemStoresEligibleForFlush(wal); 390 } 391 392 } 393 394 @Test 395 public void testFailedToCreateWALIfParentRenamed() 396 throws IOException, CommonFSUtils.StreamLacksCapabilityException { 397 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), testName, 398 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 399 long filenum = EnvironmentEdgeManager.currentTime(); 400 Path path = wal.computeFilename(filenum); 401 wal.createWriterInstance(FS, path); 402 Path parent = path.getParent(); 403 Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting"); 404 FS.rename(parent, newPath); 405 Path nextPath = wal.computeFilename(filenum + 1); 406 assertThrows(IOException.class, () -> wal.createWriterInstance(FS, nextPath)); 407 } 408 409 /** 410 * Test flush for sure has a sequence id that is beyond the last edit appended. We do this by 411 * slowing appends in the background ring buffer thread while in foreground we call flush. The 412 * addition of the sync over HRegion in flush should fix an issue where flush was returning before 413 * all of its appends had made it out to the WAL (HBASE-11109). 414 * @see <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a> 415 */ 416 @Test 417 public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException { 418 final TableName tableName = TableName.valueOf(testName); 419 final RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); 420 final byte[] rowName = tableName.getName(); 421 final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 422 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build(); 423 HRegion r = HBaseTestingUtil.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(), 424 TEST_UTIL.getConfiguration(), htd); 425 HBaseTestingUtil.closeRegionAndWAL(r); 426 final int countPerFamily = 10; 427 final AtomicBoolean goslow = new AtomicBoolean(false); 428 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 429 for (byte[] fam : htd.getColumnFamilyNames()) { 430 scopes.put(fam, 0); 431 } 432 // subclass and doctor a method. 433 AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), 434 testName, CONF, null, true, null, null, new Runnable() { 435 436 @Override 437 public void run() { 438 if (goslow.get()) { 439 Threads.sleep(100); 440 LOG.debug("Sleeping before appending 100ms"); 441 } 442 } 443 }); 444 HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(), 445 TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal); 446 EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 447 try { 448 List<Put> puts = null; 449 for (byte[] fam : htd.getColumnFamilyNames()) { 450 puts = TestWALReplay.addRegionEdits(rowName, fam, countPerFamily, ee, region, "x"); 451 } 452 453 // Now assert edits made it in. 454 final Get g = new Get(rowName); 455 Result result = region.get(g); 456 assertEquals(countPerFamily * htd.getColumnFamilyNames().size(), result.size()); 457 458 // Construct a WALEdit and add it a few times to the WAL. 459 WALEdit edits = new WALEdit(); 460 for (Put p : puts) { 461 ExtendedCellScanner cs = p.cellScanner(); 462 while (cs.advance()) { 463 WALEditInternalHelper.addExtendedCell(edits, cs.current()); 464 } 465 } 466 // Add any old cluster id. 467 List<UUID> clusterIds = new ArrayList<>(1); 468 clusterIds.add(TEST_UTIL.getRandomUUID()); 469 // Now make appends run slow. 470 goslow.set(true); 471 for (int i = 0; i < countPerFamily; i++) { 472 final RegionInfo info = region.getRegionInfo(); 473 final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 474 EnvironmentEdgeManager.currentTime(), clusterIds, -1, -1, region.getMVCC(), scopes); 475 wal.append(info, logkey, edits, true); 476 region.getMVCC().completeAndWait(logkey.getWriteEntry()); 477 } 478 region.flush(true); 479 // FlushResult.flushSequenceId is not visible here so go get the current sequence id. 480 long currentSequenceId = region.getReadPoint(null); 481 // Now release the appends 482 goslow.set(false); 483 assertTrue(currentSequenceId >= region.getReadPoint(null)); 484 } finally { 485 region.close(true); 486 wal.close(); 487 } 488 } 489 490 @Test 491 public void testSyncNoAppend() throws IOException { 492 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 493 CONF, null, true, null, null); 494 try { 495 wal.sync(); 496 } finally { 497 wal.close(); 498 } 499 } 500 501 @Test 502 public void testWriteEntryCanBeNull() throws IOException { 503 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 504 CONF, null, true, null, null); 505 wal.close(); 506 TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 507 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); 508 RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); 509 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 510 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 511 for (byte[] fam : td.getColumnFamilyNames()) { 512 scopes.put(fam, 0); 513 } 514 long timestamp = EnvironmentEdgeManager.currentTime(); 515 byte[] row = Bytes.toBytes("row"); 516 WALEdit cols = new WALEdit(); 517 WALEditInternalHelper.addExtendedCell(cols, new KeyValue(row, row, row, timestamp, row)); 518 WALKeyImpl key = 519 new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID, 520 timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); 521 try { 522 wal.append(ri, key, cols, true); 523 fail("Should fail since the wal has already been closed"); 524 } catch (IOException e) { 525 // expected 526 assertThat(e.getMessage(), containsString("log is closed")); 527 // the WriteEntry should be null since we fail before setting it. 528 assertNull(key.getWriteEntry()); 529 } 530 } 531 532 @Test 533 public void testRollWriterForClosedWAL() throws IOException { 534 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, 535 CONF, null, true, null, null); 536 wal.close(); 537 assertThrows(WALClosedException.class, () -> wal.rollWriter()); 538 } 539 540 private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend, 541 CountDownLatch holdAppend) throws IOException { 542 FS.mkdirs(new Path(CommonFSUtils.getRootDir(CONF), testName)); 543 AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName, 544 HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); 545 // newWAL has already called wal.init() 546 wal.registerWALActionsListener(new WALActionsListener() { 547 @Override 548 public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { 549 if (startHoldingForAppend.get()) { 550 try { 551 holdAppend.await(); 552 } catch (InterruptedException e) { 553 LOG.error(e.toString(), e); 554 } 555 } 556 } 557 }); 558 return wal; 559 } 560 561 private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal) 562 throws IOException { 563 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); 564 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 565 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 566 TEST_UTIL.createLocalHRegion(hri, CONF, htd, wal).close(); 567 RegionServerServices rsServices = mock(RegionServerServices.class); 568 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); 569 when(rsServices.getConfiguration()).thenReturn(conf); 570 return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null); 571 } 572 573 private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put, 574 Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend, 575 CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend) 576 throws InterruptedException, IOException { 577 // do a regular write first because of memstore size calculation. 578 region.put(put); 579 580 startHoldingForAppend.set(true); 581 region.put(new Put(put).setDurability(Durability.ASYNC_WAL)); 582 583 // give the put a chance to start 584 Threads.sleep(3000); 585 586 exec.submit(flushOrCloseRegion); 587 588 // give the flush a chance to start. Flush should have got the region lock, and 589 // should have been waiting on the mvcc complete after this. 590 Threads.sleep(3000); 591 592 // let the append to WAL go through now that the flush already started 593 holdAppend.countDown(); 594 flushOrCloseFinished.await(); 595 } 596 597 // Testcase for HBASE-23181 598 @Test 599 public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { 600 byte[] b = Bytes.toBytes("b"); 601 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 602 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 603 604 AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 605 CountDownLatch holdAppend = new CountDownLatch(1); 606 CountDownLatch closeFinished = new CountDownLatch(1); 607 ExecutorService exec = Executors.newFixedThreadPool(1); 608 AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend); 609 // open a new region which uses this WAL 610 HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal); 611 try { 612 doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> { 613 try { 614 Map<?, ?> closeResult = region.close(); 615 LOG.info("Close result:" + closeResult); 616 closeFinished.countDown(); 617 } catch (IOException e) { 618 LOG.error(e.toString(), e); 619 } 620 }, startHoldingForAppend, closeFinished, holdAppend); 621 622 // now check the region's unflushed seqIds. 623 long seqId = getEarliestMemStoreSeqNum(wal, region.getRegionInfo().getEncodedNameAsBytes()); 624 assertEquals(HConstants.NO_SEQNUM, seqId, 625 "Found seqId for the region which is already closed"); 626 } finally { 627 exec.shutdownNow(); 628 region.close(); 629 wal.close(); 630 } 631 } 632 633 public static long getEarliestMemStoreSeqNum(WAL wal, byte[] encodedRegionName) { 634 if (wal != null) { 635 if (wal instanceof AbstractFSWAL) { 636 return ((AbstractFSWAL<?>) wal).getSequenceIdAccounting() 637 .getLowestSequenceId(encodedRegionName); 638 } 639 } 640 return HConstants.NO_SEQNUM; 641 } 642 643 private static final Set<byte[]> STORES_TO_FLUSH = 644 Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR)); 645 646 // Testcase for HBASE-23157 647 @Test 648 public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException { 649 byte[] a = Bytes.toBytes("a"); 650 byte[] b = Bytes.toBytes("b"); 651 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) 652 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(a)) 653 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); 654 655 AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); 656 CountDownLatch holdAppend = new CountDownLatch(1); 657 CountDownLatch flushFinished = new CountDownLatch(1); 658 ExecutorService exec = Executors.newFixedThreadPool(2); 659 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 660 conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class, 661 FlushPolicy.class); 662 AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend); 663 // open a new region which uses this WAL 664 HRegion region = createHoldingHRegion(conf, htd, wal); 665 try { 666 Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b); 667 doPutWithAsyncWAL(exec, region, put, () -> { 668 try { 669 HRegion.FlushResult flushResult = region.flush(true); 670 LOG.info("Flush result:" + flushResult.getResult()); 671 LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); 672 flushFinished.countDown(); 673 } catch (IOException e) { 674 LOG.error(e.toString(), e); 675 } 676 }, startHoldingForAppend, flushFinished, holdAppend); 677 678 // get the max flushed sequence id after the first flush 679 long maxFlushedSeqId1 = region.getMaxFlushedSeqId(); 680 681 region.put(put); 682 // this time we only flush family a 683 STORES_TO_FLUSH.add(a); 684 region.flush(false); 685 686 // get the max flushed sequence id after the second flush 687 long maxFlushedSeqId2 = region.getMaxFlushedSeqId(); 688 // make sure that the maxFlushedSequenceId does not go backwards 689 assertTrue(maxFlushedSeqId1 <= maxFlushedSeqId2, "maxFlushedSeqId1(" + maxFlushedSeqId1 690 + ") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")"); 691 } finally { 692 exec.shutdownNow(); 693 region.close(); 694 wal.close(); 695 } 696 } 697 698 public static final class FlushSpecificStoresPolicy extends FlushPolicy { 699 700 @Override 701 public Collection<HStore> selectStoresToFlush() { 702 if (STORES_TO_FLUSH.isEmpty()) { 703 return region.getStores(); 704 } else { 705 return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList()); 706 } 707 } 708 } 709}