001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; 024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.Waiter; 035import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 036import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 037import org.apache.hadoop.hbase.io.hfile.Cacheable; 038import org.apache.hadoop.hbase.testclassification.SmallTests; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042 043/** 044 * Basic test for check file's integrity before start BucketCache in fileIOEngine 045 */ 046@Category(SmallTests.class) 047public class TestRecoveryPersistentBucketCache { 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestRecoveryPersistentBucketCache.class); 051 052 final long capacitySize = 32 * 1024 * 1024; 053 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 054 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 055 056 @Test 057 public void testBucketCacheRecovery() throws Exception { 058 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 059 Path testDir = TEST_UTIL.getDataTestDir(); 060 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 061 Configuration conf = HBaseConfiguration.create(); 062 // Disables the persister thread by setting its interval to MAX_VALUE 063 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 064 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 065 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 066 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 067 DEFAULT_ERROR_TOLERATION_DURATION, conf); 068 assertTrue(bucketCache.waitForCacheInitialization(1000)); 069 assertTrue( 070 bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); 071 072 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 073 String[] names = CacheTestUtils.getHFileNames(blocks); 074 075 CacheTestUtils.HFileBlockPair[] smallerBlocks = CacheTestUtils.generateHFileBlocks(4096, 1); 076 String[] smallerNames = CacheTestUtils.getHFileNames(smallerBlocks); 077 // Add four blocks 078 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 079 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 080 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 081 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 082 // saves the current state of the cache 083 bucketCache.persistToFile(); 084 // evicts the 4th block 085 bucketCache.evictBlock(blocks[3].getBlockName()); 086 // now adds a 5th block to bucket cache. This block is half the size of the previous 087 // blocks, and it will be added in the same offset of the previous evicted block. 088 // This overwrites part of the 4th block. Because we persisted only up to the 089 // 4th block addition, recovery would try to read the whole 4th block, but the cached time 090 // validation will fail, and we'll recover only the first three blocks 091 cacheAndWaitUntilFlushedToBucket(bucketCache, smallerBlocks[0].getBlockName(), 092 smallerBlocks[0].getBlock()); 093 094 // Creates new bucket cache instance without persisting to file after evicting 4th block 095 // and caching 5th block. Here the cache file has the first three blocks, followed by the 096 // 5th block and the second half of 4th block (we evicted 4th block, freeing up its 097 // offset in the cache, then added 5th block which is half the size of other blocks, so it's 098 // going to override the first half of the 4th block in the cache). That's fine because 099 // the in-memory backing map has the right blocks and related offsets. However, the 100 // persistent map file only has information about the first four blocks. We validate the 101 // cache time recorded in the back map against the block data in the cache. This is recorded 102 // in the cache as the first 8 bytes of a block, so the 4th block had its first 8 blocks 103 // now overridden by the 5th block, causing this check to fail and removal of 104 // the 4th block from the backing map. 105 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 106 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 107 DEFAULT_ERROR_TOLERATION_DURATION, conf); 108 assertTrue(newBucketCache.waitForCacheInitialization(1000)); 109 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 110 BlockCacheKey[] newKeysSmaller = CacheTestUtils.regenerateKeys(smallerBlocks, smallerNames); 111 // The new bucket cache would have only the first three blocks. Although we have persisted the 112 // the cache state when it had the first four blocks, the 4th block was evicted and then we 113 // added a 5th block, which overrides part of the 4th block in the cache. This would cause a 114 // checksum failure for this block offset, when we try to read from the cache, and we would 115 // consider that block as invalid and its offset available in the cache. 116 assertNull(newBucketCache.getBlock(newKeys[3], false, false, false)); 117 assertNull(newBucketCache.getBlock(newKeysSmaller[0], false, false, false)); 118 assertEquals(blocks[0].getBlock(), newBucketCache.getBlock(newKeys[0], false, false, false)); 119 assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false)); 120 assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false)); 121 TEST_UTIL.cleanupTestDir(); 122 } 123 124 @Test 125 public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { 126 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 127 Path testDir = TEST_UTIL.getDataTestDir(); 128 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 129 Configuration conf = HBaseConfiguration.create(); 130 // Disables the persister thread by setting its interval to MAX_VALUE 131 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 132 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 133 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 134 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 135 DEFAULT_ERROR_TOLERATION_DURATION, conf); 136 assertTrue(bucketCache.waitForCacheInitialization(10000)); 137 138 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 139 140 // Add four blocks 141 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 142 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 143 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 144 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 145 146 String firstFileName = blocks[0].getBlockName().getHfileName(); 147 148 // saves the current state of the cache 149 bucketCache.persistToFile(); 150 151 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 152 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 153 DEFAULT_ERROR_TOLERATION_DURATION, conf); 154 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 155 assertEquals(4, newBucketCache.backingMap.size()); 156 157 newBucketCache.evictBlocksByHfileName(firstFileName); 158 assertEquals(3, newBucketCache.backingMap.size()); 159 TEST_UTIL.cleanupTestDir(); 160 } 161 162 @Test 163 public void testValidateCacheInitialization() throws Exception { 164 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 165 Path testDir = TEST_UTIL.getDataTestDir(); 166 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 167 Configuration conf = HBaseConfiguration.create(); 168 // Disables the persister thread by setting its interval to MAX_VALUE 169 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 170 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 171 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 172 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 173 DEFAULT_ERROR_TOLERATION_DURATION, conf); 174 assertTrue(bucketCache.waitForCacheInitialization(10000)); 175 176 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 177 178 // Add four blocks 179 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 180 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 181 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 182 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 183 // saves the current state of the cache 184 bucketCache.persistToFile(); 185 186 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 187 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 188 DEFAULT_ERROR_TOLERATION_DURATION, conf); 189 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 190 191 // Set the state of bucket cache to INITIALIZING 192 newBucketCache.setCacheState(BucketCache.CacheState.INITIALIZING); 193 194 // Validate that zero values are returned for the cache being initialized. 195 assertEquals(0, newBucketCache.acceptableSize()); 196 assertEquals(0, newBucketCache.getPartitionSize(1)); 197 assertEquals(0, newBucketCache.getFreeSize()); 198 assertEquals(0, newBucketCache.getCurrentSize()); 199 assertEquals(false, newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get()); 200 201 newBucketCache.setCacheState(BucketCache.CacheState.ENABLED); 202 203 // Validate that non-zero values are returned for enabled cache 204 assertTrue(newBucketCache.acceptableSize() > 0); 205 assertTrue(newBucketCache.getPartitionSize(1) > 0); 206 assertTrue(newBucketCache.getFreeSize() > 0); 207 assertTrue(newBucketCache.getCurrentSize() > 0); 208 assertTrue(newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get()); 209 210 TEST_UTIL.cleanupTestDir(); 211 } 212 213 @Test 214 public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception { 215 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 216 Path testDir = TEST_UTIL.getDataTestDir(); 217 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 218 Configuration conf = HBaseConfiguration.create(); 219 // Disables the persister thread by setting its interval to MAX_VALUE 220 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 221 conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); 222 conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); 223 conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); 224 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 225 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192, 226 bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 227 DEFAULT_ERROR_TOLERATION_DURATION, conf); 228 assertTrue(bucketCache.waitForCacheInitialization(1000)); 229 assertTrue( 230 bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); 231 232 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5); 233 String[] names = CacheTestUtils.getHFileNames(blocks); 234 235 // Add four blocks 236 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 237 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 238 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 239 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 240 241 // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it 242 // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency 243 BucketEntry bucketEntry = 244 new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(), 245 blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(), 246 0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator()); 247 bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer()); 248 bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry); 249 250 // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The 251 // 5th block has same cache offset as the first 252 bucketCache.persistToFile(); 253 254 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 255 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 256 DEFAULT_ERROR_TOLERATION_DURATION, conf); 257 while (!newBucketCache.getBackingMapValidated().get()) { 258 Thread.sleep(10); 259 } 260 261 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 262 263 assertNull(newBucketCache.getBlock(newKeys[4], false, false, false)); 264 // The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry 265 // or null based on different ordering of the keys in the backing map. 266 // Hence, skipping the check for that key. 267 assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false)); 268 assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false)); 269 assertEquals(blocks[3].getBlock(), newBucketCache.getBlock(newKeys[3], false, false, false)); 270 assertEquals(4, newBucketCache.backingMap.size()); 271 TEST_UTIL.cleanupTestDir(); 272 } 273 274 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 275 throws InterruptedException { 276 Waiter.waitFor(HBaseConfiguration.create(), 12000, 277 () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); 278 } 279 280 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 281 // threads will flush it to the bucket and put reference entry in backingMap. 282 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 283 Cacheable block) throws InterruptedException { 284 cache.cacheBlock(cacheKey, block); 285 waitUntilFlushedToBucket(cache, cacheKey); 286 } 287 288}