001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.File;
028import java.io.IOException;
029import java.nio.ByteBuffer;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Map;
034import java.util.Random;
035import java.util.Set;
036import java.util.concurrent.locks.ReentrantReadWriteLock;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.io.ByteBuffAllocator;
044import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
045import org.apache.hadoop.hbase.io.hfile.BlockType;
046import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
048import org.apache.hadoop.hbase.io.hfile.Cacheable;
049import org.apache.hadoop.hbase.io.hfile.HFileBlock;
050import org.apache.hadoop.hbase.io.hfile.HFileContext;
051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
052import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
054import org.apache.hadoop.hbase.nio.ByteBuff;
055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
056import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
057import org.apache.hadoop.hbase.testclassification.IOTests;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.junit.After;
060import org.junit.Assert;
061import org.junit.Before;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.runner.RunWith;
066import org.junit.runners.Parameterized;
067import org.mockito.Mockito;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
070
071/**
072 * Basic test of BucketCache.Puts and gets.
073 * <p>
074 * Tests will ensure that blocks' data correctness under several threads concurrency
075 */
076@RunWith(Parameterized.class)
077@Category({ IOTests.class, MediumTests.class })
078public class TestBucketCache {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082      HBaseClassTestRule.forClass(TestBucketCache.class);
083
084  private static final Random RAND = new Random();
085
086  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
087  public static Iterable<Object[]> data() {
088    return Arrays.asList(new Object[][] {
089        { 8192, null }, // TODO: why is 8k the default blocksize for these tests?
090        {
091            16 * 1024,
092            new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
093                28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
094                128 * 1024 + 1024 } } });
095  }
096
097  @Parameterized.Parameter(0)
098  public int constructedBlockSize;
099
100  @Parameterized.Parameter(1)
101  public int[] constructedBlockSizes;
102
103  BucketCache cache;
104  final int CACHE_SIZE = 1000000;
105  final int NUM_BLOCKS = 100;
106  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
107  final int NUM_THREADS = 100;
108  final int NUM_QUERIES = 10000;
109
110  final long capacitySize = 32 * 1024 * 1024;
111  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
112  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
113  private String ioEngineName = "offheap";
114
115  private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
116
117  private static class MockedBucketCache extends BucketCache {
118
119    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
120        int writerThreads, int writerQLen, String persistencePath) throws IOException {
121      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
122          persistencePath);
123      super.wait_when_cache = true;
124    }
125
126    @Override
127    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
128      super.cacheBlock(cacheKey, buf, inMemory);
129    }
130
131    @Override
132    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
133      super.cacheBlock(cacheKey, buf);
134    }
135  }
136
137  @Before
138  public void setup() throws IOException {
139    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
140            constructedBlockSizes, writeThreads, writerQLen, null);
141  }
142
143  @After
144  public void tearDown() {
145    cache.shutdown();
146  }
147
148  /**
149   * Test Utility to create test dir and return name
150   *
151   * @return return name of created dir
152   * @throws IOException throws IOException
153   */
154  private Path createAndGetTestDir() throws IOException {
155    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
156    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
157    return testDir;
158  }
159
160
161  /**
162   * Return a random element from {@code a}.
163   */
164  private static <T> T randFrom(List<T> a) {
165    return a.get(RAND.nextInt(a.size()));
166  }
167
168  @Test
169  public void testBucketAllocator() throws BucketAllocatorException {
170    BucketAllocator mAllocator = cache.getAllocator();
171    /*
172     * Test the allocator first
173     */
174    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
175
176    boolean full = false;
177    ArrayList<Long> allocations = new ArrayList<>();
178    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
179    // the cache is completely filled.
180    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
181    while (!full) {
182      Integer blockSize = null;
183      try {
184        blockSize = randFrom(tmp);
185        allocations.add(mAllocator.allocateBlock(blockSize));
186      } catch (CacheFullException cfe) {
187        tmp.remove(blockSize);
188        if (tmp.isEmpty()) full = true;
189      }
190    }
191
192    for (Integer blockSize : BLOCKSIZES) {
193      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
194      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
195      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
196    }
197
198    for (long offset : allocations) {
199      assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset));
200    }
201    assertEquals(0, mAllocator.getUsedSize());
202  }
203
204  @Test
205  public void testCacheSimple() throws Exception {
206    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
207  }
208
209  @Test
210  public void testCacheMultiThreadedSingleKey() throws Exception {
211    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
212  }
213
214  @Test
215  public void testHeapSizeChanges() throws Exception {
216    cache.stopWriterThreads();
217    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
218  }
219
220  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
221      throws InterruptedException {
222    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
223      Thread.sleep(100);
224    }
225    Thread.sleep(1000);
226  }
227
228  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
229    while (!cache.ramCache.isEmpty()) {
230      Thread.sleep(100);
231    }
232    Thread.sleep(1000);
233  }
234
235  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
236  // threads will flush it to the bucket and put reference entry in backingMap.
237  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
238      Cacheable block) throws InterruptedException {
239    cache.cacheBlock(cacheKey, block);
240    waitUntilFlushedToBucket(cache, cacheKey);
241  }
242
243  @Test
244  public void testMemoryLeak() throws Exception {
245    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
246    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
247      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
248    long lockId = cache.backingMap.get(cacheKey).offset();
249    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
250    lock.writeLock().lock();
251    Thread evictThread = new Thread("evict-block") {
252      @Override
253      public void run() {
254        cache.evictBlock(cacheKey);
255      }
256    };
257    evictThread.start();
258    cache.offsetLock.waitForWaiters(lockId, 1);
259    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
260    assertEquals(0, cache.getBlockCount());
261    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
262      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
263    assertEquals(1, cache.getBlockCount());
264    lock.writeLock().unlock();
265    evictThread.join();
266    assertEquals(0, cache.getBlockCount());
267    assertEquals(cache.getCurrentSize(), 0L);
268  }
269
270  @Test
271  public void testRetrieveFromFile() throws Exception {
272    Path testDir = createAndGetTestDir();
273    String ioEngineName = "file:" + testDir + "/bucket.cache";
274    testRetrievalUtils(testDir, ioEngineName);
275    int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
276    String persistencePath = testDir + "/bucket.persistence";
277    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
278            smallBucketSizes, writeThreads, writerQLen, persistencePath);
279    assertFalse(new File(persistencePath).exists());
280    assertEquals(0, bucketCache.getAllocator().getUsedSize());
281    assertEquals(0, bucketCache.backingMap.size());
282    HBASE_TESTING_UTILITY.cleanupTestDir();
283  }
284
285  @Test
286  public void testRetrieveFromMMap() throws Exception {
287    final Path testDir = createAndGetTestDir();
288    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
289    testRetrievalUtils(testDir, ioEngineName);
290  }
291
292  @Test
293  public void testRetrieveFromPMem() throws Exception {
294    final Path testDir = createAndGetTestDir();
295    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
296    testRetrievalUtils(testDir, ioEngineName);
297    int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
298    String persistencePath = testDir + "/bucket.persistence";
299    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
300            smallBucketSizes, writeThreads, writerQLen, persistencePath);
301    assertFalse(new File(persistencePath).exists());
302    assertEquals(0, bucketCache.getAllocator().getUsedSize());
303    assertEquals(0, bucketCache.backingMap.size());
304    HBASE_TESTING_UTILITY.cleanupTestDir();
305  }
306
307  private void testRetrievalUtils(Path testDir, String ioEngineName)
308          throws IOException, InterruptedException {
309    final String persistencePath = testDir + "/bucket.persistence";
310    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
311            constructedBlockSizes, writeThreads, writerQLen, persistencePath);
312    try {
313      long usedSize = bucketCache.getAllocator().getUsedSize();
314      assertEquals(0, usedSize);
315      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
316      for (HFileBlockPair block : blocks) {
317        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
318      }
319      for (HFileBlockPair block : blocks) {
320        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
321      }
322      usedSize = bucketCache.getAllocator().getUsedSize();
323      assertNotEquals(0, usedSize);
324      bucketCache.shutdown();
325      assertTrue(new File(persistencePath).exists());
326      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
327              constructedBlockSizes, writeThreads, writerQLen, persistencePath);
328      assertFalse(new File(persistencePath).exists());
329      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
330    } finally {
331      bucketCache.shutdown();
332    }
333    assertTrue(new File(persistencePath).exists());
334  }
335
336  @Test
337  public void testRetrieveUnsupportedIOE() throws Exception {
338    try {
339      final Path testDir = createAndGetTestDir();
340      final String ioEngineName = testDir + "/bucket.cache";
341      testRetrievalUtils(testDir, ioEngineName);
342      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
343    } catch (IllegalArgumentException e) {
344      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " +
345              "files:, mmap: or offheap", e.getMessage());
346    }
347  }
348
349  @Test
350  public void testRetrieveFromMultipleFiles() throws Exception {
351    final Path testDirInitial = createAndGetTestDir();
352    final Path newTestDir = new HBaseTestingUtility().getDataTestDir();
353    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
354    String ioEngineName = new StringBuilder("files:").append(testDirInitial)
355            .append("/bucket1.cache").append(FileIOEngine.FILE_DELIMITER).append(newTestDir)
356            .append("/bucket2.cache").toString();
357    testRetrievalUtils(testDirInitial, ioEngineName);
358    int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
359    String persistencePath = testDirInitial + "/bucket.persistence";
360    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
361            smallBucketSizes, writeThreads, writerQLen, persistencePath);
362    assertFalse(new File(persistencePath).exists());
363    assertEquals(0, bucketCache.getAllocator().getUsedSize());
364    assertEquals(0, bucketCache.backingMap.size());
365    HBASE_TESTING_UTILITY.cleanupTestDir();
366  }
367
368  @Test
369  public void testRetrieveFromFileWithoutPersistence() throws Exception {
370    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
371            constructedBlockSizes, writeThreads, writerQLen, null);
372    try {
373      final Path testDir = createAndGetTestDir();
374      String ioEngineName = "file:" + testDir + "/bucket.cache";
375      long usedSize = bucketCache.getAllocator().getUsedSize();
376      assertEquals(0, usedSize);
377      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
378      for (HFileBlockPair block : blocks) {
379        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
380      }
381      for (HFileBlockPair block : blocks) {
382        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
383      }
384      usedSize = bucketCache.getAllocator().getUsedSize();
385      assertNotEquals(0, usedSize);
386      bucketCache.shutdown();
387      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
388              constructedBlockSizes, writeThreads, writerQLen, null);
389      assertEquals(0, bucketCache.getAllocator().getUsedSize());
390    } finally {
391      bucketCache.shutdown();
392      HBASE_TESTING_UTILITY.cleanupTestDir();
393    }
394  }
395
396  @Test
397  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
398    long availableSpace = 20 * 1024L * 1024 * 1024;
399    int[] bucketSizes = new int[]{1024, 1024 * 1024, 1024 * 1024 * 1024};
400    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
401    assertTrue(allocator.getBuckets().length > 0);
402  }
403
404  @Test
405  public void testGetPartitionSize() throws IOException {
406    //Test default values
407    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, BucketCache.DEFAULT_MIN_FACTOR);
408
409    Configuration conf = HBaseConfiguration.create();
410    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
411    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
412    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
413    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
414
415    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
416        constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
417
418    validateGetPartitionSize(cache, 0.1f, 0.5f);
419    validateGetPartitionSize(cache, 0.7f, 0.5f);
420    validateGetPartitionSize(cache, 0.2f, 0.5f);
421  }
422
423  @Test
424  public void testCacheSizeCapacity() throws IOException {
425    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
426    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
427            BucketCache.DEFAULT_MIN_FACTOR);
428    Configuration conf = HBaseConfiguration.create();
429    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
430    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
431    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
432    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
433    try {
434      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
435              writerQLen, null, 100, conf);
436      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
437    } catch (IllegalArgumentException e) {
438      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
439    }
440  }
441
442  @Test
443  public void testValidBucketCacheConfigs() throws IOException {
444    Configuration conf = HBaseConfiguration.create();
445    conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
446    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
447    conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
448    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
449    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
450    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
451
452    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
453        constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
454
455    assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
456        cache.getAcceptableFactor(), 0);
457    assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
458        cache.getMinFactor(), 0);
459    assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
460        cache.getExtraFreeFactor(), 0);
461    assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f,
462        cache.getSingleFactor(), 0);
463    assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f,
464        cache.getMultiFactor(), 0);
465    assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f,
466        cache.getMemoryFactor(), 0);
467  }
468
469  @Test
470  public void testInvalidAcceptFactorConfig() throws IOException {
471    float[] configValues = {-1f, 0.2f, 0.86f, 1.05f};
472    boolean[] expectedOutcomes = {false, false, true, false};
473    Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues);
474    Configuration conf = HBaseConfiguration.create();
475    checkConfigValues(conf, configMappings, expectedOutcomes);
476  }
477
478  @Test
479  public void testInvalidMinFactorConfig() throws IOException {
480    float[] configValues = {-1f, 0f, 0.96f, 1.05f};
481    //throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
482    boolean[] expectedOutcomes = {false, true, false, false};
483    Map<String, float[]> configMappings = ImmutableMap
484      .of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues);
485    Configuration conf = HBaseConfiguration.create();
486    checkConfigValues(conf, configMappings, expectedOutcomes);
487  }
488
489  @Test
490  public void testInvalidExtraFreeFactorConfig() throws IOException {
491    float[] configValues = {-1f, 0f, 0.2f, 1.05f};
492    //throws due to <0, in expected range, in expected range, config can be > 1.0
493    boolean[] expectedOutcomes = {false, true, true, true};
494    Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
495    Configuration conf = HBaseConfiguration.create();
496    checkConfigValues(conf, configMappings, expectedOutcomes);
497  }
498
499  @Test
500  public void testInvalidCacheSplitFactorConfig() throws IOException {
501    float[] singleFactorConfigValues = {0.2f, 0f, -0.2f, 1f};
502    float[] multiFactorConfigValues = {0.4f, 0f, 1f, .05f};
503    float[] memoryFactorConfigValues = {0.4f, 0f, 0.2f, .5f};
504    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't be negative, configs don't add to 1.0
505    boolean[] expectedOutcomes = {true, false, false, false};
506    Map<String, float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME,
507        singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues,
508        BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues);
509    Configuration conf = HBaseConfiguration.create();
510    checkConfigValues(conf, configMappings, expectedOutcomes);
511  }
512
513  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, boolean[] expectSuccess) throws IOException {
514    Set<String> configNames = configMap.keySet();
515    for (int i = 0; i < expectSuccess.length; i++) {
516      try {
517        for (String configName : configNames) {
518          conf.setFloat(configName, configMap.get(configName)[i]);
519        }
520        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
521            constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
522        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
523      } catch (IllegalArgumentException e) {
524        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
525      }
526    }
527  }
528
529  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, float minFactor) {
530    long expectedOutput = (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
531    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
532  }
533
534  @Test
535  public void testOffsetProducesPositiveOutput() {
536    // This number is picked because it produces negative output if the values isn't ensured to be
537    // positive. See HBASE-18757 for more information.
538    long testValue = 549888460800L;
539    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true);
540    assertEquals(testValue, bucketEntry.offset());
541  }
542
543  @Test
544  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
545    int size = 100;
546    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
547    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
548    HFileContext meta = new HFileContextBuilder().build();
549    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
550    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
551        ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
552    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
553        ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
554
555    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
556    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
557    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
558    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
559    blockWithNextBlockMetadata.serialize(block1Buffer, true);
560    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
561
562    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
563    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
564      block1Buffer);
565
566    waitUntilFlushedToBucket(cache, key);
567    assertNotNull(cache.backingMap.get(key));
568    assertEquals(1, cache.backingMap.get(key).refCnt());
569    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
570    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
571
572    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
573    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
574      block1Buffer);
575    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
576    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
577    assertEquals(1, cache.backingMap.get(key).refCnt());
578
579    // Clear and add blockWithoutNextBlockMetadata
580    assertTrue(cache.evictBlock(key));
581    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
582    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
583
584    assertNull(cache.getBlock(key, false, false, false));
585    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
586      block2Buffer);
587
588    waitUntilFlushedToBucket(cache, key);
589    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
590    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
591
592    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
593    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
594      block1Buffer);
595
596    waitUntilFlushedToBucket(cache, key);
597    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
598    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
599  }
600
601  @Test
602  public void testRAMCache() {
603    int size = 100;
604    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
605    byte[] byteArr = new byte[length];
606    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
607    HFileContext meta = new HFileContextBuilder().build();
608
609    RAMCache cache = new RAMCache();
610    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
611    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
612    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
613        HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
614    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
615        HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
616    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
617    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
618
619    assertFalse(cache.containsKey(key1));
620    assertNull(cache.putIfAbsent(key1, re1));
621    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
622
623    assertNotNull(cache.putIfAbsent(key1, re2));
624    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
625    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
626
627    assertNull(cache.putIfAbsent(key2, re2));
628    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
629    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
630
631    cache.remove(key1);
632    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
633    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
634
635    cache.clear();
636    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
637    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
638  }
639
640  @Test
641  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
642    // initialize an block.
643    int size = 100, offset = 20;
644    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
645    ByteBuffer buf = ByteBuffer.allocate(length);
646    HFileContext meta = new HFileContextBuilder().build();
647    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
648        HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
649
650    // initialize an mocked ioengine.
651    IOEngine ioEngine = Mockito.mock(IOEngine.class);
652    Mockito.when(ioEngine.usesSharedMemory()).thenReturn(false);
653    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
654    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
655      Mockito.anyLong());
656    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
657      Mockito.anyLong());
658
659    // create an bucket allocator.
660    long availableSpace = 1024 * 1024 * 1024L;
661    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
662
663    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
664    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, ByteBuffAllocator.NONE);
665
666    Assert.assertEquals(0, allocator.getUsedSize());
667    try {
668      re.writeToCache(ioEngine, allocator, null);
669      Assert.fail();
670    } catch (Exception e) {
671    }
672    Assert.assertEquals(0, allocator.getUsedSize());
673  }
674}