001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; 024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; 025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 026import static org.junit.Assert.assertEquals; 027import static org.junit.Assert.assertFalse; 028import static org.junit.Assert.assertNotEquals; 029import static org.junit.Assert.assertNotNull; 030import static org.junit.Assert.assertNull; 031import static org.junit.Assert.assertTrue; 032import static org.mockito.Mockito.mock; 033import static org.mockito.Mockito.when; 034 035import java.io.File; 036import java.io.IOException; 037import java.nio.ByteBuffer; 038import java.util.ArrayList; 039import java.util.Arrays; 040import java.util.Collection; 041import java.util.HashMap; 042import java.util.List; 043import java.util.Map; 044import java.util.Set; 045import java.util.concurrent.ThreadLocalRandom; 046import java.util.concurrent.locks.ReentrantReadWriteLock; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseConfiguration; 051import org.apache.hadoop.hbase.HBaseTestingUtil; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.Waiter; 054import org.apache.hadoop.hbase.io.ByteBuffAllocator; 055import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 056import org.apache.hadoop.hbase.io.hfile.BlockType; 057import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 058import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 059import org.apache.hadoop.hbase.io.hfile.Cacheable; 060import org.apache.hadoop.hbase.io.hfile.HFileBlock; 061import org.apache.hadoop.hbase.io.hfile.HFileContext; 062import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 063import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 064import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 065import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 066import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 067import org.apache.hadoop.hbase.nio.ByteBuff; 068import org.apache.hadoop.hbase.regionserver.HRegion; 069import org.apache.hadoop.hbase.regionserver.HStore; 070import org.apache.hadoop.hbase.regionserver.HStoreFile; 071import org.apache.hadoop.hbase.testclassification.IOTests; 072import org.apache.hadoop.hbase.testclassification.LargeTests; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 074import org.apache.hadoop.hbase.util.Pair; 075import org.apache.hadoop.hbase.util.Threads; 076import org.junit.After; 077import org.junit.Assert; 078import org.junit.Before; 079import org.junit.ClassRule; 080import org.junit.Test; 081import org.junit.experimental.categories.Category; 082import org.junit.runner.RunWith; 083import org.junit.runners.Parameterized; 084import org.mockito.Mockito; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 089 090/** 091 * Basic test of BucketCache.Puts and gets. 092 * <p> 093 * Tests will ensure that blocks' data correctness under several threads concurrency 094 */ 095@RunWith(Parameterized.class) 096@Category({ IOTests.class, LargeTests.class }) 097public class TestBucketCache { 098 099 private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class); 100 101 @ClassRule 102 public static final HBaseClassTestRule CLASS_RULE = 103 HBaseClassTestRule.forClass(TestBucketCache.class); 104 105 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 106 public static Iterable<Object[]> data() { 107 return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize 108 // for these tests? 109 { 16 * 1024, 110 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 111 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 112 128 * 1024 + 1024 } } }); 113 } 114 115 @Parameterized.Parameter(0) 116 public int constructedBlockSize; 117 118 @Parameterized.Parameter(1) 119 public int[] constructedBlockSizes; 120 121 BucketCache cache; 122 final int CACHE_SIZE = 1000000; 123 final int NUM_BLOCKS = 100; 124 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 125 final int NUM_THREADS = 100; 126 final int NUM_QUERIES = 10000; 127 128 final long capacitySize = 32 * 1024 * 1024; 129 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 130 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 131 private String ioEngineName = "offheap"; 132 133 private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); 134 135 private static class MockedBucketCache extends BucketCache { 136 137 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 138 int writerThreads, int writerQLen, String persistencePath) throws IOException { 139 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 140 persistencePath); 141 } 142 143 @Override 144 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 145 super.cacheBlock(cacheKey, buf, inMemory); 146 } 147 148 @Override 149 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 150 super.cacheBlock(cacheKey, buf); 151 } 152 } 153 154 @Before 155 public void setup() throws IOException { 156 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 157 constructedBlockSizes, writeThreads, writerQLen, null); 158 } 159 160 @After 161 public void tearDown() { 162 cache.shutdown(); 163 } 164 165 /** 166 * Test Utility to create test dir and return name 167 * @return return name of created dir 168 * @throws IOException throws IOException 169 */ 170 private Path createAndGetTestDir() throws IOException { 171 final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir(); 172 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir); 173 return testDir; 174 } 175 176 /** 177 * Return a random element from {@code a}. 178 */ 179 private static <T> T randFrom(List<T> a) { 180 return a.get(ThreadLocalRandom.current().nextInt(a.size())); 181 } 182 183 @Test 184 public void testBucketAllocator() throws BucketAllocatorException { 185 BucketAllocator mAllocator = cache.getAllocator(); 186 /* 187 * Test the allocator first 188 */ 189 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 190 191 boolean full = false; 192 ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>(); 193 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 194 // the cache is completely filled. 195 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 196 while (!full) { 197 Integer blockSize = null; 198 try { 199 blockSize = randFrom(tmp); 200 allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize)); 201 } catch (CacheFullException cfe) { 202 tmp.remove(blockSize); 203 if (tmp.isEmpty()) full = true; 204 } 205 } 206 207 for (Integer blockSize : BLOCKSIZES) { 208 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 209 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 210 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); 211 212 // we know the block sizes above are multiples of 1024, but default bucket sizes give an 213 // additional 1024 on top of that so this counts towards fragmentation in our test 214 // real life may have worse fragmentation because blocks may not be perfectly sized to block 215 // size, given encoding/compression and large rows 216 assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes()); 217 } 218 219 mAllocator.logDebugStatistics(); 220 221 for (Pair<Long, Integer> allocation : allocations) { 222 assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()), 223 mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond())); 224 } 225 assertEquals(0, mAllocator.getUsedSize()); 226 } 227 228 @Test 229 public void testCacheSimple() throws Exception { 230 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 231 } 232 233 @Test 234 public void testCacheMultiThreadedSingleKey() throws Exception { 235 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 236 } 237 238 @Test 239 public void testHeapSizeChanges() throws Exception { 240 cache.stopWriterThreads(); 241 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 242 } 243 244 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 245 throws InterruptedException { 246 Waiter.waitFor(HBaseConfiguration.create(), 10000, 247 () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); 248 } 249 250 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 251 while (!cache.ramCache.isEmpty()) { 252 Thread.sleep(100); 253 } 254 Thread.sleep(1000); 255 } 256 257 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 258 // threads will flush it to the bucket and put reference entry in backingMap. 259 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 260 Cacheable block, boolean waitWhenCache) throws InterruptedException { 261 cache.cacheBlock(cacheKey, block, false, waitWhenCache); 262 waitUntilFlushedToBucket(cache, cacheKey); 263 } 264 265 @Test 266 public void testMemoryLeak() throws Exception { 267 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 268 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 269 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 270 long lockId = cache.backingMap.get(cacheKey).offset(); 271 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 272 lock.writeLock().lock(); 273 Thread evictThread = new Thread("evict-block") { 274 @Override 275 public void run() { 276 cache.evictBlock(cacheKey); 277 } 278 }; 279 evictThread.start(); 280 cache.offsetLock.waitForWaiters(lockId, 1); 281 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); 282 assertEquals(0, cache.getBlockCount()); 283 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 284 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 285 assertEquals(1, cache.getBlockCount()); 286 lock.writeLock().unlock(); 287 evictThread.join(); 288 /** 289 * <pre> 290 * The asserts here before HBASE-21957 are: 291 * assertEquals(1L, cache.getBlockCount()); 292 * assertTrue(cache.getCurrentSize() > 0L); 293 * assertTrue("We should have a block!", cache.iterator().hasNext()); 294 * 295 * The asserts here after HBASE-21957 are: 296 * assertEquals(0, cache.getBlockCount()); 297 * assertEquals(cache.getCurrentSize(), 0L); 298 * 299 * I think the asserts before HBASE-21957 is more reasonable,because 300 * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry} 301 * it had seen, and newly added Block after the {@link BucketEntry} 302 * it had seen should not be evicted. 303 * </pre> 304 */ 305 assertEquals(1L, cache.getBlockCount()); 306 assertTrue(cache.getCurrentSize() > 0L); 307 assertTrue("We should have a block!", cache.iterator().hasNext()); 308 } 309 310 @Test 311 public void testRetrieveFromFile() throws Exception { 312 Path testDir = createAndGetTestDir(); 313 String ioEngineName = "file:" + testDir + "/bucket.cache"; 314 testRetrievalUtils(testDir, ioEngineName); 315 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 316 String persistencePath = testDir + "/bucket.persistence"; 317 BucketCache bucketCache = null; 318 try { 319 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 320 smallBucketSizes, writeThreads, writerQLen, persistencePath); 321 assertTrue(bucketCache.waitForCacheInitialization(10000)); 322 assertFalse(new File(persistencePath).exists()); 323 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 324 assertEquals(0, bucketCache.backingMap.size()); 325 } finally { 326 bucketCache.shutdown(); 327 HBASE_TESTING_UTILITY.cleanupTestDir(); 328 } 329 } 330 331 @Test 332 public void testRetrieveFromMMap() throws Exception { 333 final Path testDir = createAndGetTestDir(); 334 final String ioEngineName = "mmap:" + testDir + "/bucket.cache"; 335 testRetrievalUtils(testDir, ioEngineName); 336 } 337 338 @Test 339 public void testRetrieveFromPMem() throws Exception { 340 final Path testDir = createAndGetTestDir(); 341 final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; 342 testRetrievalUtils(testDir, ioEngineName); 343 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 344 String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 345 BucketCache bucketCache = null; 346 try { 347 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 348 smallBucketSizes, writeThreads, writerQLen, persistencePath); 349 assertTrue(bucketCache.waitForCacheInitialization(10000)); 350 assertFalse(new File(persistencePath).exists()); 351 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 352 assertEquals(0, bucketCache.backingMap.size()); 353 } finally { 354 bucketCache.shutdown(); 355 HBASE_TESTING_UTILITY.cleanupTestDir(); 356 } 357 } 358 359 private void testRetrievalUtils(Path testDir, String ioEngineName) 360 throws IOException, InterruptedException { 361 final String persistencePath = 362 testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 363 BucketCache bucketCache = null; 364 try { 365 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 366 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 367 assertTrue(bucketCache.waitForCacheInitialization(10000)); 368 long usedSize = bucketCache.getAllocator().getUsedSize(); 369 assertEquals(0, usedSize); 370 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 371 for (HFileBlockPair block : blocks) { 372 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 373 } 374 for (HFileBlockPair block : blocks) { 375 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 376 false); 377 } 378 usedSize = bucketCache.getAllocator().getUsedSize(); 379 assertNotEquals(0, usedSize); 380 bucketCache.shutdown(); 381 assertTrue(new File(persistencePath).exists()); 382 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 383 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 384 assertTrue(bucketCache.waitForCacheInitialization(10000)); 385 386 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 387 } finally { 388 if (bucketCache != null) { 389 bucketCache.shutdown(); 390 } 391 } 392 assertTrue(new File(persistencePath).exists()); 393 } 394 395 @Test 396 public void testRetrieveUnsupportedIOE() throws Exception { 397 try { 398 final Path testDir = createAndGetTestDir(); 399 final String ioEngineName = testDir + "/bucket.cache"; 400 testRetrievalUtils(testDir, ioEngineName); 401 Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!"); 402 } catch (IllegalArgumentException e) { 403 Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " 404 + "files:, mmap: or offheap", e.getMessage()); 405 } 406 } 407 408 @Test 409 public void testRetrieveFromMultipleFiles() throws Exception { 410 final Path testDirInitial = createAndGetTestDir(); 411 final Path newTestDir = new HBaseTestingUtil().getDataTestDir(); 412 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir); 413 String ioEngineName = 414 new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache") 415 .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString(); 416 testRetrievalUtils(testDirInitial, ioEngineName); 417 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 418 String persistencePath = testDirInitial + "/bucket.persistence"; 419 BucketCache bucketCache = null; 420 try { 421 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 422 smallBucketSizes, writeThreads, writerQLen, persistencePath); 423 assertTrue(bucketCache.waitForCacheInitialization(10000)); 424 assertFalse(new File(persistencePath).exists()); 425 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 426 assertEquals(0, bucketCache.backingMap.size()); 427 } finally { 428 bucketCache.shutdown(); 429 HBASE_TESTING_UTILITY.cleanupTestDir(); 430 } 431 } 432 433 @Test 434 public void testRetrieveFromFileWithoutPersistence() throws Exception { 435 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 436 constructedBlockSizes, writeThreads, writerQLen, null); 437 assertTrue(bucketCache.waitForCacheInitialization(10000)); 438 try { 439 final Path testDir = createAndGetTestDir(); 440 String ioEngineName = "file:" + testDir + "/bucket.cache"; 441 long usedSize = bucketCache.getAllocator().getUsedSize(); 442 assertEquals(0, usedSize); 443 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 444 for (HFileBlockPair block : blocks) { 445 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 446 } 447 for (HFileBlockPair block : blocks) { 448 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 449 false); 450 } 451 usedSize = bucketCache.getAllocator().getUsedSize(); 452 assertNotEquals(0, usedSize); 453 bucketCache.shutdown(); 454 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 455 constructedBlockSizes, writeThreads, writerQLen, null); 456 assertTrue(bucketCache.waitForCacheInitialization(10000)); 457 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 458 } finally { 459 bucketCache.shutdown(); 460 HBASE_TESTING_UTILITY.cleanupTestDir(); 461 } 462 } 463 464 @Test 465 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 466 long availableSpace = 20 * 1024L * 1024 * 1024; 467 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; 468 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 469 assertTrue(allocator.getBuckets().length > 0); 470 } 471 472 @Test 473 public void testGetPartitionSize() throws IOException { 474 // Test default values 475 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, 476 BucketCache.DEFAULT_MIN_FACTOR); 477 478 Configuration conf = HBaseConfiguration.create(); 479 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 480 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 481 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 482 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 483 484 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 485 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 486 assertTrue(cache.waitForCacheInitialization(10000)); 487 488 validateGetPartitionSize(cache, 0.1f, 0.5f); 489 validateGetPartitionSize(cache, 0.7f, 0.5f); 490 validateGetPartitionSize(cache, 0.2f, 0.5f); 491 } 492 493 @Test 494 public void testCacheSizeCapacity() throws IOException { 495 // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE 496 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, 497 BucketCache.DEFAULT_MIN_FACTOR); 498 Configuration conf = HBaseConfiguration.create(); 499 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 500 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 501 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 502 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 503 try { 504 new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads, 505 writerQLen, null, 100, conf); 506 Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!"); 507 } catch (IllegalArgumentException e) { 508 Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage()); 509 } 510 } 511 512 @Test 513 public void testValidBucketCacheConfigs() throws IOException { 514 Configuration conf = HBaseConfiguration.create(); 515 conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 516 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 517 conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 518 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 519 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 520 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 521 522 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 523 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 524 assertTrue(cache.waitForCacheInitialization(10000)); 525 526 assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, 527 cache.getAcceptableFactor(), 0); 528 assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0); 529 assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 530 cache.getExtraFreeFactor(), 0); 531 assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, 532 cache.getSingleFactor(), 0); 533 assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, 534 cache.getMultiFactor(), 0); 535 assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, 536 cache.getMemoryFactor(), 0); 537 } 538 539 @Test 540 public void testInvalidAcceptFactorConfig() throws IOException { 541 float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; 542 boolean[] expectedOutcomes = { false, false, true, false }; 543 Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues); 544 Configuration conf = HBaseConfiguration.create(); 545 checkConfigValues(conf, configMappings, expectedOutcomes); 546 } 547 548 @Test 549 public void testInvalidMinFactorConfig() throws IOException { 550 float[] configValues = { -1f, 0f, 0.96f, 1.05f }; 551 // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 552 boolean[] expectedOutcomes = { false, true, false, false }; 553 Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues); 554 Configuration conf = HBaseConfiguration.create(); 555 checkConfigValues(conf, configMappings, expectedOutcomes); 556 } 557 558 @Test 559 public void testInvalidExtraFreeFactorConfig() throws IOException { 560 float[] configValues = { -1f, 0f, 0.2f, 1.05f }; 561 // throws due to <0, in expected range, in expected range, config can be > 1.0 562 boolean[] expectedOutcomes = { false, true, true, true }; 563 Map<String, float[]> configMappings = 564 ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 565 Configuration conf = HBaseConfiguration.create(); 566 checkConfigValues(conf, configMappings, expectedOutcomes); 567 } 568 569 @Test 570 public void testInvalidCacheSplitFactorConfig() throws IOException { 571 float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f }; 572 float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f }; 573 float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f }; 574 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't 575 // be negative, configs don't add to 1.0 576 boolean[] expectedOutcomes = { true, false, false, false }; 577 Map<String, 578 float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 579 singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, 580 BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues); 581 Configuration conf = HBaseConfiguration.create(); 582 checkConfigValues(conf, configMappings, expectedOutcomes); 583 } 584 585 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, 586 boolean[] expectSuccess) throws IOException { 587 Set<String> configNames = configMap.keySet(); 588 for (int i = 0; i < expectSuccess.length; i++) { 589 try { 590 for (String configName : configNames) { 591 conf.setFloat(configName, configMap.get(configName)[i]); 592 } 593 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 594 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 595 assertTrue(cache.waitForCacheInitialization(10000)); 596 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] 597 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 598 } catch (IllegalArgumentException e) { 599 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] 600 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 601 } 602 } 603 } 604 605 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, 606 float minFactor) { 607 long expectedOutput = 608 (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 609 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 610 } 611 612 @Test 613 public void testOffsetProducesPositiveOutput() { 614 // This number is picked because it produces negative output if the values isn't ensured to be 615 // positive. See HBASE-18757 for more information. 616 long testValue = 549888460800L; 617 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { 618 return ByteBuffAllocator.NONE; 619 }, ByteBuffAllocator.HEAP); 620 assertEquals(testValue, bucketEntry.offset()); 621 } 622 623 @Test 624 public void testEvictionCount() throws InterruptedException { 625 int size = 100; 626 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 627 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 628 HFileContext meta = new HFileContextBuilder().build(); 629 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 630 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 631 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 632 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 633 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 634 635 BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0); 636 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 637 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 638 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 639 blockWithNextBlockMetadata.serialize(block1Buffer, true); 640 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 641 642 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 643 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 644 block1Buffer); 645 646 waitUntilFlushedToBucket(cache, key); 647 648 assertEquals(0, cache.getStats().getEvictionCount()); 649 650 // evict call should return 1, but then eviction count be 0 651 assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount")); 652 assertEquals(0, cache.getStats().getEvictionCount()); 653 654 // add back 655 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 656 block1Buffer); 657 waitUntilFlushedToBucket(cache, key); 658 659 // should not increment 660 assertTrue(cache.evictBlock(key)); 661 assertEquals(0, cache.getStats().getEvictionCount()); 662 663 // add back 664 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 665 block1Buffer); 666 waitUntilFlushedToBucket(cache, key); 667 668 // should finally increment eviction count 669 cache.freeSpace("testing"); 670 assertEquals(1, cache.getStats().getEvictionCount()); 671 } 672 673 @Test 674 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 675 int size = 100; 676 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 677 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 678 HFileContext meta = new HFileContextBuilder().build(); 679 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 680 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 681 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 682 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 683 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 684 685 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 686 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 687 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 688 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 689 blockWithNextBlockMetadata.serialize(block1Buffer, true); 690 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 691 692 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 693 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 694 block1Buffer); 695 696 waitUntilFlushedToBucket(cache, key); 697 assertNotNull(cache.backingMap.get(key)); 698 assertEquals(1, cache.backingMap.get(key).refCnt()); 699 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 700 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 701 702 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 703 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 704 block1Buffer); 705 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 706 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 707 assertEquals(1, cache.backingMap.get(key).refCnt()); 708 709 // Clear and add blockWithoutNextBlockMetadata 710 assertTrue(cache.evictBlock(key)); 711 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 712 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 713 714 assertNull(cache.getBlock(key, false, false, false)); 715 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 716 block2Buffer); 717 718 waitUntilFlushedToBucket(cache, key); 719 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 720 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 721 722 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 723 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 724 block1Buffer); 725 726 waitUntilFlushedToBucket(cache, key); 727 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 728 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 729 } 730 731 @Test 732 public void testRAMCache() { 733 int size = 100; 734 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 735 byte[] byteArr = new byte[length]; 736 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 737 HFileContext meta = new HFileContextBuilder().build(); 738 739 RAMCache cache = new RAMCache(); 740 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 741 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 742 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 743 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 744 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 745 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 746 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false); 747 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false); 748 749 assertFalse(cache.containsKey(key1)); 750 assertNull(cache.putIfAbsent(key1, re1)); 751 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 752 753 assertNotNull(cache.putIfAbsent(key1, re2)); 754 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 755 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 756 757 assertNull(cache.putIfAbsent(key2, re2)); 758 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 759 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 760 761 cache.remove(key1); 762 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 763 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 764 765 cache.clear(); 766 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 767 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 768 } 769 770 @Test 771 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 772 // initialize an block. 773 int size = 100, offset = 20; 774 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 775 ByteBuffer buf = ByteBuffer.allocate(length); 776 HFileContext meta = new HFileContextBuilder().build(); 777 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 778 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 779 780 // initialize an mocked ioengine. 781 IOEngine ioEngine = Mockito.mock(IOEngine.class); 782 when(ioEngine.usesSharedMemory()).thenReturn(false); 783 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 784 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 785 Mockito.anyLong()); 786 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 787 Mockito.anyLong()); 788 789 // create an bucket allocator. 790 long availableSpace = 1024 * 1024 * 1024L; 791 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 792 793 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 794 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false); 795 796 Assert.assertEquals(0, allocator.getUsedSize()); 797 try { 798 re.writeToCache(ioEngine, allocator, null, null, 799 ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); 800 Assert.fail(); 801 } catch (Exception e) { 802 } 803 Assert.assertEquals(0, allocator.getUsedSize()); 804 } 805 806 /** 807 * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file 808 * could not be freed even if corresponding {@link HFileBlock} is evicted from 809 * {@link BucketCache}. 810 */ 811 @Test 812 public void testFreeBucketEntryRestoredFromFile() throws Exception { 813 BucketCache bucketCache = null; 814 try { 815 final Path dataTestDir = createAndGetTestDir(); 816 817 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 818 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 819 820 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 821 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 822 assertTrue(bucketCache.waitForCacheInitialization(10000)); 823 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 824 assertEquals(0, usedByteSize); 825 826 HFileBlockPair[] hfileBlockPairs = 827 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 828 // Add blocks 829 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 830 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); 831 } 832 833 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 834 cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), 835 hfileBlockPair.getBlock(), false); 836 } 837 usedByteSize = bucketCache.getAllocator().getUsedSize(); 838 assertNotEquals(0, usedByteSize); 839 // persist cache to file 840 bucketCache.shutdown(); 841 assertTrue(new File(persistencePath).exists()); 842 843 // restore cache from file 844 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 845 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 846 assertTrue(bucketCache.waitForCacheInitialization(10000)); 847 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 848 849 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 850 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 851 bucketCache.evictBlock(blockCacheKey); 852 } 853 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 854 assertEquals(0, bucketCache.backingMap.size()); 855 } finally { 856 bucketCache.shutdown(); 857 HBASE_TESTING_UTILITY.cleanupTestDir(); 858 } 859 } 860 861 @Test 862 public void testBlockAdditionWaitWhenCache() throws Exception { 863 BucketCache bucketCache = null; 864 try { 865 final Path dataTestDir = createAndGetTestDir(); 866 867 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 868 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 869 870 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 871 config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000); 872 873 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 874 constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config); 875 assertTrue(bucketCache.waitForCacheInitialization(10000)); 876 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 877 assertEquals(0, usedByteSize); 878 879 HFileBlockPair[] hfileBlockPairs = 880 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); 881 // Add blocks 882 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 883 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, 884 true); 885 } 886 887 // Max wait for 10 seconds. 888 long timeout = 10000; 889 // Wait for blocks size to match the number of blocks. 890 while (bucketCache.backingMap.size() != 10) { 891 if (timeout <= 0) break; 892 Threads.sleep(100); 893 timeout -= 100; 894 } 895 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 896 assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); 897 } 898 usedByteSize = bucketCache.getAllocator().getUsedSize(); 899 assertNotEquals(0, usedByteSize); 900 // persist cache to file 901 bucketCache.shutdown(); 902 assertTrue(new File(persistencePath).exists()); 903 904 // restore cache from file 905 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 906 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 907 assertTrue(bucketCache.waitForCacheInitialization(10000)); 908 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 909 910 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 911 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 912 bucketCache.evictBlock(blockCacheKey); 913 } 914 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 915 assertEquals(0, bucketCache.backingMap.size()); 916 } finally { 917 if (bucketCache != null) { 918 bucketCache.shutdown(); 919 } 920 HBASE_TESTING_UTILITY.cleanupTestDir(); 921 } 922 } 923 924 @Test 925 public void testNotifyFileCachingCompletedSuccess() throws Exception { 926 BucketCache bucketCache = null; 927 try { 928 Path filePath = 929 new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess"); 930 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false); 931 if (bucketCache.getStats().getFailedInserts() > 0) { 932 LOG.info("There were {} fail inserts, " 933 + "will assert if total blocks in backingMap equals (10 - failInserts) " 934 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 935 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 936 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 937 } else { 938 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 939 } 940 } finally { 941 if (bucketCache != null) { 942 bucketCache.shutdown(); 943 } 944 HBASE_TESTING_UTILITY.cleanupTestDir(); 945 } 946 } 947 948 @Test 949 public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception { 950 BucketCache bucketCache = null; 951 try { 952 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 953 "testNotifyFileCachingCompletedForEncodedDataSuccess"); 954 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true); 955 if (bucketCache.getStats().getFailedInserts() > 0) { 956 LOG.info("There were {} fail inserts, " 957 + "will assert if total blocks in backingMap equals (10 - failInserts) " 958 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 959 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 960 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 961 } else { 962 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 963 } 964 } finally { 965 if (bucketCache != null) { 966 bucketCache.shutdown(); 967 } 968 HBASE_TESTING_UTILITY.cleanupTestDir(); 969 } 970 } 971 972 @Test 973 public void testNotifyFileCachingCompletedNotAllCached() throws Exception { 974 BucketCache bucketCache = null; 975 try { 976 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 977 "testNotifyFileCachingCompletedNotAllCached"); 978 // Deliberately passing more blocks than we have created to test that 979 // notifyFileCachingCompleted will not consider the file fully cached 980 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false); 981 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 982 } finally { 983 if (bucketCache != null) { 984 bucketCache.shutdown(); 985 } 986 HBASE_TESTING_UTILITY.cleanupTestDir(); 987 } 988 } 989 990 private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath, 991 int totalBlocksToCheck, boolean encoded) throws Exception { 992 final Path dataTestDir = createAndGetTestDir(); 993 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 994 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 995 constructedBlockSizes, 1, 1, null); 996 assertTrue(bucketCache.waitForCacheInitialization(10000)); 997 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 998 assertEquals(0, usedByteSize); 999 HFileBlockPair[] hfileBlockPairs = 1000 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded); 1001 // Add blocks 1002 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 1003 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true); 1004 } 1005 bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck, 1006 totalBlocksToCheck * constructedBlockSize); 1007 return bucketCache; 1008 } 1009 1010 @Test 1011 public void testEvictOrphansOutOfGracePeriod() throws Exception { 1012 BucketCache bucketCache = testEvictOrphans(0); 1013 assertEquals(10, bucketCache.getBackingMap().size()); 1014 assertEquals(0, bucketCache.blocksByHFile.stream() 1015 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count()); 1016 } 1017 1018 @Test 1019 public void testEvictOrphansWithinGracePeriod() throws Exception { 1020 BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L); 1021 assertEquals(18, bucketCache.getBackingMap().size()); 1022 assertTrue(bucketCache.blocksByHFile.stream() 1023 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0); 1024 } 1025 1026 private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception { 1027 Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid"); 1028 Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan"); 1029 Map<String, HRegion> onlineRegions = new HashMap<>(); 1030 List<HStore> stores = new ArrayList<>(); 1031 Collection<HStoreFile> storeFiles = new ArrayList<>(); 1032 HRegion mockedRegion = mock(HRegion.class); 1033 HStore mockedStore = mock(HStore.class); 1034 HStoreFile mockedStoreFile = mock(HStoreFile.class); 1035 when(mockedStoreFile.getPath()).thenReturn(validFile); 1036 storeFiles.add(mockedStoreFile); 1037 when(mockedStore.getStorefiles()).thenReturn(storeFiles); 1038 stores.add(mockedStore); 1039 when(mockedRegion.getStores()).thenReturn(stores); 1040 onlineRegions.put("mocked_region", mockedRegion); 1041 HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); 1042 HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); 1043 HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); 1044 HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD, 1045 orphanEvictionGracePeriod); 1046 BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21, 1047 constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000, 1048 HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions); 1049 HFileBlockPair[] validBlockPairs = 1050 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile); 1051 HFileBlockPair[] orphanBlockPairs = 1052 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile); 1053 for (HFileBlockPair pair : validBlockPairs) { 1054 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1055 } 1056 waitUntilAllFlushedToBucket(bucketCache); 1057 assertEquals(10, bucketCache.getBackingMap().size()); 1058 bucketCache.freeSpace("test"); 1059 assertEquals(10, bucketCache.getBackingMap().size()); 1060 for (HFileBlockPair pair : orphanBlockPairs) { 1061 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1062 } 1063 waitUntilAllFlushedToBucket(bucketCache); 1064 assertEquals(20, bucketCache.getBackingMap().size()); 1065 bucketCache.freeSpace("test"); 1066 return bucketCache; 1067 } 1068}