001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.Waiter;
035import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
036import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
037import org.apache.hadoop.hbase.io.hfile.Cacheable;
038import org.apache.hadoop.hbase.testclassification.SmallTests;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042
043/**
044 * Basic test for check file's integrity before start BucketCache in fileIOEngine
045 */
046@Category(SmallTests.class)
047public class TestRecoveryPersistentBucketCache {
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050    HBaseClassTestRule.forClass(TestRecoveryPersistentBucketCache.class);
051
052  final long capacitySize = 32 * 1024 * 1024;
053  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
054  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
055
056  @Test
057  public void testBucketCacheRecovery() throws Exception {
058    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
059    Path testDir = TEST_UTIL.getDataTestDir();
060    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
061    Configuration conf = HBaseConfiguration.create();
062    // Disables the persister thread by setting its interval to MAX_VALUE
063    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
064    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
065    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
066      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
067      DEFAULT_ERROR_TOLERATION_DURATION, conf);
068    assertTrue(bucketCache.waitForCacheInitialization(1000));
069    assertTrue(
070      bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled());
071
072    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
073    String[] names = CacheTestUtils.getHFileNames(blocks);
074
075    CacheTestUtils.HFileBlockPair[] smallerBlocks = CacheTestUtils.generateHFileBlocks(4096, 1);
076    String[] smallerNames = CacheTestUtils.getHFileNames(smallerBlocks);
077    // Add four blocks
078    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
079    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
080    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
081    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
082    // saves the current state of the cache
083    bucketCache.persistToFile();
084    // evicts the 4th block
085    bucketCache.evictBlock(blocks[3].getBlockName());
086    // now adds a 5th block to bucket cache. This block is half the size of the previous
087    // blocks, and it will be added in the same offset of the previous evicted block.
088    // This overwrites part of the 4th block. Because we persisted only up to the
089    // 4th block addition, recovery would try to read the whole 4th block, but the cached time
090    // validation will fail, and we'll recover only the first three blocks
091    cacheAndWaitUntilFlushedToBucket(bucketCache, smallerBlocks[0].getBlockName(),
092      smallerBlocks[0].getBlock());
093
094    // Creates new bucket cache instance without persisting to file after evicting 4th block
095    // and caching 5th block. Here the cache file has the first three blocks, followed by the
096    // 5th block and the second half of 4th block (we evicted 4th block, freeing up its
097    // offset in the cache, then added 5th block which is half the size of other blocks, so it's
098    // going to override the first half of the 4th block in the cache). That's fine because
099    // the in-memory backing map has the right blocks and related offsets. However, the
100    // persistent map file only has information about the first four blocks. We validate the
101    // cache time recorded in the back map against the block data in the cache. This is recorded
102    // in the cache as the first 8 bytes of a block, so the 4th block had its first 8 blocks
103    // now overridden by the 5th block, causing this check to fail and removal of
104    // the 4th block from the backing map.
105    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
106      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
107      DEFAULT_ERROR_TOLERATION_DURATION, conf);
108    assertTrue(newBucketCache.waitForCacheInitialization(1000));
109    BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
110    BlockCacheKey[] newKeysSmaller = CacheTestUtils.regenerateKeys(smallerBlocks, smallerNames);
111    // The new bucket cache would have only the first three blocks. Although we have persisted the
112    // the cache state when it had the first four blocks, the 4th block was evicted and then we
113    // added a 5th block, which overrides part of the 4th block in the cache. This would cause a
114    // checksum failure for this block offset, when we try to read from the cache, and we would
115    // consider that block as invalid and its offset available in the cache.
116    assertNull(newBucketCache.getBlock(newKeys[3], false, false, false));
117    assertNull(newBucketCache.getBlock(newKeysSmaller[0], false, false, false));
118    assertEquals(blocks[0].getBlock(), newBucketCache.getBlock(newKeys[0], false, false, false));
119    assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false));
120    assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false));
121    TEST_UTIL.cleanupTestDir();
122  }
123
124  @Test
125  public void testBucketCacheEvictByHFileAfterRecovery() throws Exception {
126    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
127    Path testDir = TEST_UTIL.getDataTestDir();
128    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
129    Configuration conf = HBaseConfiguration.create();
130    // Disables the persister thread by setting its interval to MAX_VALUE
131    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
132    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
133    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
134      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
135      DEFAULT_ERROR_TOLERATION_DURATION, conf);
136    assertTrue(bucketCache.waitForCacheInitialization(10000));
137
138    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
139
140    // Add four blocks
141    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
142    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
143    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
144    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
145
146    String firstFileName = blocks[0].getBlockName().getHfileName();
147
148    // saves the current state of the cache
149    bucketCache.persistToFile();
150
151    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
152      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
153      DEFAULT_ERROR_TOLERATION_DURATION, conf);
154    assertTrue(newBucketCache.waitForCacheInitialization(10000));
155    assertEquals(4, newBucketCache.backingMap.size());
156
157    newBucketCache.evictBlocksByHfileName(firstFileName);
158    assertEquals(3, newBucketCache.backingMap.size());
159    TEST_UTIL.cleanupTestDir();
160  }
161
162  @Test
163  public void testValidateCacheInitialization() throws Exception {
164    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
165    Path testDir = TEST_UTIL.getDataTestDir();
166    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
167    Configuration conf = HBaseConfiguration.create();
168    // Disables the persister thread by setting its interval to MAX_VALUE
169    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
170    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
171    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
172      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
173      DEFAULT_ERROR_TOLERATION_DURATION, conf);
174    assertTrue(bucketCache.waitForCacheInitialization(10000));
175
176    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
177
178    // Add four blocks
179    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
180    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
181    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
182    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
183    // saves the current state of the cache
184    bucketCache.persistToFile();
185
186    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
187      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
188      DEFAULT_ERROR_TOLERATION_DURATION, conf);
189    assertTrue(newBucketCache.waitForCacheInitialization(10000));
190
191    // Set the state of bucket cache to INITIALIZING
192    newBucketCache.setCacheState(BucketCache.CacheState.INITIALIZING);
193
194    // Validate that zero values are returned for the cache being initialized.
195    assertEquals(0, newBucketCache.acceptableSize());
196    assertEquals(0, newBucketCache.getPartitionSize(1));
197    assertEquals(0, newBucketCache.getFreeSize());
198    assertEquals(0, newBucketCache.getCurrentSize());
199    assertEquals(false, newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get());
200
201    newBucketCache.setCacheState(BucketCache.CacheState.ENABLED);
202
203    // Validate that non-zero values are returned for enabled cache
204    assertTrue(newBucketCache.acceptableSize() > 0);
205    assertTrue(newBucketCache.getPartitionSize(1) > 0);
206    assertTrue(newBucketCache.getFreeSize() > 0);
207    assertTrue(newBucketCache.getCurrentSize() > 0);
208    assertTrue(newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get());
209
210    TEST_UTIL.cleanupTestDir();
211  }
212
213  @Test
214  public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception {
215    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
216    Path testDir = TEST_UTIL.getDataTestDir();
217    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
218    Configuration conf = HBaseConfiguration.create();
219    // Disables the persister thread by setting its interval to MAX_VALUE
220    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
221    conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
222    conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
223    conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
224    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
225    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192,
226      bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
227      DEFAULT_ERROR_TOLERATION_DURATION, conf);
228    assertTrue(bucketCache.waitForCacheInitialization(1000));
229    assertTrue(
230      bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled());
231
232    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5);
233    String[] names = CacheTestUtils.getHFileNames(blocks);
234
235    // Add four blocks
236    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
237    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
238    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
239    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
240
241    // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it
242    // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency
243    BucketEntry bucketEntry =
244      new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(),
245        blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(),
246        0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator());
247    bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer());
248    bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry);
249
250    // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The
251    // 5th block has same cache offset as the first
252    bucketCache.persistToFile();
253
254    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024,
255      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
256      DEFAULT_ERROR_TOLERATION_DURATION, conf);
257    while (!newBucketCache.getBackingMapValidated().get()) {
258      Thread.sleep(10);
259    }
260
261    BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
262
263    assertNull(newBucketCache.getBlock(newKeys[4], false, false, false));
264    // The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry
265    // or null based on different ordering of the keys in the backing map.
266    // Hence, skipping the check for that key.
267    assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false));
268    assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false));
269    assertEquals(blocks[3].getBlock(), newBucketCache.getBlock(newKeys[3], false, false, false));
270    assertEquals(4, newBucketCache.backingMap.size());
271    TEST_UTIL.cleanupTestDir();
272  }
273
274  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
275    throws InterruptedException {
276    Waiter.waitFor(HBaseConfiguration.create(), 12000,
277      () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)));
278  }
279
280  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
281  // threads will flush it to the bucket and put reference entry in backingMap.
282  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
283    Cacheable block) throws InterruptedException {
284    cache.cacheBlock(cacheKey, block);
285    waitUntilFlushedToBucket(cache, cacheKey);
286  }
287
288}