001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.File;
028import java.io.IOException;
029import java.nio.ByteBuffer;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Map;
034import java.util.Random;
035import java.util.Set;
036import java.util.concurrent.locks.ReentrantReadWriteLock;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.io.ByteBuffAllocator;
044import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
045import org.apache.hadoop.hbase.io.hfile.BlockType;
046import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
048import org.apache.hadoop.hbase.io.hfile.Cacheable;
049import org.apache.hadoop.hbase.io.hfile.HFileBlock;
050import org.apache.hadoop.hbase.io.hfile.HFileContext;
051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
052import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
054import org.apache.hadoop.hbase.nio.ByteBuff;
055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
056import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
057import org.apache.hadoop.hbase.testclassification.IOTests;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.junit.After;
060import org.junit.Assert;
061import org.junit.Before;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.runner.RunWith;
066import org.junit.runners.Parameterized;
067import org.mockito.Mockito;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
070
071/**
072 * Basic test of BucketCache.Puts and gets.
073 * <p>
074 * Tests will ensure that blocks' data correctness under several threads concurrency
075 */
076@RunWith(Parameterized.class)
077@Category({ IOTests.class, LargeTests.class })
078public class TestBucketCache {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestBucketCache.class);
083
084  private static final Random RAND = new Random();
085
086  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
087  public static Iterable<Object[]> data() {
088    return Arrays.asList(new Object[][] {
089        { 8192, null }, // TODO: why is 8k the default blocksize for these tests?
090        {
091            16 * 1024,
092            new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
093                28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
094                128 * 1024 + 1024 } } });
095  }
096
097  @Parameterized.Parameter(0)
098  public int constructedBlockSize;
099
100  @Parameterized.Parameter(1)
101  public int[] constructedBlockSizes;
102
103  BucketCache cache;
104  final int CACHE_SIZE = 1000000;
105  final int NUM_BLOCKS = 100;
106  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
107  final int NUM_THREADS = 100;
108  final int NUM_QUERIES = 10000;
109
110  final long capacitySize = 32 * 1024 * 1024;
111  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
112  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
113  private String ioEngineName = "offheap";
114
115  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
116
117  private static class MockedBucketCache extends BucketCache {
118
119    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
120        int writerThreads, int writerQLen, String persistencePath) throws IOException {
121      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
122          persistencePath);
123      super.wait_when_cache = true;
124    }
125
126    @Override
127    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
128      super.cacheBlock(cacheKey, buf, inMemory);
129    }
130
131    @Override
132    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
133      super.cacheBlock(cacheKey, buf);
134    }
135  }
136
137  @Before
138  public void setup() throws IOException {
139    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
140            constructedBlockSizes, writeThreads, writerQLen, null);
141  }
142
143  @After
144  public void tearDown() {
145    cache.shutdown();
146  }
147
148  /**
149   * Test Utility to create test dir and return name
150   *
151   * @return return name of created dir
152   * @throws IOException throws IOException
153   */
154  private Path createAndGetTestDir() throws IOException {
155    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
156    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
157    return testDir;
158  }
159
160
161  /**
162   * Return a random element from {@code a}.
163   */
164  private static <T> T randFrom(List<T> a) {
165    return a.get(RAND.nextInt(a.size()));
166  }
167
168  @Test
169  public void testBucketAllocator() throws BucketAllocatorException {
170    BucketAllocator mAllocator = cache.getAllocator();
171    /*
172     * Test the allocator first
173     */
174    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
175
176    boolean full = false;
177    ArrayList<Long> allocations = new ArrayList<>();
178    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
179    // the cache is completely filled.
180    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
181    while (!full) {
182      Integer blockSize = null;
183      try {
184        blockSize = randFrom(tmp);
185        allocations.add(mAllocator.allocateBlock(blockSize));
186      } catch (CacheFullException cfe) {
187        tmp.remove(blockSize);
188        if (tmp.isEmpty()) full = true;
189      }
190    }
191
192    for (Integer blockSize : BLOCKSIZES) {
193      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
194      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
195      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
196    }
197
198    for (long offset : allocations) {
199      assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset));
200    }
201    assertEquals(0, mAllocator.getUsedSize());
202  }
203
204  @Test
205  public void testCacheSimple() throws Exception {
206    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
207  }
208
209  @Test
210  public void testCacheMultiThreadedSingleKey() throws Exception {
211    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
212  }
213
214  @Test
215  public void testHeapSizeChanges() throws Exception {
216    cache.stopWriterThreads();
217    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
218  }
219
220  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
221      throws InterruptedException {
222    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
223      Thread.sleep(100);
224    }
225    Thread.sleep(1000);
226  }
227
228  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
229    while (!cache.ramCache.isEmpty()) {
230      Thread.sleep(100);
231    }
232    Thread.sleep(1000);
233  }
234
235  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
236  // threads will flush it to the bucket and put reference entry in backingMap.
237  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
238      Cacheable block) throws InterruptedException {
239    cache.cacheBlock(cacheKey, block);
240    waitUntilFlushedToBucket(cache, cacheKey);
241  }
242
243  @Test
244  public void testMemoryLeak() throws Exception {
245    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
246    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
247      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
248    long lockId = cache.backingMap.get(cacheKey).offset();
249    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
250    lock.writeLock().lock();
251    Thread evictThread = new Thread("evict-block") {
252      @Override
253      public void run() {
254        cache.evictBlock(cacheKey);
255      }
256    };
257    evictThread.start();
258    cache.offsetLock.waitForWaiters(lockId, 1);
259    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
260    assertEquals(0, cache.getBlockCount());
261    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
262      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
263    assertEquals(1, cache.getBlockCount());
264    lock.writeLock().unlock();
265    evictThread.join();
266    /**
267     * <pre>
268     * The asserts here before HBASE-21957 are:
269     * assertEquals(1L, cache.getBlockCount());
270     * assertTrue(cache.getCurrentSize() > 0L);
271     * assertTrue("We should have a block!", cache.iterator().hasNext());
272     *
273     * The asserts here after HBASE-21957 are:
274     * assertEquals(0, cache.getBlockCount());
275     * assertEquals(cache.getCurrentSize(), 0L);
276     *
277     * I think the asserts before HBASE-21957 is more reasonable,because
278     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
279     * it had seen, and newly added Block after the {@link BucketEntry}
280     * it had seen should not be evicted.
281     * </pre>
282     */
283    assertEquals(1L, cache.getBlockCount());
284    assertTrue(cache.getCurrentSize() > 0L);
285    assertTrue("We should have a block!", cache.iterator().hasNext());
286  }
287
288  @Test
289  public void testRetrieveFromFile() throws Exception {
290    Path testDir = createAndGetTestDir();
291    String ioEngineName = "file:" + testDir + "/bucket.cache";
292    testRetrievalUtils(testDir, ioEngineName);
293    int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
294    String persistencePath = testDir + "/bucket.persistence";
295    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
296            smallBucketSizes, writeThreads, writerQLen, persistencePath);
297    assertFalse(new File(persistencePath).exists());
298    assertEquals(0, bucketCache.getAllocator().getUsedSize());
299    assertEquals(0, bucketCache.backingMap.size());
300    HBASE_TESTING_UTILITY.cleanupTestDir();
301  }
302
303  @Test
304  public void testRetrieveFromMMap() throws Exception {
305    final Path testDir = createAndGetTestDir();
306    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
307    testRetrievalUtils(testDir, ioEngineName);
308  }
309
310  @Test
311  public void testRetrieveFromPMem() throws Exception {
312    final Path testDir = createAndGetTestDir();
313    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
314    testRetrievalUtils(testDir, ioEngineName);
315    int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
316    String persistencePath = testDir + "/bucket.persistence";
317    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
318            smallBucketSizes, writeThreads, writerQLen, persistencePath);
319    assertFalse(new File(persistencePath).exists());
320    assertEquals(0, bucketCache.getAllocator().getUsedSize());
321    assertEquals(0, bucketCache.backingMap.size());
322    HBASE_TESTING_UTILITY.cleanupTestDir();
323  }
324
325  private void testRetrievalUtils(Path testDir, String ioEngineName)
326          throws IOException, InterruptedException {
327    final String persistencePath = testDir + "/bucket.persistence";
328    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
329            constructedBlockSizes, writeThreads, writerQLen, persistencePath);
330    try {
331      long usedSize = bucketCache.getAllocator().getUsedSize();
332      assertEquals(0, usedSize);
333      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
334      for (HFileBlockPair block : blocks) {
335        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
336      }
337      for (HFileBlockPair block : blocks) {
338        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
339      }
340      usedSize = bucketCache.getAllocator().getUsedSize();
341      assertNotEquals(0, usedSize);
342      bucketCache.shutdown();
343      assertTrue(new File(persistencePath).exists());
344      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
345              constructedBlockSizes, writeThreads, writerQLen, persistencePath);
346      assertFalse(new File(persistencePath).exists());
347      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
348    } finally {
349      bucketCache.shutdown();
350    }
351    assertTrue(new File(persistencePath).exists());
352  }
353
354  @Test
355  public void testRetrieveUnsupportedIOE() throws Exception {
356    try {
357      final Path testDir = createAndGetTestDir();
358      final String ioEngineName = testDir + "/bucket.cache";
359      testRetrievalUtils(testDir, ioEngineName);
360      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
361    } catch (IllegalArgumentException e) {
362      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " +
363              "files:, mmap: or offheap", e.getMessage());
364    }
365  }
366
367  @Test
368  public void testRetrieveFromMultipleFiles() throws Exception {
369    final Path testDirInitial = createAndGetTestDir();
370    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
371    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
372    String ioEngineName = new StringBuilder("files:").append(testDirInitial)
373            .append("/bucket1.cache").append(FileIOEngine.FILE_DELIMITER).append(newTestDir)
374            .append("/bucket2.cache").toString();
375    testRetrievalUtils(testDirInitial, ioEngineName);
376    int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
377    String persistencePath = testDirInitial + "/bucket.persistence";
378    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
379            smallBucketSizes, writeThreads, writerQLen, persistencePath);
380    assertFalse(new File(persistencePath).exists());
381    assertEquals(0, bucketCache.getAllocator().getUsedSize());
382    assertEquals(0, bucketCache.backingMap.size());
383    HBASE_TESTING_UTILITY.cleanupTestDir();
384  }
385
386  @Test
387  public void testRetrieveFromFileWithoutPersistence() throws Exception {
388    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
389            constructedBlockSizes, writeThreads, writerQLen, null);
390    try {
391      final Path testDir = createAndGetTestDir();
392      String ioEngineName = "file:" + testDir + "/bucket.cache";
393      long usedSize = bucketCache.getAllocator().getUsedSize();
394      assertEquals(0, usedSize);
395      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
396      for (HFileBlockPair block : blocks) {
397        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
398      }
399      for (HFileBlockPair block : blocks) {
400        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
401      }
402      usedSize = bucketCache.getAllocator().getUsedSize();
403      assertNotEquals(0, usedSize);
404      bucketCache.shutdown();
405      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
406              constructedBlockSizes, writeThreads, writerQLen, null);
407      assertEquals(0, bucketCache.getAllocator().getUsedSize());
408    } finally {
409      bucketCache.shutdown();
410      HBASE_TESTING_UTILITY.cleanupTestDir();
411    }
412  }
413
414  @Test
415  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
416    long availableSpace = 20 * 1024L * 1024 * 1024;
417    int[] bucketSizes = new int[]{1024, 1024 * 1024, 1024 * 1024 * 1024};
418    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
419    assertTrue(allocator.getBuckets().length > 0);
420  }
421
422  @Test
423  public void testGetPartitionSize() throws IOException {
424    //Test default values
425    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, BucketCache.DEFAULT_MIN_FACTOR);
426
427    Configuration conf = HBaseConfiguration.create();
428    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
429    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
430    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
431    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
432
433    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
434        constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
435
436    validateGetPartitionSize(cache, 0.1f, 0.5f);
437    validateGetPartitionSize(cache, 0.7f, 0.5f);
438    validateGetPartitionSize(cache, 0.2f, 0.5f);
439  }
440
441  @Test
442  public void testCacheSizeCapacity() throws IOException {
443    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
444    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
445            BucketCache.DEFAULT_MIN_FACTOR);
446    Configuration conf = HBaseConfiguration.create();
447    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
448    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
449    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
450    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
451    try {
452      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
453              writerQLen, null, 100, conf);
454      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
455    } catch (IllegalArgumentException e) {
456      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
457    }
458  }
459
460  @Test
461  public void testValidBucketCacheConfigs() throws IOException {
462    Configuration conf = HBaseConfiguration.create();
463    conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
464    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
465    conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
466    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
467    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
468    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
469
470    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
471        constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
472
473    assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
474        cache.getAcceptableFactor(), 0);
475    assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
476        cache.getMinFactor(), 0);
477    assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
478        cache.getExtraFreeFactor(), 0);
479    assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f,
480        cache.getSingleFactor(), 0);
481    assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f,
482        cache.getMultiFactor(), 0);
483    assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f,
484        cache.getMemoryFactor(), 0);
485  }
486
487  @Test
488  public void testInvalidAcceptFactorConfig() throws IOException {
489    float[] configValues = {-1f, 0.2f, 0.86f, 1.05f};
490    boolean[] expectedOutcomes = {false, false, true, false};
491    Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues);
492    Configuration conf = HBaseConfiguration.create();
493    checkConfigValues(conf, configMappings, expectedOutcomes);
494  }
495
496  @Test
497  public void testInvalidMinFactorConfig() throws IOException {
498    float[] configValues = {-1f, 0f, 0.96f, 1.05f};
499    //throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
500    boolean[] expectedOutcomes = {false, true, false, false};
501    Map<String, float[]> configMappings = ImmutableMap
502      .of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues);
503    Configuration conf = HBaseConfiguration.create();
504    checkConfigValues(conf, configMappings, expectedOutcomes);
505  }
506
507  @Test
508  public void testInvalidExtraFreeFactorConfig() throws IOException {
509    float[] configValues = {-1f, 0f, 0.2f, 1.05f};
510    //throws due to <0, in expected range, in expected range, config can be > 1.0
511    boolean[] expectedOutcomes = {false, true, true, true};
512    Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
513    Configuration conf = HBaseConfiguration.create();
514    checkConfigValues(conf, configMappings, expectedOutcomes);
515  }
516
517  @Test
518  public void testInvalidCacheSplitFactorConfig() throws IOException {
519    float[] singleFactorConfigValues = {0.2f, 0f, -0.2f, 1f};
520    float[] multiFactorConfigValues = {0.4f, 0f, 1f, .05f};
521    float[] memoryFactorConfigValues = {0.4f, 0f, 0.2f, .5f};
522    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't be negative, configs don't add to 1.0
523    boolean[] expectedOutcomes = {true, false, false, false};
524    Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME,
525        singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues,
526        BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues);
527    Configuration conf = HBaseConfiguration.create();
528    checkConfigValues(conf, configMappings, expectedOutcomes);
529  }
530
531  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, boolean[] expectSuccess) throws IOException {
532    Set<String> configNames = configMap.keySet();
533    for (int i = 0; i < expectSuccess.length; i++) {
534      try {
535        for (String configName : configNames) {
536          conf.setFloat(configName, configMap.get(configName)[i]);
537        }
538        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
539            constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
540        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
541      } catch (IllegalArgumentException e) {
542        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
543      }
544    }
545  }
546
547  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, float minFactor) {
548    long expectedOutput = (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
549    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
550  }
551
552  @Test
553  public void testOffsetProducesPositiveOutput() {
554    // This number is picked because it produces negative output if the values isn't ensured to be
555    // positive. See HBASE-18757 for more information.
556    long testValue = 549888460800L;
557    BucketEntry bucketEntry =
558        new BucketEntry(testValue, 10, 10L, true, (entry) -> {
559          return ByteBuffAllocator.NONE;
560        }, ByteBuffAllocator.HEAP);
561    assertEquals(testValue, bucketEntry.offset());
562  }
563
564  @Test
565  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
566    int size = 100;
567    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
568    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
569    HFileContext meta = new HFileContextBuilder().build();
570    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
571    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
572        ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
573    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
574        ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
575
576    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
577    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
578    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
579    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
580    blockWithNextBlockMetadata.serialize(block1Buffer, true);
581    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
582
583    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
584    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
585      block1Buffer);
586
587    waitUntilFlushedToBucket(cache, key);
588    assertNotNull(cache.backingMap.get(key));
589    assertEquals(1, cache.backingMap.get(key).refCnt());
590    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
591    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
592
593    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
594    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
595      block1Buffer);
596    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
597    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
598    assertEquals(1, cache.backingMap.get(key).refCnt());
599
600    // Clear and add blockWithoutNextBlockMetadata
601    assertTrue(cache.evictBlock(key));
602    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
603    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
604
605    assertNull(cache.getBlock(key, false, false, false));
606    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
607      block2Buffer);
608
609    waitUntilFlushedToBucket(cache, key);
610    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
611    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
612
613    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
614    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
615      block1Buffer);
616
617    waitUntilFlushedToBucket(cache, key);
618    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
619    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
620  }
621
622  @Test
623  public void testRAMCache() {
624    int size = 100;
625    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
626    byte[] byteArr = new byte[length];
627    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
628    HFileContext meta = new HFileContextBuilder().build();
629
630    RAMCache cache = new RAMCache();
631    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
632    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
633    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
634        HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
635    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
636        HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
637    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
638    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
639
640    assertFalse(cache.containsKey(key1));
641    assertNull(cache.putIfAbsent(key1, re1));
642    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
643
644    assertNotNull(cache.putIfAbsent(key1, re2));
645    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
646    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
647
648    assertNull(cache.putIfAbsent(key2, re2));
649    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
650    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
651
652    cache.remove(key1);
653    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
654    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
655
656    cache.clear();
657    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
658    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
659  }
660
661  @Test
662  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
663    // initialize an block.
664    int size = 100, offset = 20;
665    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
666    ByteBuffer buf = ByteBuffer.allocate(length);
667    HFileContext meta = new HFileContextBuilder().build();
668    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
669        HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
670
671    // initialize an mocked ioengine.
672    IOEngine ioEngine = Mockito.mock(IOEngine.class);
673    Mockito.when(ioEngine.usesSharedMemory()).thenReturn(false);
674    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
675    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
676      Mockito.anyLong());
677    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
678      Mockito.anyLong());
679
680    // create an bucket allocator.
681    long availableSpace = 1024 * 1024 * 1024L;
682    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
683
684    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
685    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
686
687    Assert.assertEquals(0, allocator.getUsedSize());
688    try {
689      re.writeToCache(ioEngine, allocator, null, null);
690      Assert.fail();
691    } catch (Exception e) {
692    }
693    Assert.assertEquals(0, allocator.getUsedSize());
694  }
695
696  /**
697   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
698   * could not be freed even if corresponding {@link HFileBlock} is evicted from
699   * {@link BucketCache}.
700   */
701  @Test
702  public void testFreeBucketEntryRestoredFromFile() throws Exception {
703    try {
704      final Path dataTestDir = createAndGetTestDir();
705
706      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
707      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
708
709      BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
710          constructedBlockSizes, writeThreads, writerQLen, persistencePath);
711      long usedByteSize = bucketCache.getAllocator().getUsedSize();
712      assertEquals(0, usedByteSize);
713
714      HFileBlockPair[] hfileBlockPairs =
715          CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
716      // Add blocks
717      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
718        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
719      }
720
721      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
722        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
723          hfileBlockPair.getBlock());
724      }
725      usedByteSize = bucketCache.getAllocator().getUsedSize();
726      assertNotEquals(0, usedByteSize);
727      // persist cache to file
728      bucketCache.shutdown();
729      assertTrue(new File(persistencePath).exists());
730
731      // restore cache from file
732      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
733          constructedBlockSizes, writeThreads, writerQLen, persistencePath);
734      assertFalse(new File(persistencePath).exists());
735      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
736
737      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
738        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
739        bucketCache.evictBlock(blockCacheKey);
740      }
741      assertEquals(0, bucketCache.getAllocator().getUsedSize());
742      assertEquals(0, bucketCache.backingMap.size());
743    } finally {
744      HBASE_TESTING_UTILITY.cleanupTestDir();
745    }
746  }
747
748}