001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertNull;
027import static org.junit.jupiter.api.Assertions.assertTrue;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.Waiter;
034import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
035import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
036import org.apache.hadoop.hbase.io.hfile.Cacheable;
037import org.apache.hadoop.hbase.testclassification.RegionServerTests;
038import org.apache.hadoop.hbase.testclassification.SmallTests;
039import org.junit.jupiter.api.Tag;
040import org.junit.jupiter.api.Test;
041
042/**
043 * Basic test for check file's integrity before start BucketCache in fileIOEngine
044 */
045@Tag(SmallTests.TAG)
046@Tag(RegionServerTests.TAG)
047public class TestRecoveryPersistentBucketCache {
048
049  final long capacitySize = 32 * 1024 * 1024;
050  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
051  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
052
053  @Test
054  public void testBucketCacheRecovery() throws Exception {
055    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
056    Path testDir = TEST_UTIL.getDataTestDir();
057    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
058    Configuration conf = HBaseConfiguration.create();
059    // Disables the persister thread by setting its interval to MAX_VALUE
060    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
061    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
062    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
063      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
064      DEFAULT_ERROR_TOLERATION_DURATION, conf);
065    assertTrue(bucketCache.waitForCacheInitialization(1000));
066    assertTrue(
067      bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled());
068
069    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
070    String[] names = CacheTestUtils.getHFileNames(blocks);
071
072    CacheTestUtils.HFileBlockPair[] smallerBlocks = CacheTestUtils.generateHFileBlocks(4096, 1);
073    String[] smallerNames = CacheTestUtils.getHFileNames(smallerBlocks);
074    // Add four blocks
075    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
076    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
077    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
078    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
079    // saves the current state of the cache
080    bucketCache.persistToFile();
081    // evicts the 4th block
082    bucketCache.evictBlock(blocks[3].getBlockName());
083    // now adds a 5th block to bucket cache. This block is half the size of the previous
084    // blocks, and it will be added in the same offset of the previous evicted block.
085    // This overwrites part of the 4th block. Because we persisted only up to the
086    // 4th block addition, recovery would try to read the whole 4th block, but the cached time
087    // validation will fail, and we'll recover only the first three blocks
088    cacheAndWaitUntilFlushedToBucket(bucketCache, smallerBlocks[0].getBlockName(),
089      smallerBlocks[0].getBlock());
090
091    // Creates new bucket cache instance without persisting to file after evicting 4th block
092    // and caching 5th block. Here the cache file has the first three blocks, followed by the
093    // 5th block and the second half of 4th block (we evicted 4th block, freeing up its
094    // offset in the cache, then added 5th block which is half the size of other blocks, so it's
095    // going to override the first half of the 4th block in the cache). That's fine because
096    // the in-memory backing map has the right blocks and related offsets. However, the
097    // persistent map file only has information about the first four blocks. We validate the
098    // cache time recorded in the back map against the block data in the cache. This is recorded
099    // in the cache as the first 8 bytes of a block, so the 4th block had its first 8 blocks
100    // now overridden by the 5th block, causing this check to fail and removal of
101    // the 4th block from the backing map.
102    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
103      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
104      DEFAULT_ERROR_TOLERATION_DURATION, conf);
105    assertTrue(newBucketCache.waitForCacheInitialization(1000));
106    BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
107    BlockCacheKey[] newKeysSmaller = CacheTestUtils.regenerateKeys(smallerBlocks, smallerNames);
108    // The new bucket cache would have only the first three blocks. Although we have persisted the
109    // the cache state when it had the first four blocks, the 4th block was evicted and then we
110    // added a 5th block, which overrides part of the 4th block in the cache. This would cause a
111    // checksum failure for this block offset, when we try to read from the cache, and we would
112    // consider that block as invalid and its offset available in the cache.
113    assertNull(newBucketCache.getBlock(newKeys[3], false, false, false));
114    assertNull(newBucketCache.getBlock(newKeysSmaller[0], false, false, false));
115    assertEquals(blocks[0].getBlock(), newBucketCache.getBlock(newKeys[0], false, false, false));
116    assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false));
117    assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false));
118    TEST_UTIL.cleanupTestDir();
119  }
120
121  @Test
122  public void testBucketCacheEvictByHFileAfterRecovery() throws Exception {
123    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
124    Path testDir = TEST_UTIL.getDataTestDir();
125    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
126    Configuration conf = HBaseConfiguration.create();
127    // Disables the persister thread by setting its interval to MAX_VALUE
128    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
129    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
130    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
131      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
132      DEFAULT_ERROR_TOLERATION_DURATION, conf);
133    assertTrue(bucketCache.waitForCacheInitialization(10000));
134
135    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
136
137    // Add four blocks
138    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
139    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
140    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
141    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
142
143    String firstFileName = blocks[0].getBlockName().getHfileName();
144
145    // saves the current state of the cache
146    bucketCache.persistToFile();
147
148    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
149      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
150      DEFAULT_ERROR_TOLERATION_DURATION, conf);
151    assertTrue(newBucketCache.waitForCacheInitialization(10000));
152    assertEquals(4, newBucketCache.backingMap.size());
153
154    newBucketCache.evictBlocksByHfileName(firstFileName);
155    assertEquals(3, newBucketCache.backingMap.size());
156    TEST_UTIL.cleanupTestDir();
157  }
158
159  @Test
160  public void testValidateCacheInitialization() throws Exception {
161    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
162    Path testDir = TEST_UTIL.getDataTestDir();
163    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
164    Configuration conf = HBaseConfiguration.create();
165    // Disables the persister thread by setting its interval to MAX_VALUE
166    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
167    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
168    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
169      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
170      DEFAULT_ERROR_TOLERATION_DURATION, conf);
171    assertTrue(bucketCache.waitForCacheInitialization(10000));
172
173    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
174
175    // Add four blocks
176    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
177    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
178    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
179    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
180    // saves the current state of the cache
181    bucketCache.persistToFile();
182
183    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
184      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
185      DEFAULT_ERROR_TOLERATION_DURATION, conf);
186    assertTrue(newBucketCache.waitForCacheInitialization(10000));
187
188    // Set the state of bucket cache to INITIALIZING
189    newBucketCache.setCacheState(BucketCache.CacheState.INITIALIZING);
190
191    // Validate that zero values are returned for the cache being initialized.
192    assertEquals(0, newBucketCache.acceptableSize());
193    assertEquals(0, newBucketCache.getPartitionSize(1));
194    assertEquals(0, newBucketCache.getFreeSize());
195    assertEquals(0, newBucketCache.getCurrentSize());
196    assertEquals(false, newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get());
197
198    newBucketCache.setCacheState(BucketCache.CacheState.ENABLED);
199
200    // Validate that non-zero values are returned for enabled cache
201    assertTrue(newBucketCache.acceptableSize() > 0);
202    assertTrue(newBucketCache.getPartitionSize(1) > 0);
203    assertTrue(newBucketCache.getFreeSize() > 0);
204    assertTrue(newBucketCache.getCurrentSize() > 0);
205    assertTrue(newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get());
206
207    TEST_UTIL.cleanupTestDir();
208  }
209
210  @Test
211  public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception {
212    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
213    Path testDir = TEST_UTIL.getDataTestDir();
214    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
215    Configuration conf = HBaseConfiguration.create();
216    // Disables the persister thread by setting its interval to MAX_VALUE
217    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
218    conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
219    conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
220    conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
221    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
222    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192,
223      bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
224      DEFAULT_ERROR_TOLERATION_DURATION, conf);
225    assertTrue(bucketCache.waitForCacheInitialization(1000));
226    assertTrue(
227      bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled());
228
229    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5);
230    String[] names = CacheTestUtils.getHFileNames(blocks);
231
232    // Add four blocks
233    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
234    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
235    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
236    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
237
238    // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it
239    // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency
240    BucketEntry bucketEntry =
241      new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(),
242        blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(),
243        0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator());
244    bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer());
245    bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry);
246
247    // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The
248    // 5th block has same cache offset as the first
249    bucketCache.persistToFile();
250
251    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024,
252      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
253      DEFAULT_ERROR_TOLERATION_DURATION, conf);
254    while (!newBucketCache.getBackingMapValidated().get()) {
255      Thread.sleep(10);
256    }
257
258    BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
259
260    assertNull(newBucketCache.getBlock(newKeys[4], false, false, false));
261    // The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry
262    // or null based on different ordering of the keys in the backing map.
263    // Hence, skipping the check for that key.
264    assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false));
265    assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false));
266    assertEquals(blocks[3].getBlock(), newBucketCache.getBlock(newKeys[3], false, false, false));
267    assertEquals(4, newBucketCache.backingMap.size());
268    TEST_UTIL.cleanupTestDir();
269  }
270
271  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
272    throws InterruptedException {
273    Waiter.waitFor(HBaseConfiguration.create(), 12000,
274      () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)));
275  }
276
277  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
278  // threads will flush it to the bucket and put reference entry in backingMap.
279  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
280    Cacheable block) throws InterruptedException {
281    cache.cacheBlock(cacheKey, block);
282    waitUntilFlushedToBucket(cache, cacheKey);
283  }
284
285}