001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD;
024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR;
026import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR;
027import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
028import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME;
029import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
030import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME;
031import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
032import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME;
033import static org.junit.Assert.assertEquals;
034import static org.junit.Assert.assertFalse;
035import static org.junit.Assert.assertNotEquals;
036import static org.junit.Assert.assertNotNull;
037import static org.junit.Assert.assertNull;
038import static org.junit.Assert.assertTrue;
039import static org.mockito.Mockito.mock;
040import static org.mockito.Mockito.when;
041
042import java.io.File;
043import java.io.IOException;
044import java.lang.reflect.Field;
045import java.nio.ByteBuffer;
046import java.util.ArrayList;
047import java.util.Arrays;
048import java.util.Collection;
049import java.util.HashMap;
050import java.util.List;
051import java.util.Map;
052import java.util.Random;
053import java.util.Set;
054import java.util.concurrent.ThreadLocalRandom;
055import java.util.concurrent.atomic.LongAdder;
056import java.util.concurrent.locks.ReentrantReadWriteLock;
057import org.apache.hadoop.conf.Configuration;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.hbase.HBaseClassTestRule;
060import org.apache.hadoop.hbase.HBaseConfiguration;
061import org.apache.hadoop.hbase.HBaseTestingUtil;
062import org.apache.hadoop.hbase.HConstants;
063import org.apache.hadoop.hbase.Waiter;
064import org.apache.hadoop.hbase.io.ByteBuffAllocator;
065import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
066import org.apache.hadoop.hbase.io.hfile.BlockPriority;
067import org.apache.hadoop.hbase.io.hfile.BlockType;
068import org.apache.hadoop.hbase.io.hfile.CacheStats;
069import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
070import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
071import org.apache.hadoop.hbase.io.hfile.Cacheable;
072import org.apache.hadoop.hbase.io.hfile.HFileBlock;
073import org.apache.hadoop.hbase.io.hfile.HFileContext;
074import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
075import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
076import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
077import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
078import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
079import org.apache.hadoop.hbase.nio.ByteBuff;
080import org.apache.hadoop.hbase.regionserver.HRegion;
081import org.apache.hadoop.hbase.regionserver.HStore;
082import org.apache.hadoop.hbase.regionserver.HStoreFile;
083import org.apache.hadoop.hbase.testclassification.IOTests;
084import org.apache.hadoop.hbase.testclassification.LargeTests;
085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
086import org.apache.hadoop.hbase.util.Pair;
087import org.apache.hadoop.hbase.util.Threads;
088import org.junit.After;
089import org.junit.Assert;
090import org.junit.Before;
091import org.junit.ClassRule;
092import org.junit.Test;
093import org.junit.experimental.categories.Category;
094import org.junit.runner.RunWith;
095import org.junit.runners.Parameterized;
096import org.mockito.Mockito;
097import org.slf4j.Logger;
098import org.slf4j.LoggerFactory;
099
100import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
101
102/**
103 * Basic test of BucketCache.Puts and gets.
104 * <p>
105 * Tests will ensure that blocks' data correctness under several threads concurrency
106 */
107@RunWith(Parameterized.class)
108@Category({ IOTests.class, LargeTests.class })
109public class TestBucketCache {
110
111  private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class);
112
113  @ClassRule
114  public static final HBaseClassTestRule CLASS_RULE =
115    HBaseClassTestRule.forClass(TestBucketCache.class);
116
117  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
118  public static Iterable<Object[]> data() {
119    return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize
120                                                          // for these tests?
121      { 16 * 1024,
122        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
123          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
124          128 * 1024 + 1024 } } });
125  }
126
127  @Parameterized.Parameter(0)
128  public int constructedBlockSize;
129
130  @Parameterized.Parameter(1)
131  public int[] constructedBlockSizes;
132
133  BucketCache cache;
134  final int CACHE_SIZE = 1000000;
135  final int NUM_BLOCKS = 100;
136  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
137  final int NUM_THREADS = 100;
138  final int NUM_QUERIES = 10000;
139
140  final long capacitySize = 32 * 1024 * 1024;
141  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
142  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
143  private String ioEngineName = "offheap";
144
145  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
146
147  private static class MockedBucketCache extends BucketCache {
148
149    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
150      int writerThreads, int writerQLen, String persistencePath) throws IOException {
151      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
152        persistencePath);
153    }
154
155    @Override
156    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
157      super.cacheBlock(cacheKey, buf, inMemory);
158    }
159
160    @Override
161    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
162      super.cacheBlock(cacheKey, buf);
163    }
164  }
165
166  @Before
167  public void setup() throws IOException {
168    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
169      constructedBlockSizes, writeThreads, writerQLen, null);
170  }
171
172  @After
173  public void tearDown() {
174    cache.shutdown();
175  }
176
177  /**
178   * Test Utility to create test dir and return name
179   * @return return name of created dir
180   * @throws IOException throws IOException
181   */
182  private Path createAndGetTestDir() throws IOException {
183    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
184    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
185    return testDir;
186  }
187
188  /**
189   * Return a random element from {@code a}.
190   */
191  private static <T> T randFrom(List<T> a) {
192    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
193  }
194
195  @Test
196  public void testBucketAllocator() throws BucketAllocatorException {
197    BucketAllocator mAllocator = cache.getAllocator();
198    /*
199     * Test the allocator first
200     */
201    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
202
203    boolean full = false;
204    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
205    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
206    // the cache is completely filled.
207    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
208    while (!full) {
209      Integer blockSize = null;
210      try {
211        blockSize = randFrom(tmp);
212        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
213      } catch (CacheFullException cfe) {
214        tmp.remove(blockSize);
215        if (tmp.isEmpty()) full = true;
216      }
217    }
218
219    for (Integer blockSize : BLOCKSIZES) {
220      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
221      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
222      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
223
224      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
225      // additional 1024 on top of that so this counts towards fragmentation in our test
226      // real life may have worse fragmentation because blocks may not be perfectly sized to block
227      // size, given encoding/compression and large rows
228      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
229    }
230
231    mAllocator.logDebugStatistics();
232
233    for (Pair<Long, Integer> allocation : allocations) {
234      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
235        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
236    }
237    assertEquals(0, mAllocator.getUsedSize());
238  }
239
240  @Test
241  public void testCacheSimple() throws Exception {
242    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
243  }
244
245  @Test
246  public void testCacheMultiThreadedSingleKey() throws Exception {
247    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
248  }
249
250  @Test
251  public void testHeapSizeChanges() throws Exception {
252    cache.stopWriterThreads();
253    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
254  }
255
256  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
257    throws InterruptedException {
258    Waiter.waitFor(HBaseConfiguration.create(), 10000,
259      () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)));
260  }
261
262  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
263    while (!cache.ramCache.isEmpty()) {
264      Thread.sleep(100);
265    }
266    Thread.sleep(1000);
267  }
268
269  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
270  // threads will flush it to the bucket and put reference entry in backingMap.
271  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
272    Cacheable block, boolean waitWhenCache) throws InterruptedException {
273    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
274    waitUntilFlushedToBucket(cache, cacheKey);
275  }
276
277  @Test
278  public void testMemoryLeak() throws Exception {
279    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
280    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
281      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
282    long lockId = cache.backingMap.get(cacheKey).offset();
283    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
284    lock.writeLock().lock();
285    Thread evictThread = new Thread("evict-block") {
286      @Override
287      public void run() {
288        cache.evictBlock(cacheKey);
289      }
290    };
291    evictThread.start();
292    cache.offsetLock.waitForWaiters(lockId, 1);
293    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
294    assertEquals(0, cache.getBlockCount());
295    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
296      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
297    assertEquals(1, cache.getBlockCount());
298    lock.writeLock().unlock();
299    evictThread.join();
300    /**
301     * <pre>
302     * The asserts here before HBASE-21957 are:
303     * assertEquals(1L, cache.getBlockCount());
304     * assertTrue(cache.getCurrentSize() > 0L);
305     * assertTrue("We should have a block!", cache.iterator().hasNext());
306     *
307     * The asserts here after HBASE-21957 are:
308     * assertEquals(0, cache.getBlockCount());
309     * assertEquals(cache.getCurrentSize(), 0L);
310     *
311     * I think the asserts before HBASE-21957 is more reasonable,because
312     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
313     * it had seen, and newly added Block after the {@link BucketEntry}
314     * it had seen should not be evicted.
315     * </pre>
316     */
317    assertEquals(1L, cache.getBlockCount());
318    assertTrue(cache.getCurrentSize() > 0L);
319    assertTrue("We should have a block!", cache.iterator().hasNext());
320  }
321
322  @Test
323  public void testRetrieveFromFile() throws Exception {
324    Path testDir = createAndGetTestDir();
325    String ioEngineName = "file:" + testDir + "/bucket.cache";
326    testRetrievalUtils(testDir, ioEngineName);
327    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
328    String persistencePath = testDir + "/bucket.persistence";
329    BucketCache bucketCache = null;
330    try {
331      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
332        smallBucketSizes, writeThreads, writerQLen, persistencePath);
333      assertTrue(bucketCache.waitForCacheInitialization(10000));
334      assertFalse(new File(persistencePath).exists());
335      assertEquals(0, bucketCache.getAllocator().getUsedSize());
336      assertEquals(0, bucketCache.backingMap.size());
337    } finally {
338      bucketCache.shutdown();
339      HBASE_TESTING_UTILITY.cleanupTestDir();
340    }
341  }
342
343  @Test
344  public void testRetrieveFromMMap() throws Exception {
345    final Path testDir = createAndGetTestDir();
346    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
347    testRetrievalUtils(testDir, ioEngineName);
348  }
349
350  @Test
351  public void testRetrieveFromPMem() throws Exception {
352    final Path testDir = createAndGetTestDir();
353    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
354    testRetrievalUtils(testDir, ioEngineName);
355    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
356    String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
357    BucketCache bucketCache = null;
358    try {
359      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
360        smallBucketSizes, writeThreads, writerQLen, persistencePath);
361      assertTrue(bucketCache.waitForCacheInitialization(10000));
362      assertFalse(new File(persistencePath).exists());
363      assertEquals(0, bucketCache.getAllocator().getUsedSize());
364      assertEquals(0, bucketCache.backingMap.size());
365    } finally {
366      bucketCache.shutdown();
367      HBASE_TESTING_UTILITY.cleanupTestDir();
368    }
369  }
370
371  private void testRetrievalUtils(Path testDir, String ioEngineName)
372    throws IOException, InterruptedException {
373    final String persistencePath =
374      testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
375    BucketCache bucketCache = null;
376    try {
377      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
378        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
379      assertTrue(bucketCache.waitForCacheInitialization(10000));
380      long usedSize = bucketCache.getAllocator().getUsedSize();
381      assertEquals(0, usedSize);
382      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
383      for (HFileBlockPair block : blocks) {
384        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
385      }
386      for (HFileBlockPair block : blocks) {
387        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
388          false);
389      }
390      usedSize = bucketCache.getAllocator().getUsedSize();
391      assertNotEquals(0, usedSize);
392      bucketCache.shutdown();
393      assertTrue(new File(persistencePath).exists());
394      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
395        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
396      assertTrue(bucketCache.waitForCacheInitialization(10000));
397
398      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
399    } finally {
400      if (bucketCache != null) {
401        bucketCache.shutdown();
402      }
403    }
404    assertTrue(new File(persistencePath).exists());
405  }
406
407  @Test
408  public void testRetrieveUnsupportedIOE() throws Exception {
409    try {
410      final Path testDir = createAndGetTestDir();
411      final String ioEngineName = testDir + "/bucket.cache";
412      testRetrievalUtils(testDir, ioEngineName);
413      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
414    } catch (IllegalArgumentException e) {
415      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, "
416        + "files:, mmap: or offheap", e.getMessage());
417    }
418  }
419
420  @Test
421  public void testRetrieveFromMultipleFiles() throws Exception {
422    final Path testDirInitial = createAndGetTestDir();
423    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
424    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
425    String ioEngineName =
426      new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache")
427        .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString();
428    testRetrievalUtils(testDirInitial, ioEngineName);
429    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
430    String persistencePath = testDirInitial + "/bucket.persistence";
431    BucketCache bucketCache = null;
432    try {
433      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
434        smallBucketSizes, writeThreads, writerQLen, persistencePath);
435      assertTrue(bucketCache.waitForCacheInitialization(10000));
436      assertFalse(new File(persistencePath).exists());
437      assertEquals(0, bucketCache.getAllocator().getUsedSize());
438      assertEquals(0, bucketCache.backingMap.size());
439    } finally {
440      bucketCache.shutdown();
441      HBASE_TESTING_UTILITY.cleanupTestDir();
442    }
443  }
444
445  @Test
446  public void testRetrieveFromFileWithoutPersistence() throws Exception {
447    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
448      constructedBlockSizes, writeThreads, writerQLen, null);
449    assertTrue(bucketCache.waitForCacheInitialization(10000));
450    try {
451      final Path testDir = createAndGetTestDir();
452      String ioEngineName = "file:" + testDir + "/bucket.cache";
453      long usedSize = bucketCache.getAllocator().getUsedSize();
454      assertEquals(0, usedSize);
455      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
456      for (HFileBlockPair block : blocks) {
457        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
458      }
459      for (HFileBlockPair block : blocks) {
460        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
461          false);
462      }
463      usedSize = bucketCache.getAllocator().getUsedSize();
464      assertNotEquals(0, usedSize);
465      bucketCache.shutdown();
466      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
467        constructedBlockSizes, writeThreads, writerQLen, null);
468      assertTrue(bucketCache.waitForCacheInitialization(10000));
469      assertEquals(0, bucketCache.getAllocator().getUsedSize());
470    } finally {
471      bucketCache.shutdown();
472      HBASE_TESTING_UTILITY.cleanupTestDir();
473    }
474  }
475
476  @Test
477  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
478    long availableSpace = 20 * 1024L * 1024 * 1024;
479    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
480    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
481    assertTrue(allocator.getBuckets().length > 0);
482  }
483
484  @Test
485  public void testGetPartitionSize() throws IOException {
486    // Test default values
487    validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
488
489    Configuration conf = HBaseConfiguration.create();
490    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
491    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
492    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
493    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
494
495    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
496      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
497    assertTrue(cache.waitForCacheInitialization(10000));
498
499    validateGetPartitionSize(cache, 0.1f, 0.5f);
500    validateGetPartitionSize(cache, 0.7f, 0.5f);
501    validateGetPartitionSize(cache, 0.2f, 0.5f);
502  }
503
504  @Test
505  public void testCacheSizeCapacity() throws IOException {
506    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
507    validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
508    Configuration conf = HBaseConfiguration.create();
509    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
510    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
511    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
512    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
513    try {
514      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
515        writerQLen, null, 100, conf);
516      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
517    } catch (IllegalArgumentException e) {
518      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
519    }
520  }
521
522  @Test
523  public void testValidBucketCacheConfigs() throws IOException {
524    Configuration conf = HBaseConfiguration.create();
525    conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
526    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
527    conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
528    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
529    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
530    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
531
532    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
533      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
534    assertTrue(cache.waitForCacheInitialization(10000));
535
536    assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
537      cache.getAcceptableFactor(), 0);
538    assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0);
539    assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
540      cache.getExtraFreeFactor(), 0);
541    assertEquals(SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, cache.getSingleFactor(),
542      0);
543    assertEquals(MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, cache.getMultiFactor(),
544      0);
545    assertEquals(MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, cache.getMemoryFactor(),
546      0);
547  }
548
549  @Test
550  public void testInvalidAcceptFactorConfig() throws IOException {
551    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
552    boolean[] expectedOutcomes = { false, false, true, false };
553    Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues);
554    Configuration conf = HBaseConfiguration.create();
555    checkConfigValues(conf, configMappings, expectedOutcomes);
556  }
557
558  @Test
559  public void testInvalidMinFactorConfig() throws IOException {
560    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
561    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
562    boolean[] expectedOutcomes = { false, true, false, false };
563    Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues);
564    Configuration conf = HBaseConfiguration.create();
565    checkConfigValues(conf, configMappings, expectedOutcomes);
566  }
567
568  @Test
569  public void testInvalidExtraFreeFactorConfig() throws IOException {
570    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
571    // throws due to <0, in expected range, in expected range, config can be > 1.0
572    boolean[] expectedOutcomes = { false, true, true, true };
573    Map<String, float[]> configMappings =
574      ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
575    Configuration conf = HBaseConfiguration.create();
576    checkConfigValues(conf, configMappings, expectedOutcomes);
577  }
578
579  @Test
580  public void testInvalidCacheSplitFactorConfig() throws IOException {
581    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
582    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
583    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
584    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
585    // be negative, configs don't add to 1.0
586    boolean[] expectedOutcomes = { true, false, false, false };
587    Map<String,
588      float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues,
589        MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME,
590        memoryFactorConfigValues);
591    Configuration conf = HBaseConfiguration.create();
592    checkConfigValues(conf, configMappings, expectedOutcomes);
593  }
594
595  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
596    boolean[] expectSuccess) throws IOException {
597    Set<String> configNames = configMap.keySet();
598    for (int i = 0; i < expectSuccess.length; i++) {
599      try {
600        for (String configName : configNames) {
601          conf.setFloat(configName, configMap.get(configName)[i]);
602        }
603        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
604          constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
605        assertTrue(cache.waitForCacheInitialization(10000));
606        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i]
607          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
608      } catch (IllegalArgumentException e) {
609        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i]
610          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
611      }
612    }
613  }
614
615  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
616    float minFactor) {
617    long expectedOutput =
618      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
619    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
620  }
621
622  @Test
623  public void testOffsetProducesPositiveOutput() {
624    // This number is picked because it produces negative output if the values isn't ensured to be
625    // positive. See HBASE-18757 for more information.
626    long testValue = 549888460800L;
627    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> {
628      return ByteBuffAllocator.NONE;
629    }, ByteBuffAllocator.HEAP);
630    assertEquals(testValue, bucketEntry.offset());
631  }
632
633  @Test
634  public void testEvictionCount() throws InterruptedException {
635    int size = 100;
636    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
637    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
638    HFileContext meta = new HFileContextBuilder().build();
639    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
640    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
641      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
642    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
643      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
644
645    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
646    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
647    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
648    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
649    blockWithNextBlockMetadata.serialize(block1Buffer, true);
650    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
651
652    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
653    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
654      block1Buffer);
655
656    waitUntilFlushedToBucket(cache, key);
657
658    assertEquals(0, cache.getStats().getEvictionCount());
659
660    // evict call should return 1, but then eviction count be 0
661    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
662    assertEquals(0, cache.getStats().getEvictionCount());
663
664    // add back
665    key = new BlockCacheKey("testEvictionCount", 0);
666    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
667      block1Buffer);
668    waitUntilFlushedToBucket(cache, key);
669
670    // should not increment
671    assertTrue(cache.evictBlock(key));
672    assertEquals(0, cache.getStats().getEvictionCount());
673
674    // add back
675    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
676      block1Buffer);
677    waitUntilFlushedToBucket(cache, key);
678
679    // should finally increment eviction count
680    cache.freeSpace("testing");
681    assertEquals(1, cache.getStats().getEvictionCount());
682  }
683
684  @Test
685  public void testStringPool() throws Exception {
686    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
687    Path testDir = TEST_UTIL.getDataTestDir();
688    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
689    BucketCache bucketCache =
690      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
691        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
692    assertTrue(bucketCache.waitForCacheInitialization(10000));
693    long usedSize = bucketCache.getAllocator().getUsedSize();
694    assertEquals(0, usedSize);
695    Random rand = ThreadLocalRandom.current();
696    Path filePath = new Path(testDir, Long.toString(rand.nextLong()));
697    CacheTestUtils.HFileBlockPair[] blocks =
698      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 1, filePath, false);
699    String name = blocks[0].getBlockName().getHfileName();
700    assertEquals(name, filePath.getName());
701    assertNotNull(blocks[0].getBlockName().getRegionName());
702    bucketCache.cacheBlock(blocks[0].getBlockName(), blocks[0].getBlock());
703    waitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName());
704    assertTrue(FilePathStringPool.getInstance().size() > 0);
705    bucketCache.evictBlock(blocks[0].getBlockName());
706    assertTrue(FilePathStringPool.getInstance().size() > 0);
707    bucketCache.cacheBlock(blocks[0].getBlockName(), blocks[0].getBlock());
708    waitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName());
709    bucketCache.fileCacheCompleted(filePath,
710      bucketCache.backingMap.get(blocks[0].getBlockName()).getLength());
711    bucketCache.evictBlocksByHfileName(name);
712    assertEquals(1, FilePathStringPool.getInstance().size());
713  }
714
715  @Test
716  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
717    int size = 100;
718    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
719    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
720    HFileContext meta = new HFileContextBuilder().build();
721    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
722    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
723      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
724    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
725      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
726
727    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
728    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
729    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
730    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
731    blockWithNextBlockMetadata.serialize(block1Buffer, true);
732    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
733
734    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
735    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
736      block1Buffer);
737
738    waitUntilFlushedToBucket(cache, key);
739    assertNotNull(cache.backingMap.get(key));
740    assertEquals(1, cache.backingMap.get(key).refCnt());
741    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
742    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
743
744    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
745    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
746      block1Buffer);
747    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
748    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
749    assertEquals(1, cache.backingMap.get(key).refCnt());
750
751    // Clear and add blockWithoutNextBlockMetadata
752    assertTrue(cache.evictBlock(key));
753    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
754    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
755
756    assertNull(cache.getBlock(key, false, false, false));
757    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
758      block2Buffer);
759
760    waitUntilFlushedToBucket(cache, key);
761    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
762    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
763
764    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
765    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
766      block1Buffer);
767
768    waitUntilFlushedToBucket(cache, key);
769    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
770    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
771  }
772
773  @Test
774  public void testRAMCache() {
775    int size = 100;
776    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
777    byte[] byteArr = new byte[length];
778    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
779    HFileContext meta = new HFileContextBuilder().build();
780
781    RAMCache cache = new RAMCache();
782    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
783    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
784    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
785      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
786    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
787      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
788    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false);
789    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false);
790
791    assertFalse(cache.containsKey(key1));
792    assertNull(cache.putIfAbsent(key1, re1));
793    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
794
795    assertNotNull(cache.putIfAbsent(key1, re2));
796    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
797    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
798
799    assertNull(cache.putIfAbsent(key2, re2));
800    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
801    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
802
803    cache.remove(key1);
804    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
805    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
806
807    cache.clear();
808    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
809    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
810  }
811
812  @Test
813  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
814    // initialize an block.
815    int size = 100, offset = 20;
816    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
817    ByteBuffer buf = ByteBuffer.allocate(length);
818    HFileContext meta = new HFileContextBuilder().build();
819    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
820      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
821
822    // initialize an mocked ioengine.
823    IOEngine ioEngine = Mockito.mock(IOEngine.class);
824    when(ioEngine.usesSharedMemory()).thenReturn(false);
825    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
826    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
827      Mockito.anyLong());
828    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
829      Mockito.anyLong());
830
831    // create an bucket allocator.
832    long availableSpace = 1024 * 1024 * 1024L;
833    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
834
835    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
836    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false);
837
838    Assert.assertEquals(0, allocator.getUsedSize());
839    try {
840      re.writeToCache(ioEngine, allocator, null, null,
841        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE);
842      Assert.fail();
843    } catch (Exception e) {
844    }
845    Assert.assertEquals(0, allocator.getUsedSize());
846  }
847
848  /**
849   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
850   * could not be freed even if corresponding {@link HFileBlock} is evicted from
851   * {@link BucketCache}.
852   */
853  @Test
854  public void testFreeBucketEntryRestoredFromFile() throws Exception {
855    BucketCache bucketCache = null;
856    try {
857      final Path dataTestDir = createAndGetTestDir();
858
859      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
860      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
861
862      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
863        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
864      assertTrue(bucketCache.waitForCacheInitialization(10000));
865      long usedByteSize = bucketCache.getAllocator().getUsedSize();
866      assertEquals(0, usedByteSize);
867
868      HFileBlockPair[] hfileBlockPairs =
869        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
870      // Add blocks
871      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
872        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
873      }
874
875      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
876        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
877          hfileBlockPair.getBlock(), false);
878      }
879      usedByteSize = bucketCache.getAllocator().getUsedSize();
880      assertNotEquals(0, usedByteSize);
881      // persist cache to file
882      bucketCache.shutdown();
883      assertTrue(new File(persistencePath).exists());
884
885      // restore cache from file
886      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
887        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
888      assertTrue(bucketCache.waitForCacheInitialization(10000));
889      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
890
891      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
892        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
893        bucketCache.evictBlock(blockCacheKey);
894      }
895      assertEquals(0, bucketCache.getAllocator().getUsedSize());
896      assertEquals(0, bucketCache.backingMap.size());
897    } finally {
898      bucketCache.shutdown();
899      HBASE_TESTING_UTILITY.cleanupTestDir();
900    }
901  }
902
903  @Test
904  public void testBlockAdditionWaitWhenCache() throws Exception {
905    BucketCache bucketCache = null;
906    try {
907      final Path dataTestDir = createAndGetTestDir();
908
909      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
910      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
911
912      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
913      config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000);
914
915      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
916        constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config);
917      assertTrue(bucketCache.waitForCacheInitialization(10000));
918      long usedByteSize = bucketCache.getAllocator().getUsedSize();
919      assertEquals(0, usedByteSize);
920
921      HFileBlockPair[] hfileBlockPairs =
922        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
923      String[] names = CacheTestUtils.getHFileNames(hfileBlockPairs);
924      // Add blocks
925      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
926        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
927          true);
928      }
929
930      // Max wait for 10 seconds.
931      long timeout = 10000;
932      // Wait for blocks size to match the number of blocks.
933      while (bucketCache.backingMap.size() != 10) {
934        if (timeout <= 0) break;
935        Threads.sleep(100);
936        timeout -= 100;
937      }
938      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
939        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
940      }
941      usedByteSize = bucketCache.getAllocator().getUsedSize();
942      assertNotEquals(0, usedByteSize);
943      // persist cache to file
944      bucketCache.shutdown();
945      assertTrue(new File(persistencePath).exists());
946
947      // restore cache from file
948      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
949        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
950      assertTrue(bucketCache.waitForCacheInitialization(10000));
951      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
952      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(hfileBlockPairs, names);
953      for (BlockCacheKey key : newKeys) {
954        bucketCache.evictBlock(key);
955      }
956      assertEquals(0, bucketCache.getAllocator().getUsedSize());
957      assertEquals(0, bucketCache.backingMap.size());
958    } finally {
959      if (bucketCache != null) {
960        bucketCache.shutdown();
961      }
962      HBASE_TESTING_UTILITY.cleanupTestDir();
963    }
964  }
965
966  @Test
967  public void testOnConfigurationChange() throws Exception {
968    BucketCache bucketCache = null;
969    try {
970      final Path dataTestDir = createAndGetTestDir();
971
972      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
973
974      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
975
976      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
977        constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config);
978
979      assertTrue(bucketCache.waitForCacheInitialization(10000));
980
981      config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
982      config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f);
983      config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f);
984      config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f);
985      config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f);
986      config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
987      config.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
988      config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500);
989      config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000);
990
991      bucketCache.onConfigurationChange(config);
992
993      assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01);
994      assertEquals(0.8f, bucketCache.getMinFactor(), 0.01);
995      assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01);
996      assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01);
997      assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01);
998      assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01);
999      assertEquals(100L, bucketCache.getQueueAdditionWaitTime());
1000      assertEquals(500L, bucketCache.getBucketcachePersistInterval());
1001      assertEquals(1000L, bucketCache.getPersistenceChunkSize());
1002
1003    } finally {
1004      if (bucketCache != null) {
1005        bucketCache.shutdown();
1006      }
1007      HBASE_TESTING_UTILITY.cleanupTestDir();
1008    }
1009  }
1010
1011  @Test
1012  public void testNotifyFileCachingCompletedSuccess() throws Exception {
1013    BucketCache bucketCache = null;
1014    try {
1015      Path filePath =
1016        new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess");
1017      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false);
1018      if (bucketCache.getStats().getFailedInserts() > 0) {
1019        LOG.info("There were {} fail inserts, "
1020          + "will assert if total blocks in backingMap equals (10 - failInserts) "
1021          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
1022        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
1023        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1024      } else {
1025        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1026      }
1027    } finally {
1028      if (bucketCache != null) {
1029        bucketCache.shutdown();
1030      }
1031      HBASE_TESTING_UTILITY.cleanupTestDir();
1032    }
1033  }
1034
1035  @Test
1036  public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception {
1037    BucketCache bucketCache = null;
1038    try {
1039      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
1040        "testNotifyFileCachingCompletedForEncodedDataSuccess");
1041      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true);
1042      if (bucketCache.getStats().getFailedInserts() > 0) {
1043        LOG.info("There were {} fail inserts, "
1044          + "will assert if total blocks in backingMap equals (10 - failInserts) "
1045          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
1046        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
1047        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1048      } else {
1049        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1050      }
1051    } finally {
1052      if (bucketCache != null) {
1053        bucketCache.shutdown();
1054      }
1055      HBASE_TESTING_UTILITY.cleanupTestDir();
1056    }
1057  }
1058
1059  @Test
1060  public void testNotifyFileCachingCompletedNotAllCached() throws Exception {
1061    BucketCache bucketCache = null;
1062    try {
1063      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
1064        "testNotifyFileCachingCompletedNotAllCached");
1065      // Deliberately passing more blocks than we have created to test that
1066      // notifyFileCachingCompleted will not consider the file fully cached
1067      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false);
1068      assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1069    } finally {
1070      if (bucketCache != null) {
1071        bucketCache.shutdown();
1072      }
1073      HBASE_TESTING_UTILITY.cleanupTestDir();
1074    }
1075  }
1076
1077  private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath,
1078    int totalBlocksToCheck, boolean encoded) throws Exception {
1079    final Path dataTestDir = createAndGetTestDir();
1080    String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
1081    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
1082      constructedBlockSizes, 1, 1, null);
1083    assertTrue(bucketCache.waitForCacheInitialization(10000));
1084    long usedByteSize = bucketCache.getAllocator().getUsedSize();
1085    assertEquals(0, usedByteSize);
1086    HFileBlockPair[] hfileBlockPairs =
1087      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded);
1088    // Add blocks
1089    for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
1090      bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true);
1091    }
1092    bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck,
1093      totalBlocksToCheck * constructedBlockSize);
1094    return bucketCache;
1095  }
1096
1097  @Test
1098  public void testEvictOrphansOutOfGracePeriod() throws Exception {
1099    BucketCache bucketCache = testEvictOrphans(0);
1100    assertEquals(10, bucketCache.getBackingMap().size());
1101    assertEquals(0, bucketCache.blocksByHFile.stream()
1102      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count());
1103  }
1104
1105  @Test
1106  public void testEvictOrphansWithinGracePeriod() throws Exception {
1107    BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L);
1108    assertEquals(18, bucketCache.getBackingMap().size());
1109    assertTrue(bucketCache.blocksByHFile.stream()
1110      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0);
1111  }
1112
1113  private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception {
1114    Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid");
1115    Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan");
1116    Map<String, HRegion> onlineRegions = new HashMap<>();
1117    List<HStore> stores = new ArrayList<>();
1118    Collection<HStoreFile> storeFiles = new ArrayList<>();
1119    HRegion mockedRegion = mock(HRegion.class);
1120    HStore mockedStore = mock(HStore.class);
1121    HStoreFile mockedStoreFile = mock(HStoreFile.class);
1122    when(mockedStoreFile.getPath()).thenReturn(validFile);
1123    storeFiles.add(mockedStoreFile);
1124    when(mockedStore.getStorefiles()).thenReturn(storeFiles);
1125    stores.add(mockedStore);
1126    when(mockedRegion.getStores()).thenReturn(stores);
1127    onlineRegions.put("mocked_region", mockedRegion);
1128    HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
1129    HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
1130    HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
1131    HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD,
1132      orphanEvictionGracePeriod);
1133    BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21,
1134      constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000,
1135      HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions);
1136    HFileBlockPair[] validBlockPairs =
1137      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile, false);
1138    HFileBlockPair[] orphanBlockPairs =
1139      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile, false);
1140    for (HFileBlockPair pair : validBlockPairs) {
1141      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1142    }
1143    waitUntilAllFlushedToBucket(bucketCache);
1144    assertEquals(10, bucketCache.getBackingMap().size());
1145    bucketCache.freeSpace("test");
1146    assertEquals(10, bucketCache.getBackingMap().size());
1147    for (HFileBlockPair pair : orphanBlockPairs) {
1148      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1149    }
1150    waitUntilAllFlushedToBucket(bucketCache);
1151    assertEquals(20, bucketCache.getBackingMap().size());
1152    bucketCache.freeSpace("test");
1153    return bucketCache;
1154  }
1155
1156  @Test
1157  public void testBlockPriority() throws Exception {
1158    HFileBlockPair block = CacheTestUtils.generateHFileBlocks(BLOCK_SIZE, 1)[0];
1159    cacheAndWaitUntilFlushedToBucket(cache, block.getBlockName(), block.getBlock(), true);
1160    assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.SINGLE);
1161    cache.getBlock(block.getBlockName(), true, false, true);
1162    assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.MULTI);
1163  }
1164
1165  @Test
1166  public void testIOTimePerHitReturnsZeroWhenNoHits()
1167    throws NoSuchFieldException, IllegalAccessException {
1168    CacheStats cacheStats = cache.getStats();
1169    assertTrue(cacheStats instanceof BucketCacheStats);
1170    BucketCacheStats bucketCacheStats = (BucketCacheStats) cacheStats;
1171
1172    Field field = BucketCacheStats.class.getDeclaredField("ioHitCount");
1173    field.setAccessible(true);
1174    LongAdder ioHitCount = (LongAdder) field.get(bucketCacheStats);
1175
1176    assertEquals(0, ioHitCount.sum());
1177    double ioTimePerHit = bucketCacheStats.getIOTimePerHit();
1178    assertEquals(0, ioTimePerHit, 0.0);
1179  }
1180}