001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD;
024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR;
026import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR;
027import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
028import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME;
029import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
030import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME;
031import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
032import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME;
033import static org.junit.Assert.assertEquals;
034import static org.junit.Assert.assertFalse;
035import static org.junit.Assert.assertNotEquals;
036import static org.junit.Assert.assertNotNull;
037import static org.junit.Assert.assertNull;
038import static org.junit.Assert.assertTrue;
039import static org.mockito.Mockito.mock;
040import static org.mockito.Mockito.when;
041
042import java.io.File;
043import java.io.IOException;
044import java.lang.reflect.Field;
045import java.nio.ByteBuffer;
046import java.util.ArrayList;
047import java.util.Arrays;
048import java.util.Collection;
049import java.util.HashMap;
050import java.util.List;
051import java.util.Map;
052import java.util.Set;
053import java.util.concurrent.ThreadLocalRandom;
054import java.util.concurrent.atomic.LongAdder;
055import java.util.concurrent.locks.ReentrantReadWriteLock;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.hbase.HBaseClassTestRule;
059import org.apache.hadoop.hbase.HBaseConfiguration;
060import org.apache.hadoop.hbase.HBaseTestingUtil;
061import org.apache.hadoop.hbase.HConstants;
062import org.apache.hadoop.hbase.Waiter;
063import org.apache.hadoop.hbase.io.ByteBuffAllocator;
064import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
065import org.apache.hadoop.hbase.io.hfile.BlockPriority;
066import org.apache.hadoop.hbase.io.hfile.BlockType;
067import org.apache.hadoop.hbase.io.hfile.CacheStats;
068import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
069import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
070import org.apache.hadoop.hbase.io.hfile.Cacheable;
071import org.apache.hadoop.hbase.io.hfile.HFileBlock;
072import org.apache.hadoop.hbase.io.hfile.HFileContext;
073import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
074import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
075import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
076import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
077import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
078import org.apache.hadoop.hbase.nio.ByteBuff;
079import org.apache.hadoop.hbase.regionserver.HRegion;
080import org.apache.hadoop.hbase.regionserver.HStore;
081import org.apache.hadoop.hbase.regionserver.HStoreFile;
082import org.apache.hadoop.hbase.testclassification.IOTests;
083import org.apache.hadoop.hbase.testclassification.LargeTests;
084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
085import org.apache.hadoop.hbase.util.Pair;
086import org.apache.hadoop.hbase.util.Threads;
087import org.junit.After;
088import org.junit.Assert;
089import org.junit.Before;
090import org.junit.ClassRule;
091import org.junit.Test;
092import org.junit.experimental.categories.Category;
093import org.junit.runner.RunWith;
094import org.junit.runners.Parameterized;
095import org.mockito.Mockito;
096import org.slf4j.Logger;
097import org.slf4j.LoggerFactory;
098
099import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
100
101/**
102 * Basic test of BucketCache.Puts and gets.
103 * <p>
104 * Tests will ensure that blocks' data correctness under several threads concurrency
105 */
106@RunWith(Parameterized.class)
107@Category({ IOTests.class, LargeTests.class })
108public class TestBucketCache {
109
110  private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class);
111
112  @ClassRule
113  public static final HBaseClassTestRule CLASS_RULE =
114    HBaseClassTestRule.forClass(TestBucketCache.class);
115
116  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
117  public static Iterable<Object[]> data() {
118    return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize
119                                                          // for these tests?
120      { 16 * 1024,
121        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
122          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
123          128 * 1024 + 1024 } } });
124  }
125
126  @Parameterized.Parameter(0)
127  public int constructedBlockSize;
128
129  @Parameterized.Parameter(1)
130  public int[] constructedBlockSizes;
131
132  BucketCache cache;
133  final int CACHE_SIZE = 1000000;
134  final int NUM_BLOCKS = 100;
135  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
136  final int NUM_THREADS = 100;
137  final int NUM_QUERIES = 10000;
138
139  final long capacitySize = 32 * 1024 * 1024;
140  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
141  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
142  private String ioEngineName = "offheap";
143
144  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
145
146  private static class MockedBucketCache extends BucketCache {
147
148    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
149      int writerThreads, int writerQLen, String persistencePath) throws IOException {
150      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
151        persistencePath);
152    }
153
154    @Override
155    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
156      super.cacheBlock(cacheKey, buf, inMemory);
157    }
158
159    @Override
160    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
161      super.cacheBlock(cacheKey, buf);
162    }
163  }
164
165  @Before
166  public void setup() throws IOException {
167    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
168      constructedBlockSizes, writeThreads, writerQLen, null);
169  }
170
171  @After
172  public void tearDown() {
173    cache.shutdown();
174  }
175
176  /**
177   * Test Utility to create test dir and return name
178   * @return return name of created dir
179   * @throws IOException throws IOException
180   */
181  private Path createAndGetTestDir() throws IOException {
182    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
183    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
184    return testDir;
185  }
186
187  /**
188   * Return a random element from {@code a}.
189   */
190  private static <T> T randFrom(List<T> a) {
191    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
192  }
193
194  @Test
195  public void testBucketAllocator() throws BucketAllocatorException {
196    BucketAllocator mAllocator = cache.getAllocator();
197    /*
198     * Test the allocator first
199     */
200    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
201
202    boolean full = false;
203    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
204    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
205    // the cache is completely filled.
206    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
207    while (!full) {
208      Integer blockSize = null;
209      try {
210        blockSize = randFrom(tmp);
211        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
212      } catch (CacheFullException cfe) {
213        tmp.remove(blockSize);
214        if (tmp.isEmpty()) full = true;
215      }
216    }
217
218    for (Integer blockSize : BLOCKSIZES) {
219      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
220      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
221      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
222
223      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
224      // additional 1024 on top of that so this counts towards fragmentation in our test
225      // real life may have worse fragmentation because blocks may not be perfectly sized to block
226      // size, given encoding/compression and large rows
227      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
228    }
229
230    mAllocator.logDebugStatistics();
231
232    for (Pair<Long, Integer> allocation : allocations) {
233      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
234        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
235    }
236    assertEquals(0, mAllocator.getUsedSize());
237  }
238
239  @Test
240  public void testCacheSimple() throws Exception {
241    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
242  }
243
244  @Test
245  public void testCacheMultiThreadedSingleKey() throws Exception {
246    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
247  }
248
249  @Test
250  public void testHeapSizeChanges() throws Exception {
251    cache.stopWriterThreads();
252    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
253  }
254
255  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
256    throws InterruptedException {
257    Waiter.waitFor(HBaseConfiguration.create(), 10000,
258      () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)));
259  }
260
261  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
262    while (!cache.ramCache.isEmpty()) {
263      Thread.sleep(100);
264    }
265    Thread.sleep(1000);
266  }
267
268  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
269  // threads will flush it to the bucket and put reference entry in backingMap.
270  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
271    Cacheable block, boolean waitWhenCache) throws InterruptedException {
272    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
273    waitUntilFlushedToBucket(cache, cacheKey);
274  }
275
276  @Test
277  public void testMemoryLeak() throws Exception {
278    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
279    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
280      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
281    long lockId = cache.backingMap.get(cacheKey).offset();
282    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
283    lock.writeLock().lock();
284    Thread evictThread = new Thread("evict-block") {
285      @Override
286      public void run() {
287        cache.evictBlock(cacheKey);
288      }
289    };
290    evictThread.start();
291    cache.offsetLock.waitForWaiters(lockId, 1);
292    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
293    assertEquals(0, cache.getBlockCount());
294    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
295      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
296    assertEquals(1, cache.getBlockCount());
297    lock.writeLock().unlock();
298    evictThread.join();
299    /**
300     * <pre>
301     * The asserts here before HBASE-21957 are:
302     * assertEquals(1L, cache.getBlockCount());
303     * assertTrue(cache.getCurrentSize() > 0L);
304     * assertTrue("We should have a block!", cache.iterator().hasNext());
305     *
306     * The asserts here after HBASE-21957 are:
307     * assertEquals(0, cache.getBlockCount());
308     * assertEquals(cache.getCurrentSize(), 0L);
309     *
310     * I think the asserts before HBASE-21957 is more reasonable,because
311     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
312     * it had seen, and newly added Block after the {@link BucketEntry}
313     * it had seen should not be evicted.
314     * </pre>
315     */
316    assertEquals(1L, cache.getBlockCount());
317    assertTrue(cache.getCurrentSize() > 0L);
318    assertTrue("We should have a block!", cache.iterator().hasNext());
319  }
320
321  @Test
322  public void testRetrieveFromFile() throws Exception {
323    Path testDir = createAndGetTestDir();
324    String ioEngineName = "file:" + testDir + "/bucket.cache";
325    testRetrievalUtils(testDir, ioEngineName);
326    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
327    String persistencePath = testDir + "/bucket.persistence";
328    BucketCache bucketCache = null;
329    try {
330      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
331        smallBucketSizes, writeThreads, writerQLen, persistencePath);
332      assertTrue(bucketCache.waitForCacheInitialization(10000));
333      assertFalse(new File(persistencePath).exists());
334      assertEquals(0, bucketCache.getAllocator().getUsedSize());
335      assertEquals(0, bucketCache.backingMap.size());
336    } finally {
337      bucketCache.shutdown();
338      HBASE_TESTING_UTILITY.cleanupTestDir();
339    }
340  }
341
342  @Test
343  public void testRetrieveFromMMap() throws Exception {
344    final Path testDir = createAndGetTestDir();
345    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
346    testRetrievalUtils(testDir, ioEngineName);
347  }
348
349  @Test
350  public void testRetrieveFromPMem() throws Exception {
351    final Path testDir = createAndGetTestDir();
352    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
353    testRetrievalUtils(testDir, ioEngineName);
354    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
355    String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
356    BucketCache bucketCache = null;
357    try {
358      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
359        smallBucketSizes, writeThreads, writerQLen, persistencePath);
360      assertTrue(bucketCache.waitForCacheInitialization(10000));
361      assertFalse(new File(persistencePath).exists());
362      assertEquals(0, bucketCache.getAllocator().getUsedSize());
363      assertEquals(0, bucketCache.backingMap.size());
364    } finally {
365      bucketCache.shutdown();
366      HBASE_TESTING_UTILITY.cleanupTestDir();
367    }
368  }
369
370  private void testRetrievalUtils(Path testDir, String ioEngineName)
371    throws IOException, InterruptedException {
372    final String persistencePath =
373      testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
374    BucketCache bucketCache = null;
375    try {
376      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
377        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
378      assertTrue(bucketCache.waitForCacheInitialization(10000));
379      long usedSize = bucketCache.getAllocator().getUsedSize();
380      assertEquals(0, usedSize);
381      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
382      for (HFileBlockPair block : blocks) {
383        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
384      }
385      for (HFileBlockPair block : blocks) {
386        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
387          false);
388      }
389      usedSize = bucketCache.getAllocator().getUsedSize();
390      assertNotEquals(0, usedSize);
391      bucketCache.shutdown();
392      assertTrue(new File(persistencePath).exists());
393      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
394        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
395      assertTrue(bucketCache.waitForCacheInitialization(10000));
396
397      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
398    } finally {
399      if (bucketCache != null) {
400        bucketCache.shutdown();
401      }
402    }
403    assertTrue(new File(persistencePath).exists());
404  }
405
406  @Test
407  public void testRetrieveUnsupportedIOE() throws Exception {
408    try {
409      final Path testDir = createAndGetTestDir();
410      final String ioEngineName = testDir + "/bucket.cache";
411      testRetrievalUtils(testDir, ioEngineName);
412      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
413    } catch (IllegalArgumentException e) {
414      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, "
415        + "files:, mmap: or offheap", e.getMessage());
416    }
417  }
418
419  @Test
420  public void testRetrieveFromMultipleFiles() throws Exception {
421    final Path testDirInitial = createAndGetTestDir();
422    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
423    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
424    String ioEngineName =
425      new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache")
426        .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString();
427    testRetrievalUtils(testDirInitial, ioEngineName);
428    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
429    String persistencePath = testDirInitial + "/bucket.persistence";
430    BucketCache bucketCache = null;
431    try {
432      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
433        smallBucketSizes, writeThreads, writerQLen, persistencePath);
434      assertTrue(bucketCache.waitForCacheInitialization(10000));
435      assertFalse(new File(persistencePath).exists());
436      assertEquals(0, bucketCache.getAllocator().getUsedSize());
437      assertEquals(0, bucketCache.backingMap.size());
438    } finally {
439      bucketCache.shutdown();
440      HBASE_TESTING_UTILITY.cleanupTestDir();
441    }
442  }
443
444  @Test
445  public void testRetrieveFromFileWithoutPersistence() throws Exception {
446    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
447      constructedBlockSizes, writeThreads, writerQLen, null);
448    assertTrue(bucketCache.waitForCacheInitialization(10000));
449    try {
450      final Path testDir = createAndGetTestDir();
451      String ioEngineName = "file:" + testDir + "/bucket.cache";
452      long usedSize = bucketCache.getAllocator().getUsedSize();
453      assertEquals(0, usedSize);
454      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
455      for (HFileBlockPair block : blocks) {
456        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
457      }
458      for (HFileBlockPair block : blocks) {
459        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
460          false);
461      }
462      usedSize = bucketCache.getAllocator().getUsedSize();
463      assertNotEquals(0, usedSize);
464      bucketCache.shutdown();
465      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
466        constructedBlockSizes, writeThreads, writerQLen, null);
467      assertTrue(bucketCache.waitForCacheInitialization(10000));
468      assertEquals(0, bucketCache.getAllocator().getUsedSize());
469    } finally {
470      bucketCache.shutdown();
471      HBASE_TESTING_UTILITY.cleanupTestDir();
472    }
473  }
474
475  @Test
476  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
477    long availableSpace = 20 * 1024L * 1024 * 1024;
478    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
479    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
480    assertTrue(allocator.getBuckets().length > 0);
481  }
482
483  @Test
484  public void testGetPartitionSize() throws IOException {
485    // Test default values
486    validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
487
488    Configuration conf = HBaseConfiguration.create();
489    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
490    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
491    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
492    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
493
494    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
495      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
496    assertTrue(cache.waitForCacheInitialization(10000));
497
498    validateGetPartitionSize(cache, 0.1f, 0.5f);
499    validateGetPartitionSize(cache, 0.7f, 0.5f);
500    validateGetPartitionSize(cache, 0.2f, 0.5f);
501  }
502
503  @Test
504  public void testCacheSizeCapacity() throws IOException {
505    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
506    validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
507    Configuration conf = HBaseConfiguration.create();
508    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
509    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
510    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
511    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
512    try {
513      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
514        writerQLen, null, 100, conf);
515      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
516    } catch (IllegalArgumentException e) {
517      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
518    }
519  }
520
521  @Test
522  public void testValidBucketCacheConfigs() throws IOException {
523    Configuration conf = HBaseConfiguration.create();
524    conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
525    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
526    conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
527    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
528    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
529    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
530
531    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
532      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
533    assertTrue(cache.waitForCacheInitialization(10000));
534
535    assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
536      cache.getAcceptableFactor(), 0);
537    assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0);
538    assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
539      cache.getExtraFreeFactor(), 0);
540    assertEquals(SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, cache.getSingleFactor(),
541      0);
542    assertEquals(MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, cache.getMultiFactor(),
543      0);
544    assertEquals(MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, cache.getMemoryFactor(),
545      0);
546  }
547
548  @Test
549  public void testInvalidAcceptFactorConfig() throws IOException {
550    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
551    boolean[] expectedOutcomes = { false, false, true, false };
552    Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues);
553    Configuration conf = HBaseConfiguration.create();
554    checkConfigValues(conf, configMappings, expectedOutcomes);
555  }
556
557  @Test
558  public void testInvalidMinFactorConfig() throws IOException {
559    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
560    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
561    boolean[] expectedOutcomes = { false, true, false, false };
562    Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues);
563    Configuration conf = HBaseConfiguration.create();
564    checkConfigValues(conf, configMappings, expectedOutcomes);
565  }
566
567  @Test
568  public void testInvalidExtraFreeFactorConfig() throws IOException {
569    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
570    // throws due to <0, in expected range, in expected range, config can be > 1.0
571    boolean[] expectedOutcomes = { false, true, true, true };
572    Map<String, float[]> configMappings =
573      ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
574    Configuration conf = HBaseConfiguration.create();
575    checkConfigValues(conf, configMappings, expectedOutcomes);
576  }
577
578  @Test
579  public void testInvalidCacheSplitFactorConfig() throws IOException {
580    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
581    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
582    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
583    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
584    // be negative, configs don't add to 1.0
585    boolean[] expectedOutcomes = { true, false, false, false };
586    Map<String,
587      float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues,
588        MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME,
589        memoryFactorConfigValues);
590    Configuration conf = HBaseConfiguration.create();
591    checkConfigValues(conf, configMappings, expectedOutcomes);
592  }
593
594  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
595    boolean[] expectSuccess) throws IOException {
596    Set<String> configNames = configMap.keySet();
597    for (int i = 0; i < expectSuccess.length; i++) {
598      try {
599        for (String configName : configNames) {
600          conf.setFloat(configName, configMap.get(configName)[i]);
601        }
602        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
603          constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
604        assertTrue(cache.waitForCacheInitialization(10000));
605        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i]
606          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
607      } catch (IllegalArgumentException e) {
608        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i]
609          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
610      }
611    }
612  }
613
614  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
615    float minFactor) {
616    long expectedOutput =
617      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
618    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
619  }
620
621  @Test
622  public void testOffsetProducesPositiveOutput() {
623    // This number is picked because it produces negative output if the values isn't ensured to be
624    // positive. See HBASE-18757 for more information.
625    long testValue = 549888460800L;
626    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> {
627      return ByteBuffAllocator.NONE;
628    }, ByteBuffAllocator.HEAP);
629    assertEquals(testValue, bucketEntry.offset());
630  }
631
632  @Test
633  public void testEvictionCount() throws InterruptedException {
634    int size = 100;
635    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
636    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
637    HFileContext meta = new HFileContextBuilder().build();
638    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
639    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
640      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
641    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
642      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
643
644    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
645    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
646    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
647    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
648    blockWithNextBlockMetadata.serialize(block1Buffer, true);
649    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
650
651    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
652    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
653      block1Buffer);
654
655    waitUntilFlushedToBucket(cache, key);
656
657    assertEquals(0, cache.getStats().getEvictionCount());
658
659    // evict call should return 1, but then eviction count be 0
660    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
661    assertEquals(0, cache.getStats().getEvictionCount());
662
663    // add back
664    key = new BlockCacheKey("testEvictionCount", 0);
665    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
666      block1Buffer);
667    waitUntilFlushedToBucket(cache, key);
668
669    // should not increment
670    assertTrue(cache.evictBlock(key));
671    assertEquals(0, cache.getStats().getEvictionCount());
672
673    // add back
674    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
675      block1Buffer);
676    waitUntilFlushedToBucket(cache, key);
677
678    // should finally increment eviction count
679    cache.freeSpace("testing");
680    assertEquals(1, cache.getStats().getEvictionCount());
681  }
682
683  @Test
684  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
685    int size = 100;
686    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
687    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
688    HFileContext meta = new HFileContextBuilder().build();
689    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
690    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
691      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
692    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
693      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
694
695    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
696    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
697    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
698    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
699    blockWithNextBlockMetadata.serialize(block1Buffer, true);
700    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
701
702    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
703    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
704      block1Buffer);
705
706    waitUntilFlushedToBucket(cache, key);
707    assertNotNull(cache.backingMap.get(key));
708    assertEquals(1, cache.backingMap.get(key).refCnt());
709    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
710    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
711
712    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
713    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
714      block1Buffer);
715    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
716    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
717    assertEquals(1, cache.backingMap.get(key).refCnt());
718
719    // Clear and add blockWithoutNextBlockMetadata
720    assertTrue(cache.evictBlock(key));
721    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
722    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
723
724    assertNull(cache.getBlock(key, false, false, false));
725    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
726      block2Buffer);
727
728    waitUntilFlushedToBucket(cache, key);
729    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
730    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
731
732    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
733    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
734      block1Buffer);
735
736    waitUntilFlushedToBucket(cache, key);
737    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
738    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
739  }
740
741  @Test
742  public void testRAMCache() {
743    int size = 100;
744    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
745    byte[] byteArr = new byte[length];
746    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
747    HFileContext meta = new HFileContextBuilder().build();
748
749    RAMCache cache = new RAMCache();
750    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
751    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
752    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
753      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
754    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
755      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
756    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false);
757    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false);
758
759    assertFalse(cache.containsKey(key1));
760    assertNull(cache.putIfAbsent(key1, re1));
761    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
762
763    assertNotNull(cache.putIfAbsent(key1, re2));
764    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
765    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
766
767    assertNull(cache.putIfAbsent(key2, re2));
768    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
769    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
770
771    cache.remove(key1);
772    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
773    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
774
775    cache.clear();
776    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
777    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
778  }
779
780  @Test
781  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
782    // initialize an block.
783    int size = 100, offset = 20;
784    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
785    ByteBuffer buf = ByteBuffer.allocate(length);
786    HFileContext meta = new HFileContextBuilder().build();
787    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
788      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
789
790    // initialize an mocked ioengine.
791    IOEngine ioEngine = Mockito.mock(IOEngine.class);
792    when(ioEngine.usesSharedMemory()).thenReturn(false);
793    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
794    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
795      Mockito.anyLong());
796    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
797      Mockito.anyLong());
798
799    // create an bucket allocator.
800    long availableSpace = 1024 * 1024 * 1024L;
801    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
802
803    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
804    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false);
805
806    Assert.assertEquals(0, allocator.getUsedSize());
807    try {
808      re.writeToCache(ioEngine, allocator, null, null,
809        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE);
810      Assert.fail();
811    } catch (Exception e) {
812    }
813    Assert.assertEquals(0, allocator.getUsedSize());
814  }
815
816  /**
817   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
818   * could not be freed even if corresponding {@link HFileBlock} is evicted from
819   * {@link BucketCache}.
820   */
821  @Test
822  public void testFreeBucketEntryRestoredFromFile() throws Exception {
823    BucketCache bucketCache = null;
824    try {
825      final Path dataTestDir = createAndGetTestDir();
826
827      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
828      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
829
830      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
831        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
832      assertTrue(bucketCache.waitForCacheInitialization(10000));
833      long usedByteSize = bucketCache.getAllocator().getUsedSize();
834      assertEquals(0, usedByteSize);
835
836      HFileBlockPair[] hfileBlockPairs =
837        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
838      // Add blocks
839      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
840        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
841      }
842
843      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
844        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
845          hfileBlockPair.getBlock(), false);
846      }
847      usedByteSize = bucketCache.getAllocator().getUsedSize();
848      assertNotEquals(0, usedByteSize);
849      // persist cache to file
850      bucketCache.shutdown();
851      assertTrue(new File(persistencePath).exists());
852
853      // restore cache from file
854      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
855        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
856      assertTrue(bucketCache.waitForCacheInitialization(10000));
857      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
858
859      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
860        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
861        bucketCache.evictBlock(blockCacheKey);
862      }
863      assertEquals(0, bucketCache.getAllocator().getUsedSize());
864      assertEquals(0, bucketCache.backingMap.size());
865    } finally {
866      bucketCache.shutdown();
867      HBASE_TESTING_UTILITY.cleanupTestDir();
868    }
869  }
870
871  @Test
872  public void testBlockAdditionWaitWhenCache() throws Exception {
873    BucketCache bucketCache = null;
874    try {
875      final Path dataTestDir = createAndGetTestDir();
876
877      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
878      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
879
880      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
881      config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000);
882
883      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
884        constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config);
885      assertTrue(bucketCache.waitForCacheInitialization(10000));
886      long usedByteSize = bucketCache.getAllocator().getUsedSize();
887      assertEquals(0, usedByteSize);
888
889      HFileBlockPair[] hfileBlockPairs =
890        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
891      String[] names = CacheTestUtils.getHFileNames(hfileBlockPairs);
892      // Add blocks
893      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
894        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
895          true);
896      }
897
898      // Max wait for 10 seconds.
899      long timeout = 10000;
900      // Wait for blocks size to match the number of blocks.
901      while (bucketCache.backingMap.size() != 10) {
902        if (timeout <= 0) break;
903        Threads.sleep(100);
904        timeout -= 100;
905      }
906      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
907        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
908      }
909      usedByteSize = bucketCache.getAllocator().getUsedSize();
910      assertNotEquals(0, usedByteSize);
911      // persist cache to file
912      bucketCache.shutdown();
913      assertTrue(new File(persistencePath).exists());
914
915      // restore cache from file
916      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
917        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
918      assertTrue(bucketCache.waitForCacheInitialization(10000));
919      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
920      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(hfileBlockPairs, names);
921      for (BlockCacheKey key : newKeys) {
922        bucketCache.evictBlock(key);
923      }
924      assertEquals(0, bucketCache.getAllocator().getUsedSize());
925      assertEquals(0, bucketCache.backingMap.size());
926    } finally {
927      if (bucketCache != null) {
928        bucketCache.shutdown();
929      }
930      HBASE_TESTING_UTILITY.cleanupTestDir();
931    }
932  }
933
934  @Test
935  public void testOnConfigurationChange() throws Exception {
936    BucketCache bucketCache = null;
937    try {
938      final Path dataTestDir = createAndGetTestDir();
939
940      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
941
942      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
943
944      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
945        constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config);
946
947      assertTrue(bucketCache.waitForCacheInitialization(10000));
948
949      config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
950      config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f);
951      config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f);
952      config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f);
953      config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f);
954      config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
955      config.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
956      config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500);
957      config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000);
958
959      bucketCache.onConfigurationChange(config);
960
961      assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01);
962      assertEquals(0.8f, bucketCache.getMinFactor(), 0.01);
963      assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01);
964      assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01);
965      assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01);
966      assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01);
967      assertEquals(100L, bucketCache.getQueueAdditionWaitTime());
968      assertEquals(500L, bucketCache.getBucketcachePersistInterval());
969      assertEquals(1000L, bucketCache.getPersistenceChunkSize());
970
971    } finally {
972      if (bucketCache != null) {
973        bucketCache.shutdown();
974      }
975      HBASE_TESTING_UTILITY.cleanupTestDir();
976    }
977  }
978
979  @Test
980  public void testNotifyFileCachingCompletedSuccess() throws Exception {
981    BucketCache bucketCache = null;
982    try {
983      Path filePath =
984        new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess");
985      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false);
986      if (bucketCache.getStats().getFailedInserts() > 0) {
987        LOG.info("There were {} fail inserts, "
988          + "will assert if total blocks in backingMap equals (10 - failInserts) "
989          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
990        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
991        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
992      } else {
993        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
994      }
995    } finally {
996      if (bucketCache != null) {
997        bucketCache.shutdown();
998      }
999      HBASE_TESTING_UTILITY.cleanupTestDir();
1000    }
1001  }
1002
1003  @Test
1004  public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception {
1005    BucketCache bucketCache = null;
1006    try {
1007      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
1008        "testNotifyFileCachingCompletedForEncodedDataSuccess");
1009      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true);
1010      if (bucketCache.getStats().getFailedInserts() > 0) {
1011        LOG.info("There were {} fail inserts, "
1012          + "will assert if total blocks in backingMap equals (10 - failInserts) "
1013          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
1014        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
1015        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1016      } else {
1017        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1018      }
1019    } finally {
1020      if (bucketCache != null) {
1021        bucketCache.shutdown();
1022      }
1023      HBASE_TESTING_UTILITY.cleanupTestDir();
1024    }
1025  }
1026
1027  @Test
1028  public void testNotifyFileCachingCompletedNotAllCached() throws Exception {
1029    BucketCache bucketCache = null;
1030    try {
1031      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
1032        "testNotifyFileCachingCompletedNotAllCached");
1033      // Deliberately passing more blocks than we have created to test that
1034      // notifyFileCachingCompleted will not consider the file fully cached
1035      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false);
1036      assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1037    } finally {
1038      if (bucketCache != null) {
1039        bucketCache.shutdown();
1040      }
1041      HBASE_TESTING_UTILITY.cleanupTestDir();
1042    }
1043  }
1044
1045  private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath,
1046    int totalBlocksToCheck, boolean encoded) throws Exception {
1047    final Path dataTestDir = createAndGetTestDir();
1048    String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
1049    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
1050      constructedBlockSizes, 1, 1, null);
1051    assertTrue(bucketCache.waitForCacheInitialization(10000));
1052    long usedByteSize = bucketCache.getAllocator().getUsedSize();
1053    assertEquals(0, usedByteSize);
1054    HFileBlockPair[] hfileBlockPairs =
1055      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded);
1056    // Add blocks
1057    for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
1058      bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true);
1059    }
1060    bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck,
1061      totalBlocksToCheck * constructedBlockSize);
1062    return bucketCache;
1063  }
1064
1065  @Test
1066  public void testEvictOrphansOutOfGracePeriod() throws Exception {
1067    BucketCache bucketCache = testEvictOrphans(0);
1068    assertEquals(10, bucketCache.getBackingMap().size());
1069    assertEquals(0, bucketCache.blocksByHFile.stream()
1070      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count());
1071  }
1072
1073  @Test
1074  public void testEvictOrphansWithinGracePeriod() throws Exception {
1075    BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L);
1076    assertEquals(18, bucketCache.getBackingMap().size());
1077    assertTrue(bucketCache.blocksByHFile.stream()
1078      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0);
1079  }
1080
1081  private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception {
1082    Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid");
1083    Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan");
1084    Map<String, HRegion> onlineRegions = new HashMap<>();
1085    List<HStore> stores = new ArrayList<>();
1086    Collection<HStoreFile> storeFiles = new ArrayList<>();
1087    HRegion mockedRegion = mock(HRegion.class);
1088    HStore mockedStore = mock(HStore.class);
1089    HStoreFile mockedStoreFile = mock(HStoreFile.class);
1090    when(mockedStoreFile.getPath()).thenReturn(validFile);
1091    storeFiles.add(mockedStoreFile);
1092    when(mockedStore.getStorefiles()).thenReturn(storeFiles);
1093    stores.add(mockedStore);
1094    when(mockedRegion.getStores()).thenReturn(stores);
1095    onlineRegions.put("mocked_region", mockedRegion);
1096    HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
1097    HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
1098    HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
1099    HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD,
1100      orphanEvictionGracePeriod);
1101    BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21,
1102      constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000,
1103      HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions);
1104    HFileBlockPair[] validBlockPairs =
1105      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile, false);
1106    HFileBlockPair[] orphanBlockPairs =
1107      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile, false);
1108    for (HFileBlockPair pair : validBlockPairs) {
1109      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1110    }
1111    waitUntilAllFlushedToBucket(bucketCache);
1112    assertEquals(10, bucketCache.getBackingMap().size());
1113    bucketCache.freeSpace("test");
1114    assertEquals(10, bucketCache.getBackingMap().size());
1115    for (HFileBlockPair pair : orphanBlockPairs) {
1116      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1117    }
1118    waitUntilAllFlushedToBucket(bucketCache);
1119    assertEquals(20, bucketCache.getBackingMap().size());
1120    bucketCache.freeSpace("test");
1121    return bucketCache;
1122  }
1123
1124  @Test
1125  public void testBlockPriority() throws Exception {
1126    HFileBlockPair block = CacheTestUtils.generateHFileBlocks(BLOCK_SIZE, 1)[0];
1127    cacheAndWaitUntilFlushedToBucket(cache, block.getBlockName(), block.getBlock(), true);
1128    assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.SINGLE);
1129    cache.getBlock(block.getBlockName(), true, false, true);
1130    assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.MULTI);
1131  }
1132
1133  @Test
1134  public void testIOTimePerHitReturnsZeroWhenNoHits()
1135    throws NoSuchFieldException, IllegalAccessException {
1136    CacheStats cacheStats = cache.getStats();
1137    assertTrue(cacheStats instanceof BucketCacheStats);
1138    BucketCacheStats bucketCacheStats = (BucketCacheStats) cacheStats;
1139
1140    Field field = BucketCacheStats.class.getDeclaredField("ioHitCount");
1141    field.setAccessible(true);
1142    LongAdder ioHitCount = (LongAdder) field.get(bucketCacheStats);
1143
1144    assertEquals(0, ioHitCount.sum());
1145    double ioTimePerHit = bucketCacheStats.getIOTimePerHit();
1146    assertEquals(0, ioTimePerHit, 0.0);
1147  }
1148}