001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
026import static org.junit.Assert.assertEquals;
027import static org.junit.Assert.assertFalse;
028import static org.junit.Assert.assertNotEquals;
029import static org.junit.Assert.assertNotNull;
030import static org.junit.Assert.assertNull;
031import static org.junit.Assert.assertTrue;
032import static org.mockito.Mockito.mock;
033import static org.mockito.Mockito.when;
034
035import java.io.File;
036import java.io.IOException;
037import java.nio.ByteBuffer;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.Collection;
041import java.util.HashMap;
042import java.util.List;
043import java.util.Map;
044import java.util.Set;
045import java.util.concurrent.ThreadLocalRandom;
046import java.util.concurrent.locks.ReentrantReadWriteLock;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseConfiguration;
051import org.apache.hadoop.hbase.HBaseTestingUtil;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.Waiter;
054import org.apache.hadoop.hbase.io.ByteBuffAllocator;
055import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
056import org.apache.hadoop.hbase.io.hfile.BlockType;
057import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
058import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
059import org.apache.hadoop.hbase.io.hfile.Cacheable;
060import org.apache.hadoop.hbase.io.hfile.HFileBlock;
061import org.apache.hadoop.hbase.io.hfile.HFileContext;
062import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
063import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
064import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
065import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
066import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
067import org.apache.hadoop.hbase.nio.ByteBuff;
068import org.apache.hadoop.hbase.regionserver.HRegion;
069import org.apache.hadoop.hbase.regionserver.HStore;
070import org.apache.hadoop.hbase.regionserver.HStoreFile;
071import org.apache.hadoop.hbase.testclassification.IOTests;
072import org.apache.hadoop.hbase.testclassification.LargeTests;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
074import org.apache.hadoop.hbase.util.Pair;
075import org.apache.hadoop.hbase.util.Threads;
076import org.junit.After;
077import org.junit.Assert;
078import org.junit.Before;
079import org.junit.ClassRule;
080import org.junit.Test;
081import org.junit.experimental.categories.Category;
082import org.junit.runner.RunWith;
083import org.junit.runners.Parameterized;
084import org.mockito.Mockito;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
089
090/**
091 * Basic test of BucketCache.Puts and gets.
092 * <p>
093 * Tests will ensure that blocks' data correctness under several threads concurrency
094 */
095@RunWith(Parameterized.class)
096@Category({ IOTests.class, LargeTests.class })
097public class TestBucketCache {
098
099  private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class);
100
101  @ClassRule
102  public static final HBaseClassTestRule CLASS_RULE =
103    HBaseClassTestRule.forClass(TestBucketCache.class);
104
105  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
106  public static Iterable<Object[]> data() {
107    return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize
108                                                          // for these tests?
109      { 16 * 1024,
110        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
111          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
112          128 * 1024 + 1024 } } });
113  }
114
115  @Parameterized.Parameter(0)
116  public int constructedBlockSize;
117
118  @Parameterized.Parameter(1)
119  public int[] constructedBlockSizes;
120
121  BucketCache cache;
122  final int CACHE_SIZE = 1000000;
123  final int NUM_BLOCKS = 100;
124  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
125  final int NUM_THREADS = 100;
126  final int NUM_QUERIES = 10000;
127
128  final long capacitySize = 32 * 1024 * 1024;
129  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
130  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
131  private String ioEngineName = "offheap";
132
133  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
134
135  private static class MockedBucketCache extends BucketCache {
136
137    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
138      int writerThreads, int writerQLen, String persistencePath) throws IOException {
139      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
140        persistencePath);
141    }
142
143    @Override
144    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
145      super.cacheBlock(cacheKey, buf, inMemory);
146    }
147
148    @Override
149    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
150      super.cacheBlock(cacheKey, buf);
151    }
152  }
153
154  @Before
155  public void setup() throws IOException {
156    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
157      constructedBlockSizes, writeThreads, writerQLen, null);
158  }
159
160  @After
161  public void tearDown() {
162    cache.shutdown();
163  }
164
165  /**
166   * Test Utility to create test dir and return name
167   * @return return name of created dir
168   * @throws IOException throws IOException
169   */
170  private Path createAndGetTestDir() throws IOException {
171    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
172    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
173    return testDir;
174  }
175
176  /**
177   * Return a random element from {@code a}.
178   */
179  private static <T> T randFrom(List<T> a) {
180    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
181  }
182
183  @Test
184  public void testBucketAllocator() throws BucketAllocatorException {
185    BucketAllocator mAllocator = cache.getAllocator();
186    /*
187     * Test the allocator first
188     */
189    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
190
191    boolean full = false;
192    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
193    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
194    // the cache is completely filled.
195    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
196    while (!full) {
197      Integer blockSize = null;
198      try {
199        blockSize = randFrom(tmp);
200        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
201      } catch (CacheFullException cfe) {
202        tmp.remove(blockSize);
203        if (tmp.isEmpty()) full = true;
204      }
205    }
206
207    for (Integer blockSize : BLOCKSIZES) {
208      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
209      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
210      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
211
212      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
213      // additional 1024 on top of that so this counts towards fragmentation in our test
214      // real life may have worse fragmentation because blocks may not be perfectly sized to block
215      // size, given encoding/compression and large rows
216      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
217    }
218
219    mAllocator.logDebugStatistics();
220
221    for (Pair<Long, Integer> allocation : allocations) {
222      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
223        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
224    }
225    assertEquals(0, mAllocator.getUsedSize());
226  }
227
228  @Test
229  public void testCacheSimple() throws Exception {
230    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
231  }
232
233  @Test
234  public void testCacheMultiThreadedSingleKey() throws Exception {
235    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
236  }
237
238  @Test
239  public void testHeapSizeChanges() throws Exception {
240    cache.stopWriterThreads();
241    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
242  }
243
244  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
245    throws InterruptedException {
246    Waiter.waitFor(HBaseConfiguration.create(), 10000,
247      () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)));
248  }
249
250  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
251    while (!cache.ramCache.isEmpty()) {
252      Thread.sleep(100);
253    }
254    Thread.sleep(1000);
255  }
256
257  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
258  // threads will flush it to the bucket and put reference entry in backingMap.
259  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
260    Cacheable block, boolean waitWhenCache) throws InterruptedException {
261    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
262    waitUntilFlushedToBucket(cache, cacheKey);
263  }
264
265  @Test
266  public void testMemoryLeak() throws Exception {
267    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
268    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
269      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
270    long lockId = cache.backingMap.get(cacheKey).offset();
271    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
272    lock.writeLock().lock();
273    Thread evictThread = new Thread("evict-block") {
274      @Override
275      public void run() {
276        cache.evictBlock(cacheKey);
277      }
278    };
279    evictThread.start();
280    cache.offsetLock.waitForWaiters(lockId, 1);
281    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
282    assertEquals(0, cache.getBlockCount());
283    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
284      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
285    assertEquals(1, cache.getBlockCount());
286    lock.writeLock().unlock();
287    evictThread.join();
288    /**
289     * <pre>
290     * The asserts here before HBASE-21957 are:
291     * assertEquals(1L, cache.getBlockCount());
292     * assertTrue(cache.getCurrentSize() > 0L);
293     * assertTrue("We should have a block!", cache.iterator().hasNext());
294     *
295     * The asserts here after HBASE-21957 are:
296     * assertEquals(0, cache.getBlockCount());
297     * assertEquals(cache.getCurrentSize(), 0L);
298     *
299     * I think the asserts before HBASE-21957 is more reasonable,because
300     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
301     * it had seen, and newly added Block after the {@link BucketEntry}
302     * it had seen should not be evicted.
303     * </pre>
304     */
305    assertEquals(1L, cache.getBlockCount());
306    assertTrue(cache.getCurrentSize() > 0L);
307    assertTrue("We should have a block!", cache.iterator().hasNext());
308  }
309
310  @Test
311  public void testRetrieveFromFile() throws Exception {
312    Path testDir = createAndGetTestDir();
313    String ioEngineName = "file:" + testDir + "/bucket.cache";
314    testRetrievalUtils(testDir, ioEngineName);
315    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
316    String persistencePath = testDir + "/bucket.persistence";
317    BucketCache bucketCache = null;
318    try {
319      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
320        smallBucketSizes, writeThreads, writerQLen, persistencePath);
321      assertTrue(bucketCache.waitForCacheInitialization(10000));
322      assertFalse(new File(persistencePath).exists());
323      assertEquals(0, bucketCache.getAllocator().getUsedSize());
324      assertEquals(0, bucketCache.backingMap.size());
325    } finally {
326      bucketCache.shutdown();
327      HBASE_TESTING_UTILITY.cleanupTestDir();
328    }
329  }
330
331  @Test
332  public void testRetrieveFromMMap() throws Exception {
333    final Path testDir = createAndGetTestDir();
334    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
335    testRetrievalUtils(testDir, ioEngineName);
336  }
337
338  @Test
339  public void testRetrieveFromPMem() throws Exception {
340    final Path testDir = createAndGetTestDir();
341    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
342    testRetrievalUtils(testDir, ioEngineName);
343    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
344    String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
345    BucketCache bucketCache = null;
346    try {
347      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
348        smallBucketSizes, writeThreads, writerQLen, persistencePath);
349      assertTrue(bucketCache.waitForCacheInitialization(10000));
350      assertFalse(new File(persistencePath).exists());
351      assertEquals(0, bucketCache.getAllocator().getUsedSize());
352      assertEquals(0, bucketCache.backingMap.size());
353    } finally {
354      bucketCache.shutdown();
355      HBASE_TESTING_UTILITY.cleanupTestDir();
356    }
357  }
358
359  private void testRetrievalUtils(Path testDir, String ioEngineName)
360    throws IOException, InterruptedException {
361    final String persistencePath =
362      testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
363    BucketCache bucketCache = null;
364    try {
365      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
366        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
367      assertTrue(bucketCache.waitForCacheInitialization(10000));
368      long usedSize = bucketCache.getAllocator().getUsedSize();
369      assertEquals(0, usedSize);
370      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
371      for (HFileBlockPair block : blocks) {
372        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
373      }
374      for (HFileBlockPair block : blocks) {
375        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
376          false);
377      }
378      usedSize = bucketCache.getAllocator().getUsedSize();
379      assertNotEquals(0, usedSize);
380      bucketCache.shutdown();
381      assertTrue(new File(persistencePath).exists());
382      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
383        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
384      assertTrue(bucketCache.waitForCacheInitialization(10000));
385
386      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
387    } finally {
388      if (bucketCache != null) {
389        bucketCache.shutdown();
390      }
391    }
392    assertTrue(new File(persistencePath).exists());
393  }
394
395  @Test
396  public void testRetrieveUnsupportedIOE() throws Exception {
397    try {
398      final Path testDir = createAndGetTestDir();
399      final String ioEngineName = testDir + "/bucket.cache";
400      testRetrievalUtils(testDir, ioEngineName);
401      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
402    } catch (IllegalArgumentException e) {
403      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, "
404        + "files:, mmap: or offheap", e.getMessage());
405    }
406  }
407
408  @Test
409  public void testRetrieveFromMultipleFiles() throws Exception {
410    final Path testDirInitial = createAndGetTestDir();
411    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
412    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
413    String ioEngineName =
414      new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache")
415        .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString();
416    testRetrievalUtils(testDirInitial, ioEngineName);
417    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
418    String persistencePath = testDirInitial + "/bucket.persistence";
419    BucketCache bucketCache = null;
420    try {
421      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
422        smallBucketSizes, writeThreads, writerQLen, persistencePath);
423      assertTrue(bucketCache.waitForCacheInitialization(10000));
424      assertFalse(new File(persistencePath).exists());
425      assertEquals(0, bucketCache.getAllocator().getUsedSize());
426      assertEquals(0, bucketCache.backingMap.size());
427    } finally {
428      bucketCache.shutdown();
429      HBASE_TESTING_UTILITY.cleanupTestDir();
430    }
431  }
432
433  @Test
434  public void testRetrieveFromFileWithoutPersistence() throws Exception {
435    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
436      constructedBlockSizes, writeThreads, writerQLen, null);
437    assertTrue(bucketCache.waitForCacheInitialization(10000));
438    try {
439      final Path testDir = createAndGetTestDir();
440      String ioEngineName = "file:" + testDir + "/bucket.cache";
441      long usedSize = bucketCache.getAllocator().getUsedSize();
442      assertEquals(0, usedSize);
443      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
444      for (HFileBlockPair block : blocks) {
445        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
446      }
447      for (HFileBlockPair block : blocks) {
448        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
449          false);
450      }
451      usedSize = bucketCache.getAllocator().getUsedSize();
452      assertNotEquals(0, usedSize);
453      bucketCache.shutdown();
454      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
455        constructedBlockSizes, writeThreads, writerQLen, null);
456      assertTrue(bucketCache.waitForCacheInitialization(10000));
457      assertEquals(0, bucketCache.getAllocator().getUsedSize());
458    } finally {
459      bucketCache.shutdown();
460      HBASE_TESTING_UTILITY.cleanupTestDir();
461    }
462  }
463
464  @Test
465  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
466    long availableSpace = 20 * 1024L * 1024 * 1024;
467    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
468    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
469    assertTrue(allocator.getBuckets().length > 0);
470  }
471
472  @Test
473  public void testGetPartitionSize() throws IOException {
474    // Test default values
475    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
476      BucketCache.DEFAULT_MIN_FACTOR);
477
478    Configuration conf = HBaseConfiguration.create();
479    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
480    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
481    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
482    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
483
484    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
485      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
486    assertTrue(cache.waitForCacheInitialization(10000));
487
488    validateGetPartitionSize(cache, 0.1f, 0.5f);
489    validateGetPartitionSize(cache, 0.7f, 0.5f);
490    validateGetPartitionSize(cache, 0.2f, 0.5f);
491  }
492
493  @Test
494  public void testCacheSizeCapacity() throws IOException {
495    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
496    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
497      BucketCache.DEFAULT_MIN_FACTOR);
498    Configuration conf = HBaseConfiguration.create();
499    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
500    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
501    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
502    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
503    try {
504      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
505        writerQLen, null, 100, conf);
506      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
507    } catch (IllegalArgumentException e) {
508      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
509    }
510  }
511
512  @Test
513  public void testValidBucketCacheConfigs() throws IOException {
514    Configuration conf = HBaseConfiguration.create();
515    conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
516    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
517    conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
518    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
519    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
520    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
521
522    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
523      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
524    assertTrue(cache.waitForCacheInitialization(10000));
525
526    assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
527      cache.getAcceptableFactor(), 0);
528    assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0);
529    assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
530      cache.getExtraFreeFactor(), 0);
531    assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f,
532      cache.getSingleFactor(), 0);
533    assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f,
534      cache.getMultiFactor(), 0);
535    assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f,
536      cache.getMemoryFactor(), 0);
537  }
538
539  @Test
540  public void testInvalidAcceptFactorConfig() throws IOException {
541    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
542    boolean[] expectedOutcomes = { false, false, true, false };
543    Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues);
544    Configuration conf = HBaseConfiguration.create();
545    checkConfigValues(conf, configMappings, expectedOutcomes);
546  }
547
548  @Test
549  public void testInvalidMinFactorConfig() throws IOException {
550    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
551    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
552    boolean[] expectedOutcomes = { false, true, false, false };
553    Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues);
554    Configuration conf = HBaseConfiguration.create();
555    checkConfigValues(conf, configMappings, expectedOutcomes);
556  }
557
558  @Test
559  public void testInvalidExtraFreeFactorConfig() throws IOException {
560    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
561    // throws due to <0, in expected range, in expected range, config can be > 1.0
562    boolean[] expectedOutcomes = { false, true, true, true };
563    Map<String, float[]> configMappings =
564      ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
565    Configuration conf = HBaseConfiguration.create();
566    checkConfigValues(conf, configMappings, expectedOutcomes);
567  }
568
569  @Test
570  public void testInvalidCacheSplitFactorConfig() throws IOException {
571    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
572    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
573    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
574    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
575    // be negative, configs don't add to 1.0
576    boolean[] expectedOutcomes = { true, false, false, false };
577    Map<String,
578      float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME,
579        singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues,
580        BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues);
581    Configuration conf = HBaseConfiguration.create();
582    checkConfigValues(conf, configMappings, expectedOutcomes);
583  }
584
585  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
586    boolean[] expectSuccess) throws IOException {
587    Set<String> configNames = configMap.keySet();
588    for (int i = 0; i < expectSuccess.length; i++) {
589      try {
590        for (String configName : configNames) {
591          conf.setFloat(configName, configMap.get(configName)[i]);
592        }
593        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
594          constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
595        assertTrue(cache.waitForCacheInitialization(10000));
596        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i]
597          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
598      } catch (IllegalArgumentException e) {
599        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i]
600          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
601      }
602    }
603  }
604
605  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
606    float minFactor) {
607    long expectedOutput =
608      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
609    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
610  }
611
612  @Test
613  public void testOffsetProducesPositiveOutput() {
614    // This number is picked because it produces negative output if the values isn't ensured to be
615    // positive. See HBASE-18757 for more information.
616    long testValue = 549888460800L;
617    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> {
618      return ByteBuffAllocator.NONE;
619    }, ByteBuffAllocator.HEAP);
620    assertEquals(testValue, bucketEntry.offset());
621  }
622
623  @Test
624  public void testEvictionCount() throws InterruptedException {
625    int size = 100;
626    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
627    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
628    HFileContext meta = new HFileContextBuilder().build();
629    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
630    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
631      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
632    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
633      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
634
635    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
636    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
637    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
638    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
639    blockWithNextBlockMetadata.serialize(block1Buffer, true);
640    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
641
642    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
643    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
644      block1Buffer);
645
646    waitUntilFlushedToBucket(cache, key);
647
648    assertEquals(0, cache.getStats().getEvictionCount());
649
650    // evict call should return 1, but then eviction count be 0
651    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
652    assertEquals(0, cache.getStats().getEvictionCount());
653
654    // add back
655    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
656      block1Buffer);
657    waitUntilFlushedToBucket(cache, key);
658
659    // should not increment
660    assertTrue(cache.evictBlock(key));
661    assertEquals(0, cache.getStats().getEvictionCount());
662
663    // add back
664    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
665      block1Buffer);
666    waitUntilFlushedToBucket(cache, key);
667
668    // should finally increment eviction count
669    cache.freeSpace("testing");
670    assertEquals(1, cache.getStats().getEvictionCount());
671  }
672
673  @Test
674  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
675    int size = 100;
676    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
677    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
678    HFileContext meta = new HFileContextBuilder().build();
679    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
680    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
681      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
682    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
683      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
684
685    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
686    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
687    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
688    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
689    blockWithNextBlockMetadata.serialize(block1Buffer, true);
690    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
691
692    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
693    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
694      block1Buffer);
695
696    waitUntilFlushedToBucket(cache, key);
697    assertNotNull(cache.backingMap.get(key));
698    assertEquals(1, cache.backingMap.get(key).refCnt());
699    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
700    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
701
702    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
703    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
704      block1Buffer);
705    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
706    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
707    assertEquals(1, cache.backingMap.get(key).refCnt());
708
709    // Clear and add blockWithoutNextBlockMetadata
710    assertTrue(cache.evictBlock(key));
711    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
712    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
713
714    assertNull(cache.getBlock(key, false, false, false));
715    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
716      block2Buffer);
717
718    waitUntilFlushedToBucket(cache, key);
719    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
720    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
721
722    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
723    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
724      block1Buffer);
725
726    waitUntilFlushedToBucket(cache, key);
727    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
728    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
729  }
730
731  @Test
732  public void testRAMCache() {
733    int size = 100;
734    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
735    byte[] byteArr = new byte[length];
736    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
737    HFileContext meta = new HFileContextBuilder().build();
738
739    RAMCache cache = new RAMCache();
740    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
741    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
742    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
743      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
744    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
745      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
746    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false);
747    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false);
748
749    assertFalse(cache.containsKey(key1));
750    assertNull(cache.putIfAbsent(key1, re1));
751    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
752
753    assertNotNull(cache.putIfAbsent(key1, re2));
754    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
755    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
756
757    assertNull(cache.putIfAbsent(key2, re2));
758    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
759    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
760
761    cache.remove(key1);
762    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
763    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
764
765    cache.clear();
766    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
767    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
768  }
769
770  @Test
771  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
772    // initialize an block.
773    int size = 100, offset = 20;
774    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
775    ByteBuffer buf = ByteBuffer.allocate(length);
776    HFileContext meta = new HFileContextBuilder().build();
777    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
778      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
779
780    // initialize an mocked ioengine.
781    IOEngine ioEngine = Mockito.mock(IOEngine.class);
782    when(ioEngine.usesSharedMemory()).thenReturn(false);
783    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
784    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
785      Mockito.anyLong());
786    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
787      Mockito.anyLong());
788
789    // create an bucket allocator.
790    long availableSpace = 1024 * 1024 * 1024L;
791    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
792
793    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
794    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false);
795
796    Assert.assertEquals(0, allocator.getUsedSize());
797    try {
798      re.writeToCache(ioEngine, allocator, null, null,
799        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
800      Assert.fail();
801    } catch (Exception e) {
802    }
803    Assert.assertEquals(0, allocator.getUsedSize());
804  }
805
806  /**
807   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
808   * could not be freed even if corresponding {@link HFileBlock} is evicted from
809   * {@link BucketCache}.
810   */
811  @Test
812  public void testFreeBucketEntryRestoredFromFile() throws Exception {
813    BucketCache bucketCache = null;
814    try {
815      final Path dataTestDir = createAndGetTestDir();
816
817      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
818      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
819
820      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
821        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
822      assertTrue(bucketCache.waitForCacheInitialization(10000));
823      long usedByteSize = bucketCache.getAllocator().getUsedSize();
824      assertEquals(0, usedByteSize);
825
826      HFileBlockPair[] hfileBlockPairs =
827        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
828      // Add blocks
829      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
830        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
831      }
832
833      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
834        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
835          hfileBlockPair.getBlock(), false);
836      }
837      usedByteSize = bucketCache.getAllocator().getUsedSize();
838      assertNotEquals(0, usedByteSize);
839      // persist cache to file
840      bucketCache.shutdown();
841      assertTrue(new File(persistencePath).exists());
842
843      // restore cache from file
844      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
845        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
846      assertTrue(bucketCache.waitForCacheInitialization(10000));
847      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
848
849      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
850        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
851        bucketCache.evictBlock(blockCacheKey);
852      }
853      assertEquals(0, bucketCache.getAllocator().getUsedSize());
854      assertEquals(0, bucketCache.backingMap.size());
855    } finally {
856      bucketCache.shutdown();
857      HBASE_TESTING_UTILITY.cleanupTestDir();
858    }
859  }
860
861  @Test
862  public void testBlockAdditionWaitWhenCache() throws Exception {
863    BucketCache bucketCache = null;
864    try {
865      final Path dataTestDir = createAndGetTestDir();
866
867      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
868      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
869
870      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
871      config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000);
872
873      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
874        constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config);
875      assertTrue(bucketCache.waitForCacheInitialization(10000));
876      long usedByteSize = bucketCache.getAllocator().getUsedSize();
877      assertEquals(0, usedByteSize);
878
879      HFileBlockPair[] hfileBlockPairs =
880        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
881      // Add blocks
882      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
883        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
884          true);
885      }
886
887      // Max wait for 10 seconds.
888      long timeout = 10000;
889      // Wait for blocks size to match the number of blocks.
890      while (bucketCache.backingMap.size() != 10) {
891        if (timeout <= 0) break;
892        Threads.sleep(100);
893        timeout -= 100;
894      }
895      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
896        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
897      }
898      usedByteSize = bucketCache.getAllocator().getUsedSize();
899      assertNotEquals(0, usedByteSize);
900      // persist cache to file
901      bucketCache.shutdown();
902      assertTrue(new File(persistencePath).exists());
903
904      // restore cache from file
905      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
906        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
907      assertTrue(bucketCache.waitForCacheInitialization(10000));
908      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
909
910      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
911        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
912        bucketCache.evictBlock(blockCacheKey);
913      }
914      assertEquals(0, bucketCache.getAllocator().getUsedSize());
915      assertEquals(0, bucketCache.backingMap.size());
916    } finally {
917      if (bucketCache != null) {
918        bucketCache.shutdown();
919      }
920      HBASE_TESTING_UTILITY.cleanupTestDir();
921    }
922  }
923
924  @Test
925  public void testNotifyFileCachingCompletedSuccess() throws Exception {
926    BucketCache bucketCache = null;
927    try {
928      Path filePath =
929        new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess");
930      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false);
931      if (bucketCache.getStats().getFailedInserts() > 0) {
932        LOG.info("There were {} fail inserts, "
933          + "will assert if total blocks in backingMap equals (10 - failInserts) "
934          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
935        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
936        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
937      } else {
938        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
939      }
940    } finally {
941      if (bucketCache != null) {
942        bucketCache.shutdown();
943      }
944      HBASE_TESTING_UTILITY.cleanupTestDir();
945    }
946  }
947
948  @Test
949  public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception {
950    BucketCache bucketCache = null;
951    try {
952      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
953        "testNotifyFileCachingCompletedForEncodedDataSuccess");
954      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true);
955      if (bucketCache.getStats().getFailedInserts() > 0) {
956        LOG.info("There were {} fail inserts, "
957          + "will assert if total blocks in backingMap equals (10 - failInserts) "
958          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
959        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
960        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
961      } else {
962        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
963      }
964    } finally {
965      if (bucketCache != null) {
966        bucketCache.shutdown();
967      }
968      HBASE_TESTING_UTILITY.cleanupTestDir();
969    }
970  }
971
972  @Test
973  public void testNotifyFileCachingCompletedNotAllCached() throws Exception {
974    BucketCache bucketCache = null;
975    try {
976      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
977        "testNotifyFileCachingCompletedNotAllCached");
978      // Deliberately passing more blocks than we have created to test that
979      // notifyFileCachingCompleted will not consider the file fully cached
980      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false);
981      assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
982    } finally {
983      if (bucketCache != null) {
984        bucketCache.shutdown();
985      }
986      HBASE_TESTING_UTILITY.cleanupTestDir();
987    }
988  }
989
990  private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath,
991    int totalBlocksToCheck, boolean encoded) throws Exception {
992    final Path dataTestDir = createAndGetTestDir();
993    String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
994    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
995      constructedBlockSizes, 1, 1, null);
996    assertTrue(bucketCache.waitForCacheInitialization(10000));
997    long usedByteSize = bucketCache.getAllocator().getUsedSize();
998    assertEquals(0, usedByteSize);
999    HFileBlockPair[] hfileBlockPairs =
1000      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded);
1001    // Add blocks
1002    for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
1003      bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true);
1004    }
1005    bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck,
1006      totalBlocksToCheck * constructedBlockSize);
1007    return bucketCache;
1008  }
1009
1010  @Test
1011  public void testEvictOrphansOutOfGracePeriod() throws Exception {
1012    BucketCache bucketCache = testEvictOrphans(0);
1013    assertEquals(10, bucketCache.getBackingMap().size());
1014    assertEquals(0, bucketCache.blocksByHFile.stream()
1015      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count());
1016  }
1017
1018  @Test
1019  public void testEvictOrphansWithinGracePeriod() throws Exception {
1020    BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L);
1021    assertEquals(18, bucketCache.getBackingMap().size());
1022    assertTrue(bucketCache.blocksByHFile.stream()
1023      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0);
1024  }
1025
1026  private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception {
1027    Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid");
1028    Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan");
1029    Map<String, HRegion> onlineRegions = new HashMap<>();
1030    List<HStore> stores = new ArrayList<>();
1031    Collection<HStoreFile> storeFiles = new ArrayList<>();
1032    HRegion mockedRegion = mock(HRegion.class);
1033    HStore mockedStore = mock(HStore.class);
1034    HStoreFile mockedStoreFile = mock(HStoreFile.class);
1035    when(mockedStoreFile.getPath()).thenReturn(validFile);
1036    storeFiles.add(mockedStoreFile);
1037    when(mockedStore.getStorefiles()).thenReturn(storeFiles);
1038    stores.add(mockedStore);
1039    when(mockedRegion.getStores()).thenReturn(stores);
1040    onlineRegions.put("mocked_region", mockedRegion);
1041    HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
1042    HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
1043    HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
1044    HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD,
1045      orphanEvictionGracePeriod);
1046    BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21,
1047      constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000,
1048      HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions);
1049    HFileBlockPair[] validBlockPairs =
1050      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile);
1051    HFileBlockPair[] orphanBlockPairs =
1052      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile);
1053    for (HFileBlockPair pair : validBlockPairs) {
1054      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1055    }
1056    waitUntilAllFlushedToBucket(bucketCache);
1057    assertEquals(10, bucketCache.getBackingMap().size());
1058    bucketCache.freeSpace("test");
1059    assertEquals(10, bucketCache.getBackingMap().size());
1060    for (HFileBlockPair pair : orphanBlockPairs) {
1061      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1062    }
1063    waitUntilAllFlushedToBucket(bucketCache);
1064    assertEquals(20, bucketCache.getBackingMap().size());
1065    bucketCache.freeSpace("test");
1066    return bucketCache;
1067  }
1068}