001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile; 020 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 022import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 023import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; 024import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY; 025import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY; 026import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY; 027import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY; 028import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY; 029import static org.junit.Assert.assertEquals; 030 031import java.io.IOException; 032import java.util.Arrays; 033import java.util.Collection; 034import java.util.Random; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.io.ByteBuffAllocator; 045import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 046import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 047import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl; 048import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 049import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache; 050import org.apache.hadoop.hbase.testclassification.IOTests; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.After; 054import org.junit.Assert; 055import org.junit.Before; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.junit.runner.RunWith; 063import org.junit.runners.Parameterized; 064import org.junit.runners.Parameterized.Parameter; 065import org.junit.runners.Parameterized.Parameters; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069@RunWith(Parameterized.class) 070@Category({ IOTests.class, LargeTests.class }) 071public class TestHFileScannerImplReferenceCount { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestHFileScannerImplReferenceCount.class); 076 077 @Rule 078 public TestName CASE = new TestName(); 079 080 @Parameters(name = "{index}: ioengine={0}") 081 public static Collection<Object[]> data() { 082 return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" }, 083 new Object[] { "mmap" }, new Object[] { "pmem" }); 084 } 085 086 @Parameter 087 public String ioengine; 088 089 private static final Logger LOG = 090 LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class); 091 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 092 private static final byte[] FAMILY = Bytes.toBytes("f"); 093 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 094 private static final byte[] SUFFIX = randLongBytes(); 095 private static final int CELL_COUNT = 1000; 096 097 private static byte[] randLongBytes() { 098 Random rand = new Random(); 099 byte[] keys = new byte[30]; 100 rand.nextBytes(keys); 101 return keys; 102 } 103 104 // It's a deep copy of configuration of UTIL, DON'T use shallow copy. 105 private Configuration conf; 106 private Path workDir; 107 private FileSystem fs; 108 private Path hfilePath; 109 private Cell firstCell = null; 110 private Cell secondCell = null; 111 private ByteBuffAllocator allocator; 112 113 @BeforeClass 114 public static void setUpBeforeClass() { 115 Configuration conf = UTIL.getConfiguration(); 116 // Set the max chunk size and min entries key to be very small for index block, so that we can 117 // create an index block tree with level >= 2. 118 conf.setInt(MAX_CHUNK_SIZE_KEY, 10); 119 conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2); 120 // Create a bucket cache with 32MB. 121 conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap"); 122 conf.setInt(BUCKET_CACHE_SIZE_KEY, 32); 123 conf.setInt(BUFFER_SIZE_KEY, 1024); 124 conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024); 125 // All allocated ByteBuff are pooled ByteBuff. 126 conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0); 127 } 128 129 @Before 130 public void setUp() throws IOException { 131 String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"); 132 this.workDir = UTIL.getDataTestDir(caseName); 133 if (!"offheap".equals(ioengine)) { 134 ioengine = ioengine + ":" + workDir.toString() + "/cachedata"; 135 } 136 UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine); 137 this.firstCell = null; 138 this.secondCell = null; 139 this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true); 140 this.conf = new Configuration(UTIL.getConfiguration()); 141 this.fs = this.workDir.getFileSystem(conf); 142 this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis()); 143 LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName); 144 } 145 146 @After 147 public void tearDown() throws IOException { 148 this.allocator.clean(); 149 this.fs.delete(this.workDir, true); 150 } 151 152 private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException { 153 Assert.assertTrue(cache instanceof CombinedBlockCache); 154 BlockCache[] blockCaches = cache.getBlockCaches(); 155 Assert.assertEquals(blockCaches.length, 2); 156 Assert.assertTrue(blockCaches[1] instanceof BucketCache); 157 TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]); 158 } 159 160 private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression, 161 DataBlockEncoding encoding, int cellCount) throws IOException { 162 HFileContext context = 163 new HFileContextBuilder().withBlockSize(1).withDataBlockEncoding(DataBlockEncoding.NONE) 164 .withCompression(compression).withDataBlockEncoding(encoding).build(); 165 try (HFile.Writer writer = 166 new HFile.WriterFactory(conf, new CacheConfig(conf)).withPath(fs, hfilePath) 167 .withFileContext(context).create()) { 168 Random rand = new Random(9713312); // Just a fixed seed. 169 for (int i = 0; i < cellCount; ++i) { 170 byte[] keyBytes = Bytes.add(Bytes.toBytes(i), SUFFIX); 171 172 // A random-length random value. 173 byte[] valueBytes = RandomKeyValueUtil.randomValue(rand); 174 KeyValue keyValue = 175 new KeyValue(keyBytes, FAMILY, QUALIFIER, HConstants.LATEST_TIMESTAMP, valueBytes); 176 if (firstCell == null) { 177 firstCell = keyValue; 178 } else if (secondCell == null) { 179 secondCell = keyValue; 180 } 181 writer.append(keyValue); 182 } 183 } 184 } 185 186 /** 187 * A careful UT for validating the reference count mechanism, if want to change this UT please 188 * read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design. 189 */ 190 private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding) 191 throws Exception { 192 writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT); 193 HFileBlock curBlock, prevBlock; 194 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 195 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 196 Assert.assertNotNull(defaultBC); 197 Assert.assertTrue(cacheConfig.isCombinedBlockCache()); 198 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 199 Assert.assertTrue(reader instanceof HFileReaderImpl); 200 // We've build a HFile tree with index = 16. 201 Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 202 203 HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false); 204 HFileBlock block1 = reader.getDataBlockIndexReader() 205 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, 206 DataBlockEncoding.NONE, reader).getHFileBlock(); 207 waitBucketCacheFlushed(defaultBC); 208 Assert.assertTrue(block1.getBlockType().isData()); 209 Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock); 210 211 HFileBlock block2 = reader.getDataBlockIndexReader() 212 .loadDataBlockWithScanInfo(secondCell, null, true, true, false, 213 DataBlockEncoding.NONE, reader).getHFileBlock(); 214 waitBucketCacheFlushed(defaultBC); 215 Assert.assertTrue(block2.getBlockType().isData()); 216 Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock); 217 // Only one refCnt for RPC path. 218 Assert.assertEquals(block1.refCnt(), 1); 219 Assert.assertEquals(block2.refCnt(), 1); 220 Assert.assertFalse(block1 == block2); 221 222 scanner.seekTo(firstCell); 223 curBlock = scanner.curBlock; 224 this.assertRefCnt(curBlock, 2); 225 226 // Seek to the block again, the curBlock won't change and won't read from BlockCache. so 227 // refCnt should be unchanged. 228 scanner.seekTo(firstCell); 229 Assert.assertTrue(curBlock == scanner.curBlock); 230 this.assertRefCnt(curBlock, 2); 231 prevBlock = curBlock; 232 233 scanner.seekTo(secondCell); 234 curBlock = scanner.curBlock; 235 this.assertRefCnt(prevBlock, 2); 236 this.assertRefCnt(curBlock, 2); 237 238 // After shipped, the prevBlock will be release, but curBlock is still referenced by the 239 // curBlock. 240 scanner.shipped(); 241 this.assertRefCnt(prevBlock, 1); 242 this.assertRefCnt(curBlock, 2); 243 244 // Try to ship again, though with nothing to client. 245 scanner.shipped(); 246 this.assertRefCnt(prevBlock, 1); 247 this.assertRefCnt(curBlock, 2); 248 249 // The curBlock will also be released. 250 scanner.close(); 251 this.assertRefCnt(curBlock, 1); 252 253 // Finish the block & block2 RPC path 254 Assert.assertTrue(block1.release()); 255 Assert.assertTrue(block2.release()); 256 257 // Evict the LRUBlockCache 258 Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2); 259 Assert.assertEquals(prevBlock.refCnt(), 0); 260 Assert.assertEquals(curBlock.refCnt(), 0); 261 262 int count = 0; 263 Assert.assertTrue(scanner.seekTo()); 264 ++count; 265 while (scanner.next()) { 266 count++; 267 } 268 assertEquals(CELL_COUNT, count); 269 } 270 271 /** 272 * See HBASE-22480 273 */ 274 @Test 275 public void testSeekBefore() throws Exception { 276 HFileBlock curBlock, prevBlock; 277 writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT); 278 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 279 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 280 Assert.assertNotNull(defaultBC); 281 Assert.assertTrue(cacheConfig.isCombinedBlockCache()); 282 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 283 Assert.assertTrue(reader instanceof HFileReaderImpl); 284 // We've build a HFile tree with index = 16. 285 Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 286 287 HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false); 288 HFileBlock block1 = reader.getDataBlockIndexReader() 289 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, 290 DataBlockEncoding.NONE, reader).getHFileBlock(); 291 Assert.assertTrue(block1.getBlockType().isData()); 292 Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock); 293 HFileBlock block2 = reader.getDataBlockIndexReader() 294 .loadDataBlockWithScanInfo(secondCell, null, true, true, false, 295 DataBlockEncoding.NONE, reader).getHFileBlock(); 296 Assert.assertTrue(block2.getBlockType().isData()); 297 Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock); 298 // Wait until flushed to IOEngine; 299 waitBucketCacheFlushed(defaultBC); 300 // One RPC reference path. 301 Assert.assertEquals(block1.refCnt(), 1); 302 Assert.assertEquals(block2.refCnt(), 1); 303 304 // Let the curBlock refer to block2. 305 scanner.seekTo(secondCell); 306 curBlock = scanner.curBlock; 307 Assert.assertFalse(curBlock == block2); 308 Assert.assertEquals(1, block2.refCnt()); 309 this.assertRefCnt(curBlock, 2); 310 prevBlock = scanner.curBlock; 311 312 // Release the block1, no other reference. 313 Assert.assertTrue(block1.release()); 314 Assert.assertEquals(0, block1.refCnt()); 315 // Release the block2, no other reference. 316 Assert.assertTrue(block2.release()); 317 Assert.assertEquals(0, block2.refCnt()); 318 319 // Do the seekBefore: the newBlock will be the previous block of curBlock. 320 Assert.assertTrue(scanner.seekBefore(secondCell)); 321 Assert.assertEquals(scanner.prevBlocks.size(), 1); 322 Assert.assertTrue(scanner.prevBlocks.get(0) == prevBlock); 323 curBlock = scanner.curBlock; 324 // the curBlock is read from IOEngine, so a different block. 325 Assert.assertFalse(curBlock == block1); 326 // Two reference for curBlock: 1. scanner; 2. blockCache. 327 this.assertRefCnt(curBlock, 2); 328 // Reference count of prevBlock must be unchanged because we haven't shipped. 329 this.assertRefCnt(prevBlock, 2); 330 331 // Do the shipped 332 scanner.shipped(); 333 Assert.assertEquals(scanner.prevBlocks.size(), 0); 334 Assert.assertNotNull(scanner.curBlock); 335 this.assertRefCnt(curBlock, 2); 336 this.assertRefCnt(prevBlock, 1); 337 338 // Do the close 339 scanner.close(); 340 Assert.assertNull(scanner.curBlock); 341 this.assertRefCnt(curBlock, 1); 342 this.assertRefCnt(prevBlock, 1); 343 344 Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2); 345 Assert.assertEquals(0, curBlock.refCnt()); 346 Assert.assertEquals(0, prevBlock.refCnt()); 347 348 // Reload the block1 again. 349 block1 = reader.getDataBlockIndexReader() 350 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, 351 DataBlockEncoding.NONE, reader).getHFileBlock(); 352 // Wait until flushed to IOEngine; 353 waitBucketCacheFlushed(defaultBC); 354 Assert.assertTrue(block1.getBlockType().isData()); 355 Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock); 356 Assert.assertTrue(block1.release()); 357 Assert.assertEquals(0, block1.refCnt()); 358 // Re-seek to the begin. 359 Assert.assertTrue(scanner.seekTo()); 360 curBlock = scanner.curBlock; 361 Assert.assertFalse(curBlock == block1); 362 this.assertRefCnt(curBlock, 2); 363 // Return false because firstCell <= c[0] 364 Assert.assertFalse(scanner.seekBefore(firstCell)); 365 // The block1 shouldn't be released because we still don't do the shipped or close. 366 this.assertRefCnt(curBlock, 2); 367 368 scanner.close(); 369 this.assertRefCnt(curBlock, 1); 370 Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1); 371 Assert.assertEquals(0, curBlock.refCnt()); 372 } 373 374 private void assertRefCnt(HFileBlock block, int value) { 375 if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) { 376 Assert.assertEquals(value, block.refCnt()); 377 } else { 378 Assert.assertEquals(value - 1, block.refCnt()); 379 } 380 } 381 382 @Test 383 public void testDefault() throws Exception { 384 testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE); 385 } 386 387 @Test 388 public void testCompression() throws Exception { 389 testReleaseBlock(Algorithm.GZ, DataBlockEncoding.NONE); 390 } 391 392 @Test 393 public void testDataBlockEncoding() throws Exception { 394 testReleaseBlock(Algorithm.NONE, DataBlockEncoding.ROW_INDEX_V1); 395 } 396 397 @Test 398 public void testDataBlockEncodingAndCompression() throws Exception { 399 testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1); 400 } 401 402 @Test 403 public void testWithLruBlockCache() throws Exception { 404 HFileBlock curBlock; 405 writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT); 406 // Set LruBlockCache 407 conf.set(BUCKET_CACHE_IOENGINE_KEY, ""); 408 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 409 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 410 Assert.assertNotNull(defaultBC); 411 Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache. 412 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 413 Assert.assertTrue(reader instanceof HFileReaderImpl); 414 // We've build a HFile tree with index = 16. 415 Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 416 417 HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false); 418 HFileBlock block1 = reader.getDataBlockIndexReader() 419 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, 420 DataBlockEncoding.NONE, reader).getHFileBlock(); 421 Assert.assertTrue(block1.getBlockType().isData()); 422 Assert.assertTrue(block1 instanceof ExclusiveMemHFileBlock); 423 HFileBlock block2 = reader.getDataBlockIndexReader() 424 .loadDataBlockWithScanInfo(secondCell, null, true, true, false, 425 DataBlockEncoding.NONE, reader).getHFileBlock(); 426 Assert.assertTrue(block2.getBlockType().isData()); 427 Assert.assertTrue(block2 instanceof ExclusiveMemHFileBlock); 428 // One RPC reference path. 429 Assert.assertEquals(block1.refCnt(), 0); 430 Assert.assertEquals(block2.refCnt(), 0); 431 432 scanner.seekTo(firstCell); 433 curBlock = scanner.curBlock; 434 Assert.assertTrue(curBlock == block1); 435 Assert.assertEquals(curBlock.refCnt(), 0); 436 Assert.assertTrue(scanner.prevBlocks.isEmpty()); 437 438 // Switch to next block 439 scanner.seekTo(secondCell); 440 curBlock = scanner.curBlock; 441 Assert.assertTrue(curBlock == block2); 442 Assert.assertEquals(curBlock.refCnt(), 0); 443 Assert.assertEquals(curBlock.retain().refCnt(), 0); 444 // Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep 445 // in prevBlocks. 446 Assert.assertTrue(scanner.prevBlocks.isEmpty()); 447 448 // close the scanner 449 scanner.close(); 450 Assert.assertNull(scanner.curBlock); 451 Assert.assertTrue(scanner.prevBlocks.isEmpty()); 452 } 453 454 @Test 455 public void testDisabledBlockCache() throws Exception { 456 writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT); 457 // Set LruBlockCache 458 conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); 459 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 460 Assert.assertNull(defaultBC); 461 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 462 Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache. 463 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 464 Assert.assertTrue(reader instanceof HFileReaderImpl); 465 // We've build a HFile tree with index = 16. 466 Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 467 468 HFileBlock block1 = reader.getDataBlockIndexReader() 469 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, 470 DataBlockEncoding.NONE, reader).getHFileBlock(); 471 472 Assert.assertTrue(block1.isSharedMem()); 473 Assert.assertTrue(block1 instanceof SharedMemHFileBlock); 474 Assert.assertEquals(1, block1.refCnt()); 475 Assert.assertTrue(block1.release()); 476 } 477}