001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.File; 028import java.io.IOException; 029import java.nio.ByteBuffer; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.Map; 034import java.util.Random; 035import java.util.Set; 036import java.util.concurrent.locks.ReentrantReadWriteLock; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.io.ByteBuffAllocator; 044import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 045import org.apache.hadoop.hbase.io.hfile.BlockType; 046import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 048import org.apache.hadoop.hbase.io.hfile.Cacheable; 049import org.apache.hadoop.hbase.io.hfile.HFileBlock; 050import org.apache.hadoop.hbase.io.hfile.HFileContext; 051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 052import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 054import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 056import org.apache.hadoop.hbase.nio.ByteBuff; 057import org.apache.hadoop.hbase.testclassification.IOTests; 058import org.apache.hadoop.hbase.testclassification.LargeTests; 059import org.junit.After; 060import org.junit.Assert; 061import org.junit.Before; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.junit.runner.RunWith; 066import org.junit.runners.Parameterized; 067import org.mockito.Mockito; 068 069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 070 071/** 072 * Basic test of BucketCache.Puts and gets. 073 * <p> 074 * Tests will ensure that blocks' data correctness under several threads concurrency 075 */ 076@RunWith(Parameterized.class) 077@Category({ IOTests.class, LargeTests.class }) 078public class TestBucketCache { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestBucketCache.class); 083 084 private static final Random RAND = new Random(); 085 086 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 087 public static Iterable<Object[]> data() { 088 return Arrays.asList(new Object[][] { 089 { 8192, null }, // TODO: why is 8k the default blocksize for these tests? 090 { 091 16 * 1024, 092 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 093 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 094 128 * 1024 + 1024 } } }); 095 } 096 097 @Parameterized.Parameter(0) 098 public int constructedBlockSize; 099 100 @Parameterized.Parameter(1) 101 public int[] constructedBlockSizes; 102 103 BucketCache cache; 104 final int CACHE_SIZE = 1000000; 105 final int NUM_BLOCKS = 100; 106 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 107 final int NUM_THREADS = 100; 108 final int NUM_QUERIES = 10000; 109 110 final long capacitySize = 32 * 1024 * 1024; 111 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 112 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 113 String ioEngineName = "offheap"; 114 String persistencePath = null; 115 116 private static class MockedBucketCache extends BucketCache { 117 118 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 119 int writerThreads, int writerQLen, String persistencePath) throws IOException { 120 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 121 persistencePath); 122 super.wait_when_cache = true; 123 } 124 125 @Override 126 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 127 super.cacheBlock(cacheKey, buf, inMemory); 128 } 129 130 @Override 131 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 132 super.cacheBlock(cacheKey, buf); 133 } 134 } 135 136 @Before 137 public void setup() throws IOException { 138 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 139 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 140 } 141 142 @After 143 public void tearDown() { 144 cache.shutdown(); 145 } 146 147 /** 148 * Return a random element from {@code a}. 149 */ 150 private static <T> T randFrom(List<T> a) { 151 return a.get(RAND.nextInt(a.size())); 152 } 153 154 @Test 155 public void testBucketAllocator() throws BucketAllocatorException { 156 BucketAllocator mAllocator = cache.getAllocator(); 157 /* 158 * Test the allocator first 159 */ 160 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 161 162 boolean full = false; 163 ArrayList<Long> allocations = new ArrayList<>(); 164 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 165 // the cache is completely filled. 166 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 167 while (!full) { 168 Integer blockSize = null; 169 try { 170 blockSize = randFrom(tmp); 171 allocations.add(mAllocator.allocateBlock(blockSize)); 172 } catch (CacheFullException cfe) { 173 tmp.remove(blockSize); 174 if (tmp.isEmpty()) full = true; 175 } 176 } 177 178 for (Integer blockSize : BLOCKSIZES) { 179 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 180 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 181 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); 182 } 183 184 for (long offset : allocations) { 185 assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset)); 186 } 187 assertEquals(0, mAllocator.getUsedSize()); 188 } 189 190 @Test 191 public void testCacheSimple() throws Exception { 192 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 193 } 194 195 @Test 196 public void testCacheMultiThreadedSingleKey() throws Exception { 197 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 198 } 199 200 @Test 201 public void testHeapSizeChanges() throws Exception { 202 cache.stopWriterThreads(); 203 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 204 } 205 206 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 207 throws InterruptedException { 208 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 209 Thread.sleep(100); 210 } 211 Thread.sleep(1000); 212 } 213 214 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 215 while (!cache.ramCache.isEmpty()) { 216 Thread.sleep(100); 217 } 218 Thread.sleep(1000); 219 } 220 221 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 222 // threads will flush it to the bucket and put reference entry in backingMap. 223 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 224 Cacheable block) throws InterruptedException { 225 cache.cacheBlock(cacheKey, block); 226 waitUntilFlushedToBucket(cache, cacheKey); 227 } 228 229 @Test 230 public void testMemoryLeak() throws Exception { 231 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 232 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 233 new CacheTestUtils.ByteArrayCacheable(new byte[10])); 234 long lockId = cache.backingMap.get(cacheKey).offset(); 235 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 236 lock.writeLock().lock(); 237 Thread evictThread = new Thread("evict-block") { 238 @Override 239 public void run() { 240 cache.evictBlock(cacheKey); 241 } 242 }; 243 evictThread.start(); 244 cache.offsetLock.waitForWaiters(lockId, 1); 245 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true); 246 assertEquals(0, cache.getBlockCount()); 247 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 248 new CacheTestUtils.ByteArrayCacheable(new byte[10])); 249 assertEquals(1, cache.getBlockCount()); 250 lock.writeLock().unlock(); 251 evictThread.join(); 252 assertEquals(0, cache.getBlockCount()); 253 assertEquals(cache.getCurrentSize(), 0L); 254 } 255 256 @Test 257 public void testRetrieveFromFile() throws Exception { 258 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 259 Path testDir = TEST_UTIL.getDataTestDir(); 260 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 261 262 String ioEngineName = "file:" + testDir + "/bucket.cache"; 263 String persistencePath = testDir + "/bucket.persistence"; 264 265 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 266 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 267 long usedSize = bucketCache.getAllocator().getUsedSize(); 268 assertEquals(0, usedSize); 269 270 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 271 // Add blocks 272 for (HFileBlockPair block : blocks) { 273 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 274 } 275 for (HFileBlockPair block : blocks) { 276 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 277 } 278 usedSize = bucketCache.getAllocator().getUsedSize(); 279 assertNotEquals(0, usedSize); 280 // persist cache to file 281 bucketCache.shutdown(); 282 assertTrue(new File(persistencePath).exists()); 283 284 // restore cache from file 285 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 286 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 287 assertFalse(new File(persistencePath).exists()); 288 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 289 // persist cache to file 290 bucketCache.shutdown(); 291 assertTrue(new File(persistencePath).exists()); 292 293 // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k) 294 // so it can't restore cache from file 295 int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 }; 296 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 297 smallBucketSizes, writeThreads, writerQLen, persistencePath); 298 assertFalse(new File(persistencePath).exists()); 299 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 300 assertEquals(0, bucketCache.backingMap.size()); 301 302 TEST_UTIL.cleanupTestDir(); 303 } 304 305 @Test 306 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 307 long availableSpace = 20 * 1024L * 1024 * 1024; 308 int[] bucketSizes = new int[]{1024, 1024 * 1024, 1024 * 1024 * 1024}; 309 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 310 assertTrue(allocator.getBuckets().length > 0); 311 } 312 313 @Test 314 public void testGetPartitionSize() throws IOException { 315 //Test default values 316 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, BucketCache.DEFAULT_MIN_FACTOR); 317 318 Configuration conf = HBaseConfiguration.create(); 319 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 320 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 321 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 322 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 323 324 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 325 constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); 326 327 validateGetPartitionSize(cache, 0.1f, 0.5f); 328 validateGetPartitionSize(cache, 0.7f, 0.5f); 329 validateGetPartitionSize(cache, 0.2f, 0.5f); 330 } 331 332 @Test 333 public void testValidBucketCacheConfigs() throws IOException { 334 Configuration conf = HBaseConfiguration.create(); 335 conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 336 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 337 conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 338 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 339 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 340 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 341 342 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 343 constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); 344 345 assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, 346 cache.getAcceptableFactor(), 0); 347 assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 348 cache.getMinFactor(), 0); 349 assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 350 cache.getExtraFreeFactor(), 0); 351 assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, 352 cache.getSingleFactor(), 0); 353 assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, 354 cache.getMultiFactor(), 0); 355 assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, 356 cache.getMemoryFactor(), 0); 357 } 358 359 @Test 360 public void testInvalidAcceptFactorConfig() throws IOException { 361 float[] configValues = {-1f, 0.2f, 0.86f, 1.05f}; 362 boolean[] expectedOutcomes = {false, false, true, false}; 363 Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues); 364 Configuration conf = HBaseConfiguration.create(); 365 checkConfigValues(conf, configMappings, expectedOutcomes); 366 } 367 368 @Test 369 public void testInvalidMinFactorConfig() throws IOException { 370 float[] configValues = {-1f, 0f, 0.96f, 1.05f}; 371 //throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 372 boolean[] expectedOutcomes = {false, true, false, false}; 373 Map<String, float[]> configMappings = ImmutableMap 374 .of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues); 375 Configuration conf = HBaseConfiguration.create(); 376 checkConfigValues(conf, configMappings, expectedOutcomes); 377 } 378 379 @Test 380 public void testInvalidExtraFreeFactorConfig() throws IOException { 381 float[] configValues = {-1f, 0f, 0.2f, 1.05f}; 382 //throws due to <0, in expected range, in expected range, config can be > 1.0 383 boolean[] expectedOutcomes = {false, true, true, true}; 384 Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 385 Configuration conf = HBaseConfiguration.create(); 386 checkConfigValues(conf, configMappings, expectedOutcomes); 387 } 388 389 @Test 390 public void testInvalidCacheSplitFactorConfig() throws IOException { 391 float[] singleFactorConfigValues = {0.2f, 0f, -0.2f, 1f}; 392 float[] multiFactorConfigValues = {0.4f, 0f, 1f, .05f}; 393 float[] memoryFactorConfigValues = {0.4f, 0f, 0.2f, .5f}; 394 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't be negative, configs don't add to 1.0 395 boolean[] expectedOutcomes = {true, false, false, false}; 396 Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 397 singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, 398 BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues); 399 Configuration conf = HBaseConfiguration.create(); 400 checkConfigValues(conf, configMappings, expectedOutcomes); 401 } 402 403 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, boolean[] expectSuccess) throws IOException { 404 Set<String> configNames = configMap.keySet(); 405 for (int i = 0; i < expectSuccess.length; i++) { 406 try { 407 for (String configName : configNames) { 408 conf.setFloat(configName, configMap.get(configName)[i]); 409 } 410 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 411 constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); 412 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 413 } catch (IllegalArgumentException e) { 414 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 415 } 416 } 417 } 418 419 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, float minFactor) { 420 long expectedOutput = (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 421 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 422 } 423 424 @Test 425 public void testOffsetProducesPositiveOutput() { 426 // This number is picked because it produces negative output if the values isn't ensured to be 427 // positive. See HBASE-18757 for more information. 428 long testValue = 549888460800L; 429 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true); 430 assertEquals(testValue, bucketEntry.offset()); 431 } 432 433 @Test 434 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 435 int size = 100; 436 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 437 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 438 HFileContext meta = new HFileContextBuilder().build(); 439 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 440 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 441 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 442 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 443 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 444 445 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 446 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 447 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 448 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 449 blockWithNextBlockMetadata.serialize(block1Buffer, true); 450 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 451 452 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 453 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 454 block1Buffer); 455 456 waitUntilFlushedToBucket(cache, key); 457 assertNotNull(cache.backingMap.get(key)); 458 assertEquals(1, cache.backingMap.get(key).refCnt()); 459 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 460 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 461 462 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 463 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 464 block1Buffer); 465 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 466 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 467 assertEquals(1, cache.backingMap.get(key).refCnt()); 468 469 // Clear and add blockWithoutNextBlockMetadata 470 assertTrue(cache.evictBlock(key)); 471 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 472 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 473 474 assertNull(cache.getBlock(key, false, false, false)); 475 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 476 block2Buffer); 477 478 waitUntilFlushedToBucket(cache, key); 479 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 480 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 481 482 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 483 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 484 block1Buffer); 485 486 waitUntilFlushedToBucket(cache, key); 487 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 488 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 489 } 490 491 @Test 492 public void testRAMCache() { 493 int size = 100; 494 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 495 byte[] byteArr = new byte[length]; 496 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 497 HFileContext meta = new HFileContextBuilder().build(); 498 499 RAMCache cache = new RAMCache(); 500 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 501 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 502 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 503 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 504 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 505 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 506 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE); 507 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE); 508 509 assertFalse(cache.containsKey(key1)); 510 assertNull(cache.putIfAbsent(key1, re1)); 511 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 512 513 assertNotNull(cache.putIfAbsent(key1, re2)); 514 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 515 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 516 517 assertNull(cache.putIfAbsent(key2, re2)); 518 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 519 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 520 521 cache.remove(key1); 522 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 523 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 524 525 cache.clear(); 526 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 527 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 528 } 529 530 @Test 531 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 532 // initialize an block. 533 int size = 100, offset = 20; 534 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 535 ByteBuffer buf = ByteBuffer.allocate(length); 536 HFileContext meta = new HFileContextBuilder().build(); 537 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 538 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 539 540 // initialize an mocked ioengine. 541 IOEngine ioEngine = Mockito.mock(IOEngine.class); 542 Mockito.when(ioEngine.usesSharedMemory()).thenReturn(false); 543 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 544 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 545 Mockito.anyLong()); 546 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 547 Mockito.anyLong()); 548 549 // create an bucket allocator. 550 long availableSpace = 1024 * 1024 * 1024L; 551 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 552 553 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 554 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, ByteBuffAllocator.NONE); 555 556 Assert.assertEquals(0, allocator.getUsedSize()); 557 try { 558 re.writeToCache(ioEngine, allocator, null); 559 Assert.fail(); 560 } catch (Exception e) { 561 } 562 Assert.assertEquals(0, allocator.getUsedSize()); 563 } 564}