001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.when; 027 028import java.io.File; 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.io.ByteBuffAllocator; 045import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 046import org.apache.hadoop.hbase.io.hfile.BlockType; 047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 048import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 049import org.apache.hadoop.hbase.io.hfile.Cacheable; 050import org.apache.hadoop.hbase.io.hfile.HFileBlock; 051import org.apache.hadoop.hbase.io.hfile.HFileContext; 052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 054import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 056import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 057import org.apache.hadoop.hbase.nio.ByteBuff; 058import org.apache.hadoop.hbase.testclassification.IOTests; 059import org.apache.hadoop.hbase.testclassification.LargeTests; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.apache.hadoop.hbase.util.Pair; 062import org.apache.hadoop.hbase.util.Threads; 063import org.junit.After; 064import org.junit.Assert; 065import org.junit.Before; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.junit.runner.RunWith; 070import org.junit.runners.Parameterized; 071import org.mockito.Mockito; 072 073import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 074 075/** 076 * Basic test of BucketCache.Puts and gets. 077 * <p> 078 * Tests will ensure that blocks' data correctness under several threads concurrency 079 */ 080@RunWith(Parameterized.class) 081@Category({ IOTests.class, LargeTests.class }) 082public class TestBucketCache { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestBucketCache.class); 087 088 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 089 public static Iterable<Object[]> data() { 090 return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize 091 // for these tests? 092 { 16 * 1024, 093 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 094 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 095 128 * 1024 + 1024 } } }); 096 } 097 098 @Parameterized.Parameter(0) 099 public int constructedBlockSize; 100 101 @Parameterized.Parameter(1) 102 public int[] constructedBlockSizes; 103 104 BucketCache cache; 105 final int CACHE_SIZE = 1000000; 106 final int NUM_BLOCKS = 100; 107 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 108 final int NUM_THREADS = 100; 109 final int NUM_QUERIES = 10000; 110 111 final long capacitySize = 32 * 1024 * 1024; 112 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 113 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 114 private String ioEngineName = "offheap"; 115 116 private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); 117 118 private static class MockedBucketCache extends BucketCache { 119 120 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 121 int writerThreads, int writerQLen, String persistencePath) throws IOException { 122 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 123 persistencePath); 124 } 125 126 @Override 127 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 128 super.cacheBlock(cacheKey, buf, inMemory); 129 } 130 131 @Override 132 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 133 super.cacheBlock(cacheKey, buf); 134 } 135 } 136 137 @Before 138 public void setup() throws IOException { 139 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 140 constructedBlockSizes, writeThreads, writerQLen, null); 141 } 142 143 @After 144 public void tearDown() { 145 cache.shutdown(); 146 } 147 148 /** 149 * Test Utility to create test dir and return name 150 * @return return name of created dir 151 * @throws IOException throws IOException 152 */ 153 private Path createAndGetTestDir() throws IOException { 154 final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir(); 155 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir); 156 return testDir; 157 } 158 159 /** 160 * Return a random element from {@code a}. 161 */ 162 private static <T> T randFrom(List<T> a) { 163 return a.get(ThreadLocalRandom.current().nextInt(a.size())); 164 } 165 166 @Test 167 public void testBucketAllocator() throws BucketAllocatorException { 168 BucketAllocator mAllocator = cache.getAllocator(); 169 /* 170 * Test the allocator first 171 */ 172 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 173 174 boolean full = false; 175 ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>(); 176 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 177 // the cache is completely filled. 178 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 179 while (!full) { 180 Integer blockSize = null; 181 try { 182 blockSize = randFrom(tmp); 183 allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize)); 184 } catch (CacheFullException cfe) { 185 tmp.remove(blockSize); 186 if (tmp.isEmpty()) full = true; 187 } 188 } 189 190 for (Integer blockSize : BLOCKSIZES) { 191 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 192 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 193 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); 194 195 // we know the block sizes above are multiples of 1024, but default bucket sizes give an 196 // additional 1024 on top of that so this counts towards fragmentation in our test 197 // real life may have worse fragmentation because blocks may not be perfectly sized to block 198 // size, given encoding/compression and large rows 199 assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes()); 200 } 201 202 mAllocator.logDebugStatistics(); 203 204 for (Pair<Long, Integer> allocation : allocations) { 205 assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()), 206 mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond())); 207 } 208 assertEquals(0, mAllocator.getUsedSize()); 209 } 210 211 @Test 212 public void testCacheSimple() throws Exception { 213 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 214 } 215 216 @Test 217 public void testCacheMultiThreadedSingleKey() throws Exception { 218 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 219 } 220 221 @Test 222 public void testHeapSizeChanges() throws Exception { 223 cache.stopWriterThreads(); 224 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 225 } 226 227 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 228 throws InterruptedException { 229 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 230 Thread.sleep(100); 231 } 232 Thread.sleep(1000); 233 } 234 235 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 236 while (!cache.ramCache.isEmpty()) { 237 Thread.sleep(100); 238 } 239 Thread.sleep(1000); 240 } 241 242 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 243 // threads will flush it to the bucket and put reference entry in backingMap. 244 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 245 Cacheable block, boolean waitWhenCache) throws InterruptedException { 246 cache.cacheBlock(cacheKey, block, false, waitWhenCache); 247 waitUntilFlushedToBucket(cache, cacheKey); 248 } 249 250 @Test 251 public void testMemoryLeak() throws Exception { 252 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 253 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 254 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 255 long lockId = cache.backingMap.get(cacheKey).offset(); 256 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 257 lock.writeLock().lock(); 258 Thread evictThread = new Thread("evict-block") { 259 @Override 260 public void run() { 261 cache.evictBlock(cacheKey); 262 } 263 }; 264 evictThread.start(); 265 cache.offsetLock.waitForWaiters(lockId, 1); 266 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); 267 assertEquals(0, cache.getBlockCount()); 268 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 269 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 270 assertEquals(1, cache.getBlockCount()); 271 lock.writeLock().unlock(); 272 evictThread.join(); 273 /** 274 * <pre> 275 * The asserts here before HBASE-21957 are: 276 * assertEquals(1L, cache.getBlockCount()); 277 * assertTrue(cache.getCurrentSize() > 0L); 278 * assertTrue("We should have a block!", cache.iterator().hasNext()); 279 * 280 * The asserts here after HBASE-21957 are: 281 * assertEquals(0, cache.getBlockCount()); 282 * assertEquals(cache.getCurrentSize(), 0L); 283 * 284 * I think the asserts before HBASE-21957 is more reasonable,because 285 * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry} 286 * it had seen, and newly added Block after the {@link BucketEntry} 287 * it had seen should not be evicted. 288 * </pre> 289 */ 290 assertEquals(1L, cache.getBlockCount()); 291 assertTrue(cache.getCurrentSize() > 0L); 292 assertTrue("We should have a block!", cache.iterator().hasNext()); 293 } 294 295 @Test 296 public void testRetrieveFromFile() throws Exception { 297 Path testDir = createAndGetTestDir(); 298 String ioEngineName = "file:" + testDir + "/bucket.cache"; 299 testRetrievalUtils(testDir, ioEngineName); 300 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 301 String persistencePath = testDir + "/bucket.persistence"; 302 BucketCache bucketCache = null; 303 try { 304 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 305 smallBucketSizes, writeThreads, writerQLen, persistencePath); 306 assertFalse(new File(persistencePath).exists()); 307 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 308 assertEquals(0, bucketCache.backingMap.size()); 309 } finally { 310 bucketCache.shutdown(); 311 HBASE_TESTING_UTILITY.cleanupTestDir(); 312 } 313 } 314 315 @Test 316 public void testRetrieveFromMMap() throws Exception { 317 final Path testDir = createAndGetTestDir(); 318 final String ioEngineName = "mmap:" + testDir + "/bucket.cache"; 319 testRetrievalUtils(testDir, ioEngineName); 320 } 321 322 @Test 323 public void testRetrieveFromPMem() throws Exception { 324 final Path testDir = createAndGetTestDir(); 325 final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; 326 testRetrievalUtils(testDir, ioEngineName); 327 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 328 String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 329 BucketCache bucketCache = null; 330 try { 331 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 332 smallBucketSizes, writeThreads, writerQLen, persistencePath); 333 assertFalse(new File(persistencePath).exists()); 334 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 335 assertEquals(0, bucketCache.backingMap.size()); 336 } finally { 337 bucketCache.shutdown(); 338 HBASE_TESTING_UTILITY.cleanupTestDir(); 339 } 340 } 341 342 private void testRetrievalUtils(Path testDir, String ioEngineName) 343 throws IOException, InterruptedException { 344 final String persistencePath = 345 testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 346 BucketCache bucketCache = null; 347 try { 348 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 349 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 350 long usedSize = bucketCache.getAllocator().getUsedSize(); 351 assertEquals(0, usedSize); 352 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 353 for (HFileBlockPair block : blocks) { 354 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 355 } 356 for (HFileBlockPair block : blocks) { 357 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 358 false); 359 } 360 usedSize = bucketCache.getAllocator().getUsedSize(); 361 assertNotEquals(0, usedSize); 362 bucketCache.shutdown(); 363 assertTrue(new File(persistencePath).exists()); 364 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 365 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 366 assertFalse(new File(persistencePath).exists()); 367 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 368 } finally { 369 if (bucketCache != null) { 370 bucketCache.shutdown(); 371 } 372 } 373 assertTrue(new File(persistencePath).exists()); 374 } 375 376 @Test 377 public void testRetrieveUnsupportedIOE() throws Exception { 378 try { 379 final Path testDir = createAndGetTestDir(); 380 final String ioEngineName = testDir + "/bucket.cache"; 381 testRetrievalUtils(testDir, ioEngineName); 382 Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!"); 383 } catch (IllegalArgumentException e) { 384 Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " 385 + "files:, mmap: or offheap", e.getMessage()); 386 } 387 } 388 389 @Test 390 public void testRetrieveFromMultipleFiles() throws Exception { 391 final Path testDirInitial = createAndGetTestDir(); 392 final Path newTestDir = new HBaseTestingUtil().getDataTestDir(); 393 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir); 394 String ioEngineName = 395 new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache") 396 .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString(); 397 testRetrievalUtils(testDirInitial, ioEngineName); 398 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 399 String persistencePath = testDirInitial + "/bucket.persistence"; 400 BucketCache bucketCache = null; 401 try { 402 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 403 smallBucketSizes, writeThreads, writerQLen, persistencePath); 404 assertFalse(new File(persistencePath).exists()); 405 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 406 assertEquals(0, bucketCache.backingMap.size()); 407 } finally { 408 bucketCache.shutdown(); 409 HBASE_TESTING_UTILITY.cleanupTestDir(); 410 } 411 } 412 413 @Test 414 public void testRetrieveFromFileWithoutPersistence() throws Exception { 415 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 416 constructedBlockSizes, writeThreads, writerQLen, null); 417 try { 418 final Path testDir = createAndGetTestDir(); 419 String ioEngineName = "file:" + testDir + "/bucket.cache"; 420 long usedSize = bucketCache.getAllocator().getUsedSize(); 421 assertEquals(0, usedSize); 422 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 423 for (HFileBlockPair block : blocks) { 424 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 425 } 426 for (HFileBlockPair block : blocks) { 427 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 428 false); 429 } 430 usedSize = bucketCache.getAllocator().getUsedSize(); 431 assertNotEquals(0, usedSize); 432 bucketCache.shutdown(); 433 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 434 constructedBlockSizes, writeThreads, writerQLen, null); 435 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 436 } finally { 437 bucketCache.shutdown(); 438 HBASE_TESTING_UTILITY.cleanupTestDir(); 439 } 440 } 441 442 @Test 443 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 444 long availableSpace = 20 * 1024L * 1024 * 1024; 445 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; 446 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 447 assertTrue(allocator.getBuckets().length > 0); 448 } 449 450 @Test 451 public void testGetPartitionSize() throws IOException { 452 // Test default values 453 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, 454 BucketCache.DEFAULT_MIN_FACTOR); 455 456 Configuration conf = HBaseConfiguration.create(); 457 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 458 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 459 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 460 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 461 462 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 463 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 464 465 validateGetPartitionSize(cache, 0.1f, 0.5f); 466 validateGetPartitionSize(cache, 0.7f, 0.5f); 467 validateGetPartitionSize(cache, 0.2f, 0.5f); 468 } 469 470 @Test 471 public void testCacheSizeCapacity() throws IOException { 472 // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE 473 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, 474 BucketCache.DEFAULT_MIN_FACTOR); 475 Configuration conf = HBaseConfiguration.create(); 476 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 477 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 478 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 479 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 480 try { 481 new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads, 482 writerQLen, null, 100, conf); 483 Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!"); 484 } catch (IllegalArgumentException e) { 485 Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage()); 486 } 487 } 488 489 @Test 490 public void testValidBucketCacheConfigs() throws IOException { 491 Configuration conf = HBaseConfiguration.create(); 492 conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 493 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 494 conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 495 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 496 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 497 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 498 499 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 500 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 501 502 assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, 503 cache.getAcceptableFactor(), 0); 504 assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 505 cache.getMinFactor(), 0); 506 assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 507 cache.getExtraFreeFactor(), 0); 508 assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, 509 cache.getSingleFactor(), 0); 510 assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, 511 cache.getMultiFactor(), 0); 512 assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, 513 cache.getMemoryFactor(), 0); 514 } 515 516 @Test 517 public void testInvalidAcceptFactorConfig() throws IOException { 518 float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; 519 boolean[] expectedOutcomes = { false, false, true, false }; 520 Map<String, float[]> configMappings = 521 ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues); 522 Configuration conf = HBaseConfiguration.create(); 523 checkConfigValues(conf, configMappings, expectedOutcomes); 524 } 525 526 @Test 527 public void testInvalidMinFactorConfig() throws IOException { 528 float[] configValues = { -1f, 0f, 0.96f, 1.05f }; 529 // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 530 boolean[] expectedOutcomes = { false, true, false, false }; 531 Map<String, float[]> configMappings = 532 ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues); 533 Configuration conf = HBaseConfiguration.create(); 534 checkConfigValues(conf, configMappings, expectedOutcomes); 535 } 536 537 @Test 538 public void testInvalidExtraFreeFactorConfig() throws IOException { 539 float[] configValues = { -1f, 0f, 0.2f, 1.05f }; 540 // throws due to <0, in expected range, in expected range, config can be > 1.0 541 boolean[] expectedOutcomes = { false, true, true, true }; 542 Map<String, float[]> configMappings = 543 ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 544 Configuration conf = HBaseConfiguration.create(); 545 checkConfigValues(conf, configMappings, expectedOutcomes); 546 } 547 548 @Test 549 public void testInvalidCacheSplitFactorConfig() throws IOException { 550 float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f }; 551 float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f }; 552 float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f }; 553 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't 554 // be negative, configs don't add to 1.0 555 boolean[] expectedOutcomes = { true, false, false, false }; 556 Map<String, 557 float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 558 singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, 559 BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues); 560 Configuration conf = HBaseConfiguration.create(); 561 checkConfigValues(conf, configMappings, expectedOutcomes); 562 } 563 564 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, 565 boolean[] expectSuccess) throws IOException { 566 Set<String> configNames = configMap.keySet(); 567 for (int i = 0; i < expectSuccess.length; i++) { 568 try { 569 for (String configName : configNames) { 570 conf.setFloat(configName, configMap.get(configName)[i]); 571 } 572 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 573 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 574 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] 575 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 576 } catch (IllegalArgumentException e) { 577 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] 578 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 579 } 580 } 581 } 582 583 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, 584 float minFactor) { 585 long expectedOutput = 586 (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 587 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 588 } 589 590 @Test 591 public void testOffsetProducesPositiveOutput() { 592 // This number is picked because it produces negative output if the values isn't ensured to be 593 // positive. See HBASE-18757 for more information. 594 long testValue = 549888460800L; 595 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { 596 return ByteBuffAllocator.NONE; 597 }, ByteBuffAllocator.HEAP); 598 assertEquals(testValue, bucketEntry.offset()); 599 } 600 601 @Test 602 public void testEvictionCount() throws InterruptedException { 603 int size = 100; 604 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 605 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 606 HFileContext meta = new HFileContextBuilder().build(); 607 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 608 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 609 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 610 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 611 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 612 613 BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0); 614 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 615 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 616 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 617 blockWithNextBlockMetadata.serialize(block1Buffer, true); 618 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 619 620 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 621 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 622 block1Buffer); 623 624 waitUntilFlushedToBucket(cache, key); 625 626 assertEquals(0, cache.getStats().getEvictionCount()); 627 628 // evict call should return 1, but then eviction count be 0 629 assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount")); 630 assertEquals(0, cache.getStats().getEvictionCount()); 631 632 // add back 633 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 634 block1Buffer); 635 waitUntilFlushedToBucket(cache, key); 636 637 // should not increment 638 assertTrue(cache.evictBlock(key)); 639 assertEquals(0, cache.getStats().getEvictionCount()); 640 641 // add back 642 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 643 block1Buffer); 644 waitUntilFlushedToBucket(cache, key); 645 646 // should finally increment eviction count 647 cache.freeSpace("testing"); 648 assertEquals(1, cache.getStats().getEvictionCount()); 649 } 650 651 @Test 652 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 653 int size = 100; 654 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 655 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 656 HFileContext meta = new HFileContextBuilder().build(); 657 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 658 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 659 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 660 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 661 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 662 663 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 664 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 665 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 666 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 667 blockWithNextBlockMetadata.serialize(block1Buffer, true); 668 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 669 670 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 671 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 672 block1Buffer); 673 674 waitUntilFlushedToBucket(cache, key); 675 assertNotNull(cache.backingMap.get(key)); 676 assertEquals(1, cache.backingMap.get(key).refCnt()); 677 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 678 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 679 680 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 681 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 682 block1Buffer); 683 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 684 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 685 assertEquals(1, cache.backingMap.get(key).refCnt()); 686 687 // Clear and add blockWithoutNextBlockMetadata 688 assertTrue(cache.evictBlock(key)); 689 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 690 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 691 692 assertNull(cache.getBlock(key, false, false, false)); 693 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 694 block2Buffer); 695 696 waitUntilFlushedToBucket(cache, key); 697 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 698 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 699 700 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 701 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 702 block1Buffer); 703 704 waitUntilFlushedToBucket(cache, key); 705 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 706 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 707 } 708 709 @Test 710 public void testRAMCache() { 711 int size = 100; 712 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 713 byte[] byteArr = new byte[length]; 714 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 715 HFileContext meta = new HFileContextBuilder().build(); 716 717 RAMCache cache = new RAMCache(); 718 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 719 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 720 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 721 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 722 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 723 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 724 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false); 725 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false); 726 727 assertFalse(cache.containsKey(key1)); 728 assertNull(cache.putIfAbsent(key1, re1)); 729 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 730 731 assertNotNull(cache.putIfAbsent(key1, re2)); 732 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 733 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 734 735 assertNull(cache.putIfAbsent(key2, re2)); 736 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 737 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 738 739 cache.remove(key1); 740 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 741 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 742 743 cache.clear(); 744 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 745 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 746 } 747 748 @Test 749 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 750 // initialize an block. 751 int size = 100, offset = 20; 752 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 753 ByteBuffer buf = ByteBuffer.allocate(length); 754 HFileContext meta = new HFileContextBuilder().build(); 755 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 756 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 757 758 // initialize an mocked ioengine. 759 IOEngine ioEngine = Mockito.mock(IOEngine.class); 760 when(ioEngine.usesSharedMemory()).thenReturn(false); 761 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 762 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 763 Mockito.anyLong()); 764 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 765 Mockito.anyLong()); 766 767 // create an bucket allocator. 768 long availableSpace = 1024 * 1024 * 1024L; 769 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 770 771 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 772 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false); 773 774 Assert.assertEquals(0, allocator.getUsedSize()); 775 try { 776 re.writeToCache(ioEngine, allocator, null, null, 777 ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); 778 Assert.fail(); 779 } catch (Exception e) { 780 } 781 Assert.assertEquals(0, allocator.getUsedSize()); 782 } 783 784 /** 785 * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file 786 * could not be freed even if corresponding {@link HFileBlock} is evicted from 787 * {@link BucketCache}. 788 */ 789 @Test 790 public void testFreeBucketEntryRestoredFromFile() throws Exception { 791 BucketCache bucketCache = null; 792 try { 793 final Path dataTestDir = createAndGetTestDir(); 794 795 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 796 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 797 798 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 799 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 800 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 801 assertEquals(0, usedByteSize); 802 803 HFileBlockPair[] hfileBlockPairs = 804 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 805 // Add blocks 806 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 807 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); 808 } 809 810 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 811 cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), 812 hfileBlockPair.getBlock(), false); 813 } 814 usedByteSize = bucketCache.getAllocator().getUsedSize(); 815 assertNotEquals(0, usedByteSize); 816 // persist cache to file 817 bucketCache.shutdown(); 818 assertTrue(new File(persistencePath).exists()); 819 820 // restore cache from file 821 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 822 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 823 assertFalse(new File(persistencePath).exists()); 824 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 825 826 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 827 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 828 bucketCache.evictBlock(blockCacheKey); 829 } 830 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 831 assertEquals(0, bucketCache.backingMap.size()); 832 } finally { 833 bucketCache.shutdown(); 834 HBASE_TESTING_UTILITY.cleanupTestDir(); 835 } 836 } 837 838 @Test 839 public void testBlockAdditionWaitWhenCache() throws Exception { 840 BucketCache bucketCache = null; 841 try { 842 final Path dataTestDir = createAndGetTestDir(); 843 844 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 845 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 846 847 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 848 constructedBlockSizes, 1, 1, persistencePath); 849 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 850 assertEquals(0, usedByteSize); 851 852 HFileBlockPair[] hfileBlockPairs = 853 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); 854 // Add blocks 855 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 856 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, 857 true); 858 } 859 860 // Max wait for 10 seconds. 861 long timeout = 10000; 862 // Wait for blocks size to match the number of blocks. 863 while (bucketCache.backingMap.size() != 10) { 864 if (timeout <= 0) break; 865 Threads.sleep(100); 866 timeout -= 100; 867 } 868 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 869 assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); 870 } 871 usedByteSize = bucketCache.getAllocator().getUsedSize(); 872 assertNotEquals(0, usedByteSize); 873 // persist cache to file 874 bucketCache.shutdown(); 875 assertTrue(new File(persistencePath).exists()); 876 877 // restore cache from file 878 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 879 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 880 assertFalse(new File(persistencePath).exists()); 881 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 882 883 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 884 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 885 bucketCache.evictBlock(blockCacheKey); 886 } 887 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 888 assertEquals(0, bucketCache.backingMap.size()); 889 } finally { 890 if (bucketCache != null) { 891 bucketCache.shutdown(); 892 } 893 HBASE_TESTING_UTILITY.cleanupTestDir(); 894 } 895 } 896}