001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD; 024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR; 026import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR; 027import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; 028import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME; 029import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; 030import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME; 031import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 032import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME; 033import static org.junit.Assert.assertEquals; 034import static org.junit.Assert.assertFalse; 035import static org.junit.Assert.assertNotEquals; 036import static org.junit.Assert.assertNotNull; 037import static org.junit.Assert.assertNull; 038import static org.junit.Assert.assertTrue; 039import static org.mockito.Mockito.mock; 040import static org.mockito.Mockito.when; 041 042import java.io.File; 043import java.io.IOException; 044import java.lang.reflect.Field; 045import java.nio.ByteBuffer; 046import java.util.ArrayList; 047import java.util.Arrays; 048import java.util.Collection; 049import java.util.HashMap; 050import java.util.List; 051import java.util.Map; 052import java.util.Set; 053import java.util.concurrent.ThreadLocalRandom; 054import java.util.concurrent.atomic.LongAdder; 055import java.util.concurrent.locks.ReentrantReadWriteLock; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.hbase.HBaseClassTestRule; 059import org.apache.hadoop.hbase.HBaseConfiguration; 060import org.apache.hadoop.hbase.HBaseTestingUtil; 061import org.apache.hadoop.hbase.HConstants; 062import org.apache.hadoop.hbase.Waiter; 063import org.apache.hadoop.hbase.io.ByteBuffAllocator; 064import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 065import org.apache.hadoop.hbase.io.hfile.BlockPriority; 066import org.apache.hadoop.hbase.io.hfile.BlockType; 067import org.apache.hadoop.hbase.io.hfile.CacheStats; 068import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 069import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 070import org.apache.hadoop.hbase.io.hfile.Cacheable; 071import org.apache.hadoop.hbase.io.hfile.HFileBlock; 072import org.apache.hadoop.hbase.io.hfile.HFileContext; 073import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 074import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 075import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 076import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 077import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 078import org.apache.hadoop.hbase.nio.ByteBuff; 079import org.apache.hadoop.hbase.regionserver.HRegion; 080import org.apache.hadoop.hbase.regionserver.HStore; 081import org.apache.hadoop.hbase.regionserver.HStoreFile; 082import org.apache.hadoop.hbase.testclassification.IOTests; 083import org.apache.hadoop.hbase.testclassification.LargeTests; 084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 085import org.apache.hadoop.hbase.util.Pair; 086import org.apache.hadoop.hbase.util.Threads; 087import org.junit.After; 088import org.junit.Assert; 089import org.junit.Before; 090import org.junit.ClassRule; 091import org.junit.Test; 092import org.junit.experimental.categories.Category; 093import org.junit.runner.RunWith; 094import org.junit.runners.Parameterized; 095import org.mockito.Mockito; 096import org.slf4j.Logger; 097import org.slf4j.LoggerFactory; 098 099import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 100 101/** 102 * Basic test of BucketCache.Puts and gets. 103 * <p> 104 * Tests will ensure that blocks' data correctness under several threads concurrency 105 */ 106@RunWith(Parameterized.class) 107@Category({ IOTests.class, LargeTests.class }) 108public class TestBucketCache { 109 110 private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class); 111 112 @ClassRule 113 public static final HBaseClassTestRule CLASS_RULE = 114 HBaseClassTestRule.forClass(TestBucketCache.class); 115 116 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 117 public static Iterable<Object[]> data() { 118 return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize 119 // for these tests? 120 { 16 * 1024, 121 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 122 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 123 128 * 1024 + 1024 } } }); 124 } 125 126 @Parameterized.Parameter(0) 127 public int constructedBlockSize; 128 129 @Parameterized.Parameter(1) 130 public int[] constructedBlockSizes; 131 132 BucketCache cache; 133 final int CACHE_SIZE = 1000000; 134 final int NUM_BLOCKS = 100; 135 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 136 final int NUM_THREADS = 100; 137 final int NUM_QUERIES = 10000; 138 139 final long capacitySize = 32 * 1024 * 1024; 140 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 141 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 142 private String ioEngineName = "offheap"; 143 144 private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); 145 146 private static class MockedBucketCache extends BucketCache { 147 148 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 149 int writerThreads, int writerQLen, String persistencePath) throws IOException { 150 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 151 persistencePath); 152 } 153 154 @Override 155 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 156 super.cacheBlock(cacheKey, buf, inMemory); 157 } 158 159 @Override 160 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 161 super.cacheBlock(cacheKey, buf); 162 } 163 } 164 165 @Before 166 public void setup() throws IOException { 167 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 168 constructedBlockSizes, writeThreads, writerQLen, null); 169 } 170 171 @After 172 public void tearDown() { 173 cache.shutdown(); 174 } 175 176 /** 177 * Test Utility to create test dir and return name 178 * @return return name of created dir 179 * @throws IOException throws IOException 180 */ 181 private Path createAndGetTestDir() throws IOException { 182 final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir(); 183 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir); 184 return testDir; 185 } 186 187 /** 188 * Return a random element from {@code a}. 189 */ 190 private static <T> T randFrom(List<T> a) { 191 return a.get(ThreadLocalRandom.current().nextInt(a.size())); 192 } 193 194 @Test 195 public void testBucketAllocator() throws BucketAllocatorException { 196 BucketAllocator mAllocator = cache.getAllocator(); 197 /* 198 * Test the allocator first 199 */ 200 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 201 202 boolean full = false; 203 ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>(); 204 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 205 // the cache is completely filled. 206 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 207 while (!full) { 208 Integer blockSize = null; 209 try { 210 blockSize = randFrom(tmp); 211 allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize)); 212 } catch (CacheFullException cfe) { 213 tmp.remove(blockSize); 214 if (tmp.isEmpty()) full = true; 215 } 216 } 217 218 for (Integer blockSize : BLOCKSIZES) { 219 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 220 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 221 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); 222 223 // we know the block sizes above are multiples of 1024, but default bucket sizes give an 224 // additional 1024 on top of that so this counts towards fragmentation in our test 225 // real life may have worse fragmentation because blocks may not be perfectly sized to block 226 // size, given encoding/compression and large rows 227 assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes()); 228 } 229 230 mAllocator.logDebugStatistics(); 231 232 for (Pair<Long, Integer> allocation : allocations) { 233 assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()), 234 mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond())); 235 } 236 assertEquals(0, mAllocator.getUsedSize()); 237 } 238 239 @Test 240 public void testCacheSimple() throws Exception { 241 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 242 } 243 244 @Test 245 public void testCacheMultiThreadedSingleKey() throws Exception { 246 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 247 } 248 249 @Test 250 public void testHeapSizeChanges() throws Exception { 251 cache.stopWriterThreads(); 252 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 253 } 254 255 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 256 throws InterruptedException { 257 Waiter.waitFor(HBaseConfiguration.create(), 10000, 258 () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); 259 } 260 261 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 262 while (!cache.ramCache.isEmpty()) { 263 Thread.sleep(100); 264 } 265 Thread.sleep(1000); 266 } 267 268 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 269 // threads will flush it to the bucket and put reference entry in backingMap. 270 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 271 Cacheable block, boolean waitWhenCache) throws InterruptedException { 272 cache.cacheBlock(cacheKey, block, false, waitWhenCache); 273 waitUntilFlushedToBucket(cache, cacheKey); 274 } 275 276 @Test 277 public void testMemoryLeak() throws Exception { 278 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 279 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 280 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 281 long lockId = cache.backingMap.get(cacheKey).offset(); 282 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 283 lock.writeLock().lock(); 284 Thread evictThread = new Thread("evict-block") { 285 @Override 286 public void run() { 287 cache.evictBlock(cacheKey); 288 } 289 }; 290 evictThread.start(); 291 cache.offsetLock.waitForWaiters(lockId, 1); 292 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); 293 assertEquals(0, cache.getBlockCount()); 294 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 295 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 296 assertEquals(1, cache.getBlockCount()); 297 lock.writeLock().unlock(); 298 evictThread.join(); 299 /** 300 * <pre> 301 * The asserts here before HBASE-21957 are: 302 * assertEquals(1L, cache.getBlockCount()); 303 * assertTrue(cache.getCurrentSize() > 0L); 304 * assertTrue("We should have a block!", cache.iterator().hasNext()); 305 * 306 * The asserts here after HBASE-21957 are: 307 * assertEquals(0, cache.getBlockCount()); 308 * assertEquals(cache.getCurrentSize(), 0L); 309 * 310 * I think the asserts before HBASE-21957 is more reasonable,because 311 * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry} 312 * it had seen, and newly added Block after the {@link BucketEntry} 313 * it had seen should not be evicted. 314 * </pre> 315 */ 316 assertEquals(1L, cache.getBlockCount()); 317 assertTrue(cache.getCurrentSize() > 0L); 318 assertTrue("We should have a block!", cache.iterator().hasNext()); 319 } 320 321 @Test 322 public void testRetrieveFromFile() throws Exception { 323 Path testDir = createAndGetTestDir(); 324 String ioEngineName = "file:" + testDir + "/bucket.cache"; 325 testRetrievalUtils(testDir, ioEngineName); 326 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 327 String persistencePath = testDir + "/bucket.persistence"; 328 BucketCache bucketCache = null; 329 try { 330 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 331 smallBucketSizes, writeThreads, writerQLen, persistencePath); 332 assertTrue(bucketCache.waitForCacheInitialization(10000)); 333 assertFalse(new File(persistencePath).exists()); 334 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 335 assertEquals(0, bucketCache.backingMap.size()); 336 } finally { 337 bucketCache.shutdown(); 338 HBASE_TESTING_UTILITY.cleanupTestDir(); 339 } 340 } 341 342 @Test 343 public void testRetrieveFromMMap() throws Exception { 344 final Path testDir = createAndGetTestDir(); 345 final String ioEngineName = "mmap:" + testDir + "/bucket.cache"; 346 testRetrievalUtils(testDir, ioEngineName); 347 } 348 349 @Test 350 public void testRetrieveFromPMem() throws Exception { 351 final Path testDir = createAndGetTestDir(); 352 final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; 353 testRetrievalUtils(testDir, ioEngineName); 354 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 355 String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 356 BucketCache bucketCache = null; 357 try { 358 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 359 smallBucketSizes, writeThreads, writerQLen, persistencePath); 360 assertTrue(bucketCache.waitForCacheInitialization(10000)); 361 assertFalse(new File(persistencePath).exists()); 362 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 363 assertEquals(0, bucketCache.backingMap.size()); 364 } finally { 365 bucketCache.shutdown(); 366 HBASE_TESTING_UTILITY.cleanupTestDir(); 367 } 368 } 369 370 private void testRetrievalUtils(Path testDir, String ioEngineName) 371 throws IOException, InterruptedException { 372 final String persistencePath = 373 testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 374 BucketCache bucketCache = null; 375 try { 376 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 377 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 378 assertTrue(bucketCache.waitForCacheInitialization(10000)); 379 long usedSize = bucketCache.getAllocator().getUsedSize(); 380 assertEquals(0, usedSize); 381 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 382 for (HFileBlockPair block : blocks) { 383 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 384 } 385 for (HFileBlockPair block : blocks) { 386 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 387 false); 388 } 389 usedSize = bucketCache.getAllocator().getUsedSize(); 390 assertNotEquals(0, usedSize); 391 bucketCache.shutdown(); 392 assertTrue(new File(persistencePath).exists()); 393 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 394 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 395 assertTrue(bucketCache.waitForCacheInitialization(10000)); 396 397 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 398 } finally { 399 if (bucketCache != null) { 400 bucketCache.shutdown(); 401 } 402 } 403 assertTrue(new File(persistencePath).exists()); 404 } 405 406 @Test 407 public void testRetrieveUnsupportedIOE() throws Exception { 408 try { 409 final Path testDir = createAndGetTestDir(); 410 final String ioEngineName = testDir + "/bucket.cache"; 411 testRetrievalUtils(testDir, ioEngineName); 412 Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!"); 413 } catch (IllegalArgumentException e) { 414 Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " 415 + "files:, mmap: or offheap", e.getMessage()); 416 } 417 } 418 419 @Test 420 public void testRetrieveFromMultipleFiles() throws Exception { 421 final Path testDirInitial = createAndGetTestDir(); 422 final Path newTestDir = new HBaseTestingUtil().getDataTestDir(); 423 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir); 424 String ioEngineName = 425 new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache") 426 .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString(); 427 testRetrievalUtils(testDirInitial, ioEngineName); 428 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 429 String persistencePath = testDirInitial + "/bucket.persistence"; 430 BucketCache bucketCache = null; 431 try { 432 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 433 smallBucketSizes, writeThreads, writerQLen, persistencePath); 434 assertTrue(bucketCache.waitForCacheInitialization(10000)); 435 assertFalse(new File(persistencePath).exists()); 436 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 437 assertEquals(0, bucketCache.backingMap.size()); 438 } finally { 439 bucketCache.shutdown(); 440 HBASE_TESTING_UTILITY.cleanupTestDir(); 441 } 442 } 443 444 @Test 445 public void testRetrieveFromFileWithoutPersistence() throws Exception { 446 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 447 constructedBlockSizes, writeThreads, writerQLen, null); 448 assertTrue(bucketCache.waitForCacheInitialization(10000)); 449 try { 450 final Path testDir = createAndGetTestDir(); 451 String ioEngineName = "file:" + testDir + "/bucket.cache"; 452 long usedSize = bucketCache.getAllocator().getUsedSize(); 453 assertEquals(0, usedSize); 454 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 455 for (HFileBlockPair block : blocks) { 456 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 457 } 458 for (HFileBlockPair block : blocks) { 459 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 460 false); 461 } 462 usedSize = bucketCache.getAllocator().getUsedSize(); 463 assertNotEquals(0, usedSize); 464 bucketCache.shutdown(); 465 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 466 constructedBlockSizes, writeThreads, writerQLen, null); 467 assertTrue(bucketCache.waitForCacheInitialization(10000)); 468 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 469 } finally { 470 bucketCache.shutdown(); 471 HBASE_TESTING_UTILITY.cleanupTestDir(); 472 } 473 } 474 475 @Test 476 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 477 long availableSpace = 20 * 1024L * 1024 * 1024; 478 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; 479 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 480 assertTrue(allocator.getBuckets().length > 0); 481 } 482 483 @Test 484 public void testGetPartitionSize() throws IOException { 485 // Test default values 486 validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR); 487 488 Configuration conf = HBaseConfiguration.create(); 489 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 490 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 491 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 492 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 493 494 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 495 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 496 assertTrue(cache.waitForCacheInitialization(10000)); 497 498 validateGetPartitionSize(cache, 0.1f, 0.5f); 499 validateGetPartitionSize(cache, 0.7f, 0.5f); 500 validateGetPartitionSize(cache, 0.2f, 0.5f); 501 } 502 503 @Test 504 public void testCacheSizeCapacity() throws IOException { 505 // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE 506 validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR); 507 Configuration conf = HBaseConfiguration.create(); 508 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 509 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 510 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 511 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 512 try { 513 new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads, 514 writerQLen, null, 100, conf); 515 Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!"); 516 } catch (IllegalArgumentException e) { 517 Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage()); 518 } 519 } 520 521 @Test 522 public void testValidBucketCacheConfigs() throws IOException { 523 Configuration conf = HBaseConfiguration.create(); 524 conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 525 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 526 conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 527 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 528 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 529 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 530 531 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 532 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 533 assertTrue(cache.waitForCacheInitialization(10000)); 534 535 assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, 536 cache.getAcceptableFactor(), 0); 537 assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0); 538 assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 539 cache.getExtraFreeFactor(), 0); 540 assertEquals(SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, cache.getSingleFactor(), 541 0); 542 assertEquals(MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, cache.getMultiFactor(), 543 0); 544 assertEquals(MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, cache.getMemoryFactor(), 545 0); 546 } 547 548 @Test 549 public void testInvalidAcceptFactorConfig() throws IOException { 550 float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; 551 boolean[] expectedOutcomes = { false, false, true, false }; 552 Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues); 553 Configuration conf = HBaseConfiguration.create(); 554 checkConfigValues(conf, configMappings, expectedOutcomes); 555 } 556 557 @Test 558 public void testInvalidMinFactorConfig() throws IOException { 559 float[] configValues = { -1f, 0f, 0.96f, 1.05f }; 560 // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 561 boolean[] expectedOutcomes = { false, true, false, false }; 562 Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues); 563 Configuration conf = HBaseConfiguration.create(); 564 checkConfigValues(conf, configMappings, expectedOutcomes); 565 } 566 567 @Test 568 public void testInvalidExtraFreeFactorConfig() throws IOException { 569 float[] configValues = { -1f, 0f, 0.2f, 1.05f }; 570 // throws due to <0, in expected range, in expected range, config can be > 1.0 571 boolean[] expectedOutcomes = { false, true, true, true }; 572 Map<String, float[]> configMappings = 573 ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 574 Configuration conf = HBaseConfiguration.create(); 575 checkConfigValues(conf, configMappings, expectedOutcomes); 576 } 577 578 @Test 579 public void testInvalidCacheSplitFactorConfig() throws IOException { 580 float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f }; 581 float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f }; 582 float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f }; 583 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't 584 // be negative, configs don't add to 1.0 585 boolean[] expectedOutcomes = { true, false, false, false }; 586 Map<String, 587 float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues, 588 MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME, 589 memoryFactorConfigValues); 590 Configuration conf = HBaseConfiguration.create(); 591 checkConfigValues(conf, configMappings, expectedOutcomes); 592 } 593 594 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, 595 boolean[] expectSuccess) throws IOException { 596 Set<String> configNames = configMap.keySet(); 597 for (int i = 0; i < expectSuccess.length; i++) { 598 try { 599 for (String configName : configNames) { 600 conf.setFloat(configName, configMap.get(configName)[i]); 601 } 602 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 603 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 604 assertTrue(cache.waitForCacheInitialization(10000)); 605 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] 606 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 607 } catch (IllegalArgumentException e) { 608 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] 609 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 610 } 611 } 612 } 613 614 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, 615 float minFactor) { 616 long expectedOutput = 617 (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 618 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 619 } 620 621 @Test 622 public void testOffsetProducesPositiveOutput() { 623 // This number is picked because it produces negative output if the values isn't ensured to be 624 // positive. See HBASE-18757 for more information. 625 long testValue = 549888460800L; 626 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { 627 return ByteBuffAllocator.NONE; 628 }, ByteBuffAllocator.HEAP); 629 assertEquals(testValue, bucketEntry.offset()); 630 } 631 632 @Test 633 public void testEvictionCount() throws InterruptedException { 634 int size = 100; 635 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 636 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 637 HFileContext meta = new HFileContextBuilder().build(); 638 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 639 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 640 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 641 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 642 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 643 644 BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0); 645 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 646 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 647 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 648 blockWithNextBlockMetadata.serialize(block1Buffer, true); 649 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 650 651 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 652 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 653 block1Buffer); 654 655 waitUntilFlushedToBucket(cache, key); 656 657 assertEquals(0, cache.getStats().getEvictionCount()); 658 659 // evict call should return 1, but then eviction count be 0 660 assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount")); 661 assertEquals(0, cache.getStats().getEvictionCount()); 662 663 // add back 664 key = new BlockCacheKey("testEvictionCount", 0); 665 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 666 block1Buffer); 667 waitUntilFlushedToBucket(cache, key); 668 669 // should not increment 670 assertTrue(cache.evictBlock(key)); 671 assertEquals(0, cache.getStats().getEvictionCount()); 672 673 // add back 674 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 675 block1Buffer); 676 waitUntilFlushedToBucket(cache, key); 677 678 // should finally increment eviction count 679 cache.freeSpace("testing"); 680 assertEquals(1, cache.getStats().getEvictionCount()); 681 } 682 683 @Test 684 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 685 int size = 100; 686 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 687 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 688 HFileContext meta = new HFileContextBuilder().build(); 689 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 690 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 691 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 692 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 693 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 694 695 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 696 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 697 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 698 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 699 blockWithNextBlockMetadata.serialize(block1Buffer, true); 700 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 701 702 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 703 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 704 block1Buffer); 705 706 waitUntilFlushedToBucket(cache, key); 707 assertNotNull(cache.backingMap.get(key)); 708 assertEquals(1, cache.backingMap.get(key).refCnt()); 709 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 710 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 711 712 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 713 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 714 block1Buffer); 715 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 716 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 717 assertEquals(1, cache.backingMap.get(key).refCnt()); 718 719 // Clear and add blockWithoutNextBlockMetadata 720 assertTrue(cache.evictBlock(key)); 721 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 722 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 723 724 assertNull(cache.getBlock(key, false, false, false)); 725 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 726 block2Buffer); 727 728 waitUntilFlushedToBucket(cache, key); 729 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 730 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 731 732 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 733 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 734 block1Buffer); 735 736 waitUntilFlushedToBucket(cache, key); 737 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 738 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 739 } 740 741 @Test 742 public void testRAMCache() { 743 int size = 100; 744 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 745 byte[] byteArr = new byte[length]; 746 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 747 HFileContext meta = new HFileContextBuilder().build(); 748 749 RAMCache cache = new RAMCache(); 750 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 751 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 752 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 753 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 754 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 755 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 756 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false); 757 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false); 758 759 assertFalse(cache.containsKey(key1)); 760 assertNull(cache.putIfAbsent(key1, re1)); 761 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 762 763 assertNotNull(cache.putIfAbsent(key1, re2)); 764 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 765 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 766 767 assertNull(cache.putIfAbsent(key2, re2)); 768 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 769 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 770 771 cache.remove(key1); 772 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 773 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 774 775 cache.clear(); 776 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 777 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 778 } 779 780 @Test 781 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 782 // initialize an block. 783 int size = 100, offset = 20; 784 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 785 ByteBuffer buf = ByteBuffer.allocate(length); 786 HFileContext meta = new HFileContextBuilder().build(); 787 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 788 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 789 790 // initialize an mocked ioengine. 791 IOEngine ioEngine = Mockito.mock(IOEngine.class); 792 when(ioEngine.usesSharedMemory()).thenReturn(false); 793 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 794 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 795 Mockito.anyLong()); 796 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 797 Mockito.anyLong()); 798 799 // create an bucket allocator. 800 long availableSpace = 1024 * 1024 * 1024L; 801 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 802 803 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 804 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false); 805 806 Assert.assertEquals(0, allocator.getUsedSize()); 807 try { 808 re.writeToCache(ioEngine, allocator, null, null, 809 ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE); 810 Assert.fail(); 811 } catch (Exception e) { 812 } 813 Assert.assertEquals(0, allocator.getUsedSize()); 814 } 815 816 /** 817 * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file 818 * could not be freed even if corresponding {@link HFileBlock} is evicted from 819 * {@link BucketCache}. 820 */ 821 @Test 822 public void testFreeBucketEntryRestoredFromFile() throws Exception { 823 BucketCache bucketCache = null; 824 try { 825 final Path dataTestDir = createAndGetTestDir(); 826 827 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 828 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 829 830 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 831 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 832 assertTrue(bucketCache.waitForCacheInitialization(10000)); 833 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 834 assertEquals(0, usedByteSize); 835 836 HFileBlockPair[] hfileBlockPairs = 837 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 838 // Add blocks 839 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 840 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); 841 } 842 843 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 844 cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), 845 hfileBlockPair.getBlock(), false); 846 } 847 usedByteSize = bucketCache.getAllocator().getUsedSize(); 848 assertNotEquals(0, usedByteSize); 849 // persist cache to file 850 bucketCache.shutdown(); 851 assertTrue(new File(persistencePath).exists()); 852 853 // restore cache from file 854 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 855 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 856 assertTrue(bucketCache.waitForCacheInitialization(10000)); 857 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 858 859 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 860 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 861 bucketCache.evictBlock(blockCacheKey); 862 } 863 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 864 assertEquals(0, bucketCache.backingMap.size()); 865 } finally { 866 bucketCache.shutdown(); 867 HBASE_TESTING_UTILITY.cleanupTestDir(); 868 } 869 } 870 871 @Test 872 public void testBlockAdditionWaitWhenCache() throws Exception { 873 BucketCache bucketCache = null; 874 try { 875 final Path dataTestDir = createAndGetTestDir(); 876 877 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 878 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 879 880 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 881 config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000); 882 883 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 884 constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config); 885 assertTrue(bucketCache.waitForCacheInitialization(10000)); 886 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 887 assertEquals(0, usedByteSize); 888 889 HFileBlockPair[] hfileBlockPairs = 890 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); 891 String[] names = CacheTestUtils.getHFileNames(hfileBlockPairs); 892 // Add blocks 893 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 894 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, 895 true); 896 } 897 898 // Max wait for 10 seconds. 899 long timeout = 10000; 900 // Wait for blocks size to match the number of blocks. 901 while (bucketCache.backingMap.size() != 10) { 902 if (timeout <= 0) break; 903 Threads.sleep(100); 904 timeout -= 100; 905 } 906 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 907 assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); 908 } 909 usedByteSize = bucketCache.getAllocator().getUsedSize(); 910 assertNotEquals(0, usedByteSize); 911 // persist cache to file 912 bucketCache.shutdown(); 913 assertTrue(new File(persistencePath).exists()); 914 915 // restore cache from file 916 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 917 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 918 assertTrue(bucketCache.waitForCacheInitialization(10000)); 919 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 920 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(hfileBlockPairs, names); 921 for (BlockCacheKey key : newKeys) { 922 bucketCache.evictBlock(key); 923 } 924 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 925 assertEquals(0, bucketCache.backingMap.size()); 926 } finally { 927 if (bucketCache != null) { 928 bucketCache.shutdown(); 929 } 930 HBASE_TESTING_UTILITY.cleanupTestDir(); 931 } 932 } 933 934 @Test 935 public void testOnConfigurationChange() throws Exception { 936 BucketCache bucketCache = null; 937 try { 938 final Path dataTestDir = createAndGetTestDir(); 939 940 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 941 942 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 943 944 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 945 constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config); 946 947 assertTrue(bucketCache.waitForCacheInitialization(10000)); 948 949 config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 950 config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f); 951 config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f); 952 config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f); 953 config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f); 954 config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 955 config.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 956 config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500); 957 config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000); 958 959 bucketCache.onConfigurationChange(config); 960 961 assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01); 962 assertEquals(0.8f, bucketCache.getMinFactor(), 0.01); 963 assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01); 964 assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01); 965 assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01); 966 assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01); 967 assertEquals(100L, bucketCache.getQueueAdditionWaitTime()); 968 assertEquals(500L, bucketCache.getBucketcachePersistInterval()); 969 assertEquals(1000L, bucketCache.getPersistenceChunkSize()); 970 971 } finally { 972 if (bucketCache != null) { 973 bucketCache.shutdown(); 974 } 975 HBASE_TESTING_UTILITY.cleanupTestDir(); 976 } 977 } 978 979 @Test 980 public void testNotifyFileCachingCompletedSuccess() throws Exception { 981 BucketCache bucketCache = null; 982 try { 983 Path filePath = 984 new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess"); 985 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false); 986 if (bucketCache.getStats().getFailedInserts() > 0) { 987 LOG.info("There were {} fail inserts, " 988 + "will assert if total blocks in backingMap equals (10 - failInserts) " 989 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 990 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 991 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 992 } else { 993 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 994 } 995 } finally { 996 if (bucketCache != null) { 997 bucketCache.shutdown(); 998 } 999 HBASE_TESTING_UTILITY.cleanupTestDir(); 1000 } 1001 } 1002 1003 @Test 1004 public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception { 1005 BucketCache bucketCache = null; 1006 try { 1007 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 1008 "testNotifyFileCachingCompletedForEncodedDataSuccess"); 1009 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true); 1010 if (bucketCache.getStats().getFailedInserts() > 0) { 1011 LOG.info("There were {} fail inserts, " 1012 + "will assert if total blocks in backingMap equals (10 - failInserts) " 1013 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 1014 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 1015 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1016 } else { 1017 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1018 } 1019 } finally { 1020 if (bucketCache != null) { 1021 bucketCache.shutdown(); 1022 } 1023 HBASE_TESTING_UTILITY.cleanupTestDir(); 1024 } 1025 } 1026 1027 @Test 1028 public void testNotifyFileCachingCompletedNotAllCached() throws Exception { 1029 BucketCache bucketCache = null; 1030 try { 1031 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 1032 "testNotifyFileCachingCompletedNotAllCached"); 1033 // Deliberately passing more blocks than we have created to test that 1034 // notifyFileCachingCompleted will not consider the file fully cached 1035 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false); 1036 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1037 } finally { 1038 if (bucketCache != null) { 1039 bucketCache.shutdown(); 1040 } 1041 HBASE_TESTING_UTILITY.cleanupTestDir(); 1042 } 1043 } 1044 1045 private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath, 1046 int totalBlocksToCheck, boolean encoded) throws Exception { 1047 final Path dataTestDir = createAndGetTestDir(); 1048 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 1049 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 1050 constructedBlockSizes, 1, 1, null); 1051 assertTrue(bucketCache.waitForCacheInitialization(10000)); 1052 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 1053 assertEquals(0, usedByteSize); 1054 HFileBlockPair[] hfileBlockPairs = 1055 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded); 1056 // Add blocks 1057 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 1058 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true); 1059 } 1060 bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck, 1061 totalBlocksToCheck * constructedBlockSize); 1062 return bucketCache; 1063 } 1064 1065 @Test 1066 public void testEvictOrphansOutOfGracePeriod() throws Exception { 1067 BucketCache bucketCache = testEvictOrphans(0); 1068 assertEquals(10, bucketCache.getBackingMap().size()); 1069 assertEquals(0, bucketCache.blocksByHFile.stream() 1070 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count()); 1071 } 1072 1073 @Test 1074 public void testEvictOrphansWithinGracePeriod() throws Exception { 1075 BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L); 1076 assertEquals(18, bucketCache.getBackingMap().size()); 1077 assertTrue(bucketCache.blocksByHFile.stream() 1078 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0); 1079 } 1080 1081 private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception { 1082 Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid"); 1083 Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan"); 1084 Map<String, HRegion> onlineRegions = new HashMap<>(); 1085 List<HStore> stores = new ArrayList<>(); 1086 Collection<HStoreFile> storeFiles = new ArrayList<>(); 1087 HRegion mockedRegion = mock(HRegion.class); 1088 HStore mockedStore = mock(HStore.class); 1089 HStoreFile mockedStoreFile = mock(HStoreFile.class); 1090 when(mockedStoreFile.getPath()).thenReturn(validFile); 1091 storeFiles.add(mockedStoreFile); 1092 when(mockedStore.getStorefiles()).thenReturn(storeFiles); 1093 stores.add(mockedStore); 1094 when(mockedRegion.getStores()).thenReturn(stores); 1095 onlineRegions.put("mocked_region", mockedRegion); 1096 HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); 1097 HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); 1098 HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); 1099 HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD, 1100 orphanEvictionGracePeriod); 1101 BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21, 1102 constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000, 1103 HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions); 1104 HFileBlockPair[] validBlockPairs = 1105 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile, false); 1106 HFileBlockPair[] orphanBlockPairs = 1107 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile, false); 1108 for (HFileBlockPair pair : validBlockPairs) { 1109 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1110 } 1111 waitUntilAllFlushedToBucket(bucketCache); 1112 assertEquals(10, bucketCache.getBackingMap().size()); 1113 bucketCache.freeSpace("test"); 1114 assertEquals(10, bucketCache.getBackingMap().size()); 1115 for (HFileBlockPair pair : orphanBlockPairs) { 1116 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1117 } 1118 waitUntilAllFlushedToBucket(bucketCache); 1119 assertEquals(20, bucketCache.getBackingMap().size()); 1120 bucketCache.freeSpace("test"); 1121 return bucketCache; 1122 } 1123 1124 @Test 1125 public void testBlockPriority() throws Exception { 1126 HFileBlockPair block = CacheTestUtils.generateHFileBlocks(BLOCK_SIZE, 1)[0]; 1127 cacheAndWaitUntilFlushedToBucket(cache, block.getBlockName(), block.getBlock(), true); 1128 assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.SINGLE); 1129 cache.getBlock(block.getBlockName(), true, false, true); 1130 assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.MULTI); 1131 } 1132 1133 @Test 1134 public void testIOTimePerHitReturnsZeroWhenNoHits() 1135 throws NoSuchFieldException, IllegalAccessException { 1136 CacheStats cacheStats = cache.getStats(); 1137 assertTrue(cacheStats instanceof BucketCacheStats); 1138 BucketCacheStats bucketCacheStats = (BucketCacheStats) cacheStats; 1139 1140 Field field = BucketCacheStats.class.getDeclaredField("ioHitCount"); 1141 field.setAccessible(true); 1142 LongAdder ioHitCount = (LongAdder) field.get(bucketCacheStats); 1143 1144 assertEquals(0, ioHitCount.sum()); 1145 double ioTimePerHit = bucketCacheStats.getIOTimePerHit(); 1146 assertEquals(0, ioTimePerHit, 0.0); 1147 } 1148}