001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNull;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
031import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
032import org.apache.hadoop.hbase.io.hfile.Cacheable;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037
038/**
039 * Basic test for check file's integrity before start BucketCache in fileIOEngine
040 */
041@Category(SmallTests.class)
042public class TestRecoveryPersistentBucketCache {
043  @ClassRule
044  public static final HBaseClassTestRule CLASS_RULE =
045    HBaseClassTestRule.forClass(TestRecoveryPersistentBucketCache.class);
046
047  final long capacitySize = 32 * 1024 * 1024;
048  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
049  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
050
051  @Test
052  public void testBucketCacheRecovery() throws Exception {
053    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
054    Path testDir = TEST_UTIL.getDataTestDir();
055    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
056    Configuration conf = HBaseConfiguration.create();
057    // Disables the persister thread by setting its interval to MAX_VALUE
058    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
059    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
060    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
061      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
062      DEFAULT_ERROR_TOLERATION_DURATION, conf);
063
064    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
065
066    CacheTestUtils.HFileBlockPair[] smallerBlocks = CacheTestUtils.generateHFileBlocks(4096, 1);
067    // Add four blocks
068    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
069    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
070    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
071    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
072    // saves the current state of the cache
073    bucketCache.persistToFile();
074    // evicts the 4th block
075    bucketCache.evictBlock(blocks[3].getBlockName());
076    // now adds a 5th block to bucket cache. This block is half the size of the previous
077    // blocks, and it will be added in the same offset of the previous evicted block.
078    // This overwrites part of the 4th block. Because we persisted only up to the
079    // 4th block addition, recovery would try to read the whole 4th block, but the cached time
080    // validation will fail, and we'll recover only the first three blocks
081    cacheAndWaitUntilFlushedToBucket(bucketCache, smallerBlocks[0].getBlockName(),
082      smallerBlocks[0].getBlock());
083
084    // Creates new bucket cache instance without persisting to file after evicting 4th block
085    // and caching 5th block. Here the cache file has the first three blocks, followed by the
086    // 5th block and the second half of 4th block (we evicted 4th block, freeing up its
087    // offset in the cache, then added 5th block which is half the size of other blocks, so it's
088    // going to override the first half of the 4th block in the cache). That's fine because
089    // the in-memory backing map has the right blocks and related offsets. However, the
090    // persistent map file only has information about the first four blocks. We validate the
091    // cache time recorded in the back map against the block data in the cache. This is recorded
092    // in the cache as the first 8 bytes of a block, so the 4th block had its first 8 blocks
093    // now overridden by the 5th block, causing this check to fail and removal of
094    // the 4th block from the backing map.
095    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
096      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
097      DEFAULT_ERROR_TOLERATION_DURATION, conf);
098    Thread.sleep(100);
099    assertEquals(3, newBucketCache.backingMap.size());
100    assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
101    assertNull(newBucketCache.getBlock(smallerBlocks[0].getBlockName(), false, false, false));
102    assertEquals(blocks[0].getBlock(),
103      newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false));
104    assertEquals(blocks[1].getBlock(),
105      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
106    assertEquals(blocks[2].getBlock(),
107      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
108    TEST_UTIL.cleanupTestDir();
109  }
110
111  @Test
112  public void testBucketCacheEvictByHFileAfterRecovery() throws Exception {
113    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
114    Path testDir = TEST_UTIL.getDataTestDir();
115    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
116    Configuration conf = HBaseConfiguration.create();
117    // Disables the persister thread by setting its interval to MAX_VALUE
118    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
119    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
120    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
121      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
122      DEFAULT_ERROR_TOLERATION_DURATION, conf);
123
124    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
125
126    // Add four blocks
127    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
128    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
129    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
130    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
131    // saves the current state of the cache
132    bucketCache.persistToFile();
133
134    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
135      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
136      DEFAULT_ERROR_TOLERATION_DURATION, conf);
137    Thread.sleep(100);
138    assertEquals(4, newBucketCache.backingMap.size());
139    newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName());
140    assertEquals(3, newBucketCache.backingMap.size());
141    TEST_UTIL.cleanupTestDir();
142  }
143
144  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
145    throws InterruptedException {
146    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
147      Thread.sleep(100);
148    }
149  }
150
151  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
152  // threads will flush it to the bucket and put reference entry in backingMap.
153  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
154    Cacheable block) throws InterruptedException {
155    cache.cacheBlock(cacheKey, block);
156    waitUntilFlushedToBucket(cache, cacheKey);
157  }
158
159}