001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD; 024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR; 026import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR; 027import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; 028import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME; 029import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; 030import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME; 031import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 032import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME; 033import static org.junit.Assert.assertEquals; 034import static org.junit.Assert.assertFalse; 035import static org.junit.Assert.assertNotEquals; 036import static org.junit.Assert.assertNotNull; 037import static org.junit.Assert.assertNull; 038import static org.junit.Assert.assertTrue; 039import static org.mockito.Mockito.mock; 040import static org.mockito.Mockito.when; 041 042import java.io.File; 043import java.io.IOException; 044import java.nio.ByteBuffer; 045import java.util.ArrayList; 046import java.util.Arrays; 047import java.util.Collection; 048import java.util.HashMap; 049import java.util.List; 050import java.util.Map; 051import java.util.Set; 052import java.util.concurrent.ThreadLocalRandom; 053import java.util.concurrent.locks.ReentrantReadWriteLock; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.fs.Path; 056import org.apache.hadoop.hbase.HBaseClassTestRule; 057import org.apache.hadoop.hbase.HBaseConfiguration; 058import org.apache.hadoop.hbase.HBaseTestingUtil; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.Waiter; 061import org.apache.hadoop.hbase.io.ByteBuffAllocator; 062import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 063import org.apache.hadoop.hbase.io.hfile.BlockPriority; 064import org.apache.hadoop.hbase.io.hfile.BlockType; 065import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 066import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 067import org.apache.hadoop.hbase.io.hfile.Cacheable; 068import org.apache.hadoop.hbase.io.hfile.HFileBlock; 069import org.apache.hadoop.hbase.io.hfile.HFileContext; 070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 071import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 072import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 073import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 074import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 075import org.apache.hadoop.hbase.nio.ByteBuff; 076import org.apache.hadoop.hbase.regionserver.HRegion; 077import org.apache.hadoop.hbase.regionserver.HStore; 078import org.apache.hadoop.hbase.regionserver.HStoreFile; 079import org.apache.hadoop.hbase.testclassification.IOTests; 080import org.apache.hadoop.hbase.testclassification.LargeTests; 081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 082import org.apache.hadoop.hbase.util.Pair; 083import org.apache.hadoop.hbase.util.Threads; 084import org.junit.After; 085import org.junit.Assert; 086import org.junit.Before; 087import org.junit.ClassRule; 088import org.junit.Test; 089import org.junit.experimental.categories.Category; 090import org.junit.runner.RunWith; 091import org.junit.runners.Parameterized; 092import org.mockito.Mockito; 093import org.slf4j.Logger; 094import org.slf4j.LoggerFactory; 095 096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 097 098/** 099 * Basic test of BucketCache.Puts and gets. 100 * <p> 101 * Tests will ensure that blocks' data correctness under several threads concurrency 102 */ 103@RunWith(Parameterized.class) 104@Category({ IOTests.class, LargeTests.class }) 105public class TestBucketCache { 106 107 private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class); 108 109 @ClassRule 110 public static final HBaseClassTestRule CLASS_RULE = 111 HBaseClassTestRule.forClass(TestBucketCache.class); 112 113 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 114 public static Iterable<Object[]> data() { 115 return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize 116 // for these tests? 117 { 16 * 1024, 118 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 119 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 120 128 * 1024 + 1024 } } }); 121 } 122 123 @Parameterized.Parameter(0) 124 public int constructedBlockSize; 125 126 @Parameterized.Parameter(1) 127 public int[] constructedBlockSizes; 128 129 BucketCache cache; 130 final int CACHE_SIZE = 1000000; 131 final int NUM_BLOCKS = 100; 132 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 133 final int NUM_THREADS = 100; 134 final int NUM_QUERIES = 10000; 135 136 final long capacitySize = 32 * 1024 * 1024; 137 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 138 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 139 private String ioEngineName = "offheap"; 140 141 private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); 142 143 private static class MockedBucketCache extends BucketCache { 144 145 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 146 int writerThreads, int writerQLen, String persistencePath) throws IOException { 147 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 148 persistencePath); 149 } 150 151 @Override 152 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 153 super.cacheBlock(cacheKey, buf, inMemory); 154 } 155 156 @Override 157 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 158 super.cacheBlock(cacheKey, buf); 159 } 160 } 161 162 @Before 163 public void setup() throws IOException { 164 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 165 constructedBlockSizes, writeThreads, writerQLen, null); 166 } 167 168 @After 169 public void tearDown() { 170 cache.shutdown(); 171 } 172 173 /** 174 * Test Utility to create test dir and return name 175 * @return return name of created dir 176 * @throws IOException throws IOException 177 */ 178 private Path createAndGetTestDir() throws IOException { 179 final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir(); 180 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir); 181 return testDir; 182 } 183 184 /** 185 * Return a random element from {@code a}. 186 */ 187 private static <T> T randFrom(List<T> a) { 188 return a.get(ThreadLocalRandom.current().nextInt(a.size())); 189 } 190 191 @Test 192 public void testBucketAllocator() throws BucketAllocatorException { 193 BucketAllocator mAllocator = cache.getAllocator(); 194 /* 195 * Test the allocator first 196 */ 197 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 198 199 boolean full = false; 200 ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>(); 201 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 202 // the cache is completely filled. 203 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 204 while (!full) { 205 Integer blockSize = null; 206 try { 207 blockSize = randFrom(tmp); 208 allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize)); 209 } catch (CacheFullException cfe) { 210 tmp.remove(blockSize); 211 if (tmp.isEmpty()) full = true; 212 } 213 } 214 215 for (Integer blockSize : BLOCKSIZES) { 216 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 217 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 218 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); 219 220 // we know the block sizes above are multiples of 1024, but default bucket sizes give an 221 // additional 1024 on top of that so this counts towards fragmentation in our test 222 // real life may have worse fragmentation because blocks may not be perfectly sized to block 223 // size, given encoding/compression and large rows 224 assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes()); 225 } 226 227 mAllocator.logDebugStatistics(); 228 229 for (Pair<Long, Integer> allocation : allocations) { 230 assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()), 231 mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond())); 232 } 233 assertEquals(0, mAllocator.getUsedSize()); 234 } 235 236 @Test 237 public void testCacheSimple() throws Exception { 238 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 239 } 240 241 @Test 242 public void testCacheMultiThreadedSingleKey() throws Exception { 243 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 244 } 245 246 @Test 247 public void testHeapSizeChanges() throws Exception { 248 cache.stopWriterThreads(); 249 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 250 } 251 252 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 253 throws InterruptedException { 254 Waiter.waitFor(HBaseConfiguration.create(), 10000, 255 () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); 256 } 257 258 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 259 while (!cache.ramCache.isEmpty()) { 260 Thread.sleep(100); 261 } 262 Thread.sleep(1000); 263 } 264 265 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 266 // threads will flush it to the bucket and put reference entry in backingMap. 267 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 268 Cacheable block, boolean waitWhenCache) throws InterruptedException { 269 cache.cacheBlock(cacheKey, block, false, waitWhenCache); 270 waitUntilFlushedToBucket(cache, cacheKey); 271 } 272 273 @Test 274 public void testMemoryLeak() throws Exception { 275 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 276 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 277 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 278 long lockId = cache.backingMap.get(cacheKey).offset(); 279 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 280 lock.writeLock().lock(); 281 Thread evictThread = new Thread("evict-block") { 282 @Override 283 public void run() { 284 cache.evictBlock(cacheKey); 285 } 286 }; 287 evictThread.start(); 288 cache.offsetLock.waitForWaiters(lockId, 1); 289 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); 290 assertEquals(0, cache.getBlockCount()); 291 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 292 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 293 assertEquals(1, cache.getBlockCount()); 294 lock.writeLock().unlock(); 295 evictThread.join(); 296 /** 297 * <pre> 298 * The asserts here before HBASE-21957 are: 299 * assertEquals(1L, cache.getBlockCount()); 300 * assertTrue(cache.getCurrentSize() > 0L); 301 * assertTrue("We should have a block!", cache.iterator().hasNext()); 302 * 303 * The asserts here after HBASE-21957 are: 304 * assertEquals(0, cache.getBlockCount()); 305 * assertEquals(cache.getCurrentSize(), 0L); 306 * 307 * I think the asserts before HBASE-21957 is more reasonable,because 308 * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry} 309 * it had seen, and newly added Block after the {@link BucketEntry} 310 * it had seen should not be evicted. 311 * </pre> 312 */ 313 assertEquals(1L, cache.getBlockCount()); 314 assertTrue(cache.getCurrentSize() > 0L); 315 assertTrue("We should have a block!", cache.iterator().hasNext()); 316 } 317 318 @Test 319 public void testRetrieveFromFile() throws Exception { 320 Path testDir = createAndGetTestDir(); 321 String ioEngineName = "file:" + testDir + "/bucket.cache"; 322 testRetrievalUtils(testDir, ioEngineName); 323 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 324 String persistencePath = testDir + "/bucket.persistence"; 325 BucketCache bucketCache = null; 326 try { 327 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 328 smallBucketSizes, writeThreads, writerQLen, persistencePath); 329 assertTrue(bucketCache.waitForCacheInitialization(10000)); 330 assertFalse(new File(persistencePath).exists()); 331 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 332 assertEquals(0, bucketCache.backingMap.size()); 333 } finally { 334 bucketCache.shutdown(); 335 HBASE_TESTING_UTILITY.cleanupTestDir(); 336 } 337 } 338 339 @Test 340 public void testRetrieveFromMMap() throws Exception { 341 final Path testDir = createAndGetTestDir(); 342 final String ioEngineName = "mmap:" + testDir + "/bucket.cache"; 343 testRetrievalUtils(testDir, ioEngineName); 344 } 345 346 @Test 347 public void testRetrieveFromPMem() throws Exception { 348 final Path testDir = createAndGetTestDir(); 349 final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; 350 testRetrievalUtils(testDir, ioEngineName); 351 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 352 String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 353 BucketCache bucketCache = null; 354 try { 355 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 356 smallBucketSizes, writeThreads, writerQLen, persistencePath); 357 assertTrue(bucketCache.waitForCacheInitialization(10000)); 358 assertFalse(new File(persistencePath).exists()); 359 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 360 assertEquals(0, bucketCache.backingMap.size()); 361 } finally { 362 bucketCache.shutdown(); 363 HBASE_TESTING_UTILITY.cleanupTestDir(); 364 } 365 } 366 367 private void testRetrievalUtils(Path testDir, String ioEngineName) 368 throws IOException, InterruptedException { 369 final String persistencePath = 370 testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 371 BucketCache bucketCache = null; 372 try { 373 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 374 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 375 assertTrue(bucketCache.waitForCacheInitialization(10000)); 376 long usedSize = bucketCache.getAllocator().getUsedSize(); 377 assertEquals(0, usedSize); 378 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 379 for (HFileBlockPair block : blocks) { 380 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 381 } 382 for (HFileBlockPair block : blocks) { 383 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 384 false); 385 } 386 usedSize = bucketCache.getAllocator().getUsedSize(); 387 assertNotEquals(0, usedSize); 388 bucketCache.shutdown(); 389 assertTrue(new File(persistencePath).exists()); 390 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 391 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 392 assertTrue(bucketCache.waitForCacheInitialization(10000)); 393 394 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 395 } finally { 396 if (bucketCache != null) { 397 bucketCache.shutdown(); 398 } 399 } 400 assertTrue(new File(persistencePath).exists()); 401 } 402 403 @Test 404 public void testRetrieveUnsupportedIOE() throws Exception { 405 try { 406 final Path testDir = createAndGetTestDir(); 407 final String ioEngineName = testDir + "/bucket.cache"; 408 testRetrievalUtils(testDir, ioEngineName); 409 Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!"); 410 } catch (IllegalArgumentException e) { 411 Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " 412 + "files:, mmap: or offheap", e.getMessage()); 413 } 414 } 415 416 @Test 417 public void testRetrieveFromMultipleFiles() throws Exception { 418 final Path testDirInitial = createAndGetTestDir(); 419 final Path newTestDir = new HBaseTestingUtil().getDataTestDir(); 420 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir); 421 String ioEngineName = 422 new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache") 423 .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString(); 424 testRetrievalUtils(testDirInitial, ioEngineName); 425 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 426 String persistencePath = testDirInitial + "/bucket.persistence"; 427 BucketCache bucketCache = null; 428 try { 429 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 430 smallBucketSizes, writeThreads, writerQLen, persistencePath); 431 assertTrue(bucketCache.waitForCacheInitialization(10000)); 432 assertFalse(new File(persistencePath).exists()); 433 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 434 assertEquals(0, bucketCache.backingMap.size()); 435 } finally { 436 bucketCache.shutdown(); 437 HBASE_TESTING_UTILITY.cleanupTestDir(); 438 } 439 } 440 441 @Test 442 public void testRetrieveFromFileWithoutPersistence() throws Exception { 443 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 444 constructedBlockSizes, writeThreads, writerQLen, null); 445 assertTrue(bucketCache.waitForCacheInitialization(10000)); 446 try { 447 final Path testDir = createAndGetTestDir(); 448 String ioEngineName = "file:" + testDir + "/bucket.cache"; 449 long usedSize = bucketCache.getAllocator().getUsedSize(); 450 assertEquals(0, usedSize); 451 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 452 for (HFileBlockPair block : blocks) { 453 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 454 } 455 for (HFileBlockPair block : blocks) { 456 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 457 false); 458 } 459 usedSize = bucketCache.getAllocator().getUsedSize(); 460 assertNotEquals(0, usedSize); 461 bucketCache.shutdown(); 462 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 463 constructedBlockSizes, writeThreads, writerQLen, null); 464 assertTrue(bucketCache.waitForCacheInitialization(10000)); 465 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 466 } finally { 467 bucketCache.shutdown(); 468 HBASE_TESTING_UTILITY.cleanupTestDir(); 469 } 470 } 471 472 @Test 473 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 474 long availableSpace = 20 * 1024L * 1024 * 1024; 475 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; 476 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 477 assertTrue(allocator.getBuckets().length > 0); 478 } 479 480 @Test 481 public void testGetPartitionSize() throws IOException { 482 // Test default values 483 validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR); 484 485 Configuration conf = HBaseConfiguration.create(); 486 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 487 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 488 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 489 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 490 491 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 492 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 493 assertTrue(cache.waitForCacheInitialization(10000)); 494 495 validateGetPartitionSize(cache, 0.1f, 0.5f); 496 validateGetPartitionSize(cache, 0.7f, 0.5f); 497 validateGetPartitionSize(cache, 0.2f, 0.5f); 498 } 499 500 @Test 501 public void testCacheSizeCapacity() throws IOException { 502 // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE 503 validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR); 504 Configuration conf = HBaseConfiguration.create(); 505 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 506 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 507 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 508 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 509 try { 510 new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads, 511 writerQLen, null, 100, conf); 512 Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!"); 513 } catch (IllegalArgumentException e) { 514 Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage()); 515 } 516 } 517 518 @Test 519 public void testValidBucketCacheConfigs() throws IOException { 520 Configuration conf = HBaseConfiguration.create(); 521 conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 522 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 523 conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 524 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 525 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 526 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 527 528 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 529 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 530 assertTrue(cache.waitForCacheInitialization(10000)); 531 532 assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, 533 cache.getAcceptableFactor(), 0); 534 assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0); 535 assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 536 cache.getExtraFreeFactor(), 0); 537 assertEquals(SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, cache.getSingleFactor(), 538 0); 539 assertEquals(MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, cache.getMultiFactor(), 540 0); 541 assertEquals(MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, cache.getMemoryFactor(), 542 0); 543 } 544 545 @Test 546 public void testInvalidAcceptFactorConfig() throws IOException { 547 float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; 548 boolean[] expectedOutcomes = { false, false, true, false }; 549 Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues); 550 Configuration conf = HBaseConfiguration.create(); 551 checkConfigValues(conf, configMappings, expectedOutcomes); 552 } 553 554 @Test 555 public void testInvalidMinFactorConfig() throws IOException { 556 float[] configValues = { -1f, 0f, 0.96f, 1.05f }; 557 // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 558 boolean[] expectedOutcomes = { false, true, false, false }; 559 Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues); 560 Configuration conf = HBaseConfiguration.create(); 561 checkConfigValues(conf, configMappings, expectedOutcomes); 562 } 563 564 @Test 565 public void testInvalidExtraFreeFactorConfig() throws IOException { 566 float[] configValues = { -1f, 0f, 0.2f, 1.05f }; 567 // throws due to <0, in expected range, in expected range, config can be > 1.0 568 boolean[] expectedOutcomes = { false, true, true, true }; 569 Map<String, float[]> configMappings = 570 ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 571 Configuration conf = HBaseConfiguration.create(); 572 checkConfigValues(conf, configMappings, expectedOutcomes); 573 } 574 575 @Test 576 public void testInvalidCacheSplitFactorConfig() throws IOException { 577 float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f }; 578 float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f }; 579 float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f }; 580 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't 581 // be negative, configs don't add to 1.0 582 boolean[] expectedOutcomes = { true, false, false, false }; 583 Map<String, 584 float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues, 585 MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME, 586 memoryFactorConfigValues); 587 Configuration conf = HBaseConfiguration.create(); 588 checkConfigValues(conf, configMappings, expectedOutcomes); 589 } 590 591 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, 592 boolean[] expectSuccess) throws IOException { 593 Set<String> configNames = configMap.keySet(); 594 for (int i = 0; i < expectSuccess.length; i++) { 595 try { 596 for (String configName : configNames) { 597 conf.setFloat(configName, configMap.get(configName)[i]); 598 } 599 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 600 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 601 assertTrue(cache.waitForCacheInitialization(10000)); 602 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] 603 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 604 } catch (IllegalArgumentException e) { 605 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] 606 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 607 } 608 } 609 } 610 611 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, 612 float minFactor) { 613 long expectedOutput = 614 (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 615 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 616 } 617 618 @Test 619 public void testOffsetProducesPositiveOutput() { 620 // This number is picked because it produces negative output if the values isn't ensured to be 621 // positive. See HBASE-18757 for more information. 622 long testValue = 549888460800L; 623 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { 624 return ByteBuffAllocator.NONE; 625 }, ByteBuffAllocator.HEAP); 626 assertEquals(testValue, bucketEntry.offset()); 627 } 628 629 @Test 630 public void testEvictionCount() throws InterruptedException { 631 int size = 100; 632 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 633 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 634 HFileContext meta = new HFileContextBuilder().build(); 635 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 636 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 637 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 638 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 639 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 640 641 BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0); 642 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 643 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 644 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 645 blockWithNextBlockMetadata.serialize(block1Buffer, true); 646 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 647 648 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 649 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 650 block1Buffer); 651 652 waitUntilFlushedToBucket(cache, key); 653 654 assertEquals(0, cache.getStats().getEvictionCount()); 655 656 // evict call should return 1, but then eviction count be 0 657 assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount")); 658 assertEquals(0, cache.getStats().getEvictionCount()); 659 660 // add back 661 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 662 block1Buffer); 663 waitUntilFlushedToBucket(cache, key); 664 665 // should not increment 666 assertTrue(cache.evictBlock(key)); 667 assertEquals(0, cache.getStats().getEvictionCount()); 668 669 // add back 670 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 671 block1Buffer); 672 waitUntilFlushedToBucket(cache, key); 673 674 // should finally increment eviction count 675 cache.freeSpace("testing"); 676 assertEquals(1, cache.getStats().getEvictionCount()); 677 } 678 679 @Test 680 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 681 int size = 100; 682 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 683 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 684 HFileContext meta = new HFileContextBuilder().build(); 685 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 686 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 687 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 688 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 689 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 690 691 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 692 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 693 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 694 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 695 blockWithNextBlockMetadata.serialize(block1Buffer, true); 696 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 697 698 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 699 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 700 block1Buffer); 701 702 waitUntilFlushedToBucket(cache, key); 703 assertNotNull(cache.backingMap.get(key)); 704 assertEquals(1, cache.backingMap.get(key).refCnt()); 705 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 706 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 707 708 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 709 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 710 block1Buffer); 711 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 712 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 713 assertEquals(1, cache.backingMap.get(key).refCnt()); 714 715 // Clear and add blockWithoutNextBlockMetadata 716 assertTrue(cache.evictBlock(key)); 717 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 718 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 719 720 assertNull(cache.getBlock(key, false, false, false)); 721 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 722 block2Buffer); 723 724 waitUntilFlushedToBucket(cache, key); 725 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 726 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 727 728 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 729 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 730 block1Buffer); 731 732 waitUntilFlushedToBucket(cache, key); 733 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 734 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 735 } 736 737 @Test 738 public void testRAMCache() { 739 int size = 100; 740 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 741 byte[] byteArr = new byte[length]; 742 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 743 HFileContext meta = new HFileContextBuilder().build(); 744 745 RAMCache cache = new RAMCache(); 746 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 747 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 748 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 749 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 750 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 751 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 752 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false); 753 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false); 754 755 assertFalse(cache.containsKey(key1)); 756 assertNull(cache.putIfAbsent(key1, re1)); 757 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 758 759 assertNotNull(cache.putIfAbsent(key1, re2)); 760 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 761 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 762 763 assertNull(cache.putIfAbsent(key2, re2)); 764 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 765 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 766 767 cache.remove(key1); 768 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 769 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 770 771 cache.clear(); 772 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 773 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 774 } 775 776 @Test 777 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 778 // initialize an block. 779 int size = 100, offset = 20; 780 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 781 ByteBuffer buf = ByteBuffer.allocate(length); 782 HFileContext meta = new HFileContextBuilder().build(); 783 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 784 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 785 786 // initialize an mocked ioengine. 787 IOEngine ioEngine = Mockito.mock(IOEngine.class); 788 when(ioEngine.usesSharedMemory()).thenReturn(false); 789 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 790 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 791 Mockito.anyLong()); 792 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 793 Mockito.anyLong()); 794 795 // create an bucket allocator. 796 long availableSpace = 1024 * 1024 * 1024L; 797 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 798 799 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 800 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false); 801 802 Assert.assertEquals(0, allocator.getUsedSize()); 803 try { 804 re.writeToCache(ioEngine, allocator, null, null, 805 ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE); 806 Assert.fail(); 807 } catch (Exception e) { 808 } 809 Assert.assertEquals(0, allocator.getUsedSize()); 810 } 811 812 /** 813 * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file 814 * could not be freed even if corresponding {@link HFileBlock} is evicted from 815 * {@link BucketCache}. 816 */ 817 @Test 818 public void testFreeBucketEntryRestoredFromFile() throws Exception { 819 BucketCache bucketCache = null; 820 try { 821 final Path dataTestDir = createAndGetTestDir(); 822 823 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 824 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 825 826 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 827 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 828 assertTrue(bucketCache.waitForCacheInitialization(10000)); 829 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 830 assertEquals(0, usedByteSize); 831 832 HFileBlockPair[] hfileBlockPairs = 833 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 834 // Add blocks 835 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 836 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); 837 } 838 839 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 840 cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), 841 hfileBlockPair.getBlock(), false); 842 } 843 usedByteSize = bucketCache.getAllocator().getUsedSize(); 844 assertNotEquals(0, usedByteSize); 845 // persist cache to file 846 bucketCache.shutdown(); 847 assertTrue(new File(persistencePath).exists()); 848 849 // restore cache from file 850 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 851 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 852 assertTrue(bucketCache.waitForCacheInitialization(10000)); 853 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 854 855 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 856 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 857 bucketCache.evictBlock(blockCacheKey); 858 } 859 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 860 assertEquals(0, bucketCache.backingMap.size()); 861 } finally { 862 bucketCache.shutdown(); 863 HBASE_TESTING_UTILITY.cleanupTestDir(); 864 } 865 } 866 867 @Test 868 public void testBlockAdditionWaitWhenCache() throws Exception { 869 BucketCache bucketCache = null; 870 try { 871 final Path dataTestDir = createAndGetTestDir(); 872 873 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 874 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 875 876 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 877 config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000); 878 879 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 880 constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config); 881 assertTrue(bucketCache.waitForCacheInitialization(10000)); 882 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 883 assertEquals(0, usedByteSize); 884 885 HFileBlockPair[] hfileBlockPairs = 886 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); 887 // Add blocks 888 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 889 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, 890 true); 891 } 892 893 // Max wait for 10 seconds. 894 long timeout = 10000; 895 // Wait for blocks size to match the number of blocks. 896 while (bucketCache.backingMap.size() != 10) { 897 if (timeout <= 0) break; 898 Threads.sleep(100); 899 timeout -= 100; 900 } 901 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 902 assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); 903 } 904 usedByteSize = bucketCache.getAllocator().getUsedSize(); 905 assertNotEquals(0, usedByteSize); 906 // persist cache to file 907 bucketCache.shutdown(); 908 assertTrue(new File(persistencePath).exists()); 909 910 // restore cache from file 911 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 912 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 913 assertTrue(bucketCache.waitForCacheInitialization(10000)); 914 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 915 916 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 917 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 918 bucketCache.evictBlock(blockCacheKey); 919 } 920 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 921 assertEquals(0, bucketCache.backingMap.size()); 922 } finally { 923 if (bucketCache != null) { 924 bucketCache.shutdown(); 925 } 926 HBASE_TESTING_UTILITY.cleanupTestDir(); 927 } 928 } 929 930 @Test 931 public void testOnConfigurationChange() throws Exception { 932 BucketCache bucketCache = null; 933 try { 934 final Path dataTestDir = createAndGetTestDir(); 935 936 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 937 938 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 939 940 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 941 constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config); 942 943 assertTrue(bucketCache.waitForCacheInitialization(10000)); 944 945 config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 946 config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f); 947 config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f); 948 config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f); 949 config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f); 950 config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 951 config.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 952 config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500); 953 config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000); 954 955 bucketCache.onConfigurationChange(config); 956 957 assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01); 958 assertEquals(0.8f, bucketCache.getMinFactor(), 0.01); 959 assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01); 960 assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01); 961 assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01); 962 assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01); 963 assertEquals(100L, bucketCache.getQueueAdditionWaitTime()); 964 assertEquals(500L, bucketCache.getBucketcachePersistInterval()); 965 assertEquals(1000L, bucketCache.getPersistenceChunkSize()); 966 967 } finally { 968 if (bucketCache != null) { 969 bucketCache.shutdown(); 970 } 971 HBASE_TESTING_UTILITY.cleanupTestDir(); 972 } 973 } 974 975 @Test 976 public void testNotifyFileCachingCompletedSuccess() throws Exception { 977 BucketCache bucketCache = null; 978 try { 979 Path filePath = 980 new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess"); 981 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false); 982 if (bucketCache.getStats().getFailedInserts() > 0) { 983 LOG.info("There were {} fail inserts, " 984 + "will assert if total blocks in backingMap equals (10 - failInserts) " 985 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 986 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 987 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 988 } else { 989 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 990 } 991 } finally { 992 if (bucketCache != null) { 993 bucketCache.shutdown(); 994 } 995 HBASE_TESTING_UTILITY.cleanupTestDir(); 996 } 997 } 998 999 @Test 1000 public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception { 1001 BucketCache bucketCache = null; 1002 try { 1003 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 1004 "testNotifyFileCachingCompletedForEncodedDataSuccess"); 1005 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true); 1006 if (bucketCache.getStats().getFailedInserts() > 0) { 1007 LOG.info("There were {} fail inserts, " 1008 + "will assert if total blocks in backingMap equals (10 - failInserts) " 1009 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 1010 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 1011 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1012 } else { 1013 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1014 } 1015 } finally { 1016 if (bucketCache != null) { 1017 bucketCache.shutdown(); 1018 } 1019 HBASE_TESTING_UTILITY.cleanupTestDir(); 1020 } 1021 } 1022 1023 @Test 1024 public void testNotifyFileCachingCompletedNotAllCached() throws Exception { 1025 BucketCache bucketCache = null; 1026 try { 1027 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 1028 "testNotifyFileCachingCompletedNotAllCached"); 1029 // Deliberately passing more blocks than we have created to test that 1030 // notifyFileCachingCompleted will not consider the file fully cached 1031 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false); 1032 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1033 } finally { 1034 if (bucketCache != null) { 1035 bucketCache.shutdown(); 1036 } 1037 HBASE_TESTING_UTILITY.cleanupTestDir(); 1038 } 1039 } 1040 1041 private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath, 1042 int totalBlocksToCheck, boolean encoded) throws Exception { 1043 final Path dataTestDir = createAndGetTestDir(); 1044 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 1045 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 1046 constructedBlockSizes, 1, 1, null); 1047 assertTrue(bucketCache.waitForCacheInitialization(10000)); 1048 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 1049 assertEquals(0, usedByteSize); 1050 HFileBlockPair[] hfileBlockPairs = 1051 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded); 1052 // Add blocks 1053 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 1054 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true); 1055 } 1056 bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck, 1057 totalBlocksToCheck * constructedBlockSize); 1058 return bucketCache; 1059 } 1060 1061 @Test 1062 public void testEvictOrphansOutOfGracePeriod() throws Exception { 1063 BucketCache bucketCache = testEvictOrphans(0); 1064 assertEquals(10, bucketCache.getBackingMap().size()); 1065 assertEquals(0, bucketCache.blocksByHFile.stream() 1066 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count()); 1067 } 1068 1069 @Test 1070 public void testEvictOrphansWithinGracePeriod() throws Exception { 1071 BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L); 1072 assertEquals(18, bucketCache.getBackingMap().size()); 1073 assertTrue(bucketCache.blocksByHFile.stream() 1074 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0); 1075 } 1076 1077 private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception { 1078 Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid"); 1079 Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan"); 1080 Map<String, HRegion> onlineRegions = new HashMap<>(); 1081 List<HStore> stores = new ArrayList<>(); 1082 Collection<HStoreFile> storeFiles = new ArrayList<>(); 1083 HRegion mockedRegion = mock(HRegion.class); 1084 HStore mockedStore = mock(HStore.class); 1085 HStoreFile mockedStoreFile = mock(HStoreFile.class); 1086 when(mockedStoreFile.getPath()).thenReturn(validFile); 1087 storeFiles.add(mockedStoreFile); 1088 when(mockedStore.getStorefiles()).thenReturn(storeFiles); 1089 stores.add(mockedStore); 1090 when(mockedRegion.getStores()).thenReturn(stores); 1091 onlineRegions.put("mocked_region", mockedRegion); 1092 HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); 1093 HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); 1094 HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); 1095 HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD, 1096 orphanEvictionGracePeriod); 1097 BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21, 1098 constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000, 1099 HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions); 1100 HFileBlockPair[] validBlockPairs = 1101 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile); 1102 HFileBlockPair[] orphanBlockPairs = 1103 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile); 1104 for (HFileBlockPair pair : validBlockPairs) { 1105 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1106 } 1107 waitUntilAllFlushedToBucket(bucketCache); 1108 assertEquals(10, bucketCache.getBackingMap().size()); 1109 bucketCache.freeSpace("test"); 1110 assertEquals(10, bucketCache.getBackingMap().size()); 1111 for (HFileBlockPair pair : orphanBlockPairs) { 1112 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1113 } 1114 waitUntilAllFlushedToBucket(bucketCache); 1115 assertEquals(20, bucketCache.getBackingMap().size()); 1116 bucketCache.freeSpace("test"); 1117 return bucketCache; 1118 } 1119 1120 @Test 1121 public void testBlockPriority() throws Exception { 1122 HFileBlockPair block = CacheTestUtils.generateHFileBlocks(BLOCK_SIZE, 1)[0]; 1123 cacheAndWaitUntilFlushedToBucket(cache, block.getBlockName(), block.getBlock(), true); 1124 assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.SINGLE); 1125 cache.getBlock(block.getBlockName(), true, false, true); 1126 assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.MULTI); 1127 } 1128}