001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD;
024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR;
026import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR;
027import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
028import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME;
029import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
030import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME;
031import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
032import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME;
033import static org.junit.Assert.assertEquals;
034import static org.junit.Assert.assertFalse;
035import static org.junit.Assert.assertNotEquals;
036import static org.junit.Assert.assertNotNull;
037import static org.junit.Assert.assertNull;
038import static org.junit.Assert.assertTrue;
039import static org.mockito.Mockito.mock;
040import static org.mockito.Mockito.when;
041
042import java.io.File;
043import java.io.IOException;
044import java.nio.ByteBuffer;
045import java.util.ArrayList;
046import java.util.Arrays;
047import java.util.Collection;
048import java.util.HashMap;
049import java.util.List;
050import java.util.Map;
051import java.util.Set;
052import java.util.concurrent.ThreadLocalRandom;
053import java.util.concurrent.locks.ReentrantReadWriteLock;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.hadoop.fs.Path;
056import org.apache.hadoop.hbase.HBaseClassTestRule;
057import org.apache.hadoop.hbase.HBaseConfiguration;
058import org.apache.hadoop.hbase.HBaseTestingUtil;
059import org.apache.hadoop.hbase.HConstants;
060import org.apache.hadoop.hbase.Waiter;
061import org.apache.hadoop.hbase.io.ByteBuffAllocator;
062import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
063import org.apache.hadoop.hbase.io.hfile.BlockPriority;
064import org.apache.hadoop.hbase.io.hfile.BlockType;
065import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
066import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
067import org.apache.hadoop.hbase.io.hfile.Cacheable;
068import org.apache.hadoop.hbase.io.hfile.HFileBlock;
069import org.apache.hadoop.hbase.io.hfile.HFileContext;
070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
071import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
072import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
073import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
074import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
075import org.apache.hadoop.hbase.nio.ByteBuff;
076import org.apache.hadoop.hbase.regionserver.HRegion;
077import org.apache.hadoop.hbase.regionserver.HStore;
078import org.apache.hadoop.hbase.regionserver.HStoreFile;
079import org.apache.hadoop.hbase.testclassification.IOTests;
080import org.apache.hadoop.hbase.testclassification.LargeTests;
081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
082import org.apache.hadoop.hbase.util.Pair;
083import org.apache.hadoop.hbase.util.Threads;
084import org.junit.After;
085import org.junit.Assert;
086import org.junit.Before;
087import org.junit.ClassRule;
088import org.junit.Test;
089import org.junit.experimental.categories.Category;
090import org.junit.runner.RunWith;
091import org.junit.runners.Parameterized;
092import org.mockito.Mockito;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
097
098/**
099 * Basic test of BucketCache.Puts and gets.
100 * <p>
101 * Tests will ensure that blocks' data correctness under several threads concurrency
102 */
103@RunWith(Parameterized.class)
104@Category({ IOTests.class, LargeTests.class })
105public class TestBucketCache {
106
107  private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class);
108
109  @ClassRule
110  public static final HBaseClassTestRule CLASS_RULE =
111    HBaseClassTestRule.forClass(TestBucketCache.class);
112
113  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
114  public static Iterable<Object[]> data() {
115    return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize
116                                                          // for these tests?
117      { 16 * 1024,
118        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
119          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
120          128 * 1024 + 1024 } } });
121  }
122
123  @Parameterized.Parameter(0)
124  public int constructedBlockSize;
125
126  @Parameterized.Parameter(1)
127  public int[] constructedBlockSizes;
128
129  BucketCache cache;
130  final int CACHE_SIZE = 1000000;
131  final int NUM_BLOCKS = 100;
132  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
133  final int NUM_THREADS = 100;
134  final int NUM_QUERIES = 10000;
135
136  final long capacitySize = 32 * 1024 * 1024;
137  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
138  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
139  private String ioEngineName = "offheap";
140
141  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
142
143  private static class MockedBucketCache extends BucketCache {
144
145    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
146      int writerThreads, int writerQLen, String persistencePath) throws IOException {
147      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
148        persistencePath);
149    }
150
151    @Override
152    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
153      super.cacheBlock(cacheKey, buf, inMemory);
154    }
155
156    @Override
157    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
158      super.cacheBlock(cacheKey, buf);
159    }
160  }
161
162  @Before
163  public void setup() throws IOException {
164    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
165      constructedBlockSizes, writeThreads, writerQLen, null);
166  }
167
168  @After
169  public void tearDown() {
170    cache.shutdown();
171  }
172
173  /**
174   * Test Utility to create test dir and return name
175   * @return return name of created dir
176   * @throws IOException throws IOException
177   */
178  private Path createAndGetTestDir() throws IOException {
179    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
180    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
181    return testDir;
182  }
183
184  /**
185   * Return a random element from {@code a}.
186   */
187  private static <T> T randFrom(List<T> a) {
188    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
189  }
190
191  @Test
192  public void testBucketAllocator() throws BucketAllocatorException {
193    BucketAllocator mAllocator = cache.getAllocator();
194    /*
195     * Test the allocator first
196     */
197    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
198
199    boolean full = false;
200    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
201    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
202    // the cache is completely filled.
203    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
204    while (!full) {
205      Integer blockSize = null;
206      try {
207        blockSize = randFrom(tmp);
208        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
209      } catch (CacheFullException cfe) {
210        tmp.remove(blockSize);
211        if (tmp.isEmpty()) full = true;
212      }
213    }
214
215    for (Integer blockSize : BLOCKSIZES) {
216      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
217      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
218      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
219
220      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
221      // additional 1024 on top of that so this counts towards fragmentation in our test
222      // real life may have worse fragmentation because blocks may not be perfectly sized to block
223      // size, given encoding/compression and large rows
224      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
225    }
226
227    mAllocator.logDebugStatistics();
228
229    for (Pair<Long, Integer> allocation : allocations) {
230      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
231        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
232    }
233    assertEquals(0, mAllocator.getUsedSize());
234  }
235
236  @Test
237  public void testCacheSimple() throws Exception {
238    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
239  }
240
241  @Test
242  public void testCacheMultiThreadedSingleKey() throws Exception {
243    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
244  }
245
246  @Test
247  public void testHeapSizeChanges() throws Exception {
248    cache.stopWriterThreads();
249    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
250  }
251
252  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
253    throws InterruptedException {
254    Waiter.waitFor(HBaseConfiguration.create(), 10000,
255      () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)));
256  }
257
258  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
259    while (!cache.ramCache.isEmpty()) {
260      Thread.sleep(100);
261    }
262    Thread.sleep(1000);
263  }
264
265  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
266  // threads will flush it to the bucket and put reference entry in backingMap.
267  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
268    Cacheable block, boolean waitWhenCache) throws InterruptedException {
269    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
270    waitUntilFlushedToBucket(cache, cacheKey);
271  }
272
273  @Test
274  public void testMemoryLeak() throws Exception {
275    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
276    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
277      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
278    long lockId = cache.backingMap.get(cacheKey).offset();
279    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
280    lock.writeLock().lock();
281    Thread evictThread = new Thread("evict-block") {
282      @Override
283      public void run() {
284        cache.evictBlock(cacheKey);
285      }
286    };
287    evictThread.start();
288    cache.offsetLock.waitForWaiters(lockId, 1);
289    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
290    assertEquals(0, cache.getBlockCount());
291    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
292      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
293    assertEquals(1, cache.getBlockCount());
294    lock.writeLock().unlock();
295    evictThread.join();
296    /**
297     * <pre>
298     * The asserts here before HBASE-21957 are:
299     * assertEquals(1L, cache.getBlockCount());
300     * assertTrue(cache.getCurrentSize() > 0L);
301     * assertTrue("We should have a block!", cache.iterator().hasNext());
302     *
303     * The asserts here after HBASE-21957 are:
304     * assertEquals(0, cache.getBlockCount());
305     * assertEquals(cache.getCurrentSize(), 0L);
306     *
307     * I think the asserts before HBASE-21957 is more reasonable,because
308     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
309     * it had seen, and newly added Block after the {@link BucketEntry}
310     * it had seen should not be evicted.
311     * </pre>
312     */
313    assertEquals(1L, cache.getBlockCount());
314    assertTrue(cache.getCurrentSize() > 0L);
315    assertTrue("We should have a block!", cache.iterator().hasNext());
316  }
317
318  @Test
319  public void testRetrieveFromFile() throws Exception {
320    Path testDir = createAndGetTestDir();
321    String ioEngineName = "file:" + testDir + "/bucket.cache";
322    testRetrievalUtils(testDir, ioEngineName);
323    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
324    String persistencePath = testDir + "/bucket.persistence";
325    BucketCache bucketCache = null;
326    try {
327      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
328        smallBucketSizes, writeThreads, writerQLen, persistencePath);
329      assertTrue(bucketCache.waitForCacheInitialization(10000));
330      assertFalse(new File(persistencePath).exists());
331      assertEquals(0, bucketCache.getAllocator().getUsedSize());
332      assertEquals(0, bucketCache.backingMap.size());
333    } finally {
334      bucketCache.shutdown();
335      HBASE_TESTING_UTILITY.cleanupTestDir();
336    }
337  }
338
339  @Test
340  public void testRetrieveFromMMap() throws Exception {
341    final Path testDir = createAndGetTestDir();
342    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
343    testRetrievalUtils(testDir, ioEngineName);
344  }
345
346  @Test
347  public void testRetrieveFromPMem() throws Exception {
348    final Path testDir = createAndGetTestDir();
349    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
350    testRetrievalUtils(testDir, ioEngineName);
351    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
352    String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
353    BucketCache bucketCache = null;
354    try {
355      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
356        smallBucketSizes, writeThreads, writerQLen, persistencePath);
357      assertTrue(bucketCache.waitForCacheInitialization(10000));
358      assertFalse(new File(persistencePath).exists());
359      assertEquals(0, bucketCache.getAllocator().getUsedSize());
360      assertEquals(0, bucketCache.backingMap.size());
361    } finally {
362      bucketCache.shutdown();
363      HBASE_TESTING_UTILITY.cleanupTestDir();
364    }
365  }
366
367  private void testRetrievalUtils(Path testDir, String ioEngineName)
368    throws IOException, InterruptedException {
369    final String persistencePath =
370      testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
371    BucketCache bucketCache = null;
372    try {
373      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
374        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
375      assertTrue(bucketCache.waitForCacheInitialization(10000));
376      long usedSize = bucketCache.getAllocator().getUsedSize();
377      assertEquals(0, usedSize);
378      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
379      for (HFileBlockPair block : blocks) {
380        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
381      }
382      for (HFileBlockPair block : blocks) {
383        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
384          false);
385      }
386      usedSize = bucketCache.getAllocator().getUsedSize();
387      assertNotEquals(0, usedSize);
388      bucketCache.shutdown();
389      assertTrue(new File(persistencePath).exists());
390      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
391        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
392      assertTrue(bucketCache.waitForCacheInitialization(10000));
393
394      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
395    } finally {
396      if (bucketCache != null) {
397        bucketCache.shutdown();
398      }
399    }
400    assertTrue(new File(persistencePath).exists());
401  }
402
403  @Test
404  public void testRetrieveUnsupportedIOE() throws Exception {
405    try {
406      final Path testDir = createAndGetTestDir();
407      final String ioEngineName = testDir + "/bucket.cache";
408      testRetrievalUtils(testDir, ioEngineName);
409      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
410    } catch (IllegalArgumentException e) {
411      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, "
412        + "files:, mmap: or offheap", e.getMessage());
413    }
414  }
415
416  @Test
417  public void testRetrieveFromMultipleFiles() throws Exception {
418    final Path testDirInitial = createAndGetTestDir();
419    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
420    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
421    String ioEngineName =
422      new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache")
423        .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString();
424    testRetrievalUtils(testDirInitial, ioEngineName);
425    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
426    String persistencePath = testDirInitial + "/bucket.persistence";
427    BucketCache bucketCache = null;
428    try {
429      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
430        smallBucketSizes, writeThreads, writerQLen, persistencePath);
431      assertTrue(bucketCache.waitForCacheInitialization(10000));
432      assertFalse(new File(persistencePath).exists());
433      assertEquals(0, bucketCache.getAllocator().getUsedSize());
434      assertEquals(0, bucketCache.backingMap.size());
435    } finally {
436      bucketCache.shutdown();
437      HBASE_TESTING_UTILITY.cleanupTestDir();
438    }
439  }
440
441  @Test
442  public void testRetrieveFromFileWithoutPersistence() throws Exception {
443    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
444      constructedBlockSizes, writeThreads, writerQLen, null);
445    assertTrue(bucketCache.waitForCacheInitialization(10000));
446    try {
447      final Path testDir = createAndGetTestDir();
448      String ioEngineName = "file:" + testDir + "/bucket.cache";
449      long usedSize = bucketCache.getAllocator().getUsedSize();
450      assertEquals(0, usedSize);
451      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
452      for (HFileBlockPair block : blocks) {
453        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
454      }
455      for (HFileBlockPair block : blocks) {
456        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
457          false);
458      }
459      usedSize = bucketCache.getAllocator().getUsedSize();
460      assertNotEquals(0, usedSize);
461      bucketCache.shutdown();
462      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
463        constructedBlockSizes, writeThreads, writerQLen, null);
464      assertTrue(bucketCache.waitForCacheInitialization(10000));
465      assertEquals(0, bucketCache.getAllocator().getUsedSize());
466    } finally {
467      bucketCache.shutdown();
468      HBASE_TESTING_UTILITY.cleanupTestDir();
469    }
470  }
471
472  @Test
473  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
474    long availableSpace = 20 * 1024L * 1024 * 1024;
475    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
476    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
477    assertTrue(allocator.getBuckets().length > 0);
478  }
479
480  @Test
481  public void testGetPartitionSize() throws IOException {
482    // Test default values
483    validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
484
485    Configuration conf = HBaseConfiguration.create();
486    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
487    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
488    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
489    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
490
491    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
492      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
493    assertTrue(cache.waitForCacheInitialization(10000));
494
495    validateGetPartitionSize(cache, 0.1f, 0.5f);
496    validateGetPartitionSize(cache, 0.7f, 0.5f);
497    validateGetPartitionSize(cache, 0.2f, 0.5f);
498  }
499
500  @Test
501  public void testCacheSizeCapacity() throws IOException {
502    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
503    validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
504    Configuration conf = HBaseConfiguration.create();
505    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
506    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
507    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
508    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
509    try {
510      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
511        writerQLen, null, 100, conf);
512      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
513    } catch (IllegalArgumentException e) {
514      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
515    }
516  }
517
518  @Test
519  public void testValidBucketCacheConfigs() throws IOException {
520    Configuration conf = HBaseConfiguration.create();
521    conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
522    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
523    conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
524    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
525    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
526    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
527
528    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
529      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
530    assertTrue(cache.waitForCacheInitialization(10000));
531
532    assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
533      cache.getAcceptableFactor(), 0);
534    assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0);
535    assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
536      cache.getExtraFreeFactor(), 0);
537    assertEquals(SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, cache.getSingleFactor(),
538      0);
539    assertEquals(MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, cache.getMultiFactor(),
540      0);
541    assertEquals(MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, cache.getMemoryFactor(),
542      0);
543  }
544
545  @Test
546  public void testInvalidAcceptFactorConfig() throws IOException {
547    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
548    boolean[] expectedOutcomes = { false, false, true, false };
549    Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues);
550    Configuration conf = HBaseConfiguration.create();
551    checkConfigValues(conf, configMappings, expectedOutcomes);
552  }
553
554  @Test
555  public void testInvalidMinFactorConfig() throws IOException {
556    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
557    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
558    boolean[] expectedOutcomes = { false, true, false, false };
559    Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues);
560    Configuration conf = HBaseConfiguration.create();
561    checkConfigValues(conf, configMappings, expectedOutcomes);
562  }
563
564  @Test
565  public void testInvalidExtraFreeFactorConfig() throws IOException {
566    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
567    // throws due to <0, in expected range, in expected range, config can be > 1.0
568    boolean[] expectedOutcomes = { false, true, true, true };
569    Map<String, float[]> configMappings =
570      ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
571    Configuration conf = HBaseConfiguration.create();
572    checkConfigValues(conf, configMappings, expectedOutcomes);
573  }
574
575  @Test
576  public void testInvalidCacheSplitFactorConfig() throws IOException {
577    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
578    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
579    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
580    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
581    // be negative, configs don't add to 1.0
582    boolean[] expectedOutcomes = { true, false, false, false };
583    Map<String,
584      float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues,
585        MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME,
586        memoryFactorConfigValues);
587    Configuration conf = HBaseConfiguration.create();
588    checkConfigValues(conf, configMappings, expectedOutcomes);
589  }
590
591  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
592    boolean[] expectSuccess) throws IOException {
593    Set<String> configNames = configMap.keySet();
594    for (int i = 0; i < expectSuccess.length; i++) {
595      try {
596        for (String configName : configNames) {
597          conf.setFloat(configName, configMap.get(configName)[i]);
598        }
599        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
600          constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
601        assertTrue(cache.waitForCacheInitialization(10000));
602        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i]
603          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
604      } catch (IllegalArgumentException e) {
605        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i]
606          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
607      }
608    }
609  }
610
611  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
612    float minFactor) {
613    long expectedOutput =
614      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
615    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
616  }
617
618  @Test
619  public void testOffsetProducesPositiveOutput() {
620    // This number is picked because it produces negative output if the values isn't ensured to be
621    // positive. See HBASE-18757 for more information.
622    long testValue = 549888460800L;
623    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> {
624      return ByteBuffAllocator.NONE;
625    }, ByteBuffAllocator.HEAP);
626    assertEquals(testValue, bucketEntry.offset());
627  }
628
629  @Test
630  public void testEvictionCount() throws InterruptedException {
631    int size = 100;
632    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
633    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
634    HFileContext meta = new HFileContextBuilder().build();
635    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
636    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
637      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
638    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
639      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
640
641    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
642    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
643    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
644    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
645    blockWithNextBlockMetadata.serialize(block1Buffer, true);
646    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
647
648    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
649    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
650      block1Buffer);
651
652    waitUntilFlushedToBucket(cache, key);
653
654    assertEquals(0, cache.getStats().getEvictionCount());
655
656    // evict call should return 1, but then eviction count be 0
657    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
658    assertEquals(0, cache.getStats().getEvictionCount());
659
660    // add back
661    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
662      block1Buffer);
663    waitUntilFlushedToBucket(cache, key);
664
665    // should not increment
666    assertTrue(cache.evictBlock(key));
667    assertEquals(0, cache.getStats().getEvictionCount());
668
669    // add back
670    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
671      block1Buffer);
672    waitUntilFlushedToBucket(cache, key);
673
674    // should finally increment eviction count
675    cache.freeSpace("testing");
676    assertEquals(1, cache.getStats().getEvictionCount());
677  }
678
679  @Test
680  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
681    int size = 100;
682    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
683    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
684    HFileContext meta = new HFileContextBuilder().build();
685    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
686    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
687      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
688    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
689      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
690
691    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
692    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
693    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
694    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
695    blockWithNextBlockMetadata.serialize(block1Buffer, true);
696    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
697
698    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
699    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
700      block1Buffer);
701
702    waitUntilFlushedToBucket(cache, key);
703    assertNotNull(cache.backingMap.get(key));
704    assertEquals(1, cache.backingMap.get(key).refCnt());
705    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
706    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
707
708    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
709    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
710      block1Buffer);
711    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
712    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
713    assertEquals(1, cache.backingMap.get(key).refCnt());
714
715    // Clear and add blockWithoutNextBlockMetadata
716    assertTrue(cache.evictBlock(key));
717    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
718    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
719
720    assertNull(cache.getBlock(key, false, false, false));
721    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
722      block2Buffer);
723
724    waitUntilFlushedToBucket(cache, key);
725    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
726    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
727
728    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
729    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
730      block1Buffer);
731
732    waitUntilFlushedToBucket(cache, key);
733    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
734    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
735  }
736
737  @Test
738  public void testRAMCache() {
739    int size = 100;
740    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
741    byte[] byteArr = new byte[length];
742    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
743    HFileContext meta = new HFileContextBuilder().build();
744
745    RAMCache cache = new RAMCache();
746    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
747    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
748    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
749      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
750    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
751      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
752    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false);
753    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false);
754
755    assertFalse(cache.containsKey(key1));
756    assertNull(cache.putIfAbsent(key1, re1));
757    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
758
759    assertNotNull(cache.putIfAbsent(key1, re2));
760    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
761    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
762
763    assertNull(cache.putIfAbsent(key2, re2));
764    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
765    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
766
767    cache.remove(key1);
768    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
769    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
770
771    cache.clear();
772    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
773    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
774  }
775
776  @Test
777  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
778    // initialize an block.
779    int size = 100, offset = 20;
780    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
781    ByteBuffer buf = ByteBuffer.allocate(length);
782    HFileContext meta = new HFileContextBuilder().build();
783    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
784      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
785
786    // initialize an mocked ioengine.
787    IOEngine ioEngine = Mockito.mock(IOEngine.class);
788    when(ioEngine.usesSharedMemory()).thenReturn(false);
789    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
790    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
791      Mockito.anyLong());
792    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
793      Mockito.anyLong());
794
795    // create an bucket allocator.
796    long availableSpace = 1024 * 1024 * 1024L;
797    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
798
799    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
800    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false);
801
802    Assert.assertEquals(0, allocator.getUsedSize());
803    try {
804      re.writeToCache(ioEngine, allocator, null, null,
805        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE);
806      Assert.fail();
807    } catch (Exception e) {
808    }
809    Assert.assertEquals(0, allocator.getUsedSize());
810  }
811
812  /**
813   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
814   * could not be freed even if corresponding {@link HFileBlock} is evicted from
815   * {@link BucketCache}.
816   */
817  @Test
818  public void testFreeBucketEntryRestoredFromFile() throws Exception {
819    BucketCache bucketCache = null;
820    try {
821      final Path dataTestDir = createAndGetTestDir();
822
823      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
824      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
825
826      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
827        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
828      assertTrue(bucketCache.waitForCacheInitialization(10000));
829      long usedByteSize = bucketCache.getAllocator().getUsedSize();
830      assertEquals(0, usedByteSize);
831
832      HFileBlockPair[] hfileBlockPairs =
833        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
834      // Add blocks
835      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
836        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
837      }
838
839      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
840        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
841          hfileBlockPair.getBlock(), false);
842      }
843      usedByteSize = bucketCache.getAllocator().getUsedSize();
844      assertNotEquals(0, usedByteSize);
845      // persist cache to file
846      bucketCache.shutdown();
847      assertTrue(new File(persistencePath).exists());
848
849      // restore cache from file
850      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
851        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
852      assertTrue(bucketCache.waitForCacheInitialization(10000));
853      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
854
855      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
856        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
857        bucketCache.evictBlock(blockCacheKey);
858      }
859      assertEquals(0, bucketCache.getAllocator().getUsedSize());
860      assertEquals(0, bucketCache.backingMap.size());
861    } finally {
862      bucketCache.shutdown();
863      HBASE_TESTING_UTILITY.cleanupTestDir();
864    }
865  }
866
867  @Test
868  public void testBlockAdditionWaitWhenCache() throws Exception {
869    BucketCache bucketCache = null;
870    try {
871      final Path dataTestDir = createAndGetTestDir();
872
873      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
874      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
875
876      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
877      config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000);
878
879      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
880        constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config);
881      assertTrue(bucketCache.waitForCacheInitialization(10000));
882      long usedByteSize = bucketCache.getAllocator().getUsedSize();
883      assertEquals(0, usedByteSize);
884
885      HFileBlockPair[] hfileBlockPairs =
886        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
887      // Add blocks
888      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
889        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
890          true);
891      }
892
893      // Max wait for 10 seconds.
894      long timeout = 10000;
895      // Wait for blocks size to match the number of blocks.
896      while (bucketCache.backingMap.size() != 10) {
897        if (timeout <= 0) break;
898        Threads.sleep(100);
899        timeout -= 100;
900      }
901      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
902        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
903      }
904      usedByteSize = bucketCache.getAllocator().getUsedSize();
905      assertNotEquals(0, usedByteSize);
906      // persist cache to file
907      bucketCache.shutdown();
908      assertTrue(new File(persistencePath).exists());
909
910      // restore cache from file
911      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
912        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
913      assertTrue(bucketCache.waitForCacheInitialization(10000));
914      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
915
916      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
917        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
918        bucketCache.evictBlock(blockCacheKey);
919      }
920      assertEquals(0, bucketCache.getAllocator().getUsedSize());
921      assertEquals(0, bucketCache.backingMap.size());
922    } finally {
923      if (bucketCache != null) {
924        bucketCache.shutdown();
925      }
926      HBASE_TESTING_UTILITY.cleanupTestDir();
927    }
928  }
929
930  @Test
931  public void testOnConfigurationChange() throws Exception {
932    BucketCache bucketCache = null;
933    try {
934      final Path dataTestDir = createAndGetTestDir();
935
936      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
937
938      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
939
940      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
941        constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config);
942
943      assertTrue(bucketCache.waitForCacheInitialization(10000));
944
945      config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
946      config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f);
947      config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f);
948      config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f);
949      config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f);
950      config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
951      config.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
952      config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500);
953      config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000);
954
955      bucketCache.onConfigurationChange(config);
956
957      assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01);
958      assertEquals(0.8f, bucketCache.getMinFactor(), 0.01);
959      assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01);
960      assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01);
961      assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01);
962      assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01);
963      assertEquals(100L, bucketCache.getQueueAdditionWaitTime());
964      assertEquals(500L, bucketCache.getBucketcachePersistInterval());
965      assertEquals(1000L, bucketCache.getPersistenceChunkSize());
966
967    } finally {
968      if (bucketCache != null) {
969        bucketCache.shutdown();
970      }
971      HBASE_TESTING_UTILITY.cleanupTestDir();
972    }
973  }
974
975  @Test
976  public void testNotifyFileCachingCompletedSuccess() throws Exception {
977    BucketCache bucketCache = null;
978    try {
979      Path filePath =
980        new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess");
981      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false);
982      if (bucketCache.getStats().getFailedInserts() > 0) {
983        LOG.info("There were {} fail inserts, "
984          + "will assert if total blocks in backingMap equals (10 - failInserts) "
985          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
986        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
987        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
988      } else {
989        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
990      }
991    } finally {
992      if (bucketCache != null) {
993        bucketCache.shutdown();
994      }
995      HBASE_TESTING_UTILITY.cleanupTestDir();
996    }
997  }
998
999  @Test
1000  public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception {
1001    BucketCache bucketCache = null;
1002    try {
1003      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
1004        "testNotifyFileCachingCompletedForEncodedDataSuccess");
1005      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true);
1006      if (bucketCache.getStats().getFailedInserts() > 0) {
1007        LOG.info("There were {} fail inserts, "
1008          + "will assert if total blocks in backingMap equals (10 - failInserts) "
1009          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
1010        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
1011        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1012      } else {
1013        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1014      }
1015    } finally {
1016      if (bucketCache != null) {
1017        bucketCache.shutdown();
1018      }
1019      HBASE_TESTING_UTILITY.cleanupTestDir();
1020    }
1021  }
1022
1023  @Test
1024  public void testNotifyFileCachingCompletedNotAllCached() throws Exception {
1025    BucketCache bucketCache = null;
1026    try {
1027      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
1028        "testNotifyFileCachingCompletedNotAllCached");
1029      // Deliberately passing more blocks than we have created to test that
1030      // notifyFileCachingCompleted will not consider the file fully cached
1031      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false);
1032      assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1033    } finally {
1034      if (bucketCache != null) {
1035        bucketCache.shutdown();
1036      }
1037      HBASE_TESTING_UTILITY.cleanupTestDir();
1038    }
1039  }
1040
1041  private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath,
1042    int totalBlocksToCheck, boolean encoded) throws Exception {
1043    final Path dataTestDir = createAndGetTestDir();
1044    String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
1045    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
1046      constructedBlockSizes, 1, 1, null);
1047    assertTrue(bucketCache.waitForCacheInitialization(10000));
1048    long usedByteSize = bucketCache.getAllocator().getUsedSize();
1049    assertEquals(0, usedByteSize);
1050    HFileBlockPair[] hfileBlockPairs =
1051      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded);
1052    // Add blocks
1053    for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
1054      bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true);
1055    }
1056    bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck,
1057      totalBlocksToCheck * constructedBlockSize);
1058    return bucketCache;
1059  }
1060
1061  @Test
1062  public void testEvictOrphansOutOfGracePeriod() throws Exception {
1063    BucketCache bucketCache = testEvictOrphans(0);
1064    assertEquals(10, bucketCache.getBackingMap().size());
1065    assertEquals(0, bucketCache.blocksByHFile.stream()
1066      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count());
1067  }
1068
1069  @Test
1070  public void testEvictOrphansWithinGracePeriod() throws Exception {
1071    BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L);
1072    assertEquals(18, bucketCache.getBackingMap().size());
1073    assertTrue(bucketCache.blocksByHFile.stream()
1074      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0);
1075  }
1076
1077  private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception {
1078    Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid");
1079    Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan");
1080    Map<String, HRegion> onlineRegions = new HashMap<>();
1081    List<HStore> stores = new ArrayList<>();
1082    Collection<HStoreFile> storeFiles = new ArrayList<>();
1083    HRegion mockedRegion = mock(HRegion.class);
1084    HStore mockedStore = mock(HStore.class);
1085    HStoreFile mockedStoreFile = mock(HStoreFile.class);
1086    when(mockedStoreFile.getPath()).thenReturn(validFile);
1087    storeFiles.add(mockedStoreFile);
1088    when(mockedStore.getStorefiles()).thenReturn(storeFiles);
1089    stores.add(mockedStore);
1090    when(mockedRegion.getStores()).thenReturn(stores);
1091    onlineRegions.put("mocked_region", mockedRegion);
1092    HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
1093    HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
1094    HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
1095    HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD,
1096      orphanEvictionGracePeriod);
1097    BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21,
1098      constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000,
1099      HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions);
1100    HFileBlockPair[] validBlockPairs =
1101      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile);
1102    HFileBlockPair[] orphanBlockPairs =
1103      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile);
1104    for (HFileBlockPair pair : validBlockPairs) {
1105      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1106    }
1107    waitUntilAllFlushedToBucket(bucketCache);
1108    assertEquals(10, bucketCache.getBackingMap().size());
1109    bucketCache.freeSpace("test");
1110    assertEquals(10, bucketCache.getBackingMap().size());
1111    for (HFileBlockPair pair : orphanBlockPairs) {
1112      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1113    }
1114    waitUntilAllFlushedToBucket(bucketCache);
1115    assertEquals(20, bucketCache.getBackingMap().size());
1116    bucketCache.freeSpace("test");
1117    return bucketCache;
1118  }
1119
1120  @Test
1121  public void testBlockPriority() throws Exception {
1122    HFileBlockPair block = CacheTestUtils.generateHFileBlocks(BLOCK_SIZE, 1)[0];
1123    cacheAndWaitUntilFlushedToBucket(cache, block.getBlockName(), block.getBlock(), true);
1124    assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.SINGLE);
1125    cache.getBlock(block.getBlockName(), true, false, true);
1126    assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.MULTI);
1127  }
1128}