001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD;
024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR;
026import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR;
027import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
028import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME;
029import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
030import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME;
031import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
032import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME;
033import static org.junit.jupiter.api.Assertions.assertEquals;
034import static org.junit.jupiter.api.Assertions.assertFalse;
035import static org.junit.jupiter.api.Assertions.assertNotEquals;
036import static org.junit.jupiter.api.Assertions.assertNotNull;
037import static org.junit.jupiter.api.Assertions.assertNull;
038import static org.junit.jupiter.api.Assertions.assertTrue;
039import static org.junit.jupiter.api.Assertions.fail;
040import static org.mockito.Mockito.mock;
041import static org.mockito.Mockito.when;
042
043import java.io.File;
044import java.io.IOException;
045import java.lang.reflect.Field;
046import java.nio.ByteBuffer;
047import java.util.ArrayList;
048import java.util.Arrays;
049import java.util.Collection;
050import java.util.HashMap;
051import java.util.List;
052import java.util.Map;
053import java.util.Set;
054import java.util.concurrent.ThreadLocalRandom;
055import java.util.concurrent.atomic.LongAdder;
056import java.util.concurrent.locks.ReentrantReadWriteLock;
057import java.util.stream.Stream;
058import org.apache.hadoop.conf.Configuration;
059import org.apache.hadoop.fs.Path;
060import org.apache.hadoop.hbase.HBaseConfiguration;
061import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
062import org.apache.hadoop.hbase.HBaseTestingUtil;
063import org.apache.hadoop.hbase.HConstants;
064import org.apache.hadoop.hbase.Waiter;
065import org.apache.hadoop.hbase.io.ByteBuffAllocator;
066import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
067import org.apache.hadoop.hbase.io.hfile.BlockPriority;
068import org.apache.hadoop.hbase.io.hfile.BlockType;
069import org.apache.hadoop.hbase.io.hfile.CacheStats;
070import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
071import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
072import org.apache.hadoop.hbase.io.hfile.Cacheable;
073import org.apache.hadoop.hbase.io.hfile.HFileBlock;
074import org.apache.hadoop.hbase.io.hfile.HFileContext;
075import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
076import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
077import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
078import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
079import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
080import org.apache.hadoop.hbase.nio.ByteBuff;
081import org.apache.hadoop.hbase.regionserver.HRegion;
082import org.apache.hadoop.hbase.regionserver.HStore;
083import org.apache.hadoop.hbase.regionserver.HStoreFile;
084import org.apache.hadoop.hbase.testclassification.IOTests;
085import org.apache.hadoop.hbase.testclassification.LargeTests;
086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
087import org.apache.hadoop.hbase.util.Pair;
088import org.apache.hadoop.hbase.util.Threads;
089import org.junit.jupiter.api.AfterEach;
090import org.junit.jupiter.api.BeforeEach;
091import org.junit.jupiter.api.Tag;
092import org.junit.jupiter.api.TestTemplate;
093import org.junit.jupiter.params.provider.Arguments;
094import org.mockito.Mockito;
095import org.slf4j.Logger;
096import org.slf4j.LoggerFactory;
097
098import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
099
100/**
101 * Basic test of BucketCache.Puts and gets.
102 * <p>
103 * Tests will ensure that blocks' data correctness under several threads concurrency
104 */
105@Tag(IOTests.TAG)
106@Tag(LargeTests.TAG)
107@HBaseParameterizedTestTemplate(name = "{index}: blockSize={0}, bucketSizes={1}")
108public class TestBucketCache {
109
110  private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class);
111
112  public static Stream<Arguments> parameters() {
113    // TODO: why is 8k the default blocksize for these tests?
114    return Stream.of(Arguments.of(8192, null),
115      Arguments.of(16 * 1024,
116        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
117          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
118          128 * 1024 + 1024 }));
119  }
120
121  private final int constructedBlockSize;
122  private final int[] constructedBlockSizes;
123
124  public TestBucketCache(int constructedBlockSize, int[] constructedBlockSizes) {
125    this.constructedBlockSize = constructedBlockSize;
126    this.constructedBlockSizes = constructedBlockSizes;
127  }
128
129  BucketCache cache;
130  final int CACHE_SIZE = 1000000;
131  final int NUM_BLOCKS = 100;
132  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
133  final int NUM_THREADS = 100;
134  final int NUM_QUERIES = 10000;
135
136  final long capacitySize = 32 * 1024 * 1024;
137  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
138  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
139  private String ioEngineName = "offheap";
140
141  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
142
143  private static class MockedBucketCache extends BucketCache {
144
145    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
146      int writerThreads, int writerQLen, String persistencePath) throws IOException {
147      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
148        persistencePath);
149    }
150
151    @Override
152    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
153      super.cacheBlock(cacheKey, buf, inMemory);
154    }
155
156    @Override
157    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
158      super.cacheBlock(cacheKey, buf);
159    }
160  }
161
162  @BeforeEach
163  public void setup() throws IOException {
164    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
165      constructedBlockSizes, writeThreads, writerQLen, null);
166  }
167
168  @AfterEach
169  public void tearDown() {
170    cache.shutdown();
171  }
172
173  /**
174   * Test Utility to create test dir and return name
175   * @return return name of created dir
176   * @throws IOException throws IOException
177   */
178  private Path createAndGetTestDir() throws IOException {
179    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
180    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
181    return testDir;
182  }
183
184  /**
185   * Return a random element from {@code a}.
186   */
187  private static <T> T randFrom(List<T> a) {
188    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
189  }
190
191  @TestTemplate
192  public void testBucketAllocator() throws BucketAllocatorException {
193    BucketAllocator mAllocator = cache.getAllocator();
194    /*
195     * Test the allocator first
196     */
197    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
198
199    boolean full = false;
200    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
201    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
202    // the cache is completely filled.
203    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
204    while (!full) {
205      Integer blockSize = null;
206      try {
207        blockSize = randFrom(tmp);
208        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
209      } catch (CacheFullException cfe) {
210        tmp.remove(blockSize);
211        if (tmp.isEmpty()) full = true;
212      }
213    }
214
215    for (Integer blockSize : BLOCKSIZES) {
216      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
217      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
218      assertEquals(0, indexStatistics.freeCount(), "unexpected freeCount for " + bucketSizeInfo);
219
220      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
221      // additional 1024 on top of that so this counts towards fragmentation in our test
222      // real life may have worse fragmentation because blocks may not be perfectly sized to block
223      // size, given encoding/compression and large rows
224      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
225    }
226
227    mAllocator.logDebugStatistics();
228
229    for (Pair<Long, Integer> allocation : allocations) {
230      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
231        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
232    }
233    assertEquals(0, mAllocator.getUsedSize());
234  }
235
236  @TestTemplate
237  public void testCacheSimple() throws Exception {
238    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
239  }
240
241  @TestTemplate
242  public void testCacheMultiThreadedSingleKey() throws Exception {
243    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
244  }
245
246  @TestTemplate
247  public void testHeapSizeChanges() throws Exception {
248    cache.stopWriterThreads();
249    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
250  }
251
252  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
253    throws InterruptedException {
254    Waiter.waitFor(HBaseConfiguration.create(), 10000,
255      () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)));
256  }
257
258  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
259    while (!cache.ramCache.isEmpty()) {
260      Thread.sleep(100);
261    }
262    Thread.sleep(1000);
263  }
264
265  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
266  // threads will flush it to the bucket and put reference entry in backingMap.
267  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
268    Cacheable block, boolean waitWhenCache) throws InterruptedException {
269    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
270    waitUntilFlushedToBucket(cache, cacheKey);
271  }
272
273  @TestTemplate
274  public void testMemoryLeak() throws Exception {
275    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
276    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
277      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
278    long lockId = cache.backingMap.get(cacheKey).offset();
279    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
280    lock.writeLock().lock();
281    Thread evictThread = new Thread("evict-block") {
282      @Override
283      public void run() {
284        cache.evictBlock(cacheKey);
285      }
286    };
287    evictThread.start();
288    cache.offsetLock.waitForWaiters(lockId, 1);
289    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
290    assertEquals(0, cache.getBlockCount());
291    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
292      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
293    assertEquals(1, cache.getBlockCount());
294    lock.writeLock().unlock();
295    evictThread.join();
296    /**
297     * <pre>
298     * The asserts here before HBASE-21957 are:
299     * assertEquals(1L, cache.getBlockCount());
300     * assertTrue(cache.getCurrentSize() > 0L);
301     * assertTrue("We should have a block!", cache.iterator().hasNext());
302     *
303     * The asserts here after HBASE-21957 are:
304     * assertEquals(0, cache.getBlockCount());
305     * assertEquals(cache.getCurrentSize(), 0L);
306     *
307     * I think the asserts before HBASE-21957 is more reasonable,because
308     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
309     * it had seen, and newly added Block after the {@link BucketEntry}
310     * it had seen should not be evicted.
311     * </pre>
312     */
313    assertEquals(1L, cache.getBlockCount());
314    assertTrue(cache.getCurrentSize() > 0L);
315    assertTrue(cache.iterator().hasNext(), "We should have a block!");
316  }
317
318  @TestTemplate
319  public void testRetrieveFromFile() throws Exception {
320    Path testDir = createAndGetTestDir();
321    String ioEngineName = "file:" + testDir + "/bucket.cache";
322    testRetrievalUtils(testDir, ioEngineName);
323    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
324    String persistencePath = testDir + "/bucket.persistence";
325    BucketCache bucketCache = null;
326    try {
327      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
328        smallBucketSizes, writeThreads, writerQLen, persistencePath);
329      assertTrue(bucketCache.waitForCacheInitialization(10000));
330      assertFalse(new File(persistencePath).exists());
331      assertEquals(0, bucketCache.getAllocator().getUsedSize());
332      assertEquals(0, bucketCache.backingMap.size());
333    } finally {
334      bucketCache.shutdown();
335      HBASE_TESTING_UTILITY.cleanupTestDir();
336    }
337  }
338
339  @TestTemplate
340  public void testRetrieveFromMMap() throws Exception {
341    final Path testDir = createAndGetTestDir();
342    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
343    testRetrievalUtils(testDir, ioEngineName);
344  }
345
346  @TestTemplate
347  public void testRetrieveFromPMem() throws Exception {
348    final Path testDir = createAndGetTestDir();
349    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
350    testRetrievalUtils(testDir, ioEngineName);
351    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
352    String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
353    BucketCache bucketCache = null;
354    try {
355      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
356        smallBucketSizes, writeThreads, writerQLen, persistencePath);
357      assertTrue(bucketCache.waitForCacheInitialization(10000));
358      assertFalse(new File(persistencePath).exists());
359      assertEquals(0, bucketCache.getAllocator().getUsedSize());
360      assertEquals(0, bucketCache.backingMap.size());
361    } finally {
362      bucketCache.shutdown();
363      HBASE_TESTING_UTILITY.cleanupTestDir();
364    }
365  }
366
367  private void testRetrievalUtils(Path testDir, String ioEngineName)
368    throws IOException, InterruptedException {
369    final String persistencePath =
370      testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
371    BucketCache bucketCache = null;
372    try {
373      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
374        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
375      assertTrue(bucketCache.waitForCacheInitialization(10000));
376      long usedSize = bucketCache.getAllocator().getUsedSize();
377      assertEquals(0, usedSize);
378      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
379      for (HFileBlockPair block : blocks) {
380        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
381      }
382      for (HFileBlockPair block : blocks) {
383        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
384          false);
385      }
386      usedSize = bucketCache.getAllocator().getUsedSize();
387      assertNotEquals(0, usedSize);
388      bucketCache.shutdown();
389      assertTrue(new File(persistencePath).exists());
390      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
391        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
392      assertTrue(bucketCache.waitForCacheInitialization(10000));
393
394      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
395    } finally {
396      if (bucketCache != null) {
397        bucketCache.shutdown();
398      }
399    }
400    assertTrue(new File(persistencePath).exists());
401  }
402
403  @TestTemplate
404  public void testRetrieveUnsupportedIOE() throws Exception {
405    try {
406      final Path testDir = createAndGetTestDir();
407      final String ioEngineName = testDir + "/bucket.cache";
408      testRetrievalUtils(testDir, ioEngineName);
409      fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
410    } catch (IllegalArgumentException e) {
411      assertEquals("Don't understand io engine name for cache- prefix with file:, "
412        + "files:, mmap: or offheap", e.getMessage());
413    }
414  }
415
416  @TestTemplate
417  public void testRetrieveFromMultipleFiles() throws Exception {
418    final Path testDirInitial = createAndGetTestDir();
419    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
420    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
421    String ioEngineName =
422      new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache")
423        .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString();
424    testRetrievalUtils(testDirInitial, ioEngineName);
425    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
426    String persistencePath = testDirInitial + "/bucket.persistence";
427    BucketCache bucketCache = null;
428    try {
429      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
430        smallBucketSizes, writeThreads, writerQLen, persistencePath);
431      assertTrue(bucketCache.waitForCacheInitialization(10000));
432      assertFalse(new File(persistencePath).exists());
433      assertEquals(0, bucketCache.getAllocator().getUsedSize());
434      assertEquals(0, bucketCache.backingMap.size());
435    } finally {
436      bucketCache.shutdown();
437      HBASE_TESTING_UTILITY.cleanupTestDir();
438    }
439  }
440
441  @TestTemplate
442  public void testRetrieveFromFileWithoutPersistence() throws Exception {
443    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
444      constructedBlockSizes, writeThreads, writerQLen, null);
445    assertTrue(bucketCache.waitForCacheInitialization(10000));
446    try {
447      final Path testDir = createAndGetTestDir();
448      String ioEngineName = "file:" + testDir + "/bucket.cache";
449      long usedSize = bucketCache.getAllocator().getUsedSize();
450      assertEquals(0, usedSize);
451      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
452      for (HFileBlockPair block : blocks) {
453        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
454      }
455      for (HFileBlockPair block : blocks) {
456        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
457          false);
458      }
459      usedSize = bucketCache.getAllocator().getUsedSize();
460      assertNotEquals(0, usedSize);
461      bucketCache.shutdown();
462      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
463        constructedBlockSizes, writeThreads, writerQLen, null);
464      assertTrue(bucketCache.waitForCacheInitialization(10000));
465      assertEquals(0, bucketCache.getAllocator().getUsedSize());
466    } finally {
467      bucketCache.shutdown();
468      HBASE_TESTING_UTILITY.cleanupTestDir();
469    }
470  }
471
472  @TestTemplate
473  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
474    long availableSpace = 20 * 1024L * 1024 * 1024;
475    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
476    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
477    assertTrue(allocator.getBuckets().length > 0);
478  }
479
480  @TestTemplate
481  public void testGetPartitionSize() throws IOException {
482    // Test default values
483    validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
484
485    Configuration conf = HBaseConfiguration.create();
486    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
487    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
488    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
489    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
490
491    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
492      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
493    assertTrue(cache.waitForCacheInitialization(10000));
494
495    validateGetPartitionSize(cache, 0.1f, 0.5f);
496    validateGetPartitionSize(cache, 0.7f, 0.5f);
497    validateGetPartitionSize(cache, 0.2f, 0.5f);
498  }
499
500  @TestTemplate
501  public void testCacheSizeCapacity() throws IOException {
502    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
503    validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
504    Configuration conf = HBaseConfiguration.create();
505    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
506    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
507    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
508    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
509    try {
510      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
511        writerQLen, null, 100, conf);
512      fail("Should have thrown IllegalArgumentException because of large cache capacity!");
513    } catch (IllegalArgumentException e) {
514      assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
515    }
516  }
517
518  @TestTemplate
519  public void testValidBucketCacheConfigs() throws IOException {
520    Configuration conf = HBaseConfiguration.create();
521    conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
522    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
523    conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
524    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
525    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
526    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
527
528    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
529      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
530    assertTrue(cache.waitForCacheInitialization(10000));
531
532    assertEquals(0.9f, cache.getAcceptableFactor(), 0,
533      ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.");
534    assertEquals(0.5f, cache.getMinFactor(), 0, MIN_FACTOR_CONFIG_NAME + " failed to propagate.");
535    assertEquals(0.5f, cache.getExtraFreeFactor(), 0,
536      EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.");
537    assertEquals(0.1f, cache.getSingleFactor(), 0,
538      SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.");
539    assertEquals(0.7f, cache.getMultiFactor(), 0,
540      MULTI_FACTOR_CONFIG_NAME + " failed to propagate.");
541    assertEquals(0.2f, cache.getMemoryFactor(), 0,
542      MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.");
543  }
544
545  @TestTemplate
546  public void testInvalidAcceptFactorConfig() throws IOException {
547    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
548    boolean[] expectedOutcomes = { false, false, true, false };
549    Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues);
550    Configuration conf = HBaseConfiguration.create();
551    checkConfigValues(conf, configMappings, expectedOutcomes);
552  }
553
554  @TestTemplate
555  public void testInvalidMinFactorConfig() throws IOException {
556    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
557    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
558    boolean[] expectedOutcomes = { false, true, false, false };
559    Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues);
560    Configuration conf = HBaseConfiguration.create();
561    checkConfigValues(conf, configMappings, expectedOutcomes);
562  }
563
564  @TestTemplate
565  public void testInvalidExtraFreeFactorConfig() throws IOException {
566    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
567    // throws due to <0, in expected range, in expected range, config can be > 1.0
568    boolean[] expectedOutcomes = { false, true, true, true };
569    Map<String, float[]> configMappings =
570      ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
571    Configuration conf = HBaseConfiguration.create();
572    checkConfigValues(conf, configMappings, expectedOutcomes);
573  }
574
575  @TestTemplate
576  public void testInvalidCacheSplitFactorConfig() throws IOException {
577    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
578    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
579    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
580    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
581    // be negative, configs don't add to 1.0
582    boolean[] expectedOutcomes = { true, false, false, false };
583    Map<String,
584      float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues,
585        MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME,
586        memoryFactorConfigValues);
587    Configuration conf = HBaseConfiguration.create();
588    checkConfigValues(conf, configMappings, expectedOutcomes);
589  }
590
591  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
592    boolean[] expectSuccess) throws IOException {
593    Set<String> configNames = configMap.keySet();
594    for (int i = 0; i < expectSuccess.length; i++) {
595      try {
596        for (String configName : configNames) {
597          conf.setFloat(configName, configMap.get(configName)[i]);
598        }
599        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
600          constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
601        assertTrue(cache.waitForCacheInitialization(10000));
602        assertTrue(expectSuccess[i], "Created BucketCache and expected it to succeed: "
603          + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i]);
604      } catch (IllegalArgumentException e) {
605        assertFalse(expectSuccess[i], "Created BucketCache and expected it to succeed: "
606          + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i]);
607      }
608    }
609  }
610
611  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
612    float minFactor) {
613    long expectedOutput =
614      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
615    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
616  }
617
618  @TestTemplate
619  public void testOffsetProducesPositiveOutput() {
620    // This number is picked because it produces negative output if the values isn't ensured to be
621    // positive. See HBASE-18757 for more information.
622    long testValue = 549888460800L;
623    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> {
624      return ByteBuffAllocator.NONE;
625    }, ByteBuffAllocator.HEAP);
626    assertEquals(testValue, bucketEntry.offset());
627  }
628
629  @TestTemplate
630  public void testEvictionCount() throws InterruptedException {
631    int size = 100;
632    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
633    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
634    HFileContext meta = new HFileContextBuilder().build();
635    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
636    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
637      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
638    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
639      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
640
641    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
642    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
643    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
644    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
645    blockWithNextBlockMetadata.serialize(block1Buffer, true);
646    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
647
648    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
649    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
650      block1Buffer);
651
652    waitUntilFlushedToBucket(cache, key);
653
654    assertEquals(0, cache.getStats().getEvictionCount());
655
656    // evict call should return 1, but then eviction count be 0
657    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
658    assertEquals(0, cache.getStats().getEvictionCount());
659
660    // add back
661    key = new BlockCacheKey("testEvictionCount", 0);
662    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
663      block1Buffer);
664    waitUntilFlushedToBucket(cache, key);
665
666    // should not increment
667    assertTrue(cache.evictBlock(key));
668    assertEquals(0, cache.getStats().getEvictionCount());
669
670    // add back
671    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
672      block1Buffer);
673    waitUntilFlushedToBucket(cache, key);
674
675    // should finally increment eviction count
676    cache.freeSpace("testing");
677    assertEquals(1, cache.getStats().getEvictionCount());
678  }
679
680  @TestTemplate
681  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
682    int size = 100;
683    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
684    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
685    HFileContext meta = new HFileContextBuilder().build();
686    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
687    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
688      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
689    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
690      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
691
692    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
693    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
694    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
695    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
696    blockWithNextBlockMetadata.serialize(block1Buffer, true);
697    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
698
699    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
700    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
701      block1Buffer);
702
703    waitUntilFlushedToBucket(cache, key);
704    assertNotNull(cache.backingMap.get(key));
705    assertEquals(1, cache.backingMap.get(key).refCnt());
706    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
707    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
708
709    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
710    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
711      block1Buffer);
712    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
713    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
714    assertEquals(1, cache.backingMap.get(key).refCnt());
715
716    // Clear and add blockWithoutNextBlockMetadata
717    assertTrue(cache.evictBlock(key));
718    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
719    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
720
721    assertNull(cache.getBlock(key, false, false, false));
722    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
723      block2Buffer);
724
725    waitUntilFlushedToBucket(cache, key);
726    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
727    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
728
729    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
730    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
731      block1Buffer);
732
733    waitUntilFlushedToBucket(cache, key);
734    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
735    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
736  }
737
738  @TestTemplate
739  public void testRAMCache() {
740    int size = 100;
741    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
742    byte[] byteArr = new byte[length];
743    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
744    HFileContext meta = new HFileContextBuilder().build();
745
746    RAMCache cache = new RAMCache();
747    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
748    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
749    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
750      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
751    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
752      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
753    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false);
754    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false);
755
756    assertFalse(cache.containsKey(key1));
757    assertNull(cache.putIfAbsent(key1, re1));
758    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
759
760    assertNotNull(cache.putIfAbsent(key1, re2));
761    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
762    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
763
764    assertNull(cache.putIfAbsent(key2, re2));
765    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
766    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
767
768    cache.remove(key1);
769    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
770    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
771
772    cache.clear();
773    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
774    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
775  }
776
777  @TestTemplate
778  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
779    // initialize an block.
780    int size = 100, offset = 20;
781    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
782    ByteBuffer buf = ByteBuffer.allocate(length);
783    HFileContext meta = new HFileContextBuilder().build();
784    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
785      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
786
787    // initialize an mocked ioengine.
788    IOEngine ioEngine = Mockito.mock(IOEngine.class);
789    when(ioEngine.usesSharedMemory()).thenReturn(false);
790    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
791    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
792      Mockito.anyLong());
793    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
794      Mockito.anyLong());
795
796    // create an bucket allocator.
797    long availableSpace = 1024 * 1024 * 1024L;
798    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
799
800    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
801    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false);
802
803    assertEquals(0, allocator.getUsedSize());
804    try {
805      re.writeToCache(ioEngine, allocator, null, null,
806        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE);
807      fail();
808    } catch (Exception e) {
809    }
810    assertEquals(0, allocator.getUsedSize());
811  }
812
813  /**
814   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
815   * could not be freed even if corresponding {@link HFileBlock} is evicted from
816   * {@link BucketCache}.
817   */
818  @TestTemplate
819  public void testFreeBucketEntryRestoredFromFile() throws Exception {
820    BucketCache bucketCache = null;
821    try {
822      final Path dataTestDir = createAndGetTestDir();
823
824      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
825      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
826
827      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
828        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
829      assertTrue(bucketCache.waitForCacheInitialization(10000));
830      long usedByteSize = bucketCache.getAllocator().getUsedSize();
831      assertEquals(0, usedByteSize);
832
833      HFileBlockPair[] hfileBlockPairs =
834        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
835      // Add blocks
836      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
837        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
838      }
839
840      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
841        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
842          hfileBlockPair.getBlock(), false);
843      }
844      usedByteSize = bucketCache.getAllocator().getUsedSize();
845      assertNotEquals(0, usedByteSize);
846      // persist cache to file
847      bucketCache.shutdown();
848      assertTrue(new File(persistencePath).exists());
849
850      // restore cache from file
851      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
852        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
853      assertTrue(bucketCache.waitForCacheInitialization(10000));
854      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
855
856      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
857        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
858        bucketCache.evictBlock(blockCacheKey);
859      }
860      assertEquals(0, bucketCache.getAllocator().getUsedSize());
861      assertEquals(0, bucketCache.backingMap.size());
862    } finally {
863      bucketCache.shutdown();
864      HBASE_TESTING_UTILITY.cleanupTestDir();
865    }
866  }
867
868  @TestTemplate
869  public void testBlockAdditionWaitWhenCache() throws Exception {
870    BucketCache bucketCache = null;
871    try {
872      final Path dataTestDir = createAndGetTestDir();
873
874      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
875      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
876
877      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
878      config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000);
879
880      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
881        constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config);
882      assertTrue(bucketCache.waitForCacheInitialization(10000));
883      long usedByteSize = bucketCache.getAllocator().getUsedSize();
884      assertEquals(0, usedByteSize);
885
886      HFileBlockPair[] hfileBlockPairs =
887        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
888      String[] names = CacheTestUtils.getHFileNames(hfileBlockPairs);
889      // Add blocks
890      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
891        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
892          true);
893      }
894
895      // Max wait for 10 seconds.
896      long timeout = 10000;
897      // Wait for blocks size to match the number of blocks.
898      while (bucketCache.backingMap.size() != 10) {
899        if (timeout <= 0) break;
900        Threads.sleep(100);
901        timeout -= 100;
902      }
903      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
904        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
905      }
906      usedByteSize = bucketCache.getAllocator().getUsedSize();
907      assertNotEquals(0, usedByteSize);
908      // persist cache to file
909      bucketCache.shutdown();
910      assertTrue(new File(persistencePath).exists());
911
912      // restore cache from file
913      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
914        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
915      assertTrue(bucketCache.waitForCacheInitialization(10000));
916      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
917      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(hfileBlockPairs, names);
918      for (BlockCacheKey key : newKeys) {
919        bucketCache.evictBlock(key);
920      }
921      assertEquals(0, bucketCache.getAllocator().getUsedSize());
922      assertEquals(0, bucketCache.backingMap.size());
923    } finally {
924      if (bucketCache != null) {
925        bucketCache.shutdown();
926      }
927      HBASE_TESTING_UTILITY.cleanupTestDir();
928    }
929  }
930
931  @TestTemplate
932  public void testOnConfigurationChange() throws Exception {
933    BucketCache bucketCache = null;
934    try {
935      final Path dataTestDir = createAndGetTestDir();
936
937      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
938
939      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
940
941      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
942        constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config);
943
944      assertTrue(bucketCache.waitForCacheInitialization(10000));
945
946      config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
947      config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f);
948      config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f);
949      config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f);
950      config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f);
951      config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
952      config.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
953      config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500);
954      config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000);
955
956      bucketCache.onConfigurationChange(config);
957
958      assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01);
959      assertEquals(0.8f, bucketCache.getMinFactor(), 0.01);
960      assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01);
961      assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01);
962      assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01);
963      assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01);
964      assertEquals(100L, bucketCache.getQueueAdditionWaitTime());
965      assertEquals(500L, bucketCache.getBucketcachePersistInterval());
966      assertEquals(1000L, bucketCache.getPersistenceChunkSize());
967
968    } finally {
969      if (bucketCache != null) {
970        bucketCache.shutdown();
971      }
972      HBASE_TESTING_UTILITY.cleanupTestDir();
973    }
974  }
975
976  @TestTemplate
977  public void testNotifyFileCachingCompletedSuccess() throws Exception {
978    BucketCache bucketCache = null;
979    try {
980      Path filePath =
981        new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess");
982      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false);
983      if (bucketCache.getStats().getFailedInserts() > 0) {
984        LOG.info("There were {} fail inserts, "
985          + "will assert if total blocks in backingMap equals (10 - failInserts) "
986          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
987        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
988        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
989      } else {
990        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
991      }
992    } finally {
993      if (bucketCache != null) {
994        bucketCache.shutdown();
995      }
996      HBASE_TESTING_UTILITY.cleanupTestDir();
997    }
998  }
999
1000  @TestTemplate
1001  public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception {
1002    BucketCache bucketCache = null;
1003    try {
1004      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
1005        "testNotifyFileCachingCompletedForEncodedDataSuccess");
1006      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true);
1007      if (bucketCache.getStats().getFailedInserts() > 0) {
1008        LOG.info("There were {} fail inserts, "
1009          + "will assert if total blocks in backingMap equals (10 - failInserts) "
1010          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
1011        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
1012        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1013      } else {
1014        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1015      }
1016    } finally {
1017      if (bucketCache != null) {
1018        bucketCache.shutdown();
1019      }
1020      HBASE_TESTING_UTILITY.cleanupTestDir();
1021    }
1022  }
1023
1024  @TestTemplate
1025  public void testNotifyFileCachingCompletedNotAllCached() throws Exception {
1026    BucketCache bucketCache = null;
1027    try {
1028      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
1029        "testNotifyFileCachingCompletedNotAllCached");
1030      // Deliberately passing more blocks than we have created to test that
1031      // notifyFileCachingCompleted will not consider the file fully cached
1032      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false);
1033      assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1034    } finally {
1035      if (bucketCache != null) {
1036        bucketCache.shutdown();
1037      }
1038      HBASE_TESTING_UTILITY.cleanupTestDir();
1039    }
1040  }
1041
1042  private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath,
1043    int totalBlocksToCheck, boolean encoded) throws Exception {
1044    final Path dataTestDir = createAndGetTestDir();
1045    String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
1046    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
1047      constructedBlockSizes, 1, 1, null);
1048    assertTrue(bucketCache.waitForCacheInitialization(10000));
1049    long usedByteSize = bucketCache.getAllocator().getUsedSize();
1050    assertEquals(0, usedByteSize);
1051    HFileBlockPair[] hfileBlockPairs =
1052      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded);
1053    // Add blocks
1054    for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
1055      bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true);
1056    }
1057    bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck,
1058      totalBlocksToCheck * constructedBlockSize);
1059    return bucketCache;
1060  }
1061
1062  @TestTemplate
1063  public void testEvictOrphansOutOfGracePeriod() throws Exception {
1064    BucketCache bucketCache = testEvictOrphans(0);
1065    assertEquals(10, bucketCache.getBackingMap().size());
1066    assertEquals(0, bucketCache.blocksByHFile.stream()
1067      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count());
1068  }
1069
1070  @TestTemplate
1071  public void testEvictOrphansWithinGracePeriod() throws Exception {
1072    BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L);
1073    assertEquals(18, bucketCache.getBackingMap().size());
1074    assertTrue(bucketCache.blocksByHFile.stream()
1075      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0);
1076  }
1077
1078  private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception {
1079    Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid");
1080    Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan");
1081    Map<String, HRegion> onlineRegions = new HashMap<>();
1082    List<HStore> stores = new ArrayList<>();
1083    Collection<HStoreFile> storeFiles = new ArrayList<>();
1084    HRegion mockedRegion = mock(HRegion.class);
1085    HStore mockedStore = mock(HStore.class);
1086    HStoreFile mockedStoreFile = mock(HStoreFile.class);
1087    when(mockedStoreFile.getPath()).thenReturn(validFile);
1088    storeFiles.add(mockedStoreFile);
1089    when(mockedStore.getStorefiles()).thenReturn(storeFiles);
1090    stores.add(mockedStore);
1091    when(mockedRegion.getStores()).thenReturn(stores);
1092    onlineRegions.put("mocked_region", mockedRegion);
1093    HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
1094    HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
1095    HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
1096    HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD,
1097      orphanEvictionGracePeriod);
1098    BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21,
1099      constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000,
1100      HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions);
1101    HFileBlockPair[] validBlockPairs =
1102      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile, false);
1103    HFileBlockPair[] orphanBlockPairs =
1104      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile, false);
1105    for (HFileBlockPair pair : validBlockPairs) {
1106      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1107    }
1108    waitUntilAllFlushedToBucket(bucketCache);
1109    assertEquals(10, bucketCache.getBackingMap().size());
1110    bucketCache.freeSpace("test");
1111    assertEquals(10, bucketCache.getBackingMap().size());
1112    for (HFileBlockPair pair : orphanBlockPairs) {
1113      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1114    }
1115    waitUntilAllFlushedToBucket(bucketCache);
1116    assertEquals(20, bucketCache.getBackingMap().size());
1117    bucketCache.freeSpace("test");
1118    return bucketCache;
1119  }
1120
1121  @TestTemplate
1122  public void testBlockPriority() throws Exception {
1123    HFileBlockPair block = CacheTestUtils.generateHFileBlocks(BLOCK_SIZE, 1)[0];
1124    cacheAndWaitUntilFlushedToBucket(cache, block.getBlockName(), block.getBlock(), true);
1125    assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.SINGLE);
1126    cache.getBlock(block.getBlockName(), true, false, true);
1127    assertEquals(cache.backingMap.get(block.getBlockName()).getPriority(), BlockPriority.MULTI);
1128  }
1129
1130  @TestTemplate
1131  public void testIOTimePerHitReturnsZeroWhenNoHits()
1132    throws NoSuchFieldException, IllegalAccessException {
1133    CacheStats cacheStats = cache.getStats();
1134    assertTrue(cacheStats instanceof BucketCacheStats);
1135    BucketCacheStats bucketCacheStats = (BucketCacheStats) cacheStats;
1136
1137    Field field = BucketCacheStats.class.getDeclaredField("ioHitCount");
1138    field.setAccessible(true);
1139    LongAdder ioHitCount = (LongAdder) field.get(bucketCacheStats);
1140
1141    assertEquals(0, ioHitCount.sum());
1142    double ioTimePerHit = bucketCacheStats.getIOTimePerHit();
1143    assertEquals(0, ioTimePerHit, 0.0);
1144  }
1145}