001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.when;
027
028import java.io.File;
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.io.ByteBuffAllocator;
045import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
046import org.apache.hadoop.hbase.io.hfile.BlockType;
047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
048import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
049import org.apache.hadoop.hbase.io.hfile.Cacheable;
050import org.apache.hadoop.hbase.io.hfile.HFileBlock;
051import org.apache.hadoop.hbase.io.hfile.HFileContext;
052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
054import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
056import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
057import org.apache.hadoop.hbase.nio.ByteBuff;
058import org.apache.hadoop.hbase.testclassification.IOTests;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.Pair;
061import org.junit.After;
062import org.junit.Assert;
063import org.junit.Before;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.runner.RunWith;
068import org.junit.runners.Parameterized;
069import org.mockito.Mockito;
070
071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
072
073/**
074 * Basic test of BucketCache.Puts and gets.
075 * <p>
076 * Tests will ensure that blocks' data correctness under several threads concurrency
077 */
078@RunWith(Parameterized.class)
079@Category({ IOTests.class, LargeTests.class })
080public class TestBucketCache {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084    HBaseClassTestRule.forClass(TestBucketCache.class);
085
086  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
087  public static Iterable<Object[]> data() {
088    return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize
089                                                          // for these tests?
090      { 16 * 1024,
091        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
092          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
093          128 * 1024 + 1024 } } });
094  }
095
096  @Parameterized.Parameter(0)
097  public int constructedBlockSize;
098
099  @Parameterized.Parameter(1)
100  public int[] constructedBlockSizes;
101
102  BucketCache cache;
103  final int CACHE_SIZE = 1000000;
104  final int NUM_BLOCKS = 100;
105  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
106  final int NUM_THREADS = 100;
107  final int NUM_QUERIES = 10000;
108
109  final long capacitySize = 32 * 1024 * 1024;
110  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
111  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
112  String ioEngineName = "offheap";
113  String persistencePath = null;
114
115  private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
116
117  private static class MockedBucketCache extends BucketCache {
118
119    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
120      int writerThreads, int writerQLen, String persistencePath) throws IOException {
121      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
122        persistencePath);
123      super.wait_when_cache = true;
124    }
125
126    @Override
127    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
128      super.cacheBlock(cacheKey, buf, inMemory);
129    }
130
131    @Override
132    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
133      super.cacheBlock(cacheKey, buf);
134    }
135  }
136
137  @Before
138  public void setup() throws IOException {
139    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
140      constructedBlockSizes, writeThreads, writerQLen, persistencePath);
141  }
142
143  @After
144  public void tearDown() {
145    cache.shutdown();
146  }
147
148  /**
149   * Test Utility to create test dir and return name
150   * @return return name of created dir
151   * @throws IOException throws IOException
152   */
153  private Path createAndGetTestDir() throws IOException {
154    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
155    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
156    return testDir;
157  }
158
159  /**
160   * Return a random element from {@code a}.
161   */
162  private static <T> T randFrom(List<T> a) {
163    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
164  }
165
166  @Test
167  public void testBucketAllocator() throws BucketAllocatorException {
168    BucketAllocator mAllocator = cache.getAllocator();
169    /*
170     * Test the allocator first
171     */
172    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
173
174    boolean full = false;
175    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
176    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
177    // the cache is completely filled.
178    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
179    while (!full) {
180      Integer blockSize = null;
181      try {
182        blockSize = randFrom(tmp);
183        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
184      } catch (CacheFullException cfe) {
185        tmp.remove(blockSize);
186        if (tmp.isEmpty()) full = true;
187      }
188    }
189
190    for (Integer blockSize : BLOCKSIZES) {
191      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
192      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
193      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
194
195      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
196      // additional 1024 on top of that so this counts towards fragmentation in our test
197      // real life may have worse fragmentation because blocks may not be perfectly sized to block
198      // size, given encoding/compression and large rows
199      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
200    }
201
202    mAllocator.logDebugStatistics();
203
204    for (Pair<Long, Integer> allocation : allocations) {
205      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
206        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
207    }
208    assertEquals(0, mAllocator.getUsedSize());
209  }
210
211  @Test
212  public void testCacheSimple() throws Exception {
213    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
214  }
215
216  @Test
217  public void testCacheMultiThreadedSingleKey() throws Exception {
218    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
219  }
220
221  @Test
222  public void testHeapSizeChanges() throws Exception {
223    cache.stopWriterThreads();
224    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
225  }
226
227  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
228    throws InterruptedException {
229    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
230      Thread.sleep(100);
231    }
232    Thread.sleep(1000);
233  }
234
235  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
236    while (!cache.ramCache.isEmpty()) {
237      Thread.sleep(100);
238    }
239    Thread.sleep(1000);
240  }
241
242  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
243  // threads will flush it to the bucket and put reference entry in backingMap.
244  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
245    Cacheable block) throws InterruptedException {
246    cache.cacheBlock(cacheKey, block);
247    waitUntilFlushedToBucket(cache, cacheKey);
248  }
249
250  @Test
251  public void testMemoryLeak() throws Exception {
252    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
253    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
254      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
255    long lockId = cache.backingMap.get(cacheKey).offset();
256    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
257    lock.writeLock().lock();
258    Thread evictThread = new Thread("evict-block") {
259      @Override
260      public void run() {
261        cache.evictBlock(cacheKey);
262      }
263    };
264    evictThread.start();
265    cache.offsetLock.waitForWaiters(lockId, 1);
266    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
267    assertEquals(0, cache.getBlockCount());
268    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
269      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
270    assertEquals(1, cache.getBlockCount());
271    lock.writeLock().unlock();
272    evictThread.join();
273    /**
274     * <pre>
275     * The asserts here before HBASE-21957 are:
276     * assertEquals(1L, cache.getBlockCount());
277     * assertTrue(cache.getCurrentSize() > 0L);
278     * assertTrue("We should have a block!", cache.iterator().hasNext());
279     *
280     * The asserts here after HBASE-21957 are:
281     * assertEquals(0, cache.getBlockCount());
282     * assertEquals(cache.getCurrentSize(), 0L);
283     *
284     * I think the asserts before HBASE-21957 is more reasonable,because
285     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
286     * it had seen, and newly added Block after the {@link BucketEntry}
287     * it had seen should not be evicted.
288     * </pre>
289     */
290    assertEquals(1L, cache.getBlockCount());
291    assertTrue(cache.getCurrentSize() > 0L);
292    assertTrue("We should have a block!", cache.iterator().hasNext());
293  }
294
295  @Test
296  public void testRetrieveFromFile() throws Exception {
297    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
298    Path testDir = TEST_UTIL.getDataTestDir();
299    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
300
301    String ioEngineName = "file:" + testDir + "/bucket.cache";
302    String persistencePath = testDir + "/bucket.persistence";
303
304    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
305      constructedBlockSizes, writeThreads, writerQLen, persistencePath);
306    long usedSize = bucketCache.getAllocator().getUsedSize();
307    assertEquals(0, usedSize);
308
309    HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
310    // Add blocks
311    for (HFileBlockPair block : blocks) {
312      bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
313    }
314    for (HFileBlockPair block : blocks) {
315      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
316    }
317    usedSize = bucketCache.getAllocator().getUsedSize();
318    assertNotEquals(0, usedSize);
319    // persist cache to file
320    bucketCache.shutdown();
321    assertTrue(new File(persistencePath).exists());
322
323    // restore cache from file
324    bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
325      constructedBlockSizes, writeThreads, writerQLen, persistencePath);
326    assertFalse(new File(persistencePath).exists());
327    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
328    // persist cache to file
329    bucketCache.shutdown();
330    assertTrue(new File(persistencePath).exists());
331
332    // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
333    // so it can't restore cache from file
334    int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
335    bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
336      smallBucketSizes, writeThreads, writerQLen, persistencePath);
337    assertFalse(new File(persistencePath).exists());
338    assertEquals(0, bucketCache.getAllocator().getUsedSize());
339    assertEquals(0, bucketCache.backingMap.size());
340
341    TEST_UTIL.cleanupTestDir();
342  }
343
344  @Test
345  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
346    long availableSpace = 20 * 1024L * 1024 * 1024;
347    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
348    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
349    assertTrue(allocator.getBuckets().length > 0);
350  }
351
352  @Test
353  public void testGetPartitionSize() throws IOException {
354    // Test default values
355    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
356      BucketCache.DEFAULT_MIN_FACTOR);
357
358    Configuration conf = HBaseConfiguration.create();
359    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
360    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
361    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
362    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
363
364    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
365      constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
366
367    validateGetPartitionSize(cache, 0.1f, 0.5f);
368    validateGetPartitionSize(cache, 0.7f, 0.5f);
369    validateGetPartitionSize(cache, 0.2f, 0.5f);
370  }
371
372  @Test
373  public void testValidBucketCacheConfigs() throws IOException {
374    Configuration conf = HBaseConfiguration.create();
375    conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
376    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
377    conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
378    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
379    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
380    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
381
382    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
383      constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
384
385    assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
386      cache.getAcceptableFactor(), 0);
387    assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
388      cache.getMinFactor(), 0);
389    assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
390      cache.getExtraFreeFactor(), 0);
391    assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f,
392      cache.getSingleFactor(), 0);
393    assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f,
394      cache.getMultiFactor(), 0);
395    assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f,
396      cache.getMemoryFactor(), 0);
397  }
398
399  @Test
400  public void testInvalidAcceptFactorConfig() throws IOException {
401    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
402    boolean[] expectedOutcomes = { false, false, true, false };
403    Map<String, float[]> configMappings =
404      ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues);
405    Configuration conf = HBaseConfiguration.create();
406    checkConfigValues(conf, configMappings, expectedOutcomes);
407  }
408
409  @Test
410  public void testInvalidMinFactorConfig() throws IOException {
411    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
412    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
413    boolean[] expectedOutcomes = { false, true, false, false };
414    Map<String, float[]> configMappings =
415      ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues);
416    Configuration conf = HBaseConfiguration.create();
417    checkConfigValues(conf, configMappings, expectedOutcomes);
418  }
419
420  @Test
421  public void testInvalidExtraFreeFactorConfig() throws IOException {
422    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
423    // throws due to <0, in expected range, in expected range, config can be > 1.0
424    boolean[] expectedOutcomes = { false, true, true, true };
425    Map<String, float[]> configMappings =
426      ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
427    Configuration conf = HBaseConfiguration.create();
428    checkConfigValues(conf, configMappings, expectedOutcomes);
429  }
430
431  @Test
432  public void testInvalidCacheSplitFactorConfig() throws IOException {
433    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
434    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
435    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
436    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
437    // be negative, configs don't add to 1.0
438    boolean[] expectedOutcomes = { true, false, false, false };
439    Map<String,
440      float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME,
441        singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues,
442        BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues);
443    Configuration conf = HBaseConfiguration.create();
444    checkConfigValues(conf, configMappings, expectedOutcomes);
445  }
446
447  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
448    boolean[] expectSuccess) throws IOException {
449    Set<String> configNames = configMap.keySet();
450    for (int i = 0; i < expectSuccess.length; i++) {
451      try {
452        for (String configName : configNames) {
453          conf.setFloat(configName, configMap.get(configName)[i]);
454        }
455        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
456          constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
457        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i]
458          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
459      } catch (IllegalArgumentException e) {
460        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i]
461          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
462      }
463    }
464  }
465
466  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
467    float minFactor) {
468    long expectedOutput =
469      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
470    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
471  }
472
473  @Test
474  public void testOffsetProducesPositiveOutput() {
475    // This number is picked because it produces negative output if the values isn't ensured to be
476    // positive. See HBASE-18757 for more information.
477    long testValue = 549888460800L;
478    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true, (entry) -> {
479      return ByteBuffAllocator.NONE;
480    }, ByteBuffAllocator.HEAP);
481    assertEquals(testValue, bucketEntry.offset());
482  }
483
484  @Test
485  public void testEvictionCount() throws InterruptedException {
486    int size = 100;
487    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
488    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
489    HFileContext meta = new HFileContextBuilder().build();
490    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
491    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
492      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
493    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
494      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
495
496    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
497    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
498    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
499    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
500    blockWithNextBlockMetadata.serialize(block1Buffer, true);
501    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
502
503    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
504    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
505      block1Buffer);
506
507    waitUntilFlushedToBucket(cache, key);
508
509    assertEquals(0, cache.getStats().getEvictionCount());
510
511    // evict call should return 1, but then eviction count be 0
512    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
513    assertEquals(0, cache.getStats().getEvictionCount());
514
515    // add back
516    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
517      block1Buffer);
518    waitUntilFlushedToBucket(cache, key);
519
520    // should not increment
521    assertTrue(cache.evictBlock(key));
522    assertEquals(0, cache.getStats().getEvictionCount());
523
524    // add back
525    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
526      block1Buffer);
527    waitUntilFlushedToBucket(cache, key);
528
529    // should finally increment eviction count
530    cache.freeSpace("testing");
531    assertEquals(1, cache.getStats().getEvictionCount());
532  }
533
534  @Test
535  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
536    int size = 100;
537    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
538    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
539    HFileContext meta = new HFileContextBuilder().build();
540    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
541    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
542      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
543    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
544      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
545
546    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
547    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
548    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
549    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
550    blockWithNextBlockMetadata.serialize(block1Buffer, true);
551    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
552
553    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
554    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
555      block1Buffer);
556
557    waitUntilFlushedToBucket(cache, key);
558    assertNotNull(cache.backingMap.get(key));
559    assertEquals(1, cache.backingMap.get(key).refCnt());
560    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
561    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
562
563    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
564    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
565      block1Buffer);
566    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
567    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
568    assertEquals(1, cache.backingMap.get(key).refCnt());
569
570    // Clear and add blockWithoutNextBlockMetadata
571    assertTrue(cache.evictBlock(key));
572    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
573    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
574
575    assertNull(cache.getBlock(key, false, false, false));
576    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
577      block2Buffer);
578
579    waitUntilFlushedToBucket(cache, key);
580    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
581    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
582
583    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
584    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
585      block1Buffer);
586
587    waitUntilFlushedToBucket(cache, key);
588    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
589    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
590  }
591
592  @Test
593  public void testRAMCache() {
594    int size = 100;
595    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
596    byte[] byteArr = new byte[length];
597    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
598    HFileContext meta = new HFileContextBuilder().build();
599
600    RAMCache cache = new RAMCache();
601    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
602    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
603    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
604      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
605    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
606      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
607    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
608    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
609
610    assertFalse(cache.containsKey(key1));
611    assertNull(cache.putIfAbsent(key1, re1));
612    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
613
614    assertNotNull(cache.putIfAbsent(key1, re2));
615    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
616    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
617
618    assertNull(cache.putIfAbsent(key2, re2));
619    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
620    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
621
622    cache.remove(key1);
623    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
624    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
625
626    cache.clear();
627    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
628    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
629  }
630
631  @Test
632  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
633    // initialize an block.
634    int size = 100, offset = 20;
635    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
636    ByteBuffer buf = ByteBuffer.allocate(length);
637    HFileContext meta = new HFileContextBuilder().build();
638    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
639      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
640
641    // initialize an mocked ioengine.
642    IOEngine ioEngine = Mockito.mock(IOEngine.class);
643    when(ioEngine.usesSharedMemory()).thenReturn(false);
644    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
645    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
646      Mockito.anyLong());
647    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
648      Mockito.anyLong());
649
650    // create an bucket allocator.
651    long availableSpace = 1024 * 1024 * 1024L;
652    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
653
654    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
655    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
656
657    Assert.assertEquals(0, allocator.getUsedSize());
658    try {
659      re.writeToCache(ioEngine, allocator, null, null,
660        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
661      Assert.fail();
662    } catch (Exception e) {
663    }
664    Assert.assertEquals(0, allocator.getUsedSize());
665  }
666
667  /**
668   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
669   * could not be freed even if corresponding {@link HFileBlock} is evicted from
670   * {@link BucketCache}.
671   */
672  @Test
673  public void testFreeBucketEntryRestoredFromFile() throws Exception {
674    try {
675      final Path dataTestDir = createAndGetTestDir();
676
677      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
678      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
679
680      BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
681        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
682      long usedByteSize = bucketCache.getAllocator().getUsedSize();
683      assertEquals(0, usedByteSize);
684
685      HFileBlockPair[] hfileBlockPairs =
686        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
687      // Add blocks
688      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
689        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
690      }
691
692      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
693        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
694          hfileBlockPair.getBlock());
695      }
696      usedByteSize = bucketCache.getAllocator().getUsedSize();
697      assertNotEquals(0, usedByteSize);
698      // persist cache to file
699      bucketCache.shutdown();
700      assertTrue(new File(persistencePath).exists());
701
702      // restore cache from file
703      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
704        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
705      assertFalse(new File(persistencePath).exists());
706      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
707
708      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
709        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
710        bucketCache.evictBlock(blockCacheKey);
711      }
712      assertEquals(0, bucketCache.getAllocator().getUsedSize());
713      assertEquals(0, bucketCache.backingMap.size());
714    } finally {
715      HBASE_TESTING_UTILITY.cleanupTestDir();
716    }
717  }
718
719}