001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.when;
027
028import java.io.File;
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.io.ByteBuffAllocator;
045import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
046import org.apache.hadoop.hbase.io.hfile.BlockType;
047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
048import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
049import org.apache.hadoop.hbase.io.hfile.Cacheable;
050import org.apache.hadoop.hbase.io.hfile.HFileBlock;
051import org.apache.hadoop.hbase.io.hfile.HFileContext;
052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
054import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
056import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
057import org.apache.hadoop.hbase.nio.ByteBuff;
058import org.apache.hadoop.hbase.testclassification.IOTests;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.Threads;
063import org.junit.After;
064import org.junit.Assert;
065import org.junit.Before;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.runner.RunWith;
070import org.junit.runners.Parameterized;
071import org.mockito.Mockito;
072
073import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
074
075/**
076 * Basic test of BucketCache.Puts and gets.
077 * <p>
078 * Tests will ensure that blocks' data correctness under several threads concurrency
079 */
080@RunWith(Parameterized.class)
081@Category({ IOTests.class, LargeTests.class })
082public class TestBucketCache {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086    HBaseClassTestRule.forClass(TestBucketCache.class);
087
088  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
089  public static Iterable<Object[]> data() {
090    return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize
091                                                          // for these tests?
092      { 16 * 1024,
093        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
094          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
095          128 * 1024 + 1024 } } });
096  }
097
098  @Parameterized.Parameter(0)
099  public int constructedBlockSize;
100
101  @Parameterized.Parameter(1)
102  public int[] constructedBlockSizes;
103
104  BucketCache cache;
105  final int CACHE_SIZE = 1000000;
106  final int NUM_BLOCKS = 100;
107  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
108  final int NUM_THREADS = 100;
109  final int NUM_QUERIES = 10000;
110
111  final long capacitySize = 32 * 1024 * 1024;
112  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
113  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
114  private String ioEngineName = "offheap";
115
116  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
117
118  private static class MockedBucketCache extends BucketCache {
119
120    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
121      int writerThreads, int writerQLen, String persistencePath) throws IOException {
122      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
123        persistencePath);
124    }
125
126    @Override
127    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
128      super.cacheBlock(cacheKey, buf, inMemory);
129    }
130
131    @Override
132    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
133      super.cacheBlock(cacheKey, buf);
134    }
135  }
136
137  @Before
138  public void setup() throws IOException {
139    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
140      constructedBlockSizes, writeThreads, writerQLen, null);
141  }
142
143  @After
144  public void tearDown() {
145    cache.shutdown();
146  }
147
148  /**
149   * Test Utility to create test dir and return name
150   * @return return name of created dir
151   * @throws IOException throws IOException
152   */
153  private Path createAndGetTestDir() throws IOException {
154    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
155    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
156    return testDir;
157  }
158
159  /**
160   * Return a random element from {@code a}.
161   */
162  private static <T> T randFrom(List<T> a) {
163    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
164  }
165
166  @Test
167  public void testBucketAllocator() throws BucketAllocatorException {
168    BucketAllocator mAllocator = cache.getAllocator();
169    /*
170     * Test the allocator first
171     */
172    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
173
174    boolean full = false;
175    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
176    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
177    // the cache is completely filled.
178    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
179    while (!full) {
180      Integer blockSize = null;
181      try {
182        blockSize = randFrom(tmp);
183        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
184      } catch (CacheFullException cfe) {
185        tmp.remove(blockSize);
186        if (tmp.isEmpty()) full = true;
187      }
188    }
189
190    for (Integer blockSize : BLOCKSIZES) {
191      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
192      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
193      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
194
195      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
196      // additional 1024 on top of that so this counts towards fragmentation in our test
197      // real life may have worse fragmentation because blocks may not be perfectly sized to block
198      // size, given encoding/compression and large rows
199      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
200    }
201
202    mAllocator.logDebugStatistics();
203
204    for (Pair<Long, Integer> allocation : allocations) {
205      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
206        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
207    }
208    assertEquals(0, mAllocator.getUsedSize());
209  }
210
211  @Test
212  public void testCacheSimple() throws Exception {
213    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
214  }
215
216  @Test
217  public void testCacheMultiThreadedSingleKey() throws Exception {
218    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
219  }
220
221  @Test
222  public void testHeapSizeChanges() throws Exception {
223    cache.stopWriterThreads();
224    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
225  }
226
227  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
228    throws InterruptedException {
229    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
230      Thread.sleep(100);
231    }
232    Thread.sleep(1000);
233  }
234
235  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
236    while (!cache.ramCache.isEmpty()) {
237      Thread.sleep(100);
238    }
239    Thread.sleep(1000);
240  }
241
242  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
243  // threads will flush it to the bucket and put reference entry in backingMap.
244  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
245    Cacheable block, boolean waitWhenCache) throws InterruptedException {
246    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
247    waitUntilFlushedToBucket(cache, cacheKey);
248  }
249
250  @Test
251  public void testMemoryLeak() throws Exception {
252    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
253    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
254      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
255    long lockId = cache.backingMap.get(cacheKey).offset();
256    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
257    lock.writeLock().lock();
258    Thread evictThread = new Thread("evict-block") {
259      @Override
260      public void run() {
261        cache.evictBlock(cacheKey);
262      }
263    };
264    evictThread.start();
265    cache.offsetLock.waitForWaiters(lockId, 1);
266    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
267    assertEquals(0, cache.getBlockCount());
268    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
269      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
270    assertEquals(1, cache.getBlockCount());
271    lock.writeLock().unlock();
272    evictThread.join();
273    /**
274     * <pre>
275     * The asserts here before HBASE-21957 are:
276     * assertEquals(1L, cache.getBlockCount());
277     * assertTrue(cache.getCurrentSize() > 0L);
278     * assertTrue("We should have a block!", cache.iterator().hasNext());
279     *
280     * The asserts here after HBASE-21957 are:
281     * assertEquals(0, cache.getBlockCount());
282     * assertEquals(cache.getCurrentSize(), 0L);
283     *
284     * I think the asserts before HBASE-21957 is more reasonable,because
285     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
286     * it had seen, and newly added Block after the {@link BucketEntry}
287     * it had seen should not be evicted.
288     * </pre>
289     */
290    assertEquals(1L, cache.getBlockCount());
291    assertTrue(cache.getCurrentSize() > 0L);
292    assertTrue("We should have a block!", cache.iterator().hasNext());
293  }
294
295  @Test
296  public void testRetrieveFromFile() throws Exception {
297    Path testDir = createAndGetTestDir();
298    String ioEngineName = "file:" + testDir + "/bucket.cache";
299    testRetrievalUtils(testDir, ioEngineName);
300    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
301    String persistencePath = testDir + "/bucket.persistence";
302    BucketCache bucketCache = null;
303    try {
304      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
305        smallBucketSizes, writeThreads, writerQLen, persistencePath);
306      assertFalse(new File(persistencePath).exists());
307      assertEquals(0, bucketCache.getAllocator().getUsedSize());
308      assertEquals(0, bucketCache.backingMap.size());
309    } finally {
310      bucketCache.shutdown();
311      HBASE_TESTING_UTILITY.cleanupTestDir();
312    }
313  }
314
315  @Test
316  public void testRetrieveFromMMap() throws Exception {
317    final Path testDir = createAndGetTestDir();
318    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
319    testRetrievalUtils(testDir, ioEngineName);
320  }
321
322  @Test
323  public void testRetrieveFromPMem() throws Exception {
324    final Path testDir = createAndGetTestDir();
325    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
326    testRetrievalUtils(testDir, ioEngineName);
327    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
328    String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
329    BucketCache bucketCache = null;
330    try {
331      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
332        smallBucketSizes, writeThreads, writerQLen, persistencePath);
333      assertFalse(new File(persistencePath).exists());
334      assertEquals(0, bucketCache.getAllocator().getUsedSize());
335      assertEquals(0, bucketCache.backingMap.size());
336    } finally {
337      bucketCache.shutdown();
338      HBASE_TESTING_UTILITY.cleanupTestDir();
339    }
340  }
341
342  private void testRetrievalUtils(Path testDir, String ioEngineName)
343    throws IOException, InterruptedException {
344    final String persistencePath =
345      testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
346    BucketCache bucketCache = null;
347    try {
348      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
349        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
350      long usedSize = bucketCache.getAllocator().getUsedSize();
351      assertEquals(0, usedSize);
352      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
353      for (HFileBlockPair block : blocks) {
354        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
355      }
356      for (HFileBlockPair block : blocks) {
357        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
358          false);
359      }
360      usedSize = bucketCache.getAllocator().getUsedSize();
361      assertNotEquals(0, usedSize);
362      bucketCache.shutdown();
363      assertTrue(new File(persistencePath).exists());
364      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
365        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
366      assertFalse(new File(persistencePath).exists());
367      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
368    } finally {
369      if (bucketCache != null) {
370        bucketCache.shutdown();
371      }
372    }
373    assertTrue(new File(persistencePath).exists());
374  }
375
376  @Test
377  public void testRetrieveUnsupportedIOE() throws Exception {
378    try {
379      final Path testDir = createAndGetTestDir();
380      final String ioEngineName = testDir + "/bucket.cache";
381      testRetrievalUtils(testDir, ioEngineName);
382      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
383    } catch (IllegalArgumentException e) {
384      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, "
385        + "files:, mmap: or offheap", e.getMessage());
386    }
387  }
388
389  @Test
390  public void testRetrieveFromMultipleFiles() throws Exception {
391    final Path testDirInitial = createAndGetTestDir();
392    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
393    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
394    String ioEngineName =
395      new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache")
396        .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString();
397    testRetrievalUtils(testDirInitial, ioEngineName);
398    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
399    String persistencePath = testDirInitial + "/bucket.persistence";
400    BucketCache bucketCache = null;
401    try {
402      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
403        smallBucketSizes, writeThreads, writerQLen, persistencePath);
404      assertFalse(new File(persistencePath).exists());
405      assertEquals(0, bucketCache.getAllocator().getUsedSize());
406      assertEquals(0, bucketCache.backingMap.size());
407    } finally {
408      bucketCache.shutdown();
409      HBASE_TESTING_UTILITY.cleanupTestDir();
410    }
411  }
412
413  @Test
414  public void testRetrieveFromFileWithoutPersistence() throws Exception {
415    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
416      constructedBlockSizes, writeThreads, writerQLen, null);
417    try {
418      final Path testDir = createAndGetTestDir();
419      String ioEngineName = "file:" + testDir + "/bucket.cache";
420      long usedSize = bucketCache.getAllocator().getUsedSize();
421      assertEquals(0, usedSize);
422      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
423      for (HFileBlockPair block : blocks) {
424        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
425      }
426      for (HFileBlockPair block : blocks) {
427        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
428          false);
429      }
430      usedSize = bucketCache.getAllocator().getUsedSize();
431      assertNotEquals(0, usedSize);
432      bucketCache.shutdown();
433      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
434        constructedBlockSizes, writeThreads, writerQLen, null);
435      assertEquals(0, bucketCache.getAllocator().getUsedSize());
436    } finally {
437      bucketCache.shutdown();
438      HBASE_TESTING_UTILITY.cleanupTestDir();
439    }
440  }
441
442  @Test
443  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
444    long availableSpace = 20 * 1024L * 1024 * 1024;
445    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
446    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
447    assertTrue(allocator.getBuckets().length > 0);
448  }
449
450  @Test
451  public void testGetPartitionSize() throws IOException {
452    // Test default values
453    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
454      BucketCache.DEFAULT_MIN_FACTOR);
455
456    Configuration conf = HBaseConfiguration.create();
457    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
458    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
459    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
460    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
461
462    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
463      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
464
465    validateGetPartitionSize(cache, 0.1f, 0.5f);
466    validateGetPartitionSize(cache, 0.7f, 0.5f);
467    validateGetPartitionSize(cache, 0.2f, 0.5f);
468  }
469
470  @Test
471  public void testCacheSizeCapacity() throws IOException {
472    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
473    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
474      BucketCache.DEFAULT_MIN_FACTOR);
475    Configuration conf = HBaseConfiguration.create();
476    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
477    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
478    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
479    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
480    try {
481      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
482        writerQLen, null, 100, conf);
483      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
484    } catch (IllegalArgumentException e) {
485      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
486    }
487  }
488
489  @Test
490  public void testValidBucketCacheConfigs() throws IOException {
491    Configuration conf = HBaseConfiguration.create();
492    conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
493    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
494    conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
495    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
496    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
497    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
498
499    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
500      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
501
502    assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
503      cache.getAcceptableFactor(), 0);
504    assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
505      cache.getMinFactor(), 0);
506    assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
507      cache.getExtraFreeFactor(), 0);
508    assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f,
509      cache.getSingleFactor(), 0);
510    assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f,
511      cache.getMultiFactor(), 0);
512    assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f,
513      cache.getMemoryFactor(), 0);
514  }
515
516  @Test
517  public void testInvalidAcceptFactorConfig() throws IOException {
518    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
519    boolean[] expectedOutcomes = { false, false, true, false };
520    Map<String, float[]> configMappings =
521      ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues);
522    Configuration conf = HBaseConfiguration.create();
523    checkConfigValues(conf, configMappings, expectedOutcomes);
524  }
525
526  @Test
527  public void testInvalidMinFactorConfig() throws IOException {
528    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
529    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
530    boolean[] expectedOutcomes = { false, true, false, false };
531    Map<String, float[]> configMappings =
532      ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues);
533    Configuration conf = HBaseConfiguration.create();
534    checkConfigValues(conf, configMappings, expectedOutcomes);
535  }
536
537  @Test
538  public void testInvalidExtraFreeFactorConfig() throws IOException {
539    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
540    // throws due to <0, in expected range, in expected range, config can be > 1.0
541    boolean[] expectedOutcomes = { false, true, true, true };
542    Map<String, float[]> configMappings =
543      ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
544    Configuration conf = HBaseConfiguration.create();
545    checkConfigValues(conf, configMappings, expectedOutcomes);
546  }
547
548  @Test
549  public void testInvalidCacheSplitFactorConfig() throws IOException {
550    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
551    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
552    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
553    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
554    // be negative, configs don't add to 1.0
555    boolean[] expectedOutcomes = { true, false, false, false };
556    Map<String,
557      float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME,
558        singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues,
559        BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues);
560    Configuration conf = HBaseConfiguration.create();
561    checkConfigValues(conf, configMappings, expectedOutcomes);
562  }
563
564  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
565    boolean[] expectSuccess) throws IOException {
566    Set<String> configNames = configMap.keySet();
567    for (int i = 0; i < expectSuccess.length; i++) {
568      try {
569        for (String configName : configNames) {
570          conf.setFloat(configName, configMap.get(configName)[i]);
571        }
572        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
573          constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
574        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i]
575          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
576      } catch (IllegalArgumentException e) {
577        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i]
578          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
579      }
580    }
581  }
582
583  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
584    float minFactor) {
585    long expectedOutput =
586      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
587    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
588  }
589
590  @Test
591  public void testOffsetProducesPositiveOutput() {
592    // This number is picked because it produces negative output if the values isn't ensured to be
593    // positive. See HBASE-18757 for more information.
594    long testValue = 549888460800L;
595    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> {
596      return ByteBuffAllocator.NONE;
597    }, ByteBuffAllocator.HEAP);
598    assertEquals(testValue, bucketEntry.offset());
599  }
600
601  @Test
602  public void testEvictionCount() throws InterruptedException {
603    int size = 100;
604    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
605    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
606    HFileContext meta = new HFileContextBuilder().build();
607    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
608    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
609      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
610    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
611      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
612
613    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
614    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
615    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
616    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
617    blockWithNextBlockMetadata.serialize(block1Buffer, true);
618    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
619
620    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
621    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
622      block1Buffer);
623
624    waitUntilFlushedToBucket(cache, key);
625
626    assertEquals(0, cache.getStats().getEvictionCount());
627
628    // evict call should return 1, but then eviction count be 0
629    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
630    assertEquals(0, cache.getStats().getEvictionCount());
631
632    // add back
633    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
634      block1Buffer);
635    waitUntilFlushedToBucket(cache, key);
636
637    // should not increment
638    assertTrue(cache.evictBlock(key));
639    assertEquals(0, cache.getStats().getEvictionCount());
640
641    // add back
642    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
643      block1Buffer);
644    waitUntilFlushedToBucket(cache, key);
645
646    // should finally increment eviction count
647    cache.freeSpace("testing");
648    assertEquals(1, cache.getStats().getEvictionCount());
649  }
650
651  @Test
652  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
653    int size = 100;
654    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
655    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
656    HFileContext meta = new HFileContextBuilder().build();
657    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
658    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
659      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
660    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
661      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
662
663    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
664    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
665    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
666    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
667    blockWithNextBlockMetadata.serialize(block1Buffer, true);
668    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
669
670    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
671    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
672      block1Buffer);
673
674    waitUntilFlushedToBucket(cache, key);
675    assertNotNull(cache.backingMap.get(key));
676    assertEquals(1, cache.backingMap.get(key).refCnt());
677    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
678    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
679
680    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
681    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
682      block1Buffer);
683    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
684    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
685    assertEquals(1, cache.backingMap.get(key).refCnt());
686
687    // Clear and add blockWithoutNextBlockMetadata
688    assertTrue(cache.evictBlock(key));
689    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
690    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
691
692    assertNull(cache.getBlock(key, false, false, false));
693    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
694      block2Buffer);
695
696    waitUntilFlushedToBucket(cache, key);
697    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
698    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
699
700    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
701    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
702      block1Buffer);
703
704    waitUntilFlushedToBucket(cache, key);
705    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
706    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
707  }
708
709  @Test
710  public void testRAMCache() {
711    int size = 100;
712    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
713    byte[] byteArr = new byte[length];
714    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
715    HFileContext meta = new HFileContextBuilder().build();
716
717    RAMCache cache = new RAMCache();
718    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
719    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
720    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
721      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
722    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
723      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
724    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false);
725    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false);
726
727    assertFalse(cache.containsKey(key1));
728    assertNull(cache.putIfAbsent(key1, re1));
729    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
730
731    assertNotNull(cache.putIfAbsent(key1, re2));
732    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
733    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
734
735    assertNull(cache.putIfAbsent(key2, re2));
736    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
737    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
738
739    cache.remove(key1);
740    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
741    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
742
743    cache.clear();
744    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
745    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
746  }
747
748  @Test
749  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
750    // initialize an block.
751    int size = 100, offset = 20;
752    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
753    ByteBuffer buf = ByteBuffer.allocate(length);
754    HFileContext meta = new HFileContextBuilder().build();
755    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
756      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
757
758    // initialize an mocked ioengine.
759    IOEngine ioEngine = Mockito.mock(IOEngine.class);
760    when(ioEngine.usesSharedMemory()).thenReturn(false);
761    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
762    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
763      Mockito.anyLong());
764    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
765      Mockito.anyLong());
766
767    // create an bucket allocator.
768    long availableSpace = 1024 * 1024 * 1024L;
769    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
770
771    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
772    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false);
773
774    Assert.assertEquals(0, allocator.getUsedSize());
775    try {
776      re.writeToCache(ioEngine, allocator, null, null,
777        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
778      Assert.fail();
779    } catch (Exception e) {
780    }
781    Assert.assertEquals(0, allocator.getUsedSize());
782  }
783
784  /**
785   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
786   * could not be freed even if corresponding {@link HFileBlock} is evicted from
787   * {@link BucketCache}.
788   */
789  @Test
790  public void testFreeBucketEntryRestoredFromFile() throws Exception {
791    BucketCache bucketCache = null;
792    try {
793      final Path dataTestDir = createAndGetTestDir();
794
795      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
796      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
797
798      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
799        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
800      long usedByteSize = bucketCache.getAllocator().getUsedSize();
801      assertEquals(0, usedByteSize);
802
803      HFileBlockPair[] hfileBlockPairs =
804        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
805      // Add blocks
806      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
807        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
808      }
809
810      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
811        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
812          hfileBlockPair.getBlock(), false);
813      }
814      usedByteSize = bucketCache.getAllocator().getUsedSize();
815      assertNotEquals(0, usedByteSize);
816      // persist cache to file
817      bucketCache.shutdown();
818      assertTrue(new File(persistencePath).exists());
819
820      // restore cache from file
821      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
822        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
823      assertFalse(new File(persistencePath).exists());
824      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
825
826      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
827        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
828        bucketCache.evictBlock(blockCacheKey);
829      }
830      assertEquals(0, bucketCache.getAllocator().getUsedSize());
831      assertEquals(0, bucketCache.backingMap.size());
832    } finally {
833      bucketCache.shutdown();
834      HBASE_TESTING_UTILITY.cleanupTestDir();
835    }
836  }
837
838  @Test
839  public void testBlockAdditionWaitWhenCache() throws Exception {
840    BucketCache bucketCache = null;
841    try {
842      final Path dataTestDir = createAndGetTestDir();
843
844      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
845      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
846
847      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
848        constructedBlockSizes, 1, 1, persistencePath);
849      long usedByteSize = bucketCache.getAllocator().getUsedSize();
850      assertEquals(0, usedByteSize);
851
852      HFileBlockPair[] hfileBlockPairs =
853        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
854      // Add blocks
855      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
856        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
857          true);
858      }
859
860      // Max wait for 10 seconds.
861      long timeout = 10000;
862      // Wait for blocks size to match the number of blocks.
863      while (bucketCache.backingMap.size() != 10) {
864        if (timeout <= 0) break;
865        Threads.sleep(100);
866        timeout -= 100;
867      }
868      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
869        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
870      }
871      usedByteSize = bucketCache.getAllocator().getUsedSize();
872      assertNotEquals(0, usedByteSize);
873      // persist cache to file
874      bucketCache.shutdown();
875      assertTrue(new File(persistencePath).exists());
876
877      // restore cache from file
878      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
879        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
880      assertFalse(new File(persistencePath).exists());
881      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
882
883      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
884        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
885        bucketCache.evictBlock(blockCacheKey);
886      }
887      assertEquals(0, bucketCache.getAllocator().getUsedSize());
888      assertEquals(0, bucketCache.backingMap.size());
889    } finally {
890      if (bucketCache != null) {
891        bucketCache.shutdown();
892      }
893      HBASE_TESTING_UTILITY.cleanupTestDir();
894    }
895  }
896}