001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD; 024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR; 026import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR; 027import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; 028import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME; 029import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; 030import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME; 031import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 032import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME; 033import static org.junit.Assert.assertEquals; 034import static org.junit.Assert.assertFalse; 035import static org.junit.Assert.assertNotEquals; 036import static org.junit.Assert.assertNotNull; 037import static org.junit.Assert.assertNull; 038import static org.junit.Assert.assertTrue; 039import static org.mockito.Mockito.mock; 040import static org.mockito.Mockito.when; 041 042import java.io.File; 043import java.io.IOException; 044import java.lang.reflect.Field; 045import java.nio.ByteBuffer; 046import java.util.ArrayList; 047import java.util.Arrays; 048import java.util.Collection; 049import java.util.HashMap; 050import java.util.List; 051import java.util.Map; 052import java.util.Random; 053import java.util.Set; 054import java.util.concurrent.ThreadLocalRandom; 055import java.util.concurrent.atomic.LongAdder; 056import java.util.concurrent.locks.ReentrantReadWriteLock; 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.hbase.HBaseClassTestRule; 060import org.apache.hadoop.hbase.HBaseConfiguration; 061import org.apache.hadoop.hbase.HBaseTestingUtil; 062import org.apache.hadoop.hbase.HConstants; 063import org.apache.hadoop.hbase.Waiter; 064import org.apache.hadoop.hbase.io.ByteBuffAllocator; 065import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 066import org.apache.hadoop.hbase.io.hfile.BlockPriority; 067import org.apache.hadoop.hbase.io.hfile.BlockType; 068import org.apache.hadoop.hbase.io.hfile.CacheStats; 069import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 070import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 071import org.apache.hadoop.hbase.io.hfile.Cacheable; 072import org.apache.hadoop.hbase.io.hfile.HFileBlock; 073import org.apache.hadoop.hbase.io.hfile.HFileContext; 074import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 075import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 076import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 077import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 078import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 079import org.apache.hadoop.hbase.nio.ByteBuff; 080import org.apache.hadoop.hbase.regionserver.HRegion; 081import org.apache.hadoop.hbase.regionserver.HStore; 082import org.apache.hadoop.hbase.regionserver.HStoreFile; 083import org.apache.hadoop.hbase.testclassification.IOTests; 084import org.apache.hadoop.hbase.testclassification.LargeTests; 085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 086import org.apache.hadoop.hbase.util.Pair; 087import org.apache.hadoop.hbase.util.Threads; 088import org.junit.After; 089import org.junit.Assert; 090import org.junit.Before; 091import org.junit.ClassRule; 092import org.junit.Test; 093import org.junit.experimental.categories.Category; 094import org.junit.runner.RunWith; 095import org.junit.runners.Parameterized; 096import org.mockito.Mockito; 097import org.slf4j.Logger; 098import org.slf4j.LoggerFactory; 099 100import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 101 102/** 103 * Basic test of BucketCache.Puts and gets. 104 * <p> 105 * Tests will ensure that blocks' data correctness under several threads concurrency 106 */ 107@RunWith(Parameterized.class) 108@Category({ IOTests.class, LargeTests.class }) 109public class TestBucketCache { 110 111 private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class); 112 113 @ClassRule 114 public static final HBaseClassTestRule CLASS_RULE = 115 HBaseClassTestRule.forClass(TestBucketCache.class); 116 117 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 118 public static Iterable<Object[]> data() { 119 return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize 120 // for these tests? 121 { 16 * 1024, 122 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 123 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 124 128 * 1024 + 1024 } } }); 125 } 126 127 @Parameterized.Parameter(0) 128 public int constructedBlockSize; 129 130 @Parameterized.Parameter(1) 131 public int[] constructedBlockSizes; 132 133 BucketCache cache; 134 final int CACHE_SIZE = 1000000; 135 final int NUM_BLOCKS = 100; 136 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 137 final int NUM_THREADS = 100; 138 final int NUM_QUERIES = 10000; 139 140 final long capacitySize = 32 * 1024 * 1024; 141 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 142 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 143 private String ioEngineName = "offheap"; 144 145 private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); 146 147 private static class MockedBucketCache extends BucketCache { 148 149 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 150 int writerThreads, int writerQLen, String persistencePath) throws IOException { 151 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 152 persistencePath); 153 } 154 155 @Override 156 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 157 super.cacheBlock(cacheKey, buf, inMemory); 158 } 159 160 @Override 161 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 162 super.cacheBlock(cacheKey, buf); 163 } 164 } 165 166 @Before 167 public void setup() throws IOException { 168 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 169 constructedBlockSizes, writeThreads, writerQLen, null); 170 } 171 172 @After 173 public void tearDown() { 174 cache.shutdown(); 175 } 176 177 /** 178 * Test Utility to create test dir and return name 179 * @return return name of created dir 180 * @throws IOException throws IOException 181 */ 182 private Path createAndGetTestDir() throws IOException { 183 final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir(); 184 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir); 185 return testDir; 186 } 187 188 /** 189 * Return a random element from {@code a}. 190 */ 191 private static <T> T randFrom(List<T> a) { 192 return a.get(ThreadLocalRandom.current().nextInt(a.size())); 193 } 194 195 @Test 196 public void testBucketAllocator() throws BucketAllocatorException { 197 BucketAllocator mAllocator = cache.getAllocator(); 198 /* 199 * Test the allocator first 200 */ 201 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 202 203 boolean full = false; 204 ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>(); 205 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 206 // the cache is completely filled. 207 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 208 while (!full) { 209 Integer blockSize = null; 210 try { 211 blockSize = randFrom(tmp); 212 allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize)); 213 } catch (CacheFullException cfe) { 214 tmp.remove(blockSize); 215 if (tmp.isEmpty()) full = true; 216 } 217 } 218 219 for (Integer blockSize : BLOCKSIZES) { 220 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 221 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 222 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); 223 224 // we know the block sizes above are multiples of 1024, but default bucket sizes give an 225 // additional 1024 on top of that so this counts towards fragmentation in our test 226 // real life may have worse fragmentation because blocks may not be perfectly sized to block 227 // size, given encoding/compression and large rows 228 assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes()); 229 } 230 231 mAllocator.logDebugStatistics(); 232 233 for (Pair<Long, Integer> allocation : allocations) { 234 assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()), 235 mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond())); 236 } 237 assertEquals(0, mAllocator.getUsedSize()); 238 } 239 240 @Test 241 public void testCacheSimple() throws Exception { 242 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 243 } 244 245 @Test 246 public void testCacheMultiThreadedSingleKey() throws Exception { 247 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 248 } 249 250 @Test 251 public void testHeapSizeChanges() throws Exception { 252 cache.stopWriterThreads(); 253 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 254 } 255 256 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 257 throws InterruptedException { 258 Waiter.waitFor(HBaseConfiguration.create(), 10000, 259 () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); 260 } 261 262 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 263 while (!cache.ramCache.isEmpty()) { 264 Thread.sleep(100); 265 } 266 Thread.sleep(1000); 267 } 268 269 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 270 // threads will flush it to the bucket and put reference entry in backingMap. 271 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 272 Cacheable block, boolean waitWhenCache) throws InterruptedException { 273 cache.cacheBlock(cacheKey, block, false, waitWhenCache); 274 waitUntilFlushedToBucket(cache, cacheKey); 275 } 276 277 @Test 278 public void testMemoryLeak() throws Exception { 279 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 280 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 281 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 282 long lockId = cache.backingMap.get(cacheKey).offset(); 283 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 284 lock.writeLock().lock(); 285 Thread evictThread = new Thread("evict-block") { 286 @Override 287 public void run() { 288 cache.evictBlock(cacheKey); 289 } 290 }; 291 evictThread.start(); 292 cache.offsetLock.waitForWaiters(lockId, 1); 293 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); 294 assertEquals(0, cache.getBlockCount()); 295 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 296 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 297 assertEquals(1, cache.getBlockCount()); 298 lock.writeLock().unlock(); 299 evictThread.join(); 300 /** 301 * <pre> 302 * The asserts here before HBASE-21957 are: 303 * assertEquals(1L, cache.getBlockCount()); 304 * assertTrue(cache.getCurrentSize() > 0L); 305 * assertTrue("We should have a block!", cache.iterator().hasNext()); 306 * 307 * The asserts here after HBASE-21957 are: 308 * assertEquals(0, cache.getBlockCount()); 309 * assertEquals(cache.getCurrentSize(), 0L); 310 * 311 * I think the asserts before HBASE-21957 is more reasonable,because 312 * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry} 313 * it had seen, and newly added Block after the {@link BucketEntry} 314 * it had seen should not be evicted. 315 * </pre> 316 */ 317 assertEquals(1L, cache.getBlockCount()); 318 assertTrue(cache.getCurrentSize() > 0L); 319 assertTrue("We should have a block!", cache.iterator().hasNext()); 320 } 321 322 @Test 323 public void testRetrieveFromFile() throws Exception { 324 Path testDir = createAndGetTestDir(); 325 String ioEngineName = "file:" + testDir + "/bucket.cache"; 326 testRetrievalUtils(testDir, ioEngineName); 327 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 328 String persistencePath = testDir + "/bucket.persistence"; 329 BucketCache bucketCache = null; 330 try { 331 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 332 smallBucketSizes, writeThreads, writerQLen, persistencePath); 333 assertTrue(bucketCache.waitForCacheInitialization(10000)); 334 assertFalse(new File(persistencePath).exists()); 335 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 336 assertEquals(0, bucketCache.backingMap.size()); 337 } finally { 338 bucketCache.shutdown(); 339 HBASE_TESTING_UTILITY.cleanupTestDir(); 340 } 341 } 342 343 @Test 344 public void testRetrieveFromMMap() throws Exception { 345 final Path testDir = createAndGetTestDir(); 346 final String ioEngineName = "mmap:" + testDir + "/bucket.cache"; 347 testRetrievalUtils(testDir, ioEngineName); 348 } 349 350 @Test 351 public void testRetrieveFromPMem() throws Exception { 352 final Path testDir = createAndGetTestDir(); 353 final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; 354 testRetrievalUtils(testDir, ioEngineName); 355 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 356 String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 357 BucketCache bucketCache = null; 358 try { 359 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 360 smallBucketSizes, writeThreads, writerQLen, persistencePath); 361 assertTrue(bucketCache.waitForCacheInitialization(10000)); 362 assertFalse(new File(persistencePath).exists()); 363 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 364 assertEquals(0, bucketCache.backingMap.size()); 365 } finally { 366 bucketCache.shutdown(); 367 HBASE_TESTING_UTILITY.cleanupTestDir(); 368 } 369 } 370 371 private void testRetrievalUtils(Path testDir, String ioEngineName) 372 throws IOException, InterruptedException { 373 final String persistencePath = 374 testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 375 BucketCache bucketCache = null; 376 try { 377 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 378 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 379 assertTrue(bucketCache.waitForCacheInitialization(10000)); 380 long usedSize = bucketCache.getAllocator().getUsedSize(); 381 assertEquals(0, usedSize); 382 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 383 for (HFileBlockPair block : blocks) { 384 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 385 } 386 for (HFileBlockPair block : blocks) { 387 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 388 false); 389 } 390 usedSize = bucketCache.getAllocator().getUsedSize(); 391 assertNotEquals(0, usedSize); 392 bucketCache.shutdown(); 393 assertTrue(new File(persistencePath).exists()); 394 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 395 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 396 assertTrue(bucketCache.waitForCacheInitialization(10000)); 397 398 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 399 } finally { 400 if (bucketCache != null) { 401 bucketCache.shutdown(); 402 } 403 } 404 assertTrue(new File(persistencePath).exists()); 405 } 406 407 @Test 408 public void testRetrieveUnsupportedIOE() throws Exception { 409 try { 410 final Path testDir = createAndGetTestDir(); 411 final String ioEngineName = testDir + "/bucket.cache"; 412 testRetrievalUtils(testDir, ioEngineName); 413 Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!"); 414 } catch (IllegalArgumentException e) { 415 Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " 416 + "files:, mmap: or offheap", e.getMessage()); 417 } 418 } 419 420 @Test 421 public void testRetrieveFromMultipleFiles() throws Exception { 422 final Path testDirInitial = createAndGetTestDir(); 423 final Path newTestDir = new HBaseTestingUtil().getDataTestDir(); 424 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir); 425 String ioEngineName = 426 new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache") 427 .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString(); 428 testRetrievalUtils(testDirInitial, ioEngineName); 429 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 430 String persistencePath = testDirInitial + "/bucket.persistence"; 431 BucketCache bucketCache = null; 432 try { 433 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 434 smallBucketSizes, writeThreads, writerQLen, persistencePath); 435 assertTrue(bucketCache.waitForCacheInitialization(10000)); 436 assertFalse(new File(persistencePath).exists()); 437 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 438 assertEquals(0, bucketCache.backingMap.size()); 439 } finally { 440 bucketCache.shutdown(); 441 HBASE_TESTING_UTILITY.cleanupTestDir(); 442 } 443 } 444 445 @Test 446 public void testRetrieveFromFileWithoutPersistence() throws Exception { 447 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 448 constructedBlockSizes, writeThreads, writerQLen, null); 449 assertTrue(bucketCache.waitForCacheInitialization(10000)); 450 try { 451 final Path testDir = createAndGetTestDir(); 452 String ioEngineName = "file:" + testDir + "/bucket.cache"; 453 long usedSize = bucketCache.getAllocator().getUsedSize(); 454 assertEquals(0, usedSize); 455 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 456 for (HFileBlockPair block : blocks) { 457 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 458 } 459 for (HFileBlockPair block : blocks) { 460 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 461 false); 462 } 463 usedSize = bucketCache.getAllocator().getUsedSize(); 464 assertNotEquals(0, usedSize); 465 bucketCache.shutdown(); 466 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 467 constructedBlockSizes, writeThreads, writerQLen, null); 468 assertTrue(bucketCache.waitForCacheInitialization(10000)); 469 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 470 } finally { 471 bucketCache.shutdown(); 472 HBASE_TESTING_UTILITY.cleanupTestDir(); 473 } 474 } 475 476 @Test 477 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 478 long availableSpace = 20 * 1024L * 1024 * 1024; 479 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; 480 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 481 assertTrue(allocator.getBuckets().length > 0); 482 } 483 484 @Test 485 public void testGetPartitionSize() throws IOException { 486 // Test default values 487 validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR); 488 489 Configuration conf = HBaseConfiguration.create(); 490 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 491 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 492 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 493 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 494 495 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 496 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 497 assertTrue(cache.waitForCacheInitialization(10000)); 498 499 validateGetPartitionSize(cache, 0.1f, 0.5f); 500 validateGetPartitionSize(cache, 0.7f, 0.5f); 501 validateGetPartitionSize(cache, 0.2f, 0.5f); 502 } 503 504 @Test 505 public void testCacheSizeCapacity() throws IOException { 506 // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE 507 validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR); 508 Configuration conf = HBaseConfiguration.create(); 509 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 510 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 511 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 512 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 513 try { 514 new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads, 515 writerQLen, null, 100, conf); 516 Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!"); 517 } catch (IllegalArgumentException e) { 518 Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage()); 519 } 520 } 521 522 @Test 523 public void testValidBucketCacheConfigs() throws IOException { 524 Configuration conf = HBaseConfiguration.create(); 525 conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 526 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 527 conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 528 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 529 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 530 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 531 532 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 533 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 534 assertTrue(cache.waitForCacheInitialization(10000)); 535 536 assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, 537 cache.getAcceptableFactor(), 0); 538 assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0); 539 assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 540 cache.getExtraFreeFactor(), 0); 541 assertEquals(SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, cache.getSingleFactor(), 542 0); 543 assertEquals(MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, cache.getMultiFactor(), 544 0); 545 assertEquals(MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, cache.getMemoryFactor(), 546 0); 547 } 548 549 @Test 550 public void testInvalidAcceptFactorConfig() throws IOException { 551 float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; 552 boolean[] expectedOutcomes = { false, false, true, false }; 553 Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues); 554 Configuration conf = HBaseConfiguration.create(); 555 checkConfigValues(conf, configMappings, expectedOutcomes); 556 } 557 558 @Test 559 public void testInvalidMinFactorConfig() throws IOException { 560 float[] configValues = { -1f, 0f, 0.96f, 1.05f }; 561 // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 562 boolean[] expectedOutcomes = { false, true, false, false }; 563 Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues); 564 Configuration conf = HBaseConfiguration.create(); 565 checkConfigValues(conf, configMappings, expectedOutcomes); 566 } 567 568 @Test 569 public void testInvalidExtraFreeFactorConfig() throws IOException { 570 float[] configValues = { -1f, 0f, 0.2f, 1.05f }; 571 // throws due to <0, in expected range, in expected range, config can be > 1.0 572 boolean[] expectedOutcomes = { false, true, true, true }; 573 Map<String, float[]> configMappings = 574 ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 575 Configuration conf = HBaseConfiguration.create(); 576 checkConfigValues(conf, configMappings, expectedOutcomes); 577 } 578 579 @Test 580 public void testInvalidCacheSplitFactorConfig() throws IOException { 581 float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f }; 582 float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f }; 583 float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f }; 584 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't 585 // be negative, configs don't add to 1.0 586 boolean[] expectedOutcomes = { true, false, false, false }; 587 Map<String, 588 float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues, 589 MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME, 590 memoryFactorConfigValues); 591 Configuration conf = HBaseConfiguration.create(); 592 checkConfigValues(conf, configMappings, expectedOutcomes); 593 } 594 595 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, 596 boolean[] expectSuccess) throws IOException { 597 Set<String> configNames = configMap.keySet(); 598 for (int i = 0; i < expectSuccess.length; i++) { 599 try { 600 for (String configName : configNames) { 601 conf.setFloat(configName, configMap.get(configName)[i]); 602 } 603 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 604 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 605 assertTrue(cache.waitForCacheInitialization(10000)); 606 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] 607 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 608 } catch (IllegalArgumentException e) { 609 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] 610 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 611 } 612 } 613 } 614 615 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, 616 float minFactor) { 617 long expectedOutput = 618 (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 619 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 620 } 621 622 @Test 623 public void testOffsetProducesPositiveOutput() { 624 // This number is picked because it produces negative output if the values isn't ensured to be 625 // positive. See HBASE-18757 for more information. 626 long testValue = 549888460800L; 627 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { 628 return ByteBuffAllocator.NONE; 629 }, ByteBuffAllocator.HEAP); 630 assertEquals(testValue, bucketEntry.offset()); 631 } 632 633 @Test 634 public void testEvictionCount() throws InterruptedException { 635 int size = 100; 636 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 637 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 638 HFileContext meta = new HFileContextBuilder().build(); 639 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 640 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 641 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 642 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 643 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 644 645 BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0); 646 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 647 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 648 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 649 blockWithNextBlockMetadata.serialize(block1Buffer, true); 650 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 651 652 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 653 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 654 block1Buffer); 655 656 waitUntilFlushedToBucket(cache, key); 657 658 assertEquals(0, cache.getStats().getEvictionCount()); 659 660 // evict call should return 1, but then eviction count be 0 661 assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount")); 662 assertEquals(0, cache.getStats().getEvictionCount()); 663 664 // add back 665 key = new BlockCacheKey("testEvictionCount", 0); 666 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 667 block1Buffer); 668 waitUntilFlushedToBucket(cache, key); 669 670 // should not increment 671 assertTrue(cache.evictBlock(key)); 672 assertEquals(0, cache.getStats().getEvictionCount()); 673 674 // add back 675 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 676 block1Buffer); 677 waitUntilFlushedToBucket(cache, key); 678 679 // should finally increment eviction count 680 cache.freeSpace("testing"); 681 assertEquals(1, cache.getStats().getEvictionCount()); 682 } 683 684 @Test 685 public void testStringPool() throws Exception { 686 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 687 Path testDir = TEST_UTIL.getDataTestDir(); 688 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 689 BucketCache bucketCache = 690 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 691 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 692 assertTrue(bucketCache.waitForCacheInitialization(10000)); 693 long usedSize = bucketCache.getAllocator().getUsedSize(); 694 assertEquals(0, usedSize); 695 Random rand = ThreadLocalRandom.current(); 696 Path filePath = new Path(testDir, Long.toString(rand.nextLong())); 697 CacheTestUtils.HFileBlockPair[] blocks = 698 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 1, filePath, false); 699 String name = blocks[0].getBlockName().getHfileName(); 700 assertEquals(name, filePath.getName()); 701 assertNotNull(blocks[0].getBlockName().getRegionName()); 702 bucketCache.cacheBlock(blocks[0].getBlockName(), blocks[0].getBlock()); 703 waitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName()); 704 assertTrue(FilePathStringPool.getInstance().size() > 0); 705 bucketCache.evictBlock(blocks[0].getBlockName()); 706 assertTrue(FilePathStringPool.getInstance().size() > 0); 707 bucketCache.cacheBlock(blocks[0].getBlockName(), blocks[0].getBlock()); 708 waitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName()); 709 bucketCache.fileCacheCompleted(filePath, 710 bucketCache.backingMap.get(blocks[0].getBlockName()).getLength()); 711 bucketCache.evictBlocksByHfileName(name); 712 assertEquals(1, FilePathStringPool.getInstance().size()); 713 } 714 715 @Test 716 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 717 int size = 100; 718 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 719 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 720 HFileContext meta = new HFileContextBuilder().build(); 721 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 722 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 723 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 724 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 725 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 726 727 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 728 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 729 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 730 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 731 blockWithNextBlockMetadata.serialize(block1Buffer, true); 732 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 733 734 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 735 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 736 block1Buffer); 737 738 waitUntilFlushedToBucket(cache, key); 739 assertNotNull(cache.backingMap.get(key)); 740 assertEquals(1, cache.backingMap.get(key).refCnt()); 741 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 742 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 743 744 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 745 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 746 block1Buffer); 747 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 748 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 749 assertEquals(1, cache.backingMap.get(key).refCnt()); 750 751 // Clear and add blockWithoutNextBlockMetadata 752 assertTrue(cache.evictBlock(key)); 753 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 754 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 755 756 assertNull(cache.getBlock(key, false, false, false)); 757 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 758 block2Buffer); 759 760 waitUntilFlushedToBucket(cache, key); 761 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 762 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 763 764 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 765 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 766 block1Buffer); 767 768 waitUntilFlushedToBucket(cache, key); 769 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 770 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 771 } 772 773 @Test 774 public void testRAMCache() { 775 int size = 100; 776 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 777 byte[] byteArr = new byte[length]; 778 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 779 HFileContext meta = new HFileContextBuilder().build(); 780 781 RAMCache cache = new RAMCache(); 782 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 783 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 784 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 785 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 786 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 787 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 788 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false); 789 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false); 790 791 assertFalse(cache.containsKey(key1)); 792 assertNull(cache.putIfAbsent(key1, re1)); 793 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 794 795 assertNotNull(cache.putIfAbsent(key1, re2)); 796 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 797 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 798 799 assertNull(cache.putIfAbsent(key2, re2)); 800 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 801 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 802 803 cache.remove(key1); 804 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 805 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 806 807 cache.clear(); 808 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 809 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 810 } 811 812 @Test 813 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 814 // initialize an block. 815 int size = 100, offset = 20; 816 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 817 ByteBuffer buf = ByteBuffer.allocate(length); 818 HFileContext meta = new HFileContextBuilder().build(); 819 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 820 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 821 822 // initialize an mocked ioengine. 823 IOEngine ioEngine = Mockito.mock(IOEngine.class); 824 when(ioEngine.usesSharedMemory()).thenReturn(false); 825 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 826 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 827 Mockito.anyLong()); 828 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 829 Mockito.anyLong()); 830 831 // create an bucket allocator. 832 long availableSpace = 1024 * 1024 * 1024L; 833 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 834 835 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 836 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false); 837 838 Assert.assertEquals(0, allocator.getUsedSize()); 839 try { 840 re.writeToCache(ioEngine, allocator, null, null, 841 ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE); 842 Assert.fail(); 843 } catch (Exception e) { 844 } 845 Assert.assertEquals(0, allocator.getUsedSize()); 846 } 847 848 /** 849 * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file 850 * could not be freed even if corresponding {@link HFileBlock} is evicted from 851 * {@link BucketCache}. 852 */ 853 @Test 854 public void testFreeBucketEntryRestoredFromFile() throws Exception { 855 BucketCache bucketCache = null; 856 try { 857 final Path dataTestDir = createAndGetTestDir(); 858 859 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 860 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 861 862 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 863 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 864 assertTrue(bucketCache.waitForCacheInitialization(10000)); 865 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 866 assertEquals(0, usedByteSize); 867 868 HFileBlockPair[] hfileBlockPairs = 869 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 870 // Add blocks 871 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 872 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); 873 } 874 875 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 876 cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), 877 hfileBlockPair.getBlock(), false); 878 } 879 usedByteSize = bucketCache.getAllocator().getUsedSize(); 880 assertNotEquals(0, usedByteSize); 881 // persist cache to file 882 bucketCache.shutdown(); 883 assertTrue(new File(persistencePath).exists()); 884 885 // restore cache from file 886 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 887 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 888 assertTrue(bucketCache.waitForCacheInitialization(10000)); 889 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 890 891 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 892 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 893 bucketCache.evictBlock(blockCacheKey); 894 } 895 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 896 assertEquals(0, bucketCache.backingMap.size()); 897 } finally { 898 bucketCache.shutdown(); 899 HBASE_TESTING_UTILITY.cleanupTestDir(); 900 } 901 } 902 903 @Test 904 public void testBlockAdditionWaitWhenCache() throws Exception { 905 BucketCache bucketCache = null; 906 try { 907 final Path dataTestDir = createAndGetTestDir(); 908 909 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 910 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 911 912 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 913 config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000); 914 915 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 916 constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config); 917 assertTrue(bucketCache.waitForCacheInitialization(10000)); 918 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 919 assertEquals(0, usedByteSize); 920 921 HFileBlockPair[] hfileBlockPairs = 922 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); 923 String[] names = CacheTestUtils.getHFileNames(hfileBlockPairs); 924 // Add blocks 925 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 926 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, 927 true); 928 } 929 930 // Max wait for 10 seconds. 931 long timeout = 10000; 932 // Wait for blocks size to match the number of blocks. 933 while (bucketCache.backingMap.size() != 10) { 934 if (timeout <= 0) break; 935 Threads.sleep(100); 936 timeout -= 100; 937 } 938 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 939 assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); 940 } 941 usedByteSize = bucketCache.getAllocator().getUsedSize(); 942 assertNotEquals(0, usedByteSize); 943 // persist cache to file 944 bucketCache.shutdown(); 945 assertTrue(new File(persistencePath).exists()); 946 947 // restore cache from file 948 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 949 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 950 assertTrue(bucketCache.waitForCacheInitialization(10000)); 951 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 952 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(hfileBlockPairs, names); 953 for (BlockCacheKey key : newKeys) { 954 bucketCache.evictBlock(key); 955 } 956 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 957 assertEquals(0, bucketCache.backingMap.size()); 958 } finally { 959 if (bucketCache != null) { 960 bucketCache.shutdown(); 961 } 962 HBASE_TESTING_UTILITY.cleanupTestDir(); 963 } 964 } 965 966 @Test 967 public void testOnConfigurationChange() throws Exception { 968 BucketCache bucketCache = null; 969 try { 970 final Path dataTestDir = createAndGetTestDir(); 971 972 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 973 974 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 975 976 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 977 constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config); 978 979 assertTrue(bucketCache.waitForCacheInitialization(10000)); 980 981 config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 982 config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f); 983 config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f); 984 config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f); 985 config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f); 986 config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 987 config.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 988 config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500); 989 config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000); 990 991 bucketCache.onConfigurationChange(config); 992 993 assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01); 994 assertEquals(0.8f, bucketCache.getMinFactor(), 0.01); 995 assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01); 996 assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01); 997 assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01); 998 assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01); 999 assertEquals(100L, bucketCache.getQueueAdditionWaitTime()); 1000 assertEquals(500L, bucketCache.getBucketcachePersistInterval()); 1001 assertEquals(1000L, bucketCache.getPersistenceChunkSize()); 1002 1003 } finally { 1004 if (bucketCache != null) { 1005 bucketCache.shutdown(); 1006 } 1007 HBASE_TESTING_UTILITY.cleanupTestDir(); 1008 } 1009 } 1010 1011 @Test 1012 public void testNotifyFileCachingCompletedSuccess() throws Exception { 1013 BucketCache bucketCache = null; 1014 try { 1015 Path filePath = 1016 new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess"); 1017 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false); 1018 if (bucketCache.getStats().getFailedInserts() > 0) { 1019 LOG.info("There were {} fail inserts, " 1020 + "will assert if total blocks in backingMap equals (10 - failInserts) " 1021 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 1022 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 1023 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1024 } else { 1025 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1026 } 1027 } finally { 1028 if (bucketCache != null) { 1029 bucketCache.shutdown(); 1030 } 1031 HBASE_TESTING_UTILITY.cleanupTestDir(); 1032 } 1033 } 1034 1035 @Test 1036 public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception { 1037 BucketCache bucketCache = null; 1038 try { 1039 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 1040 "testNotifyFileCachingCompletedForEncodedDataSuccess"); 1041 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true); 1042 if (bucketCache.getStats().getFailedInserts() > 0) { 1043 LOG.info("There were {} fail inserts, " 1044 + "will assert if total blocks in backingMap equals (10 - failInserts) " 1045 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 1046 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 1047 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1048 } else { 1049 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1050 } 1051 } finally { 1052 if (bucketCache != null) { 1053 bucketCache.shutdown(); 1054 } 1055 HBASE_TESTING_UTILITY.cleanupTestDir(); 1056 } 1057 } 1058 1059 @Test 1060 public void testNotifyFileCachingCompletedNotAllCached() throws Exception { 1061 BucketCache bucketCache = null; 1062 try { 1063 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 1064 "testNotifyFileCachingCompletedNotAllCached"); 1065 // Deliberately passing more blocks than we have created to test that 1066 // notifyFileCachingCompleted will not consider the file fully cached 1067 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false); 1068 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1069 } finally { 1070 if (bucketCache != null) { 1071 bucketCache.shutdown(); 1072 } 1073 HBASE_TESTING_UTILITY.cleanupTestDir(); 1074 } 1075 } 1076 1077 private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath, 1078 int totalBlocksToCheck, boolean encoded) throws Exception { 1079 final Path dataTestDir = createAndGetTestDir(); 1080 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 1081 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 1082 constructedBlockSizes, 1, 1, null); 1083 assertTrue(bucketCache.waitForCacheInitialization(10000)); 1084 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 1085 assertEquals(0, usedByteSize); 1086 HFileBlockPair[] hfileBlockPairs = 1087 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded); 1088 // Add blocks 1089 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 1090 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true); 1091 } 1092 bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck, 1093 totalBlocksToCheck * constructedBlockSize); 1094 return bucketCache; 1095 } 1096 1097 @Test 1098 public void testEvictOrphansOutOfGracePeriod() throws Exception { 1099 BucketCache bucketCache = testEvictOrphans(0); 1100 assertEquals(10, bucketCache.getBackingMap().size()); 1101 assertEquals(0, bucketCache.blocksByHFile.stream() 1102 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count()); 1103 } 1104 1105 @Test 1106 public void testEvictOrphansWithinGracePeriod() throws Exception { 1107 BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L); 1108 assertEquals(18, bucketCache.getBackingMap().size()); 1109 assertTrue(bucketCache.blocksByHFile.stream() 1110 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0); 1111 } 1112 1113 private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception { 1114 Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid"); 1115 Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan"); 1116 Map<String, HRegion> onlineRegions = new HashMap<>(); 1117 List<HStore> stores = new ArrayList<>(); 1118 Collection<HStoreFile> storeFiles = new ArrayList<>(); 1119 HRegion mockedRegion = mock(HRegion.class); 1120 HStore mockedStore = mock(HStore.class); 1121 HStoreFile mockedStoreFile = mock(HStoreFile.class); 1122 when(mockedStoreFile.getPath()).thenReturn(validFile); 1123 storeFiles.add(mockedStoreFile); 1124 when(mockedStore.getStorefiles()).thenReturn(storeFiles); 1125 stores.add(mockedStore); 1126 when(mockedRegion.getStores()).thenReturn(stores); 1127 onlineRegions.put("mocked_region", mockedRegion); 1128 HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); 1129 HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); 1130 HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); 1131 HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD, 1132 orphanEvictionGracePeriod); 1133 BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21, 1134 constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000, 1135 HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions); 1136 HFileBlockPair[] validBlockPairs = 1137 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile, false); 1138 HFileBlockPair[] orphanBlockPairs = 1139 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile, false); 1140 for (HFileBlockPair pair : validBlockPairs) { 1141 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1142 } 1143 waitUntilAllFlushedToBucket(bucketCache); 1144 assertEquals(10, bucketCache.getBackingMap().size()); 1145 bucketCache.freeSpace("test"); 1146 assertEquals(10, bucketCache.getBackingMap().size()); 1147 for (HFileBlockPair pair : orphanBlockPairs) { 1148 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1149 } 1150 waitUntilAllFlushedToBucket(bucketCache); 1151 assertEquals(20, bucketCache.getBackingMap().size()); 1152 bucketCache.freeSpace("test"); 1153 return bucketCache; 1154 } 1155 1156 @Test 1157 public void testBlockPriority() throws Exception { 1158 HFileBlockPair block = CacheTestUtils.generateHFileBlocks(BLOCK_SIZE, 1)[0]; 1159 cacheAndWaitUntilFlushedToBucket(cache, block.getBlockName(), block.getBlock(), true); 1160 assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.SINGLE); 1161 cache.getBlock(block.getBlockName(), true, false, true); 1162 assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.MULTI); 1163 } 1164 1165 @Test 1166 public void testIOTimePerHitReturnsZeroWhenNoHits() 1167 throws NoSuchFieldException, IllegalAccessException { 1168 CacheStats cacheStats = cache.getStats(); 1169 assertTrue(cacheStats instanceof BucketCacheStats); 1170 BucketCacheStats bucketCacheStats = (BucketCacheStats) cacheStats; 1171 1172 Field field = BucketCacheStats.class.getDeclaredField("ioHitCount"); 1173 field.setAccessible(true); 1174 LongAdder ioHitCount = (LongAdder) field.get(bucketCacheStats); 1175 1176 assertEquals(0, ioHitCount.sum()); 1177 double ioTimePerHit = bucketCacheStats.getIOTimePerHit(); 1178 assertEquals(0, ioTimePerHit, 0.0); 1179 } 1180}