001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD; 024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR; 026import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR; 027import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; 028import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME; 029import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; 030import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME; 031import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 032import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME; 033import static org.junit.jupiter.api.Assertions.assertEquals; 034import static org.junit.jupiter.api.Assertions.assertFalse; 035import static org.junit.jupiter.api.Assertions.assertNotEquals; 036import static org.junit.jupiter.api.Assertions.assertNotNull; 037import static org.junit.jupiter.api.Assertions.assertNull; 038import static org.junit.jupiter.api.Assertions.assertTrue; 039import static org.junit.jupiter.api.Assertions.fail; 040import static org.mockito.Mockito.mock; 041import static org.mockito.Mockito.when; 042 043import java.io.File; 044import java.io.IOException; 045import java.lang.reflect.Field; 046import java.nio.ByteBuffer; 047import java.util.ArrayList; 048import java.util.Arrays; 049import java.util.Collection; 050import java.util.HashMap; 051import java.util.List; 052import java.util.Map; 053import java.util.Set; 054import java.util.concurrent.ThreadLocalRandom; 055import java.util.concurrent.atomic.LongAdder; 056import java.util.concurrent.locks.ReentrantReadWriteLock; 057import java.util.stream.Stream; 058import org.apache.hadoop.conf.Configuration; 059import org.apache.hadoop.fs.Path; 060import org.apache.hadoop.hbase.HBaseConfiguration; 061import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 062import org.apache.hadoop.hbase.HBaseTestingUtil; 063import org.apache.hadoop.hbase.HConstants; 064import org.apache.hadoop.hbase.Waiter; 065import org.apache.hadoop.hbase.io.ByteBuffAllocator; 066import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 067import org.apache.hadoop.hbase.io.hfile.BlockPriority; 068import org.apache.hadoop.hbase.io.hfile.BlockType; 069import org.apache.hadoop.hbase.io.hfile.CacheStats; 070import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 071import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 072import org.apache.hadoop.hbase.io.hfile.Cacheable; 073import org.apache.hadoop.hbase.io.hfile.HFileBlock; 074import org.apache.hadoop.hbase.io.hfile.HFileContext; 075import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 076import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 077import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 078import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 079import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 080import org.apache.hadoop.hbase.nio.ByteBuff; 081import org.apache.hadoop.hbase.regionserver.HRegion; 082import org.apache.hadoop.hbase.regionserver.HStore; 083import org.apache.hadoop.hbase.regionserver.HStoreFile; 084import org.apache.hadoop.hbase.testclassification.IOTests; 085import org.apache.hadoop.hbase.testclassification.LargeTests; 086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 087import org.apache.hadoop.hbase.util.Pair; 088import org.apache.hadoop.hbase.util.Threads; 089import org.junit.jupiter.api.AfterEach; 090import org.junit.jupiter.api.BeforeEach; 091import org.junit.jupiter.api.Tag; 092import org.junit.jupiter.api.TestTemplate; 093import org.junit.jupiter.params.provider.Arguments; 094import org.mockito.Mockito; 095import org.slf4j.Logger; 096import org.slf4j.LoggerFactory; 097 098import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 099 100/** 101 * Basic test of BucketCache.Puts and gets. 102 * <p> 103 * Tests will ensure that blocks' data correctness under several threads concurrency 104 */ 105@Tag(IOTests.TAG) 106@Tag(LargeTests.TAG) 107@HBaseParameterizedTestTemplate(name = "{index}: blockSize={0}, bucketSizes={1}") 108public class TestBucketCache { 109 110 private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class); 111 112 public static Stream<Arguments> parameters() { 113 // TODO: why is 8k the default blocksize for these tests? 114 return Stream.of(Arguments.of(8192, null), 115 Arguments.of(16 * 1024, 116 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 117 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 118 128 * 1024 + 1024 })); 119 } 120 121 private final int constructedBlockSize; 122 private final int[] constructedBlockSizes; 123 124 public TestBucketCache(int constructedBlockSize, int[] constructedBlockSizes) { 125 this.constructedBlockSize = constructedBlockSize; 126 this.constructedBlockSizes = constructedBlockSizes; 127 } 128 129 BucketCache cache; 130 final int CACHE_SIZE = 1000000; 131 final int NUM_BLOCKS = 100; 132 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 133 final int NUM_THREADS = 100; 134 final int NUM_QUERIES = 10000; 135 136 final long capacitySize = 32 * 1024 * 1024; 137 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 138 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 139 private String ioEngineName = "offheap"; 140 141 private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); 142 143 private static class MockedBucketCache extends BucketCache { 144 145 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 146 int writerThreads, int writerQLen, String persistencePath) throws IOException { 147 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 148 persistencePath); 149 } 150 151 @Override 152 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 153 super.cacheBlock(cacheKey, buf, inMemory); 154 } 155 156 @Override 157 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 158 super.cacheBlock(cacheKey, buf); 159 } 160 } 161 162 @BeforeEach 163 public void setup() throws IOException { 164 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 165 constructedBlockSizes, writeThreads, writerQLen, null); 166 } 167 168 @AfterEach 169 public void tearDown() { 170 cache.shutdown(); 171 } 172 173 /** 174 * Test Utility to create test dir and return name 175 * @return return name of created dir 176 * @throws IOException throws IOException 177 */ 178 private Path createAndGetTestDir() throws IOException { 179 final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir(); 180 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir); 181 return testDir; 182 } 183 184 /** 185 * Return a random element from {@code a}. 186 */ 187 private static <T> T randFrom(List<T> a) { 188 return a.get(ThreadLocalRandom.current().nextInt(a.size())); 189 } 190 191 @TestTemplate 192 public void testBucketAllocator() throws BucketAllocatorException { 193 BucketAllocator mAllocator = cache.getAllocator(); 194 /* 195 * Test the allocator first 196 */ 197 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 198 199 boolean full = false; 200 ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>(); 201 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 202 // the cache is completely filled. 203 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 204 while (!full) { 205 Integer blockSize = null; 206 try { 207 blockSize = randFrom(tmp); 208 allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize)); 209 } catch (CacheFullException cfe) { 210 tmp.remove(blockSize); 211 if (tmp.isEmpty()) full = true; 212 } 213 } 214 215 for (Integer blockSize : BLOCKSIZES) { 216 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 217 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 218 assertEquals(0, indexStatistics.freeCount(), "unexpected freeCount for " + bucketSizeInfo); 219 220 // we know the block sizes above are multiples of 1024, but default bucket sizes give an 221 // additional 1024 on top of that so this counts towards fragmentation in our test 222 // real life may have worse fragmentation because blocks may not be perfectly sized to block 223 // size, given encoding/compression and large rows 224 assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes()); 225 } 226 227 mAllocator.logDebugStatistics(); 228 229 for (Pair<Long, Integer> allocation : allocations) { 230 assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()), 231 mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond())); 232 } 233 assertEquals(0, mAllocator.getUsedSize()); 234 } 235 236 @TestTemplate 237 public void testCacheSimple() throws Exception { 238 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 239 } 240 241 @TestTemplate 242 public void testCacheMultiThreadedSingleKey() throws Exception { 243 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 244 } 245 246 @TestTemplate 247 public void testHeapSizeChanges() throws Exception { 248 cache.stopWriterThreads(); 249 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 250 } 251 252 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 253 throws InterruptedException { 254 Waiter.waitFor(HBaseConfiguration.create(), 10000, 255 () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); 256 } 257 258 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 259 while (!cache.ramCache.isEmpty()) { 260 Thread.sleep(100); 261 } 262 Thread.sleep(1000); 263 } 264 265 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 266 // threads will flush it to the bucket and put reference entry in backingMap. 267 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 268 Cacheable block, boolean waitWhenCache) throws InterruptedException { 269 cache.cacheBlock(cacheKey, block, false, waitWhenCache); 270 waitUntilFlushedToBucket(cache, cacheKey); 271 } 272 273 @TestTemplate 274 public void testMemoryLeak() throws Exception { 275 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 276 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 277 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 278 long lockId = cache.backingMap.get(cacheKey).offset(); 279 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 280 lock.writeLock().lock(); 281 Thread evictThread = new Thread("evict-block") { 282 @Override 283 public void run() { 284 cache.evictBlock(cacheKey); 285 } 286 }; 287 evictThread.start(); 288 cache.offsetLock.waitForWaiters(lockId, 1); 289 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); 290 assertEquals(0, cache.getBlockCount()); 291 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 292 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 293 assertEquals(1, cache.getBlockCount()); 294 lock.writeLock().unlock(); 295 evictThread.join(); 296 /** 297 * <pre> 298 * The asserts here before HBASE-21957 are: 299 * assertEquals(1L, cache.getBlockCount()); 300 * assertTrue(cache.getCurrentSize() > 0L); 301 * assertTrue("We should have a block!", cache.iterator().hasNext()); 302 * 303 * The asserts here after HBASE-21957 are: 304 * assertEquals(0, cache.getBlockCount()); 305 * assertEquals(cache.getCurrentSize(), 0L); 306 * 307 * I think the asserts before HBASE-21957 is more reasonable,because 308 * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry} 309 * it had seen, and newly added Block after the {@link BucketEntry} 310 * it had seen should not be evicted. 311 * </pre> 312 */ 313 assertEquals(1L, cache.getBlockCount()); 314 assertTrue(cache.getCurrentSize() > 0L); 315 assertTrue(cache.iterator().hasNext(), "We should have a block!"); 316 } 317 318 @TestTemplate 319 public void testRetrieveFromFile() throws Exception { 320 Path testDir = createAndGetTestDir(); 321 String ioEngineName = "file:" + testDir + "/bucket.cache"; 322 testRetrievalUtils(testDir, ioEngineName); 323 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 324 String persistencePath = testDir + "/bucket.persistence"; 325 BucketCache bucketCache = null; 326 try { 327 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 328 smallBucketSizes, writeThreads, writerQLen, persistencePath); 329 assertTrue(bucketCache.waitForCacheInitialization(10000)); 330 assertFalse(new File(persistencePath).exists()); 331 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 332 assertEquals(0, bucketCache.backingMap.size()); 333 } finally { 334 bucketCache.shutdown(); 335 HBASE_TESTING_UTILITY.cleanupTestDir(); 336 } 337 } 338 339 @TestTemplate 340 public void testRetrieveFromMMap() throws Exception { 341 final Path testDir = createAndGetTestDir(); 342 final String ioEngineName = "mmap:" + testDir + "/bucket.cache"; 343 testRetrievalUtils(testDir, ioEngineName); 344 } 345 346 @TestTemplate 347 public void testRetrieveFromPMem() throws Exception { 348 final Path testDir = createAndGetTestDir(); 349 final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; 350 testRetrievalUtils(testDir, ioEngineName); 351 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 352 String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 353 BucketCache bucketCache = null; 354 try { 355 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 356 smallBucketSizes, writeThreads, writerQLen, persistencePath); 357 assertTrue(bucketCache.waitForCacheInitialization(10000)); 358 assertFalse(new File(persistencePath).exists()); 359 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 360 assertEquals(0, bucketCache.backingMap.size()); 361 } finally { 362 bucketCache.shutdown(); 363 HBASE_TESTING_UTILITY.cleanupTestDir(); 364 } 365 } 366 367 private void testRetrievalUtils(Path testDir, String ioEngineName) 368 throws IOException, InterruptedException { 369 final String persistencePath = 370 testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 371 BucketCache bucketCache = null; 372 try { 373 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 374 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 375 assertTrue(bucketCache.waitForCacheInitialization(10000)); 376 long usedSize = bucketCache.getAllocator().getUsedSize(); 377 assertEquals(0, usedSize); 378 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 379 for (HFileBlockPair block : blocks) { 380 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 381 } 382 for (HFileBlockPair block : blocks) { 383 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 384 false); 385 } 386 usedSize = bucketCache.getAllocator().getUsedSize(); 387 assertNotEquals(0, usedSize); 388 bucketCache.shutdown(); 389 assertTrue(new File(persistencePath).exists()); 390 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 391 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 392 assertTrue(bucketCache.waitForCacheInitialization(10000)); 393 394 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 395 } finally { 396 if (bucketCache != null) { 397 bucketCache.shutdown(); 398 } 399 } 400 assertTrue(new File(persistencePath).exists()); 401 } 402 403 @TestTemplate 404 public void testRetrieveUnsupportedIOE() throws Exception { 405 try { 406 final Path testDir = createAndGetTestDir(); 407 final String ioEngineName = testDir + "/bucket.cache"; 408 testRetrievalUtils(testDir, ioEngineName); 409 fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!"); 410 } catch (IllegalArgumentException e) { 411 assertEquals("Don't understand io engine name for cache- prefix with file:, " 412 + "files:, mmap: or offheap", e.getMessage()); 413 } 414 } 415 416 @TestTemplate 417 public void testRetrieveFromMultipleFiles() throws Exception { 418 final Path testDirInitial = createAndGetTestDir(); 419 final Path newTestDir = new HBaseTestingUtil().getDataTestDir(); 420 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir); 421 String ioEngineName = 422 new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache") 423 .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString(); 424 testRetrievalUtils(testDirInitial, ioEngineName); 425 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 426 String persistencePath = testDirInitial + "/bucket.persistence"; 427 BucketCache bucketCache = null; 428 try { 429 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 430 smallBucketSizes, writeThreads, writerQLen, persistencePath); 431 assertTrue(bucketCache.waitForCacheInitialization(10000)); 432 assertFalse(new File(persistencePath).exists()); 433 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 434 assertEquals(0, bucketCache.backingMap.size()); 435 } finally { 436 bucketCache.shutdown(); 437 HBASE_TESTING_UTILITY.cleanupTestDir(); 438 } 439 } 440 441 @TestTemplate 442 public void testRetrieveFromFileWithoutPersistence() throws Exception { 443 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 444 constructedBlockSizes, writeThreads, writerQLen, null); 445 assertTrue(bucketCache.waitForCacheInitialization(10000)); 446 try { 447 final Path testDir = createAndGetTestDir(); 448 String ioEngineName = "file:" + testDir + "/bucket.cache"; 449 long usedSize = bucketCache.getAllocator().getUsedSize(); 450 assertEquals(0, usedSize); 451 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 452 for (HFileBlockPair block : blocks) { 453 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 454 } 455 for (HFileBlockPair block : blocks) { 456 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 457 false); 458 } 459 usedSize = bucketCache.getAllocator().getUsedSize(); 460 assertNotEquals(0, usedSize); 461 bucketCache.shutdown(); 462 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 463 constructedBlockSizes, writeThreads, writerQLen, null); 464 assertTrue(bucketCache.waitForCacheInitialization(10000)); 465 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 466 } finally { 467 bucketCache.shutdown(); 468 HBASE_TESTING_UTILITY.cleanupTestDir(); 469 } 470 } 471 472 @TestTemplate 473 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 474 long availableSpace = 20 * 1024L * 1024 * 1024; 475 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; 476 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 477 assertTrue(allocator.getBuckets().length > 0); 478 } 479 480 @TestTemplate 481 public void testGetPartitionSize() throws IOException { 482 // Test default values 483 validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR); 484 485 Configuration conf = HBaseConfiguration.create(); 486 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 487 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 488 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 489 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 490 491 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 492 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 493 assertTrue(cache.waitForCacheInitialization(10000)); 494 495 validateGetPartitionSize(cache, 0.1f, 0.5f); 496 validateGetPartitionSize(cache, 0.7f, 0.5f); 497 validateGetPartitionSize(cache, 0.2f, 0.5f); 498 } 499 500 @TestTemplate 501 public void testCacheSizeCapacity() throws IOException { 502 // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE 503 validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR); 504 Configuration conf = HBaseConfiguration.create(); 505 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 506 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 507 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 508 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 509 try { 510 new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads, 511 writerQLen, null, 100, conf); 512 fail("Should have thrown IllegalArgumentException because of large cache capacity!"); 513 } catch (IllegalArgumentException e) { 514 assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage()); 515 } 516 } 517 518 @TestTemplate 519 public void testValidBucketCacheConfigs() throws IOException { 520 Configuration conf = HBaseConfiguration.create(); 521 conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 522 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 523 conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 524 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 525 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 526 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 527 528 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 529 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 530 assertTrue(cache.waitForCacheInitialization(10000)); 531 532 assertEquals(0.9f, cache.getAcceptableFactor(), 0, 533 ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate."); 534 assertEquals(0.5f, cache.getMinFactor(), 0, MIN_FACTOR_CONFIG_NAME + " failed to propagate."); 535 assertEquals(0.5f, cache.getExtraFreeFactor(), 0, 536 EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate."); 537 assertEquals(0.1f, cache.getSingleFactor(), 0, 538 SINGLE_FACTOR_CONFIG_NAME + " failed to propagate."); 539 assertEquals(0.7f, cache.getMultiFactor(), 0, 540 MULTI_FACTOR_CONFIG_NAME + " failed to propagate."); 541 assertEquals(0.2f, cache.getMemoryFactor(), 0, 542 MEMORY_FACTOR_CONFIG_NAME + " failed to propagate."); 543 } 544 545 @TestTemplate 546 public void testInvalidAcceptFactorConfig() throws IOException { 547 float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; 548 boolean[] expectedOutcomes = { false, false, true, false }; 549 Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues); 550 Configuration conf = HBaseConfiguration.create(); 551 checkConfigValues(conf, configMappings, expectedOutcomes); 552 } 553 554 @TestTemplate 555 public void testInvalidMinFactorConfig() throws IOException { 556 float[] configValues = { -1f, 0f, 0.96f, 1.05f }; 557 // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 558 boolean[] expectedOutcomes = { false, true, false, false }; 559 Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues); 560 Configuration conf = HBaseConfiguration.create(); 561 checkConfigValues(conf, configMappings, expectedOutcomes); 562 } 563 564 @TestTemplate 565 public void testInvalidExtraFreeFactorConfig() throws IOException { 566 float[] configValues = { -1f, 0f, 0.2f, 1.05f }; 567 // throws due to <0, in expected range, in expected range, config can be > 1.0 568 boolean[] expectedOutcomes = { false, true, true, true }; 569 Map<String, float[]> configMappings = 570 ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 571 Configuration conf = HBaseConfiguration.create(); 572 checkConfigValues(conf, configMappings, expectedOutcomes); 573 } 574 575 @TestTemplate 576 public void testInvalidCacheSplitFactorConfig() throws IOException { 577 float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f }; 578 float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f }; 579 float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f }; 580 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't 581 // be negative, configs don't add to 1.0 582 boolean[] expectedOutcomes = { true, false, false, false }; 583 Map<String, 584 float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues, 585 MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME, 586 memoryFactorConfigValues); 587 Configuration conf = HBaseConfiguration.create(); 588 checkConfigValues(conf, configMappings, expectedOutcomes); 589 } 590 591 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, 592 boolean[] expectSuccess) throws IOException { 593 Set<String> configNames = configMap.keySet(); 594 for (int i = 0; i < expectSuccess.length; i++) { 595 try { 596 for (String configName : configNames) { 597 conf.setFloat(configName, configMap.get(configName)[i]); 598 } 599 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 600 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 601 assertTrue(cache.waitForCacheInitialization(10000)); 602 assertTrue(expectSuccess[i], "Created BucketCache and expected it to succeed: " 603 + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i]); 604 } catch (IllegalArgumentException e) { 605 assertFalse(expectSuccess[i], "Created BucketCache and expected it to succeed: " 606 + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i]); 607 } 608 } 609 } 610 611 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, 612 float minFactor) { 613 long expectedOutput = 614 (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 615 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 616 } 617 618 @TestTemplate 619 public void testOffsetProducesPositiveOutput() { 620 // This number is picked because it produces negative output if the values isn't ensured to be 621 // positive. See HBASE-18757 for more information. 622 long testValue = 549888460800L; 623 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { 624 return ByteBuffAllocator.NONE; 625 }, ByteBuffAllocator.HEAP); 626 assertEquals(testValue, bucketEntry.offset()); 627 } 628 629 @TestTemplate 630 public void testEvictionCount() throws InterruptedException { 631 int size = 100; 632 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 633 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 634 HFileContext meta = new HFileContextBuilder().build(); 635 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 636 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 637 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 638 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 639 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 640 641 BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0); 642 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 643 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 644 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 645 blockWithNextBlockMetadata.serialize(block1Buffer, true); 646 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 647 648 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 649 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 650 block1Buffer); 651 652 waitUntilFlushedToBucket(cache, key); 653 654 assertEquals(0, cache.getStats().getEvictionCount()); 655 656 // evict call should return 1, but then eviction count be 0 657 assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount")); 658 assertEquals(0, cache.getStats().getEvictionCount()); 659 660 // add back 661 key = new BlockCacheKey("testEvictionCount", 0); 662 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 663 block1Buffer); 664 waitUntilFlushedToBucket(cache, key); 665 666 // should not increment 667 assertTrue(cache.evictBlock(key)); 668 assertEquals(0, cache.getStats().getEvictionCount()); 669 670 // add back 671 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 672 block1Buffer); 673 waitUntilFlushedToBucket(cache, key); 674 675 // should finally increment eviction count 676 cache.freeSpace("testing"); 677 assertEquals(1, cache.getStats().getEvictionCount()); 678 } 679 680 @TestTemplate 681 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 682 int size = 100; 683 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 684 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 685 HFileContext meta = new HFileContextBuilder().build(); 686 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 687 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 688 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 689 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 690 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 691 692 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 693 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 694 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 695 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 696 blockWithNextBlockMetadata.serialize(block1Buffer, true); 697 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 698 699 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 700 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 701 block1Buffer); 702 703 waitUntilFlushedToBucket(cache, key); 704 assertNotNull(cache.backingMap.get(key)); 705 assertEquals(1, cache.backingMap.get(key).refCnt()); 706 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 707 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 708 709 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 710 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 711 block1Buffer); 712 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 713 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 714 assertEquals(1, cache.backingMap.get(key).refCnt()); 715 716 // Clear and add blockWithoutNextBlockMetadata 717 assertTrue(cache.evictBlock(key)); 718 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 719 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 720 721 assertNull(cache.getBlock(key, false, false, false)); 722 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 723 block2Buffer); 724 725 waitUntilFlushedToBucket(cache, key); 726 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 727 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 728 729 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 730 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 731 block1Buffer); 732 733 waitUntilFlushedToBucket(cache, key); 734 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 735 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 736 } 737 738 @TestTemplate 739 public void testRAMCache() { 740 int size = 100; 741 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 742 byte[] byteArr = new byte[length]; 743 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 744 HFileContext meta = new HFileContextBuilder().build(); 745 746 RAMCache cache = new RAMCache(); 747 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 748 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 749 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 750 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 751 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 752 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 753 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false); 754 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false); 755 756 assertFalse(cache.containsKey(key1)); 757 assertNull(cache.putIfAbsent(key1, re1)); 758 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 759 760 assertNotNull(cache.putIfAbsent(key1, re2)); 761 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 762 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 763 764 assertNull(cache.putIfAbsent(key2, re2)); 765 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 766 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 767 768 cache.remove(key1); 769 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 770 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 771 772 cache.clear(); 773 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 774 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 775 } 776 777 @TestTemplate 778 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 779 // initialize an block. 780 int size = 100, offset = 20; 781 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 782 ByteBuffer buf = ByteBuffer.allocate(length); 783 HFileContext meta = new HFileContextBuilder().build(); 784 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 785 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 786 787 // initialize an mocked ioengine. 788 IOEngine ioEngine = Mockito.mock(IOEngine.class); 789 when(ioEngine.usesSharedMemory()).thenReturn(false); 790 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 791 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 792 Mockito.anyLong()); 793 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 794 Mockito.anyLong()); 795 796 // create an bucket allocator. 797 long availableSpace = 1024 * 1024 * 1024L; 798 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 799 800 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 801 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false); 802 803 assertEquals(0, allocator.getUsedSize()); 804 try { 805 re.writeToCache(ioEngine, allocator, null, null, 806 ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE); 807 fail(); 808 } catch (Exception e) { 809 } 810 assertEquals(0, allocator.getUsedSize()); 811 } 812 813 /** 814 * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file 815 * could not be freed even if corresponding {@link HFileBlock} is evicted from 816 * {@link BucketCache}. 817 */ 818 @TestTemplate 819 public void testFreeBucketEntryRestoredFromFile() throws Exception { 820 BucketCache bucketCache = null; 821 try { 822 final Path dataTestDir = createAndGetTestDir(); 823 824 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 825 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 826 827 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 828 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 829 assertTrue(bucketCache.waitForCacheInitialization(10000)); 830 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 831 assertEquals(0, usedByteSize); 832 833 HFileBlockPair[] hfileBlockPairs = 834 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 835 // Add blocks 836 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 837 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); 838 } 839 840 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 841 cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), 842 hfileBlockPair.getBlock(), false); 843 } 844 usedByteSize = bucketCache.getAllocator().getUsedSize(); 845 assertNotEquals(0, usedByteSize); 846 // persist cache to file 847 bucketCache.shutdown(); 848 assertTrue(new File(persistencePath).exists()); 849 850 // restore cache from file 851 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 852 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 853 assertTrue(bucketCache.waitForCacheInitialization(10000)); 854 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 855 856 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 857 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 858 bucketCache.evictBlock(blockCacheKey); 859 } 860 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 861 assertEquals(0, bucketCache.backingMap.size()); 862 } finally { 863 bucketCache.shutdown(); 864 HBASE_TESTING_UTILITY.cleanupTestDir(); 865 } 866 } 867 868 @TestTemplate 869 public void testBlockAdditionWaitWhenCache() throws Exception { 870 BucketCache bucketCache = null; 871 try { 872 final Path dataTestDir = createAndGetTestDir(); 873 874 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 875 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 876 877 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 878 config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000); 879 880 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 881 constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config); 882 assertTrue(bucketCache.waitForCacheInitialization(10000)); 883 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 884 assertEquals(0, usedByteSize); 885 886 HFileBlockPair[] hfileBlockPairs = 887 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); 888 String[] names = CacheTestUtils.getHFileNames(hfileBlockPairs); 889 // Add blocks 890 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 891 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, 892 true); 893 } 894 895 // Max wait for 10 seconds. 896 long timeout = 10000; 897 // Wait for blocks size to match the number of blocks. 898 while (bucketCache.backingMap.size() != 10) { 899 if (timeout <= 0) break; 900 Threads.sleep(100); 901 timeout -= 100; 902 } 903 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 904 assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); 905 } 906 usedByteSize = bucketCache.getAllocator().getUsedSize(); 907 assertNotEquals(0, usedByteSize); 908 // persist cache to file 909 bucketCache.shutdown(); 910 assertTrue(new File(persistencePath).exists()); 911 912 // restore cache from file 913 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 914 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 915 assertTrue(bucketCache.waitForCacheInitialization(10000)); 916 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 917 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(hfileBlockPairs, names); 918 for (BlockCacheKey key : newKeys) { 919 bucketCache.evictBlock(key); 920 } 921 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 922 assertEquals(0, bucketCache.backingMap.size()); 923 } finally { 924 if (bucketCache != null) { 925 bucketCache.shutdown(); 926 } 927 HBASE_TESTING_UTILITY.cleanupTestDir(); 928 } 929 } 930 931 @TestTemplate 932 public void testOnConfigurationChange() throws Exception { 933 BucketCache bucketCache = null; 934 try { 935 final Path dataTestDir = createAndGetTestDir(); 936 937 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 938 939 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 940 941 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 942 constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config); 943 944 assertTrue(bucketCache.waitForCacheInitialization(10000)); 945 946 config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 947 config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f); 948 config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f); 949 config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f); 950 config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f); 951 config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 952 config.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 953 config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500); 954 config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000); 955 956 bucketCache.onConfigurationChange(config); 957 958 assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01); 959 assertEquals(0.8f, bucketCache.getMinFactor(), 0.01); 960 assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01); 961 assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01); 962 assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01); 963 assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01); 964 assertEquals(100L, bucketCache.getQueueAdditionWaitTime()); 965 assertEquals(500L, bucketCache.getBucketcachePersistInterval()); 966 assertEquals(1000L, bucketCache.getPersistenceChunkSize()); 967 968 } finally { 969 if (bucketCache != null) { 970 bucketCache.shutdown(); 971 } 972 HBASE_TESTING_UTILITY.cleanupTestDir(); 973 } 974 } 975 976 @TestTemplate 977 public void testNotifyFileCachingCompletedSuccess() throws Exception { 978 BucketCache bucketCache = null; 979 try { 980 Path filePath = 981 new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess"); 982 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false); 983 if (bucketCache.getStats().getFailedInserts() > 0) { 984 LOG.info("There were {} fail inserts, " 985 + "will assert if total blocks in backingMap equals (10 - failInserts) " 986 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 987 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 988 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 989 } else { 990 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 991 } 992 } finally { 993 if (bucketCache != null) { 994 bucketCache.shutdown(); 995 } 996 HBASE_TESTING_UTILITY.cleanupTestDir(); 997 } 998 } 999 1000 @TestTemplate 1001 public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception { 1002 BucketCache bucketCache = null; 1003 try { 1004 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 1005 "testNotifyFileCachingCompletedForEncodedDataSuccess"); 1006 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true); 1007 if (bucketCache.getStats().getFailedInserts() > 0) { 1008 LOG.info("There were {} fail inserts, " 1009 + "will assert if total blocks in backingMap equals (10 - failInserts) " 1010 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 1011 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 1012 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1013 } else { 1014 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1015 } 1016 } finally { 1017 if (bucketCache != null) { 1018 bucketCache.shutdown(); 1019 } 1020 HBASE_TESTING_UTILITY.cleanupTestDir(); 1021 } 1022 } 1023 1024 @TestTemplate 1025 public void testNotifyFileCachingCompletedNotAllCached() throws Exception { 1026 BucketCache bucketCache = null; 1027 try { 1028 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 1029 "testNotifyFileCachingCompletedNotAllCached"); 1030 // Deliberately passing more blocks than we have created to test that 1031 // notifyFileCachingCompleted will not consider the file fully cached 1032 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false); 1033 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1034 } finally { 1035 if (bucketCache != null) { 1036 bucketCache.shutdown(); 1037 } 1038 HBASE_TESTING_UTILITY.cleanupTestDir(); 1039 } 1040 } 1041 1042 private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath, 1043 int totalBlocksToCheck, boolean encoded) throws Exception { 1044 final Path dataTestDir = createAndGetTestDir(); 1045 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 1046 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 1047 constructedBlockSizes, 1, 1, null); 1048 assertTrue(bucketCache.waitForCacheInitialization(10000)); 1049 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 1050 assertEquals(0, usedByteSize); 1051 HFileBlockPair[] hfileBlockPairs = 1052 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded); 1053 // Add blocks 1054 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 1055 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true); 1056 } 1057 bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck, 1058 totalBlocksToCheck * constructedBlockSize); 1059 return bucketCache; 1060 } 1061 1062 @TestTemplate 1063 public void testEvictOrphansOutOfGracePeriod() throws Exception { 1064 BucketCache bucketCache = testEvictOrphans(0); 1065 assertEquals(10, bucketCache.getBackingMap().size()); 1066 assertEquals(0, bucketCache.blocksByHFile.stream() 1067 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count()); 1068 } 1069 1070 @TestTemplate 1071 public void testEvictOrphansWithinGracePeriod() throws Exception { 1072 BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L); 1073 assertEquals(18, bucketCache.getBackingMap().size()); 1074 assertTrue(bucketCache.blocksByHFile.stream() 1075 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0); 1076 } 1077 1078 private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception { 1079 Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid"); 1080 Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan"); 1081 Map<String, HRegion> onlineRegions = new HashMap<>(); 1082 List<HStore> stores = new ArrayList<>(); 1083 Collection<HStoreFile> storeFiles = new ArrayList<>(); 1084 HRegion mockedRegion = mock(HRegion.class); 1085 HStore mockedStore = mock(HStore.class); 1086 HStoreFile mockedStoreFile = mock(HStoreFile.class); 1087 when(mockedStoreFile.getPath()).thenReturn(validFile); 1088 storeFiles.add(mockedStoreFile); 1089 when(mockedStore.getStorefiles()).thenReturn(storeFiles); 1090 stores.add(mockedStore); 1091 when(mockedRegion.getStores()).thenReturn(stores); 1092 onlineRegions.put("mocked_region", mockedRegion); 1093 HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); 1094 HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); 1095 HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); 1096 HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD, 1097 orphanEvictionGracePeriod); 1098 BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21, 1099 constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000, 1100 HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions); 1101 HFileBlockPair[] validBlockPairs = 1102 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile, false); 1103 HFileBlockPair[] orphanBlockPairs = 1104 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile, false); 1105 for (HFileBlockPair pair : validBlockPairs) { 1106 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1107 } 1108 waitUntilAllFlushedToBucket(bucketCache); 1109 assertEquals(10, bucketCache.getBackingMap().size()); 1110 bucketCache.freeSpace("test"); 1111 assertEquals(10, bucketCache.getBackingMap().size()); 1112 for (HFileBlockPair pair : orphanBlockPairs) { 1113 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1114 } 1115 waitUntilAllFlushedToBucket(bucketCache); 1116 assertEquals(20, bucketCache.getBackingMap().size()); 1117 bucketCache.freeSpace("test"); 1118 return bucketCache; 1119 } 1120 1121 @TestTemplate 1122 public void testBlockPriority() throws Exception { 1123 HFileBlockPair block = CacheTestUtils.generateHFileBlocks(BLOCK_SIZE, 1)[0]; 1124 cacheAndWaitUntilFlushedToBucket(cache, block.getBlockName(), block.getBlock(), true); 1125 assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.SINGLE); 1126 cache.getBlock(block.getBlockName(), true, false, true); 1127 assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.MULTI); 1128 } 1129 1130 @TestTemplate 1131 public void testIOTimePerHitReturnsZeroWhenNoHits() 1132 throws NoSuchFieldException, IllegalAccessException { 1133 CacheStats cacheStats = cache.getStats(); 1134 assertTrue(cacheStats instanceof BucketCacheStats); 1135 BucketCacheStats bucketCacheStats = (BucketCacheStats) cacheStats; 1136 1137 Field field = BucketCacheStats.class.getDeclaredField("ioHitCount"); 1138 field.setAccessible(true); 1139 LongAdder ioHitCount = (LongAdder) field.get(bucketCacheStats); 1140 1141 assertEquals(0, ioHitCount.sum()); 1142 double ioTimePerHit = bucketCacheStats.getIOTimePerHit(); 1143 assertEquals(0, ioTimePerHit, 0.0); 1144 } 1145}