001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.when; 027 028import java.io.File; 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.io.ByteBuffAllocator; 045import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 046import org.apache.hadoop.hbase.io.hfile.BlockType; 047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 048import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 049import org.apache.hadoop.hbase.io.hfile.Cacheable; 050import org.apache.hadoop.hbase.io.hfile.HFileBlock; 051import org.apache.hadoop.hbase.io.hfile.HFileContext; 052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 054import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 056import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 057import org.apache.hadoop.hbase.nio.ByteBuff; 058import org.apache.hadoop.hbase.testclassification.IOTests; 059import org.apache.hadoop.hbase.testclassification.LargeTests; 060import org.apache.hadoop.hbase.util.Pair; 061import org.junit.After; 062import org.junit.Assert; 063import org.junit.Before; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.runner.RunWith; 068import org.junit.runners.Parameterized; 069import org.mockito.Mockito; 070 071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 072 073/** 074 * Basic test of BucketCache.Puts and gets. 075 * <p> 076 * Tests will ensure that blocks' data correctness under several threads concurrency 077 */ 078@RunWith(Parameterized.class) 079@Category({ IOTests.class, LargeTests.class }) 080public class TestBucketCache { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestBucketCache.class); 085 086 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 087 public static Iterable<Object[]> data() { 088 return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize 089 // for these tests? 090 { 16 * 1024, 091 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 092 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 093 128 * 1024 + 1024 } } }); 094 } 095 096 @Parameterized.Parameter(0) 097 public int constructedBlockSize; 098 099 @Parameterized.Parameter(1) 100 public int[] constructedBlockSizes; 101 102 BucketCache cache; 103 final int CACHE_SIZE = 1000000; 104 final int NUM_BLOCKS = 100; 105 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 106 final int NUM_THREADS = 100; 107 final int NUM_QUERIES = 10000; 108 109 final long capacitySize = 32 * 1024 * 1024; 110 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 111 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 112 String ioEngineName = "offheap"; 113 String persistencePath = null; 114 115 private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); 116 117 private static class MockedBucketCache extends BucketCache { 118 119 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 120 int writerThreads, int writerQLen, String persistencePath) throws IOException { 121 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 122 persistencePath); 123 super.wait_when_cache = true; 124 } 125 126 @Override 127 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 128 super.cacheBlock(cacheKey, buf, inMemory); 129 } 130 131 @Override 132 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 133 super.cacheBlock(cacheKey, buf); 134 } 135 } 136 137 @Before 138 public void setup() throws IOException { 139 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 140 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 141 } 142 143 @After 144 public void tearDown() { 145 cache.shutdown(); 146 } 147 148 /** 149 * Test Utility to create test dir and return name 150 * @return return name of created dir 151 * @throws IOException throws IOException 152 */ 153 private Path createAndGetTestDir() throws IOException { 154 final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir(); 155 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir); 156 return testDir; 157 } 158 159 /** 160 * Return a random element from {@code a}. 161 */ 162 private static <T> T randFrom(List<T> a) { 163 return a.get(ThreadLocalRandom.current().nextInt(a.size())); 164 } 165 166 @Test 167 public void testBucketAllocator() throws BucketAllocatorException { 168 BucketAllocator mAllocator = cache.getAllocator(); 169 /* 170 * Test the allocator first 171 */ 172 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 173 174 boolean full = false; 175 ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>(); 176 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 177 // the cache is completely filled. 178 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 179 while (!full) { 180 Integer blockSize = null; 181 try { 182 blockSize = randFrom(tmp); 183 allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize)); 184 } catch (CacheFullException cfe) { 185 tmp.remove(blockSize); 186 if (tmp.isEmpty()) full = true; 187 } 188 } 189 190 for (Integer blockSize : BLOCKSIZES) { 191 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 192 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 193 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); 194 195 // we know the block sizes above are multiples of 1024, but default bucket sizes give an 196 // additional 1024 on top of that so this counts towards fragmentation in our test 197 // real life may have worse fragmentation because blocks may not be perfectly sized to block 198 // size, given encoding/compression and large rows 199 assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes()); 200 } 201 202 mAllocator.logDebugStatistics(); 203 204 for (Pair<Long, Integer> allocation : allocations) { 205 assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()), 206 mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond())); 207 } 208 assertEquals(0, mAllocator.getUsedSize()); 209 } 210 211 @Test 212 public void testCacheSimple() throws Exception { 213 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 214 } 215 216 @Test 217 public void testCacheMultiThreadedSingleKey() throws Exception { 218 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 219 } 220 221 @Test 222 public void testHeapSizeChanges() throws Exception { 223 cache.stopWriterThreads(); 224 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 225 } 226 227 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 228 throws InterruptedException { 229 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 230 Thread.sleep(100); 231 } 232 Thread.sleep(1000); 233 } 234 235 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 236 while (!cache.ramCache.isEmpty()) { 237 Thread.sleep(100); 238 } 239 Thread.sleep(1000); 240 } 241 242 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 243 // threads will flush it to the bucket and put reference entry in backingMap. 244 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 245 Cacheable block) throws InterruptedException { 246 cache.cacheBlock(cacheKey, block); 247 waitUntilFlushedToBucket(cache, cacheKey); 248 } 249 250 @Test 251 public void testMemoryLeak() throws Exception { 252 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 253 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 254 new CacheTestUtils.ByteArrayCacheable(new byte[10])); 255 long lockId = cache.backingMap.get(cacheKey).offset(); 256 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 257 lock.writeLock().lock(); 258 Thread evictThread = new Thread("evict-block") { 259 @Override 260 public void run() { 261 cache.evictBlock(cacheKey); 262 } 263 }; 264 evictThread.start(); 265 cache.offsetLock.waitForWaiters(lockId, 1); 266 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); 267 assertEquals(0, cache.getBlockCount()); 268 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 269 new CacheTestUtils.ByteArrayCacheable(new byte[10])); 270 assertEquals(1, cache.getBlockCount()); 271 lock.writeLock().unlock(); 272 evictThread.join(); 273 /** 274 * <pre> 275 * The asserts here before HBASE-21957 are: 276 * assertEquals(1L, cache.getBlockCount()); 277 * assertTrue(cache.getCurrentSize() > 0L); 278 * assertTrue("We should have a block!", cache.iterator().hasNext()); 279 * 280 * The asserts here after HBASE-21957 are: 281 * assertEquals(0, cache.getBlockCount()); 282 * assertEquals(cache.getCurrentSize(), 0L); 283 * 284 * I think the asserts before HBASE-21957 is more reasonable,because 285 * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry} 286 * it had seen, and newly added Block after the {@link BucketEntry} 287 * it had seen should not be evicted. 288 * </pre> 289 */ 290 assertEquals(1L, cache.getBlockCount()); 291 assertTrue(cache.getCurrentSize() > 0L); 292 assertTrue("We should have a block!", cache.iterator().hasNext()); 293 } 294 295 @Test 296 public void testRetrieveFromFile() throws Exception { 297 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 298 Path testDir = TEST_UTIL.getDataTestDir(); 299 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 300 301 String ioEngineName = "file:" + testDir + "/bucket.cache"; 302 String persistencePath = testDir + "/bucket.persistence"; 303 304 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 305 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 306 long usedSize = bucketCache.getAllocator().getUsedSize(); 307 assertEquals(0, usedSize); 308 309 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 310 // Add blocks 311 for (HFileBlockPair block : blocks) { 312 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 313 } 314 for (HFileBlockPair block : blocks) { 315 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 316 } 317 usedSize = bucketCache.getAllocator().getUsedSize(); 318 assertNotEquals(0, usedSize); 319 // persist cache to file 320 bucketCache.shutdown(); 321 assertTrue(new File(persistencePath).exists()); 322 323 // restore cache from file 324 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 325 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 326 assertFalse(new File(persistencePath).exists()); 327 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 328 // persist cache to file 329 bucketCache.shutdown(); 330 assertTrue(new File(persistencePath).exists()); 331 332 // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k) 333 // so it can't restore cache from file 334 int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 }; 335 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 336 smallBucketSizes, writeThreads, writerQLen, persistencePath); 337 assertFalse(new File(persistencePath).exists()); 338 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 339 assertEquals(0, bucketCache.backingMap.size()); 340 341 TEST_UTIL.cleanupTestDir(); 342 } 343 344 @Test 345 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 346 long availableSpace = 20 * 1024L * 1024 * 1024; 347 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; 348 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 349 assertTrue(allocator.getBuckets().length > 0); 350 } 351 352 @Test 353 public void testGetPartitionSize() throws IOException { 354 // Test default values 355 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, 356 BucketCache.DEFAULT_MIN_FACTOR); 357 358 Configuration conf = HBaseConfiguration.create(); 359 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 360 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 361 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 362 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 363 364 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 365 constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); 366 367 validateGetPartitionSize(cache, 0.1f, 0.5f); 368 validateGetPartitionSize(cache, 0.7f, 0.5f); 369 validateGetPartitionSize(cache, 0.2f, 0.5f); 370 } 371 372 @Test 373 public void testValidBucketCacheConfigs() throws IOException { 374 Configuration conf = HBaseConfiguration.create(); 375 conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 376 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 377 conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 378 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 379 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 380 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 381 382 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 383 constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); 384 385 assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, 386 cache.getAcceptableFactor(), 0); 387 assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 388 cache.getMinFactor(), 0); 389 assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 390 cache.getExtraFreeFactor(), 0); 391 assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, 392 cache.getSingleFactor(), 0); 393 assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, 394 cache.getMultiFactor(), 0); 395 assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, 396 cache.getMemoryFactor(), 0); 397 } 398 399 @Test 400 public void testInvalidAcceptFactorConfig() throws IOException { 401 float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; 402 boolean[] expectedOutcomes = { false, false, true, false }; 403 Map<String, float[]> configMappings = 404 ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues); 405 Configuration conf = HBaseConfiguration.create(); 406 checkConfigValues(conf, configMappings, expectedOutcomes); 407 } 408 409 @Test 410 public void testInvalidMinFactorConfig() throws IOException { 411 float[] configValues = { -1f, 0f, 0.96f, 1.05f }; 412 // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 413 boolean[] expectedOutcomes = { false, true, false, false }; 414 Map<String, float[]> configMappings = 415 ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues); 416 Configuration conf = HBaseConfiguration.create(); 417 checkConfigValues(conf, configMappings, expectedOutcomes); 418 } 419 420 @Test 421 public void testInvalidExtraFreeFactorConfig() throws IOException { 422 float[] configValues = { -1f, 0f, 0.2f, 1.05f }; 423 // throws due to <0, in expected range, in expected range, config can be > 1.0 424 boolean[] expectedOutcomes = { false, true, true, true }; 425 Map<String, float[]> configMappings = 426 ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 427 Configuration conf = HBaseConfiguration.create(); 428 checkConfigValues(conf, configMappings, expectedOutcomes); 429 } 430 431 @Test 432 public void testInvalidCacheSplitFactorConfig() throws IOException { 433 float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f }; 434 float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f }; 435 float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f }; 436 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't 437 // be negative, configs don't add to 1.0 438 boolean[] expectedOutcomes = { true, false, false, false }; 439 Map<String, 440 float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 441 singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, 442 BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues); 443 Configuration conf = HBaseConfiguration.create(); 444 checkConfigValues(conf, configMappings, expectedOutcomes); 445 } 446 447 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, 448 boolean[] expectSuccess) throws IOException { 449 Set<String> configNames = configMap.keySet(); 450 for (int i = 0; i < expectSuccess.length; i++) { 451 try { 452 for (String configName : configNames) { 453 conf.setFloat(configName, configMap.get(configName)[i]); 454 } 455 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 456 constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); 457 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] 458 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 459 } catch (IllegalArgumentException e) { 460 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] 461 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 462 } 463 } 464 } 465 466 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, 467 float minFactor) { 468 long expectedOutput = 469 (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 470 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 471 } 472 473 @Test 474 public void testOffsetProducesPositiveOutput() { 475 // This number is picked because it produces negative output if the values isn't ensured to be 476 // positive. See HBASE-18757 for more information. 477 long testValue = 549888460800L; 478 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true, (entry) -> { 479 return ByteBuffAllocator.NONE; 480 }, ByteBuffAllocator.HEAP); 481 assertEquals(testValue, bucketEntry.offset()); 482 } 483 484 @Test 485 public void testEvictionCount() throws InterruptedException { 486 int size = 100; 487 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 488 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 489 HFileContext meta = new HFileContextBuilder().build(); 490 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 491 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 492 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 493 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 494 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 495 496 BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0); 497 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 498 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 499 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 500 blockWithNextBlockMetadata.serialize(block1Buffer, true); 501 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 502 503 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 504 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 505 block1Buffer); 506 507 waitUntilFlushedToBucket(cache, key); 508 509 assertEquals(0, cache.getStats().getEvictionCount()); 510 511 // evict call should return 1, but then eviction count be 0 512 assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount")); 513 assertEquals(0, cache.getStats().getEvictionCount()); 514 515 // add back 516 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 517 block1Buffer); 518 waitUntilFlushedToBucket(cache, key); 519 520 // should not increment 521 assertTrue(cache.evictBlock(key)); 522 assertEquals(0, cache.getStats().getEvictionCount()); 523 524 // add back 525 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 526 block1Buffer); 527 waitUntilFlushedToBucket(cache, key); 528 529 // should finally increment eviction count 530 cache.freeSpace("testing"); 531 assertEquals(1, cache.getStats().getEvictionCount()); 532 } 533 534 @Test 535 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 536 int size = 100; 537 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 538 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 539 HFileContext meta = new HFileContextBuilder().build(); 540 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 541 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 542 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 543 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 544 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 545 546 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 547 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 548 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 549 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 550 blockWithNextBlockMetadata.serialize(block1Buffer, true); 551 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 552 553 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 554 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 555 block1Buffer); 556 557 waitUntilFlushedToBucket(cache, key); 558 assertNotNull(cache.backingMap.get(key)); 559 assertEquals(1, cache.backingMap.get(key).refCnt()); 560 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 561 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 562 563 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 564 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 565 block1Buffer); 566 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 567 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 568 assertEquals(1, cache.backingMap.get(key).refCnt()); 569 570 // Clear and add blockWithoutNextBlockMetadata 571 assertTrue(cache.evictBlock(key)); 572 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 573 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 574 575 assertNull(cache.getBlock(key, false, false, false)); 576 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 577 block2Buffer); 578 579 waitUntilFlushedToBucket(cache, key); 580 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 581 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 582 583 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 584 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 585 block1Buffer); 586 587 waitUntilFlushedToBucket(cache, key); 588 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 589 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 590 } 591 592 @Test 593 public void testRAMCache() { 594 int size = 100; 595 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 596 byte[] byteArr = new byte[length]; 597 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 598 HFileContext meta = new HFileContextBuilder().build(); 599 600 RAMCache cache = new RAMCache(); 601 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 602 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 603 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 604 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 605 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 606 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 607 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false); 608 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false); 609 610 assertFalse(cache.containsKey(key1)); 611 assertNull(cache.putIfAbsent(key1, re1)); 612 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 613 614 assertNotNull(cache.putIfAbsent(key1, re2)); 615 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 616 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 617 618 assertNull(cache.putIfAbsent(key2, re2)); 619 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 620 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 621 622 cache.remove(key1); 623 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 624 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 625 626 cache.clear(); 627 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 628 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 629 } 630 631 @Test 632 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 633 // initialize an block. 634 int size = 100, offset = 20; 635 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 636 ByteBuffer buf = ByteBuffer.allocate(length); 637 HFileContext meta = new HFileContextBuilder().build(); 638 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 639 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 640 641 // initialize an mocked ioengine. 642 IOEngine ioEngine = Mockito.mock(IOEngine.class); 643 when(ioEngine.usesSharedMemory()).thenReturn(false); 644 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 645 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 646 Mockito.anyLong()); 647 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 648 Mockito.anyLong()); 649 650 // create an bucket allocator. 651 long availableSpace = 1024 * 1024 * 1024L; 652 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 653 654 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 655 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true); 656 657 Assert.assertEquals(0, allocator.getUsedSize()); 658 try { 659 re.writeToCache(ioEngine, allocator, null, null, 660 ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); 661 Assert.fail(); 662 } catch (Exception e) { 663 } 664 Assert.assertEquals(0, allocator.getUsedSize()); 665 } 666 667 /** 668 * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file 669 * could not be freed even if corresponding {@link HFileBlock} is evicted from 670 * {@link BucketCache}. 671 */ 672 @Test 673 public void testFreeBucketEntryRestoredFromFile() throws Exception { 674 try { 675 final Path dataTestDir = createAndGetTestDir(); 676 677 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 678 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 679 680 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 681 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 682 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 683 assertEquals(0, usedByteSize); 684 685 HFileBlockPair[] hfileBlockPairs = 686 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 687 // Add blocks 688 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 689 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); 690 } 691 692 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 693 cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), 694 hfileBlockPair.getBlock()); 695 } 696 usedByteSize = bucketCache.getAllocator().getUsedSize(); 697 assertNotEquals(0, usedByteSize); 698 // persist cache to file 699 bucketCache.shutdown(); 700 assertTrue(new File(persistencePath).exists()); 701 702 // restore cache from file 703 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 704 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 705 assertFalse(new File(persistencePath).exists()); 706 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 707 708 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 709 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 710 bucketCache.evictBlock(blockCacheKey); 711 } 712 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 713 assertEquals(0, bucketCache.backingMap.size()); 714 } finally { 715 HBASE_TESTING_UTILITY.cleanupTestDir(); 716 } 717 } 718 719}