001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 022import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; 023import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY; 024import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY; 025import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY; 026import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY; 027import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY; 028import static org.junit.jupiter.api.Assertions.assertEquals; 029import static org.junit.jupiter.api.Assertions.assertFalse; 030import static org.junit.jupiter.api.Assertions.assertNotNull; 031import static org.junit.jupiter.api.Assertions.assertNull; 032import static org.junit.jupiter.api.Assertions.assertTrue; 033 034import java.io.IOException; 035import java.util.Random; 036import java.util.stream.Stream; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.ExtendedCell; 041import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.io.ByteBuffAllocator; 046import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 048import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl; 049import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 050import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache; 051import org.apache.hadoop.hbase.testclassification.IOTests; 052import org.apache.hadoop.hbase.testclassification.LargeTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.junit.jupiter.api.AfterEach; 056import org.junit.jupiter.api.BeforeAll; 057import org.junit.jupiter.api.BeforeEach; 058import org.junit.jupiter.api.Tag; 059import org.junit.jupiter.api.TestInfo; 060import org.junit.jupiter.api.TestTemplate; 061import org.junit.jupiter.params.provider.Arguments; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065@Tag(IOTests.TAG) 066@Tag(LargeTests.TAG) 067@HBaseParameterizedTestTemplate(name = "{index}: ioengine={0}") 068public class TestHFileScannerImplReferenceCount { 069 070 public static Stream<Arguments> parameters() { 071 return Stream.of(Arguments.of("file"), Arguments.of("offheap"), Arguments.of("mmap"), 072 Arguments.of("pmem")); 073 } 074 075 private String ioengine; 076 077 public TestHFileScannerImplReferenceCount(String ioengine) { 078 this.ioengine = ioengine; 079 } 080 081 private static final Logger LOG = 082 LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class); 083 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 084 private static final Random RNG = new Random(9713312); // Just a fixed seed. 085 private static final byte[] FAMILY = Bytes.toBytes("f"); 086 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 087 private static final byte[] SUFFIX = randLongBytes(); 088 private static final int CELL_COUNT = 1000; 089 090 private static byte[] randLongBytes() { 091 byte[] keys = new byte[30]; 092 Bytes.random(keys); 093 return keys; 094 } 095 096 // It's a deep copy of configuration of UTIL, DON'T use shallow copy. 097 private Configuration conf; 098 private Path workDir; 099 private FileSystem fs; 100 private Path hfilePath; 101 private ExtendedCell firstCell = null; 102 private ExtendedCell secondCell = null; 103 private ByteBuffAllocator allocator; 104 105 @BeforeAll 106 public static void setUpBeforeClass() { 107 Configuration conf = UTIL.getConfiguration(); 108 // Set the max chunk size and min entries key to be very small for index block, so that we can 109 // create an index block tree with level >= 2. 110 conf.setInt(MAX_CHUNK_SIZE_KEY, 10); 111 conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2); 112 // Create a bucket cache with 32MB. 113 conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap"); 114 conf.setInt(BUCKET_CACHE_SIZE_KEY, 32); 115 conf.setInt(BUFFER_SIZE_KEY, 1024); 116 conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024); 117 // All allocated ByteBuff are pooled ByteBuff. 118 conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0); 119 } 120 121 @BeforeEach 122 public void setUp(TestInfo testInfo) throws IOException { 123 String caseName = testInfo.getDisplayName().replaceAll("[^a-zA-Z0-9]", "_"); 124 this.workDir = UTIL.getDataTestDir(caseName); 125 if (!"offheap".equals(ioengine)) { 126 ioengine = ioengine + ":" + workDir.toString() + "/cachedata"; 127 } 128 UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine); 129 this.firstCell = null; 130 this.secondCell = null; 131 this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true); 132 this.conf = new Configuration(UTIL.getConfiguration()); 133 this.fs = this.workDir.getFileSystem(conf); 134 this.hfilePath = new Path(this.workDir, caseName + EnvironmentEdgeManager.currentTime()); 135 LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName); 136 } 137 138 @AfterEach 139 public void tearDown() throws IOException { 140 this.allocator.clean(); 141 this.fs.delete(this.workDir, true); 142 } 143 144 private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException { 145 assertTrue(cache instanceof CombinedBlockCache); 146 BlockCache[] blockCaches = cache.getBlockCaches(); 147 assertEquals(blockCaches.length, 2); 148 assertTrue(blockCaches[1] instanceof BucketCache); 149 TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]); 150 } 151 152 private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression, 153 DataBlockEncoding encoding, int cellCount) throws IOException { 154 HFileContext context = 155 new HFileContextBuilder().withBlockSize(1).withDataBlockEncoding(DataBlockEncoding.NONE) 156 .withCompression(compression).withDataBlockEncoding(encoding).build(); 157 try (HFile.Writer writer = new HFile.WriterFactory(conf, new CacheConfig(conf)) 158 .withPath(fs, hfilePath).withFileContext(context).create()) { 159 for (int i = 0; i < cellCount; ++i) { 160 byte[] keyBytes = Bytes.add(Bytes.toBytes(i), SUFFIX); 161 // A random-length random value. 162 byte[] valueBytes = RandomKeyValueUtil.randomValue(RNG); 163 KeyValue keyValue = 164 new KeyValue(keyBytes, FAMILY, QUALIFIER, HConstants.LATEST_TIMESTAMP, valueBytes); 165 if (firstCell == null) { 166 firstCell = keyValue; 167 } else if (secondCell == null) { 168 secondCell = keyValue; 169 } 170 writer.append(keyValue); 171 } 172 } 173 } 174 175 /** 176 * A careful UT for validating the reference count mechanism, if want to change this UT please 177 * read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design. 178 */ 179 private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding) 180 throws Exception { 181 writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT); 182 HFileBlock curBlock, prevBlock; 183 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 184 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 185 assertNotNull(defaultBC); 186 assertTrue(cacheConfig.isCombinedBlockCache()); 187 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 188 assertTrue(reader instanceof HFileReaderImpl); 189 // We've build a HFile tree with index = 16. 190 assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 191 192 HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false); 193 HFileBlock block1 = reader.getDataBlockIndexReader() 194 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) 195 .getHFileBlock(); 196 waitBucketCacheFlushed(defaultBC); 197 assertTrue(block1.getBlockType().isData()); 198 assertFalse(block1 instanceof ExclusiveMemHFileBlock); 199 200 HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null, 201 true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock(); 202 waitBucketCacheFlushed(defaultBC); 203 assertTrue(block2.getBlockType().isData()); 204 assertFalse(block2 instanceof ExclusiveMemHFileBlock); 205 // Only one refCnt for RPC path. 206 assertEquals(block1.refCnt(), 1); 207 assertEquals(block2.refCnt(), 1); 208 assertFalse(block1 == block2); 209 210 scanner.seekTo(firstCell); 211 curBlock = scanner.curBlock; 212 this.assertRefCnt(curBlock, 2); 213 214 // Seek to the block again, the curBlock won't change and won't read from BlockCache. so 215 // refCnt should be unchanged. 216 scanner.seekTo(firstCell); 217 assertTrue(curBlock == scanner.curBlock); 218 this.assertRefCnt(curBlock, 2); 219 prevBlock = curBlock; 220 221 scanner.seekTo(secondCell); 222 curBlock = scanner.curBlock; 223 this.assertRefCnt(prevBlock, 2); 224 this.assertRefCnt(curBlock, 2); 225 226 // After shipped, the prevBlock will be release, but curBlock is still referenced by the 227 // curBlock. 228 scanner.shipped(); 229 this.assertRefCnt(prevBlock, 1); 230 this.assertRefCnt(curBlock, 2); 231 232 // Try to ship again, though with nothing to client. 233 scanner.shipped(); 234 this.assertRefCnt(prevBlock, 1); 235 this.assertRefCnt(curBlock, 2); 236 237 // The curBlock will also be released. 238 scanner.close(); 239 this.assertRefCnt(curBlock, 1); 240 241 // Finish the block & block2 RPC path 242 assertTrue(block1.release()); 243 assertTrue(block2.release()); 244 245 // Evict the LRUBlockCache 246 assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2); 247 assertEquals(prevBlock.refCnt(), 0); 248 assertEquals(curBlock.refCnt(), 0); 249 250 int count = 0; 251 assertTrue(scanner.seekTo()); 252 ++count; 253 while (scanner.next()) { 254 count++; 255 } 256 assertEquals(CELL_COUNT, count); 257 } 258 259 /** 260 * See HBASE-22480 261 */ 262 @TestTemplate 263 public void testSeekBefore() throws Exception { 264 HFileBlock curBlock, prevBlock; 265 writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT); 266 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 267 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 268 assertNotNull(defaultBC); 269 assertTrue(cacheConfig.isCombinedBlockCache()); 270 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 271 assertTrue(reader instanceof HFileReaderImpl); 272 // We've build a HFile tree with index = 16. 273 assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 274 275 HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false); 276 HFileBlock block1 = reader.getDataBlockIndexReader() 277 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) 278 .getHFileBlock(); 279 assertTrue(block1.getBlockType().isData()); 280 assertFalse(block1 instanceof ExclusiveMemHFileBlock); 281 HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null, 282 true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock(); 283 assertTrue(block2.getBlockType().isData()); 284 assertFalse(block2 instanceof ExclusiveMemHFileBlock); 285 // Wait until flushed to IOEngine; 286 waitBucketCacheFlushed(defaultBC); 287 // One RPC reference path. 288 assertEquals(block1.refCnt(), 1); 289 assertEquals(block2.refCnt(), 1); 290 291 // Let the curBlock refer to block2. 292 scanner.seekTo(secondCell); 293 curBlock = scanner.curBlock; 294 assertFalse(curBlock == block2); 295 assertEquals(1, block2.refCnt()); 296 this.assertRefCnt(curBlock, 2); 297 prevBlock = scanner.curBlock; 298 299 // Release the block1, no other reference. 300 assertTrue(block1.release()); 301 assertEquals(0, block1.refCnt()); 302 // Release the block2, no other reference. 303 assertTrue(block2.release()); 304 assertEquals(0, block2.refCnt()); 305 306 // Do the seekBefore: the newBlock will be the previous block of curBlock. 307 assertTrue(scanner.seekBefore(secondCell)); 308 assertEquals(scanner.prevBlocks.size(), 1); 309 assertTrue(scanner.prevBlocks.get(0) == prevBlock); 310 curBlock = scanner.curBlock; 311 // the curBlock is read from IOEngine, so a different block. 312 assertFalse(curBlock == block1); 313 // Two reference for curBlock: 1. scanner; 2. blockCache. 314 this.assertRefCnt(curBlock, 2); 315 // Reference count of prevBlock must be unchanged because we haven't shipped. 316 this.assertRefCnt(prevBlock, 2); 317 318 // Do the shipped 319 scanner.shipped(); 320 assertEquals(scanner.prevBlocks.size(), 0); 321 assertNotNull(scanner.curBlock); 322 this.assertRefCnt(curBlock, 2); 323 this.assertRefCnt(prevBlock, 1); 324 325 // Do the close 326 scanner.close(); 327 assertNull(scanner.curBlock); 328 this.assertRefCnt(curBlock, 1); 329 this.assertRefCnt(prevBlock, 1); 330 331 assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2); 332 assertEquals(0, curBlock.refCnt()); 333 assertEquals(0, prevBlock.refCnt()); 334 335 // Reload the block1 again. 336 block1 = reader.getDataBlockIndexReader() 337 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) 338 .getHFileBlock(); 339 // Wait until flushed to IOEngine; 340 waitBucketCacheFlushed(defaultBC); 341 assertTrue(block1.getBlockType().isData()); 342 assertFalse(block1 instanceof ExclusiveMemHFileBlock); 343 assertTrue(block1.release()); 344 assertEquals(0, block1.refCnt()); 345 // Re-seek to the begin. 346 assertTrue(scanner.seekTo()); 347 curBlock = scanner.curBlock; 348 assertFalse(curBlock == block1); 349 this.assertRefCnt(curBlock, 2); 350 // Return false because firstCell <= c[0] 351 assertFalse(scanner.seekBefore(firstCell)); 352 // The block1 shouldn't be released because we still don't do the shipped or close. 353 this.assertRefCnt(curBlock, 2); 354 355 scanner.close(); 356 this.assertRefCnt(curBlock, 1); 357 assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1); 358 assertEquals(0, curBlock.refCnt()); 359 } 360 361 private void assertRefCnt(HFileBlock block, int value) { 362 if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) { 363 assertEquals(value, block.refCnt()); 364 } else { 365 assertEquals(value - 1, block.refCnt()); 366 } 367 } 368 369 @TestTemplate 370 public void testDefault() throws Exception { 371 testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE); 372 } 373 374 @TestTemplate 375 public void testCompression() throws Exception { 376 testReleaseBlock(Algorithm.GZ, DataBlockEncoding.NONE); 377 } 378 379 @TestTemplate 380 public void testDataBlockEncoding() throws Exception { 381 testReleaseBlock(Algorithm.NONE, DataBlockEncoding.ROW_INDEX_V1); 382 } 383 384 @TestTemplate 385 public void testDataBlockEncodingAndCompression() throws Exception { 386 testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1); 387 } 388 389 @TestTemplate 390 public void testWithLruBlockCache() throws Exception { 391 HFileBlock curBlock; 392 writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT); 393 // Set LruBlockCache 394 conf.set(BUCKET_CACHE_IOENGINE_KEY, ""); 395 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 396 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 397 assertNotNull(defaultBC); 398 assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache. 399 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 400 assertTrue(reader instanceof HFileReaderImpl); 401 // We've build a HFile tree with index = 16. 402 assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 403 404 HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false); 405 HFileBlock block1 = reader.getDataBlockIndexReader() 406 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) 407 .getHFileBlock(); 408 assertTrue(block1.getBlockType().isData()); 409 assertTrue(block1 instanceof ExclusiveMemHFileBlock); 410 HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null, 411 true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock(); 412 assertTrue(block2.getBlockType().isData()); 413 assertTrue(block2 instanceof ExclusiveMemHFileBlock); 414 // One RPC reference path. 415 assertEquals(block1.refCnt(), 0); 416 assertEquals(block2.refCnt(), 0); 417 418 scanner.seekTo(firstCell); 419 curBlock = scanner.curBlock; 420 assertTrue(curBlock == block1); 421 assertEquals(curBlock.refCnt(), 0); 422 assertTrue(scanner.prevBlocks.isEmpty()); 423 424 // Switch to next block 425 scanner.seekTo(secondCell); 426 curBlock = scanner.curBlock; 427 assertTrue(curBlock == block2); 428 assertEquals(curBlock.refCnt(), 0); 429 assertEquals(curBlock.retain().refCnt(), 0); 430 // Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep 431 // in prevBlocks. 432 assertTrue(scanner.prevBlocks.isEmpty()); 433 434 // close the scanner 435 scanner.close(); 436 assertNull(scanner.curBlock); 437 assertTrue(scanner.prevBlocks.isEmpty()); 438 } 439 440 @TestTemplate 441 public void testDisabledBlockCache() throws Exception { 442 writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT); 443 // Set LruBlockCache 444 conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); 445 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 446 assertNull(defaultBC); 447 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 448 assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache. 449 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 450 assertTrue(reader instanceof HFileReaderImpl); 451 // We've build a HFile tree with index = 16. 452 assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 453 454 HFileBlock block1 = reader.getDataBlockIndexReader() 455 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) 456 .getHFileBlock(); 457 458 assertTrue(block1.isSharedMem()); 459 assertTrue(block1 instanceof SharedMemHFileBlock); 460 assertEquals(1, block1.refCnt()); 461 assertTrue(block1.release()); 462 } 463}