001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.when;
027
028import java.io.File;
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.io.ByteBuffAllocator;
045import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
046import org.apache.hadoop.hbase.io.hfile.BlockType;
047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
048import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
049import org.apache.hadoop.hbase.io.hfile.Cacheable;
050import org.apache.hadoop.hbase.io.hfile.HFileBlock;
051import org.apache.hadoop.hbase.io.hfile.HFileContext;
052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
054import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
056import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
057import org.apache.hadoop.hbase.nio.ByteBuff;
058import org.apache.hadoop.hbase.testclassification.IOTests;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.hbase.util.Threads;
062import org.junit.After;
063import org.junit.Assert;
064import org.junit.Before;
065import org.junit.ClassRule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.junit.runner.RunWith;
069import org.junit.runners.Parameterized;
070import org.mockito.Mockito;
071
072import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
073
074/**
075 * Basic test of BucketCache.Puts and gets.
076 * <p>
077 * Tests will ensure that blocks' data correctness under several threads concurrency
078 */
079@RunWith(Parameterized.class)
080@Category({ IOTests.class, LargeTests.class })
081public class TestBucketCache {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085    HBaseClassTestRule.forClass(TestBucketCache.class);
086
087  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
088  public static Iterable<Object[]> data() {
089    return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize
090                                                          // for these tests?
091      { 16 * 1024,
092        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
093          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
094          128 * 1024 + 1024 } } });
095  }
096
097  @Parameterized.Parameter(0)
098  public int constructedBlockSize;
099
100  @Parameterized.Parameter(1)
101  public int[] constructedBlockSizes;
102
103  BucketCache cache;
104  final int CACHE_SIZE = 1000000;
105  final int NUM_BLOCKS = 100;
106  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
107  final int NUM_THREADS = 100;
108  final int NUM_QUERIES = 10000;
109
110  final long capacitySize = 32 * 1024 * 1024;
111  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
112  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
113  private String ioEngineName = "offheap";
114
115  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
116
117  private static class MockedBucketCache extends BucketCache {
118
119    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
120      int writerThreads, int writerQLen, String persistencePath) throws IOException {
121      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
122        persistencePath);
123    }
124
125    @Override
126    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
127      super.cacheBlock(cacheKey, buf, inMemory);
128    }
129
130    @Override
131    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
132      super.cacheBlock(cacheKey, buf);
133    }
134  }
135
136  @Before
137  public void setup() throws IOException {
138    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
139      constructedBlockSizes, writeThreads, writerQLen, null);
140  }
141
142  @After
143  public void tearDown() {
144    cache.shutdown();
145  }
146
147  /**
148   * Test Utility to create test dir and return name
149   * @return return name of created dir
150   * @throws IOException throws IOException
151   */
152  private Path createAndGetTestDir() throws IOException {
153    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
154    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
155    return testDir;
156  }
157
158  /**
159   * Return a random element from {@code a}.
160   */
161  private static <T> T randFrom(List<T> a) {
162    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
163  }
164
165  @Test
166  public void testBucketAllocator() throws BucketAllocatorException {
167    BucketAllocator mAllocator = cache.getAllocator();
168    /*
169     * Test the allocator first
170     */
171    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
172
173    boolean full = false;
174    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
175    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
176    // the cache is completely filled.
177    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
178    while (!full) {
179      Integer blockSize = null;
180      try {
181        blockSize = randFrom(tmp);
182        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
183      } catch (CacheFullException cfe) {
184        tmp.remove(blockSize);
185        if (tmp.isEmpty()) full = true;
186      }
187    }
188
189    for (Integer blockSize : BLOCKSIZES) {
190      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
191      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
192      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
193
194      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
195      // additional 1024 on top of that so this counts towards fragmentation in our test
196      // real life may have worse fragmentation because blocks may not be perfectly sized to block
197      // size, given encoding/compression and large rows
198      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
199    }
200
201    mAllocator.logDebugStatistics();
202
203    for (Pair<Long, Integer> allocation : allocations) {
204      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
205        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
206    }
207    assertEquals(0, mAllocator.getUsedSize());
208  }
209
210  @Test
211  public void testCacheSimple() throws Exception {
212    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
213  }
214
215  @Test
216  public void testCacheMultiThreadedSingleKey() throws Exception {
217    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
218  }
219
220  @Test
221  public void testHeapSizeChanges() throws Exception {
222    cache.stopWriterThreads();
223    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
224  }
225
226  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
227    throws InterruptedException {
228    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
229      Thread.sleep(100);
230    }
231    Thread.sleep(1000);
232  }
233
234  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
235    while (!cache.ramCache.isEmpty()) {
236      Thread.sleep(100);
237    }
238    Thread.sleep(1000);
239  }
240
241  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
242  // threads will flush it to the bucket and put reference entry in backingMap.
243  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
244    Cacheable block, boolean waitWhenCache) throws InterruptedException {
245    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
246    waitUntilFlushedToBucket(cache, cacheKey);
247  }
248
249  @Test
250  public void testMemoryLeak() throws Exception {
251    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
252    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
253      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
254    long lockId = cache.backingMap.get(cacheKey).offset();
255    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
256    lock.writeLock().lock();
257    Thread evictThread = new Thread("evict-block") {
258      @Override
259      public void run() {
260        cache.evictBlock(cacheKey);
261      }
262    };
263    evictThread.start();
264    cache.offsetLock.waitForWaiters(lockId, 1);
265    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
266    assertEquals(0, cache.getBlockCount());
267    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
268      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
269    assertEquals(1, cache.getBlockCount());
270    lock.writeLock().unlock();
271    evictThread.join();
272    /**
273     * <pre>
274     * The asserts here before HBASE-21957 are:
275     * assertEquals(1L, cache.getBlockCount());
276     * assertTrue(cache.getCurrentSize() > 0L);
277     * assertTrue("We should have a block!", cache.iterator().hasNext());
278     *
279     * The asserts here after HBASE-21957 are:
280     * assertEquals(0, cache.getBlockCount());
281     * assertEquals(cache.getCurrentSize(), 0L);
282     *
283     * I think the asserts before HBASE-21957 is more reasonable,because
284     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
285     * it had seen, and newly added Block after the {@link BucketEntry}
286     * it had seen should not be evicted.
287     * </pre>
288     */
289    assertEquals(1L, cache.getBlockCount());
290    assertTrue(cache.getCurrentSize() > 0L);
291    assertTrue("We should have a block!", cache.iterator().hasNext());
292  }
293
294  @Test
295  public void testRetrieveFromFile() throws Exception {
296    Path testDir = createAndGetTestDir();
297    String ioEngineName = "file:" + testDir + "/bucket.cache";
298    testRetrievalUtils(testDir, ioEngineName);
299    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
300    String persistencePath = testDir + "/bucket.persistence";
301    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
302      smallBucketSizes, writeThreads, writerQLen, persistencePath);
303    assertFalse(new File(persistencePath).exists());
304    assertEquals(0, bucketCache.getAllocator().getUsedSize());
305    assertEquals(0, bucketCache.backingMap.size());
306    HBASE_TESTING_UTILITY.cleanupTestDir();
307  }
308
309  @Test
310  public void testRetrieveFromMMap() throws Exception {
311    final Path testDir = createAndGetTestDir();
312    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
313    testRetrievalUtils(testDir, ioEngineName);
314  }
315
316  @Test
317  public void testRetrieveFromPMem() throws Exception {
318    final Path testDir = createAndGetTestDir();
319    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
320    testRetrievalUtils(testDir, ioEngineName);
321    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
322    String persistencePath = testDir + "/bucket.persistence";
323    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
324      smallBucketSizes, writeThreads, writerQLen, persistencePath);
325    assertFalse(new File(persistencePath).exists());
326    assertEquals(0, bucketCache.getAllocator().getUsedSize());
327    assertEquals(0, bucketCache.backingMap.size());
328    HBASE_TESTING_UTILITY.cleanupTestDir();
329  }
330
331  private void testRetrievalUtils(Path testDir, String ioEngineName)
332    throws IOException, InterruptedException {
333    final String persistencePath = testDir + "/bucket.persistence";
334    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
335      constructedBlockSizes, writeThreads, writerQLen, persistencePath);
336    try {
337      long usedSize = bucketCache.getAllocator().getUsedSize();
338      assertEquals(0, usedSize);
339      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
340      for (HFileBlockPair block : blocks) {
341        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
342      }
343      for (HFileBlockPair block : blocks) {
344        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
345          false);
346      }
347      usedSize = bucketCache.getAllocator().getUsedSize();
348      assertNotEquals(0, usedSize);
349      bucketCache.shutdown();
350      assertTrue(new File(persistencePath).exists());
351      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
352        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
353      assertFalse(new File(persistencePath).exists());
354      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
355    } finally {
356      bucketCache.shutdown();
357    }
358    assertTrue(new File(persistencePath).exists());
359  }
360
361  @Test
362  public void testRetrieveUnsupportedIOE() throws Exception {
363    try {
364      final Path testDir = createAndGetTestDir();
365      final String ioEngineName = testDir + "/bucket.cache";
366      testRetrievalUtils(testDir, ioEngineName);
367      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
368    } catch (IllegalArgumentException e) {
369      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, "
370        + "files:, mmap: or offheap", e.getMessage());
371    }
372  }
373
374  @Test
375  public void testRetrieveFromMultipleFiles() throws Exception {
376    final Path testDirInitial = createAndGetTestDir();
377    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
378    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
379    String ioEngineName =
380      new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache")
381        .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString();
382    testRetrievalUtils(testDirInitial, ioEngineName);
383    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
384    String persistencePath = testDirInitial + "/bucket.persistence";
385    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
386      smallBucketSizes, writeThreads, writerQLen, persistencePath);
387    assertFalse(new File(persistencePath).exists());
388    assertEquals(0, bucketCache.getAllocator().getUsedSize());
389    assertEquals(0, bucketCache.backingMap.size());
390    HBASE_TESTING_UTILITY.cleanupTestDir();
391  }
392
393  @Test
394  public void testRetrieveFromFileWithoutPersistence() throws Exception {
395    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
396      constructedBlockSizes, writeThreads, writerQLen, null);
397    try {
398      final Path testDir = createAndGetTestDir();
399      String ioEngineName = "file:" + testDir + "/bucket.cache";
400      long usedSize = bucketCache.getAllocator().getUsedSize();
401      assertEquals(0, usedSize);
402      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
403      for (HFileBlockPair block : blocks) {
404        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
405      }
406      for (HFileBlockPair block : blocks) {
407        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
408          false);
409      }
410      usedSize = bucketCache.getAllocator().getUsedSize();
411      assertNotEquals(0, usedSize);
412      bucketCache.shutdown();
413      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
414        constructedBlockSizes, writeThreads, writerQLen, null);
415      assertEquals(0, bucketCache.getAllocator().getUsedSize());
416    } finally {
417      bucketCache.shutdown();
418      HBASE_TESTING_UTILITY.cleanupTestDir();
419    }
420  }
421
422  @Test
423  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
424    long availableSpace = 20 * 1024L * 1024 * 1024;
425    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
426    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
427    assertTrue(allocator.getBuckets().length > 0);
428  }
429
430  @Test
431  public void testGetPartitionSize() throws IOException {
432    // Test default values
433    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
434      BucketCache.DEFAULT_MIN_FACTOR);
435
436    Configuration conf = HBaseConfiguration.create();
437    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
438    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
439    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
440    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
441
442    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
443      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
444
445    validateGetPartitionSize(cache, 0.1f, 0.5f);
446    validateGetPartitionSize(cache, 0.7f, 0.5f);
447    validateGetPartitionSize(cache, 0.2f, 0.5f);
448  }
449
450  @Test
451  public void testCacheSizeCapacity() throws IOException {
452    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
453    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
454      BucketCache.DEFAULT_MIN_FACTOR);
455    Configuration conf = HBaseConfiguration.create();
456    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
457    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
458    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
459    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
460    try {
461      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
462        writerQLen, null, 100, conf);
463      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
464    } catch (IllegalArgumentException e) {
465      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
466    }
467  }
468
469  @Test
470  public void testValidBucketCacheConfigs() throws IOException {
471    Configuration conf = HBaseConfiguration.create();
472    conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
473    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
474    conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
475    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
476    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
477    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
478
479    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
480      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
481
482    assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
483      cache.getAcceptableFactor(), 0);
484    assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
485      cache.getMinFactor(), 0);
486    assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
487      cache.getExtraFreeFactor(), 0);
488    assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f,
489      cache.getSingleFactor(), 0);
490    assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f,
491      cache.getMultiFactor(), 0);
492    assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f,
493      cache.getMemoryFactor(), 0);
494  }
495
496  @Test
497  public void testInvalidAcceptFactorConfig() throws IOException {
498    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
499    boolean[] expectedOutcomes = { false, false, true, false };
500    Map<String, float[]> configMappings =
501      ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues);
502    Configuration conf = HBaseConfiguration.create();
503    checkConfigValues(conf, configMappings, expectedOutcomes);
504  }
505
506  @Test
507  public void testInvalidMinFactorConfig() throws IOException {
508    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
509    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
510    boolean[] expectedOutcomes = { false, true, false, false };
511    Map<String, float[]> configMappings =
512      ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues);
513    Configuration conf = HBaseConfiguration.create();
514    checkConfigValues(conf, configMappings, expectedOutcomes);
515  }
516
517  @Test
518  public void testInvalidExtraFreeFactorConfig() throws IOException {
519    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
520    // throws due to <0, in expected range, in expected range, config can be > 1.0
521    boolean[] expectedOutcomes = { false, true, true, true };
522    Map<String, float[]> configMappings =
523      ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
524    Configuration conf = HBaseConfiguration.create();
525    checkConfigValues(conf, configMappings, expectedOutcomes);
526  }
527
528  @Test
529  public void testInvalidCacheSplitFactorConfig() throws IOException {
530    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
531    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
532    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
533    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
534    // be negative, configs don't add to 1.0
535    boolean[] expectedOutcomes = { true, false, false, false };
536    Map<String,
537      float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME,
538        singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues,
539        BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues);
540    Configuration conf = HBaseConfiguration.create();
541    checkConfigValues(conf, configMappings, expectedOutcomes);
542  }
543
544  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
545    boolean[] expectSuccess) throws IOException {
546    Set<String> configNames = configMap.keySet();
547    for (int i = 0; i < expectSuccess.length; i++) {
548      try {
549        for (String configName : configNames) {
550          conf.setFloat(configName, configMap.get(configName)[i]);
551        }
552        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
553          constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
554        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i]
555          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
556      } catch (IllegalArgumentException e) {
557        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i]
558          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
559      }
560    }
561  }
562
563  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
564    float minFactor) {
565    long expectedOutput =
566      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
567    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
568  }
569
570  @Test
571  public void testOffsetProducesPositiveOutput() {
572    // This number is picked because it produces negative output if the values isn't ensured to be
573    // positive. See HBASE-18757 for more information.
574    long testValue = 549888460800L;
575    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true, (entry) -> {
576      return ByteBuffAllocator.NONE;
577    }, ByteBuffAllocator.HEAP);
578    assertEquals(testValue, bucketEntry.offset());
579  }
580
581  @Test
582  public void testEvictionCount() throws InterruptedException {
583    int size = 100;
584    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
585    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
586    HFileContext meta = new HFileContextBuilder().build();
587    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
588    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
589      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
590    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
591      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
592
593    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
594    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
595    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
596    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
597    blockWithNextBlockMetadata.serialize(block1Buffer, true);
598    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
599
600    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
601    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
602      block1Buffer);
603
604    waitUntilFlushedToBucket(cache, key);
605
606    assertEquals(0, cache.getStats().getEvictionCount());
607
608    // evict call should return 1, but then eviction count be 0
609    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
610    assertEquals(0, cache.getStats().getEvictionCount());
611
612    // add back
613    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
614      block1Buffer);
615    waitUntilFlushedToBucket(cache, key);
616
617    // should not increment
618    assertTrue(cache.evictBlock(key));
619    assertEquals(0, cache.getStats().getEvictionCount());
620
621    // add back
622    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
623      block1Buffer);
624    waitUntilFlushedToBucket(cache, key);
625
626    // should finally increment eviction count
627    cache.freeSpace("testing");
628    assertEquals(1, cache.getStats().getEvictionCount());
629  }
630
631  @Test
632  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
633    int size = 100;
634    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
635    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
636    HFileContext meta = new HFileContextBuilder().build();
637    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
638    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
639      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
640    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
641      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
642
643    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
644    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
645    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
646    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
647    blockWithNextBlockMetadata.serialize(block1Buffer, true);
648    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
649
650    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
651    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
652      block1Buffer);
653
654    waitUntilFlushedToBucket(cache, key);
655    assertNotNull(cache.backingMap.get(key));
656    assertEquals(1, cache.backingMap.get(key).refCnt());
657    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
658    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
659
660    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
661    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
662      block1Buffer);
663    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
664    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
665    assertEquals(1, cache.backingMap.get(key).refCnt());
666
667    // Clear and add blockWithoutNextBlockMetadata
668    assertTrue(cache.evictBlock(key));
669    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
670    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
671
672    assertNull(cache.getBlock(key, false, false, false));
673    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
674      block2Buffer);
675
676    waitUntilFlushedToBucket(cache, key);
677    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
678    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
679
680    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
681    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
682      block1Buffer);
683
684    waitUntilFlushedToBucket(cache, key);
685    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
686    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
687  }
688
689  @Test
690  public void testRAMCache() {
691    int size = 100;
692    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
693    byte[] byteArr = new byte[length];
694    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
695    HFileContext meta = new HFileContextBuilder().build();
696
697    RAMCache cache = new RAMCache();
698    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
699    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
700    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
701      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
702    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
703      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
704    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
705    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
706
707    assertFalse(cache.containsKey(key1));
708    assertNull(cache.putIfAbsent(key1, re1));
709    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
710
711    assertNotNull(cache.putIfAbsent(key1, re2));
712    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
713    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
714
715    assertNull(cache.putIfAbsent(key2, re2));
716    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
717    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
718
719    cache.remove(key1);
720    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
721    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
722
723    cache.clear();
724    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
725    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
726  }
727
728  @Test
729  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
730    // initialize an block.
731    int size = 100, offset = 20;
732    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
733    ByteBuffer buf = ByteBuffer.allocate(length);
734    HFileContext meta = new HFileContextBuilder().build();
735    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
736      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
737
738    // initialize an mocked ioengine.
739    IOEngine ioEngine = Mockito.mock(IOEngine.class);
740    when(ioEngine.usesSharedMemory()).thenReturn(false);
741    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
742    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
743      Mockito.anyLong());
744    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
745      Mockito.anyLong());
746
747    // create an bucket allocator.
748    long availableSpace = 1024 * 1024 * 1024L;
749    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
750
751    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
752    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
753
754    Assert.assertEquals(0, allocator.getUsedSize());
755    try {
756      re.writeToCache(ioEngine, allocator, null, null,
757        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
758      Assert.fail();
759    } catch (Exception e) {
760    }
761    Assert.assertEquals(0, allocator.getUsedSize());
762  }
763
764  /**
765   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
766   * could not be freed even if corresponding {@link HFileBlock} is evicted from
767   * {@link BucketCache}.
768   */
769  @Test
770  public void testFreeBucketEntryRestoredFromFile() throws Exception {
771    try {
772      final Path dataTestDir = createAndGetTestDir();
773
774      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
775      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
776
777      BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
778        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
779      long usedByteSize = bucketCache.getAllocator().getUsedSize();
780      assertEquals(0, usedByteSize);
781
782      HFileBlockPair[] hfileBlockPairs =
783        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
784      // Add blocks
785      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
786        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
787      }
788
789      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
790        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
791          hfileBlockPair.getBlock(), false);
792      }
793      usedByteSize = bucketCache.getAllocator().getUsedSize();
794      assertNotEquals(0, usedByteSize);
795      // persist cache to file
796      bucketCache.shutdown();
797      assertTrue(new File(persistencePath).exists());
798
799      // restore cache from file
800      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
801        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
802      assertFalse(new File(persistencePath).exists());
803      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
804
805      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
806        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
807        bucketCache.evictBlock(blockCacheKey);
808      }
809      assertEquals(0, bucketCache.getAllocator().getUsedSize());
810      assertEquals(0, bucketCache.backingMap.size());
811    } finally {
812      HBASE_TESTING_UTILITY.cleanupTestDir();
813    }
814  }
815
816  @Test
817  public void testBlockAdditionWaitWhenCache() throws Exception {
818    try {
819      final Path dataTestDir = createAndGetTestDir();
820
821      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
822      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
823
824      BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
825        constructedBlockSizes, 1, 1, persistencePath);
826      long usedByteSize = bucketCache.getAllocator().getUsedSize();
827      assertEquals(0, usedByteSize);
828
829      HFileBlockPair[] hfileBlockPairs =
830        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
831      // Add blocks
832      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
833        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
834          true);
835      }
836
837      // Max wait for 10 seconds.
838      long timeout = 10000;
839      // Wait for blocks size to match the number of blocks.
840      while (bucketCache.backingMap.size() != 10) {
841        if (timeout <= 0) break;
842        Threads.sleep(100);
843        timeout -= 100;
844      }
845      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
846        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
847      }
848      usedByteSize = bucketCache.getAllocator().getUsedSize();
849      assertNotEquals(0, usedByteSize);
850      // persist cache to file
851      bucketCache.shutdown();
852      assertTrue(new File(persistencePath).exists());
853
854      // restore cache from file
855      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
856        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
857      assertFalse(new File(persistencePath).exists());
858      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
859
860      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
861        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
862        bucketCache.evictBlock(blockCacheKey);
863      }
864      assertEquals(0, bucketCache.getAllocator().getUsedSize());
865      assertEquals(0, bucketCache.backingMap.size());
866    } finally {
867      HBASE_TESTING_UTILITY.cleanupTestDir();
868    }
869  }
870}