001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.File;
028import java.io.IOException;
029import java.nio.ByteBuffer;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Map;
034import java.util.Random;
035import java.util.Set;
036import java.util.concurrent.locks.ReentrantReadWriteLock;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.io.ByteBuffAllocator;
044import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
045import org.apache.hadoop.hbase.io.hfile.BlockType;
046import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
048import org.apache.hadoop.hbase.io.hfile.Cacheable;
049import org.apache.hadoop.hbase.io.hfile.HFileBlock;
050import org.apache.hadoop.hbase.io.hfile.HFileContext;
051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
052import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
054import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
056import org.apache.hadoop.hbase.nio.ByteBuff;
057import org.apache.hadoop.hbase.testclassification.IOTests;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.junit.After;
060import org.junit.Assert;
061import org.junit.Before;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.runner.RunWith;
066import org.junit.runners.Parameterized;
067import org.mockito.Mockito;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
070
071/**
072 * Basic test of BucketCache.Puts and gets.
073 * <p>
074 * Tests will ensure that blocks' data correctness under several threads concurrency
075 */
076@RunWith(Parameterized.class)
077@Category({ IOTests.class, LargeTests.class })
078public class TestBucketCache {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestBucketCache.class);
083
084  private static final Random RAND = new Random();
085
086  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
087  public static Iterable<Object[]> data() {
088    return Arrays.asList(new Object[][] {
089        { 8192, null }, // TODO: why is 8k the default blocksize for these tests?
090        {
091            16 * 1024,
092            new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
093                28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
094                128 * 1024 + 1024 } } });
095  }
096
097  @Parameterized.Parameter(0)
098  public int constructedBlockSize;
099
100  @Parameterized.Parameter(1)
101  public int[] constructedBlockSizes;
102
103  BucketCache cache;
104  final int CACHE_SIZE = 1000000;
105  final int NUM_BLOCKS = 100;
106  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
107  final int NUM_THREADS = 100;
108  final int NUM_QUERIES = 10000;
109
110  final long capacitySize = 32 * 1024 * 1024;
111  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
112  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
113  String ioEngineName = "offheap";
114  String persistencePath = null;
115
116  private static class MockedBucketCache extends BucketCache {
117
118    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
119        int writerThreads, int writerQLen, String persistencePath) throws IOException {
120      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
121          persistencePath);
122      super.wait_when_cache = true;
123    }
124
125    @Override
126    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
127      super.cacheBlock(cacheKey, buf, inMemory);
128    }
129
130    @Override
131    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
132      super.cacheBlock(cacheKey, buf);
133    }
134  }
135
136  @Before
137  public void setup() throws IOException {
138    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
139        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
140  }
141
142  @After
143  public void tearDown() {
144    cache.shutdown();
145  }
146
147  /**
148   * Return a random element from {@code a}.
149   */
150  private static <T> T randFrom(List<T> a) {
151    return a.get(RAND.nextInt(a.size()));
152  }
153
154  @Test
155  public void testBucketAllocator() throws BucketAllocatorException {
156    BucketAllocator mAllocator = cache.getAllocator();
157    /*
158     * Test the allocator first
159     */
160    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
161
162    boolean full = false;
163    ArrayList<Long> allocations = new ArrayList<>();
164    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
165    // the cache is completely filled.
166    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
167    while (!full) {
168      Integer blockSize = null;
169      try {
170        blockSize = randFrom(tmp);
171        allocations.add(mAllocator.allocateBlock(blockSize));
172      } catch (CacheFullException cfe) {
173        tmp.remove(blockSize);
174        if (tmp.isEmpty()) full = true;
175      }
176    }
177
178    for (Integer blockSize : BLOCKSIZES) {
179      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
180      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
181      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
182    }
183
184    for (long offset : allocations) {
185      assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset));
186    }
187    assertEquals(0, mAllocator.getUsedSize());
188  }
189
190  @Test
191  public void testCacheSimple() throws Exception {
192    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
193  }
194
195  @Test
196  public void testCacheMultiThreadedSingleKey() throws Exception {
197    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
198  }
199
200  @Test
201  public void testHeapSizeChanges() throws Exception {
202    cache.stopWriterThreads();
203    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
204  }
205
206  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
207      throws InterruptedException {
208    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
209      Thread.sleep(100);
210    }
211    Thread.sleep(1000);
212  }
213
214  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
215    while (!cache.ramCache.isEmpty()) {
216      Thread.sleep(100);
217    }
218    Thread.sleep(1000);
219  }
220
221  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
222  // threads will flush it to the bucket and put reference entry in backingMap.
223  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
224      Cacheable block) throws InterruptedException {
225    cache.cacheBlock(cacheKey, block);
226    waitUntilFlushedToBucket(cache, cacheKey);
227  }
228
229  @Test
230  public void testMemoryLeak() throws Exception {
231    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
232    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
233      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
234    long lockId = cache.backingMap.get(cacheKey).offset();
235    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
236    lock.writeLock().lock();
237    Thread evictThread = new Thread("evict-block") {
238      @Override
239      public void run() {
240        cache.evictBlock(cacheKey);
241      }
242    };
243    evictThread.start();
244    cache.offsetLock.waitForWaiters(lockId, 1);
245    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
246    assertEquals(0, cache.getBlockCount());
247    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
248      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
249    assertEquals(1, cache.getBlockCount());
250    lock.writeLock().unlock();
251    evictThread.join();
252    assertEquals(0, cache.getBlockCount());
253    assertEquals(cache.getCurrentSize(), 0L);
254  }
255
256  @Test
257  public void testRetrieveFromFile() throws Exception {
258    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
259    Path testDir = TEST_UTIL.getDataTestDir();
260    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
261
262    String ioEngineName = "file:" + testDir + "/bucket.cache";
263    String persistencePath = testDir + "/bucket.persistence";
264
265    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
266        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
267    long usedSize = bucketCache.getAllocator().getUsedSize();
268    assertEquals(0, usedSize);
269
270    HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
271    // Add blocks
272    for (HFileBlockPair block : blocks) {
273      bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
274    }
275    for (HFileBlockPair block : blocks) {
276      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
277    }
278    usedSize = bucketCache.getAllocator().getUsedSize();
279    assertNotEquals(0, usedSize);
280    // persist cache to file
281    bucketCache.shutdown();
282    assertTrue(new File(persistencePath).exists());
283
284    // restore cache from file
285    bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
286        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
287    assertFalse(new File(persistencePath).exists());
288    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
289    // persist cache to file
290    bucketCache.shutdown();
291    assertTrue(new File(persistencePath).exists());
292
293    // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
294    // so it can't restore cache from file
295    int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
296    bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
297        smallBucketSizes, writeThreads, writerQLen, persistencePath);
298    assertFalse(new File(persistencePath).exists());
299    assertEquals(0, bucketCache.getAllocator().getUsedSize());
300    assertEquals(0, bucketCache.backingMap.size());
301
302    TEST_UTIL.cleanupTestDir();
303  }
304
305  @Test
306  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
307    long availableSpace = 20 * 1024L * 1024 * 1024;
308    int[] bucketSizes = new int[]{1024, 1024 * 1024, 1024 * 1024 * 1024};
309    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
310    assertTrue(allocator.getBuckets().length > 0);
311  }
312
313  @Test
314  public void testGetPartitionSize() throws IOException {
315    //Test default values
316    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, BucketCache.DEFAULT_MIN_FACTOR);
317
318    Configuration conf = HBaseConfiguration.create();
319    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
320    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
321    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
322    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
323
324    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
325        constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
326
327    validateGetPartitionSize(cache, 0.1f, 0.5f);
328    validateGetPartitionSize(cache, 0.7f, 0.5f);
329    validateGetPartitionSize(cache, 0.2f, 0.5f);
330  }
331
332  @Test
333  public void testValidBucketCacheConfigs() throws IOException {
334    Configuration conf = HBaseConfiguration.create();
335    conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
336    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
337    conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
338    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
339    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
340    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
341
342    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
343        constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
344
345    assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
346        cache.getAcceptableFactor(), 0);
347    assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
348        cache.getMinFactor(), 0);
349    assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
350        cache.getExtraFreeFactor(), 0);
351    assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f,
352        cache.getSingleFactor(), 0);
353    assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f,
354        cache.getMultiFactor(), 0);
355    assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f,
356        cache.getMemoryFactor(), 0);
357  }
358
359  @Test
360  public void testInvalidAcceptFactorConfig() throws IOException {
361    float[] configValues = {-1f, 0.2f, 0.86f, 1.05f};
362    boolean[] expectedOutcomes = {false, false, true, false};
363    Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues);
364    Configuration conf = HBaseConfiguration.create();
365    checkConfigValues(conf, configMappings, expectedOutcomes);
366  }
367
368  @Test
369  public void testInvalidMinFactorConfig() throws IOException {
370    float[] configValues = {-1f, 0f, 0.96f, 1.05f};
371    //throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
372    boolean[] expectedOutcomes = {false, true, false, false};
373    Map<String, float[]> configMappings = ImmutableMap
374      .of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues);
375    Configuration conf = HBaseConfiguration.create();
376    checkConfigValues(conf, configMappings, expectedOutcomes);
377  }
378
379  @Test
380  public void testInvalidExtraFreeFactorConfig() throws IOException {
381    float[] configValues = {-1f, 0f, 0.2f, 1.05f};
382    //throws due to <0, in expected range, in expected range, config can be > 1.0
383    boolean[] expectedOutcomes = {false, true, true, true};
384    Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
385    Configuration conf = HBaseConfiguration.create();
386    checkConfigValues(conf, configMappings, expectedOutcomes);
387  }
388
389  @Test
390  public void testInvalidCacheSplitFactorConfig() throws IOException {
391    float[] singleFactorConfigValues = {0.2f, 0f, -0.2f, 1f};
392    float[] multiFactorConfigValues = {0.4f, 0f, 1f, .05f};
393    float[] memoryFactorConfigValues = {0.4f, 0f, 0.2f, .5f};
394    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't be negative, configs don't add to 1.0
395    boolean[] expectedOutcomes = {true, false, false, false};
396    Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME,
397        singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues,
398        BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues);
399    Configuration conf = HBaseConfiguration.create();
400    checkConfigValues(conf, configMappings, expectedOutcomes);
401  }
402
403  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, boolean[] expectSuccess) throws IOException {
404    Set<String> configNames = configMap.keySet();
405    for (int i = 0; i < expectSuccess.length; i++) {
406      try {
407        for (String configName : configNames) {
408          conf.setFloat(configName, configMap.get(configName)[i]);
409        }
410        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
411            constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
412        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
413      } catch (IllegalArgumentException e) {
414        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
415      }
416    }
417  }
418
419  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, float minFactor) {
420    long expectedOutput = (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
421    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
422  }
423
424  @Test
425  public void testOffsetProducesPositiveOutput() {
426    // This number is picked because it produces negative output if the values isn't ensured to be
427    // positive. See HBASE-18757 for more information.
428    long testValue = 549888460800L;
429    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true);
430    assertEquals(testValue, bucketEntry.offset());
431  }
432
433  @Test
434  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
435    int size = 100;
436    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
437    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
438    HFileContext meta = new HFileContextBuilder().build();
439    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
440    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
441        ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
442    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
443        ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
444
445    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
446    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
447    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
448    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
449    blockWithNextBlockMetadata.serialize(block1Buffer, true);
450    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
451
452    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
453    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
454      block1Buffer);
455
456    waitUntilFlushedToBucket(cache, key);
457    assertNotNull(cache.backingMap.get(key));
458    assertEquals(1, cache.backingMap.get(key).refCnt());
459    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
460    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
461
462    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
463    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
464      block1Buffer);
465    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
466    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
467    assertEquals(1, cache.backingMap.get(key).refCnt());
468
469    // Clear and add blockWithoutNextBlockMetadata
470    assertTrue(cache.evictBlock(key));
471    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
472    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
473
474    assertNull(cache.getBlock(key, false, false, false));
475    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
476      block2Buffer);
477
478    waitUntilFlushedToBucket(cache, key);
479    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
480    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
481
482    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
483    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
484      block1Buffer);
485
486    waitUntilFlushedToBucket(cache, key);
487    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
488    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
489  }
490
491  @Test
492  public void testRAMCache() {
493    int size = 100;
494    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
495    byte[] byteArr = new byte[length];
496    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
497    HFileContext meta = new HFileContextBuilder().build();
498
499    RAMCache cache = new RAMCache();
500    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
501    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
502    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
503        HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
504    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
505        HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
506    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
507    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
508
509    assertFalse(cache.containsKey(key1));
510    assertNull(cache.putIfAbsent(key1, re1));
511    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
512
513    assertNotNull(cache.putIfAbsent(key1, re2));
514    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
515    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
516
517    assertNull(cache.putIfAbsent(key2, re2));
518    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
519    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
520
521    cache.remove(key1);
522    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
523    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
524
525    cache.clear();
526    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
527    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
528  }
529
530  @Test
531  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
532    // initialize an block.
533    int size = 100, offset = 20;
534    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
535    ByteBuffer buf = ByteBuffer.allocate(length);
536    HFileContext meta = new HFileContextBuilder().build();
537    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
538        HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
539
540    // initialize an mocked ioengine.
541    IOEngine ioEngine = Mockito.mock(IOEngine.class);
542    Mockito.when(ioEngine.usesSharedMemory()).thenReturn(false);
543    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
544    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
545      Mockito.anyLong());
546    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
547      Mockito.anyLong());
548
549    // create an bucket allocator.
550    long availableSpace = 1024 * 1024 * 1024L;
551    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
552
553    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
554    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, ByteBuffAllocator.NONE);
555
556    Assert.assertEquals(0, allocator.getUsedSize());
557    try {
558      re.writeToCache(ioEngine, allocator, null);
559      Assert.fail();
560    } catch (Exception e) {
561    }
562    Assert.assertEquals(0, allocator.getUsedSize());
563  }
564}