001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.when;
027
028import java.io.File;
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.io.ByteBuffAllocator;
045import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
046import org.apache.hadoop.hbase.io.hfile.BlockType;
047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
048import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
049import org.apache.hadoop.hbase.io.hfile.Cacheable;
050import org.apache.hadoop.hbase.io.hfile.HFileBlock;
051import org.apache.hadoop.hbase.io.hfile.HFileContext;
052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
054import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
056import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
057import org.apache.hadoop.hbase.nio.ByteBuff;
058import org.apache.hadoop.hbase.testclassification.IOTests;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.Threads;
063import org.junit.After;
064import org.junit.Assert;
065import org.junit.Before;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.runner.RunWith;
070import org.junit.runners.Parameterized;
071import org.mockito.Mockito;
072
073import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
074
075/**
076 * Basic test of BucketCache.Puts and gets.
077 * <p>
078 * Tests will ensure that blocks' data correctness under several threads concurrency
079 */
080@RunWith(Parameterized.class)
081@Category({ IOTests.class, LargeTests.class })
082public class TestBucketCache {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086    HBaseClassTestRule.forClass(TestBucketCache.class);
087
088  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
089  public static Iterable<Object[]> data() {
090    return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize
091                                                          // for these tests?
092      { 16 * 1024,
093        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
094          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
095          128 * 1024 + 1024 } } });
096  }
097
098  @Parameterized.Parameter(0)
099  public int constructedBlockSize;
100
101  @Parameterized.Parameter(1)
102  public int[] constructedBlockSizes;
103
104  BucketCache cache;
105  final int CACHE_SIZE = 1000000;
106  final int NUM_BLOCKS = 100;
107  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
108  final int NUM_THREADS = 100;
109  final int NUM_QUERIES = 10000;
110
111  final long capacitySize = 32 * 1024 * 1024;
112  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
113  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
114  private String ioEngineName = "offheap";
115
116  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
117
118  private static class MockedBucketCache extends BucketCache {
119
120    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
121      int writerThreads, int writerQLen, String persistencePath) throws IOException {
122      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
123        persistencePath);
124    }
125
126    @Override
127    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
128      super.cacheBlock(cacheKey, buf, inMemory);
129    }
130
131    @Override
132    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
133      super.cacheBlock(cacheKey, buf);
134    }
135  }
136
137  @Before
138  public void setup() throws IOException {
139    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
140      constructedBlockSizes, writeThreads, writerQLen, null);
141  }
142
143  @After
144  public void tearDown() {
145    cache.shutdown();
146  }
147
148  /**
149   * Test Utility to create test dir and return name
150   * @return return name of created dir
151   * @throws IOException throws IOException
152   */
153  private Path createAndGetTestDir() throws IOException {
154    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
155    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
156    return testDir;
157  }
158
159  /**
160   * Return a random element from {@code a}.
161   */
162  private static <T> T randFrom(List<T> a) {
163    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
164  }
165
166  @Test
167  public void testBucketAllocator() throws BucketAllocatorException {
168    BucketAllocator mAllocator = cache.getAllocator();
169    /*
170     * Test the allocator first
171     */
172    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
173
174    boolean full = false;
175    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
176    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
177    // the cache is completely filled.
178    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
179    while (!full) {
180      Integer blockSize = null;
181      try {
182        blockSize = randFrom(tmp);
183        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
184      } catch (CacheFullException cfe) {
185        tmp.remove(blockSize);
186        if (tmp.isEmpty()) full = true;
187      }
188    }
189
190    for (Integer blockSize : BLOCKSIZES) {
191      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
192      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
193      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
194
195      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
196      // additional 1024 on top of that so this counts towards fragmentation in our test
197      // real life may have worse fragmentation because blocks may not be perfectly sized to block
198      // size, given encoding/compression and large rows
199      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
200    }
201
202    mAllocator.logDebugStatistics();
203
204    for (Pair<Long, Integer> allocation : allocations) {
205      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
206        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
207    }
208    assertEquals(0, mAllocator.getUsedSize());
209  }
210
211  @Test
212  public void testCacheSimple() throws Exception {
213    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
214  }
215
216  @Test
217  public void testCacheMultiThreadedSingleKey() throws Exception {
218    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
219  }
220
221  @Test
222  public void testHeapSizeChanges() throws Exception {
223    cache.stopWriterThreads();
224    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
225  }
226
227  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
228    throws InterruptedException {
229    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
230      Thread.sleep(100);
231    }
232    Thread.sleep(1000);
233  }
234
235  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
236    while (!cache.ramCache.isEmpty()) {
237      Thread.sleep(100);
238    }
239    Thread.sleep(1000);
240  }
241
242  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
243  // threads will flush it to the bucket and put reference entry in backingMap.
244  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
245    Cacheable block, boolean waitWhenCache) throws InterruptedException {
246    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
247    waitUntilFlushedToBucket(cache, cacheKey);
248  }
249
250  @Test
251  public void testMemoryLeak() throws Exception {
252    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
253    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
254      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
255    long lockId = cache.backingMap.get(cacheKey).offset();
256    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
257    lock.writeLock().lock();
258    Thread evictThread = new Thread("evict-block") {
259      @Override
260      public void run() {
261        cache.evictBlock(cacheKey);
262      }
263    };
264    evictThread.start();
265    cache.offsetLock.waitForWaiters(lockId, 1);
266    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
267    assertEquals(0, cache.getBlockCount());
268    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
269      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
270    assertEquals(1, cache.getBlockCount());
271    lock.writeLock().unlock();
272    evictThread.join();
273    /**
274     * <pre>
275     * The asserts here before HBASE-21957 are:
276     * assertEquals(1L, cache.getBlockCount());
277     * assertTrue(cache.getCurrentSize() > 0L);
278     * assertTrue("We should have a block!", cache.iterator().hasNext());
279     *
280     * The asserts here after HBASE-21957 are:
281     * assertEquals(0, cache.getBlockCount());
282     * assertEquals(cache.getCurrentSize(), 0L);
283     *
284     * I think the asserts before HBASE-21957 is more reasonable,because
285     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
286     * it had seen, and newly added Block after the {@link BucketEntry}
287     * it had seen should not be evicted.
288     * </pre>
289     */
290    assertEquals(1L, cache.getBlockCount());
291    assertTrue(cache.getCurrentSize() > 0L);
292    assertTrue("We should have a block!", cache.iterator().hasNext());
293  }
294
295  @Test
296  public void testRetrieveFromFile() throws Exception {
297    Path testDir = createAndGetTestDir();
298    String ioEngineName = "file:" + testDir + "/bucket.cache";
299    testRetrievalUtils(testDir, ioEngineName);
300    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
301    String persistencePath = testDir + "/bucket.persistence";
302    BucketCache bucketCache = null;
303    try {
304      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
305        smallBucketSizes, writeThreads, writerQLen, persistencePath);
306      assertFalse(new File(persistencePath).exists());
307      assertEquals(0, bucketCache.getAllocator().getUsedSize());
308      assertEquals(0, bucketCache.backingMap.size());
309    } finally {
310      bucketCache.shutdown();
311      HBASE_TESTING_UTILITY.cleanupTestDir();
312    }
313  }
314
315  @Test
316  public void testRetrieveFromMMap() throws Exception {
317    final Path testDir = createAndGetTestDir();
318    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
319    testRetrievalUtils(testDir, ioEngineName);
320  }
321
322  @Test
323  public void testRetrieveFromPMem() throws Exception {
324    final Path testDir = createAndGetTestDir();
325    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
326    testRetrievalUtils(testDir, ioEngineName);
327    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
328    String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
329    BucketCache bucketCache = null;
330    try {
331      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
332        smallBucketSizes, writeThreads, writerQLen, persistencePath);
333      assertFalse(new File(persistencePath).exists());
334      assertEquals(0, bucketCache.getAllocator().getUsedSize());
335      assertEquals(0, bucketCache.backingMap.size());
336    } finally {
337      bucketCache.shutdown();
338      HBASE_TESTING_UTILITY.cleanupTestDir();
339    }
340  }
341
342  private void testRetrievalUtils(Path testDir, String ioEngineName)
343    throws IOException, InterruptedException {
344    final String persistencePath =
345      testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
346    BucketCache bucketCache = null;
347    try {
348      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
349        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
350      long usedSize = bucketCache.getAllocator().getUsedSize();
351      assertEquals(0, usedSize);
352      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
353      for (HFileBlockPair block : blocks) {
354        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
355      }
356      for (HFileBlockPair block : blocks) {
357        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
358          false);
359      }
360      usedSize = bucketCache.getAllocator().getUsedSize();
361      assertNotEquals(0, usedSize);
362      bucketCache.shutdown();
363      assertTrue(new File(persistencePath).exists());
364      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
365        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
366      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
367    } finally {
368      if (bucketCache != null) {
369        bucketCache.shutdown();
370      }
371    }
372    assertTrue(new File(persistencePath).exists());
373  }
374
375  @Test
376  public void testRetrieveUnsupportedIOE() throws Exception {
377    try {
378      final Path testDir = createAndGetTestDir();
379      final String ioEngineName = testDir + "/bucket.cache";
380      testRetrievalUtils(testDir, ioEngineName);
381      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
382    } catch (IllegalArgumentException e) {
383      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, "
384        + "files:, mmap: or offheap", e.getMessage());
385    }
386  }
387
388  @Test
389  public void testRetrieveFromMultipleFiles() throws Exception {
390    final Path testDirInitial = createAndGetTestDir();
391    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
392    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
393    String ioEngineName =
394      new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache")
395        .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString();
396    testRetrievalUtils(testDirInitial, ioEngineName);
397    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
398    String persistencePath = testDirInitial + "/bucket.persistence";
399    BucketCache bucketCache = null;
400    try {
401      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
402        smallBucketSizes, writeThreads, writerQLen, persistencePath);
403      assertFalse(new File(persistencePath).exists());
404      assertEquals(0, bucketCache.getAllocator().getUsedSize());
405      assertEquals(0, bucketCache.backingMap.size());
406    } finally {
407      bucketCache.shutdown();
408      HBASE_TESTING_UTILITY.cleanupTestDir();
409    }
410  }
411
412  @Test
413  public void testRetrieveFromFileWithoutPersistence() throws Exception {
414    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
415      constructedBlockSizes, writeThreads, writerQLen, null);
416    try {
417      final Path testDir = createAndGetTestDir();
418      String ioEngineName = "file:" + testDir + "/bucket.cache";
419      long usedSize = bucketCache.getAllocator().getUsedSize();
420      assertEquals(0, usedSize);
421      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
422      for (HFileBlockPair block : blocks) {
423        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
424      }
425      for (HFileBlockPair block : blocks) {
426        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
427          false);
428      }
429      usedSize = bucketCache.getAllocator().getUsedSize();
430      assertNotEquals(0, usedSize);
431      bucketCache.shutdown();
432      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
433        constructedBlockSizes, writeThreads, writerQLen, null);
434      assertEquals(0, bucketCache.getAllocator().getUsedSize());
435    } finally {
436      bucketCache.shutdown();
437      HBASE_TESTING_UTILITY.cleanupTestDir();
438    }
439  }
440
441  @Test
442  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
443    long availableSpace = 20 * 1024L * 1024 * 1024;
444    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
445    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
446    assertTrue(allocator.getBuckets().length > 0);
447  }
448
449  @Test
450  public void testGetPartitionSize() throws IOException {
451    // Test default values
452    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
453      BucketCache.DEFAULT_MIN_FACTOR);
454
455    Configuration conf = HBaseConfiguration.create();
456    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
457    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
458    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
459    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
460
461    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
462      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
463
464    validateGetPartitionSize(cache, 0.1f, 0.5f);
465    validateGetPartitionSize(cache, 0.7f, 0.5f);
466    validateGetPartitionSize(cache, 0.2f, 0.5f);
467  }
468
469  @Test
470  public void testCacheSizeCapacity() throws IOException {
471    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
472    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
473      BucketCache.DEFAULT_MIN_FACTOR);
474    Configuration conf = HBaseConfiguration.create();
475    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
476    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
477    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
478    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
479    try {
480      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
481        writerQLen, null, 100, conf);
482      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
483    } catch (IllegalArgumentException e) {
484      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
485    }
486  }
487
488  @Test
489  public void testValidBucketCacheConfigs() throws IOException {
490    Configuration conf = HBaseConfiguration.create();
491    conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
492    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
493    conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
494    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
495    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
496    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
497
498    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
499      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
500
501    assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
502      cache.getAcceptableFactor(), 0);
503    assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
504      cache.getMinFactor(), 0);
505    assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
506      cache.getExtraFreeFactor(), 0);
507    assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f,
508      cache.getSingleFactor(), 0);
509    assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f,
510      cache.getMultiFactor(), 0);
511    assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f,
512      cache.getMemoryFactor(), 0);
513  }
514
515  @Test
516  public void testInvalidAcceptFactorConfig() throws IOException {
517    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
518    boolean[] expectedOutcomes = { false, false, true, false };
519    Map<String, float[]> configMappings =
520      ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues);
521    Configuration conf = HBaseConfiguration.create();
522    checkConfigValues(conf, configMappings, expectedOutcomes);
523  }
524
525  @Test
526  public void testInvalidMinFactorConfig() throws IOException {
527    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
528    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
529    boolean[] expectedOutcomes = { false, true, false, false };
530    Map<String, float[]> configMappings =
531      ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues);
532    Configuration conf = HBaseConfiguration.create();
533    checkConfigValues(conf, configMappings, expectedOutcomes);
534  }
535
536  @Test
537  public void testInvalidExtraFreeFactorConfig() throws IOException {
538    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
539    // throws due to <0, in expected range, in expected range, config can be > 1.0
540    boolean[] expectedOutcomes = { false, true, true, true };
541    Map<String, float[]> configMappings =
542      ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
543    Configuration conf = HBaseConfiguration.create();
544    checkConfigValues(conf, configMappings, expectedOutcomes);
545  }
546
547  @Test
548  public void testInvalidCacheSplitFactorConfig() throws IOException {
549    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
550    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
551    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
552    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
553    // be negative, configs don't add to 1.0
554    boolean[] expectedOutcomes = { true, false, false, false };
555    Map<String,
556      float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME,
557        singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues,
558        BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues);
559    Configuration conf = HBaseConfiguration.create();
560    checkConfigValues(conf, configMappings, expectedOutcomes);
561  }
562
563  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
564    boolean[] expectSuccess) throws IOException {
565    Set<String> configNames = configMap.keySet();
566    for (int i = 0; i < expectSuccess.length; i++) {
567      try {
568        for (String configName : configNames) {
569          conf.setFloat(configName, configMap.get(configName)[i]);
570        }
571        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
572          constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
573        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i]
574          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
575      } catch (IllegalArgumentException e) {
576        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i]
577          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
578      }
579    }
580  }
581
582  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
583    float minFactor) {
584    long expectedOutput =
585      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
586    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
587  }
588
589  @Test
590  public void testOffsetProducesPositiveOutput() {
591    // This number is picked because it produces negative output if the values isn't ensured to be
592    // positive. See HBASE-18757 for more information.
593    long testValue = 549888460800L;
594    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> {
595      return ByteBuffAllocator.NONE;
596    }, ByteBuffAllocator.HEAP);
597    assertEquals(testValue, bucketEntry.offset());
598  }
599
600  @Test
601  public void testEvictionCount() throws InterruptedException {
602    int size = 100;
603    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
604    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
605    HFileContext meta = new HFileContextBuilder().build();
606    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
607    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
608      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
609    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
610      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
611
612    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
613    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
614    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
615    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
616    blockWithNextBlockMetadata.serialize(block1Buffer, true);
617    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
618
619    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
620    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
621      block1Buffer);
622
623    waitUntilFlushedToBucket(cache, key);
624
625    assertEquals(0, cache.getStats().getEvictionCount());
626
627    // evict call should return 1, but then eviction count be 0
628    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
629    assertEquals(0, cache.getStats().getEvictionCount());
630
631    // add back
632    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
633      block1Buffer);
634    waitUntilFlushedToBucket(cache, key);
635
636    // should not increment
637    assertTrue(cache.evictBlock(key));
638    assertEquals(0, cache.getStats().getEvictionCount());
639
640    // add back
641    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
642      block1Buffer);
643    waitUntilFlushedToBucket(cache, key);
644
645    // should finally increment eviction count
646    cache.freeSpace("testing");
647    assertEquals(1, cache.getStats().getEvictionCount());
648  }
649
650  @Test
651  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
652    int size = 100;
653    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
654    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
655    HFileContext meta = new HFileContextBuilder().build();
656    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
657    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
658      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
659    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
660      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
661
662    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
663    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
664    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
665    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
666    blockWithNextBlockMetadata.serialize(block1Buffer, true);
667    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
668
669    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
670    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
671      block1Buffer);
672
673    waitUntilFlushedToBucket(cache, key);
674    assertNotNull(cache.backingMap.get(key));
675    assertEquals(1, cache.backingMap.get(key).refCnt());
676    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
677    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
678
679    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
680    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
681      block1Buffer);
682    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
683    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
684    assertEquals(1, cache.backingMap.get(key).refCnt());
685
686    // Clear and add blockWithoutNextBlockMetadata
687    assertTrue(cache.evictBlock(key));
688    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
689    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
690
691    assertNull(cache.getBlock(key, false, false, false));
692    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
693      block2Buffer);
694
695    waitUntilFlushedToBucket(cache, key);
696    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
697    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
698
699    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
700    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
701      block1Buffer);
702
703    waitUntilFlushedToBucket(cache, key);
704    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
705    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
706  }
707
708  @Test
709  public void testRAMCache() {
710    int size = 100;
711    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
712    byte[] byteArr = new byte[length];
713    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
714    HFileContext meta = new HFileContextBuilder().build();
715
716    RAMCache cache = new RAMCache();
717    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
718    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
719    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
720      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
721    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
722      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
723    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false);
724    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false);
725
726    assertFalse(cache.containsKey(key1));
727    assertNull(cache.putIfAbsent(key1, re1));
728    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
729
730    assertNotNull(cache.putIfAbsent(key1, re2));
731    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
732    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
733
734    assertNull(cache.putIfAbsent(key2, re2));
735    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
736    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
737
738    cache.remove(key1);
739    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
740    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
741
742    cache.clear();
743    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
744    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
745  }
746
747  @Test
748  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
749    // initialize an block.
750    int size = 100, offset = 20;
751    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
752    ByteBuffer buf = ByteBuffer.allocate(length);
753    HFileContext meta = new HFileContextBuilder().build();
754    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
755      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
756
757    // initialize an mocked ioengine.
758    IOEngine ioEngine = Mockito.mock(IOEngine.class);
759    when(ioEngine.usesSharedMemory()).thenReturn(false);
760    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
761    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
762      Mockito.anyLong());
763    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
764      Mockito.anyLong());
765
766    // create an bucket allocator.
767    long availableSpace = 1024 * 1024 * 1024L;
768    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
769
770    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
771    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false);
772
773    Assert.assertEquals(0, allocator.getUsedSize());
774    try {
775      re.writeToCache(ioEngine, allocator, null, null,
776        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
777      Assert.fail();
778    } catch (Exception e) {
779    }
780    Assert.assertEquals(0, allocator.getUsedSize());
781  }
782
783  /**
784   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
785   * could not be freed even if corresponding {@link HFileBlock} is evicted from
786   * {@link BucketCache}.
787   */
788  @Test
789  public void testFreeBucketEntryRestoredFromFile() throws Exception {
790    BucketCache bucketCache = null;
791    try {
792      final Path dataTestDir = createAndGetTestDir();
793
794      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
795      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
796
797      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
798        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
799      long usedByteSize = bucketCache.getAllocator().getUsedSize();
800      assertEquals(0, usedByteSize);
801
802      HFileBlockPair[] hfileBlockPairs =
803        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
804      // Add blocks
805      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
806        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
807      }
808
809      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
810        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
811          hfileBlockPair.getBlock(), false);
812      }
813      usedByteSize = bucketCache.getAllocator().getUsedSize();
814      assertNotEquals(0, usedByteSize);
815      // persist cache to file
816      bucketCache.shutdown();
817      assertTrue(new File(persistencePath).exists());
818
819      // restore cache from file
820      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
821        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
822      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
823
824      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
825        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
826        bucketCache.evictBlock(blockCacheKey);
827      }
828      assertEquals(0, bucketCache.getAllocator().getUsedSize());
829      assertEquals(0, bucketCache.backingMap.size());
830    } finally {
831      bucketCache.shutdown();
832      HBASE_TESTING_UTILITY.cleanupTestDir();
833    }
834  }
835
836  @Test
837  public void testBlockAdditionWaitWhenCache() throws Exception {
838    BucketCache bucketCache = null;
839    try {
840      final Path dataTestDir = createAndGetTestDir();
841
842      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
843      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
844
845      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
846        constructedBlockSizes, 1, 1, persistencePath);
847      long usedByteSize = bucketCache.getAllocator().getUsedSize();
848      assertEquals(0, usedByteSize);
849
850      HFileBlockPair[] hfileBlockPairs =
851        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
852      // Add blocks
853      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
854        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
855          true);
856      }
857
858      // Max wait for 10 seconds.
859      long timeout = 10000;
860      // Wait for blocks size to match the number of blocks.
861      while (bucketCache.backingMap.size() != 10) {
862        if (timeout <= 0) break;
863        Threads.sleep(100);
864        timeout -= 100;
865      }
866      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
867        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
868      }
869      usedByteSize = bucketCache.getAllocator().getUsedSize();
870      assertNotEquals(0, usedByteSize);
871      // persist cache to file
872      bucketCache.shutdown();
873      assertTrue(new File(persistencePath).exists());
874
875      // restore cache from file
876      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
877        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
878      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
879
880      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
881        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
882        bucketCache.evictBlock(blockCacheKey);
883      }
884      assertEquals(0, bucketCache.getAllocator().getUsedSize());
885      assertEquals(0, bucketCache.backingMap.size());
886    } finally {
887      if (bucketCache != null) {
888        bucketCache.shutdown();
889      }
890      HBASE_TESTING_UTILITY.cleanupTestDir();
891    }
892  }
893}