001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; 024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; 025import static org.junit.jupiter.api.Assertions.assertEquals; 026import static org.junit.jupiter.api.Assertions.assertNull; 027import static org.junit.jupiter.api.Assertions.assertTrue; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.Waiter; 034import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 035import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 036import org.apache.hadoop.hbase.io.hfile.Cacheable; 037import org.apache.hadoop.hbase.testclassification.RegionServerTests; 038import org.apache.hadoop.hbase.testclassification.SmallTests; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.Test; 041 042/** 043 * Basic test for check file's integrity before start BucketCache in fileIOEngine 044 */ 045@Tag(SmallTests.TAG) 046@Tag(RegionServerTests.TAG) 047public class TestRecoveryPersistentBucketCache { 048 049 final long capacitySize = 32 * 1024 * 1024; 050 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 051 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 052 053 @Test 054 public void testBucketCacheRecovery() throws Exception { 055 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 056 Path testDir = TEST_UTIL.getDataTestDir(); 057 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 058 Configuration conf = HBaseConfiguration.create(); 059 // Disables the persister thread by setting its interval to MAX_VALUE 060 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 061 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 062 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 063 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 064 DEFAULT_ERROR_TOLERATION_DURATION, conf); 065 assertTrue(bucketCache.waitForCacheInitialization(1000)); 066 assertTrue( 067 bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); 068 069 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 070 String[] names = CacheTestUtils.getHFileNames(blocks); 071 072 CacheTestUtils.HFileBlockPair[] smallerBlocks = CacheTestUtils.generateHFileBlocks(4096, 1); 073 String[] smallerNames = CacheTestUtils.getHFileNames(smallerBlocks); 074 // Add four blocks 075 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 076 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 077 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 078 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 079 // saves the current state of the cache 080 bucketCache.persistToFile(); 081 // evicts the 4th block 082 bucketCache.evictBlock(blocks[3].getBlockName()); 083 // now adds a 5th block to bucket cache. This block is half the size of the previous 084 // blocks, and it will be added in the same offset of the previous evicted block. 085 // This overwrites part of the 4th block. Because we persisted only up to the 086 // 4th block addition, recovery would try to read the whole 4th block, but the cached time 087 // validation will fail, and we'll recover only the first three blocks 088 cacheAndWaitUntilFlushedToBucket(bucketCache, smallerBlocks[0].getBlockName(), 089 smallerBlocks[0].getBlock()); 090 091 // Creates new bucket cache instance without persisting to file after evicting 4th block 092 // and caching 5th block. Here the cache file has the first three blocks, followed by the 093 // 5th block and the second half of 4th block (we evicted 4th block, freeing up its 094 // offset in the cache, then added 5th block which is half the size of other blocks, so it's 095 // going to override the first half of the 4th block in the cache). That's fine because 096 // the in-memory backing map has the right blocks and related offsets. However, the 097 // persistent map file only has information about the first four blocks. We validate the 098 // cache time recorded in the back map against the block data in the cache. This is recorded 099 // in the cache as the first 8 bytes of a block, so the 4th block had its first 8 blocks 100 // now overridden by the 5th block, causing this check to fail and removal of 101 // the 4th block from the backing map. 102 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 103 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 104 DEFAULT_ERROR_TOLERATION_DURATION, conf); 105 assertTrue(newBucketCache.waitForCacheInitialization(1000)); 106 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 107 BlockCacheKey[] newKeysSmaller = CacheTestUtils.regenerateKeys(smallerBlocks, smallerNames); 108 // The new bucket cache would have only the first three blocks. Although we have persisted the 109 // the cache state when it had the first four blocks, the 4th block was evicted and then we 110 // added a 5th block, which overrides part of the 4th block in the cache. This would cause a 111 // checksum failure for this block offset, when we try to read from the cache, and we would 112 // consider that block as invalid and its offset available in the cache. 113 assertNull(newBucketCache.getBlock(newKeys[3], false, false, false)); 114 assertNull(newBucketCache.getBlock(newKeysSmaller[0], false, false, false)); 115 assertEquals(blocks[0].getBlock(), newBucketCache.getBlock(newKeys[0], false, false, false)); 116 assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false)); 117 assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false)); 118 TEST_UTIL.cleanupTestDir(); 119 } 120 121 @Test 122 public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { 123 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 124 Path testDir = TEST_UTIL.getDataTestDir(); 125 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 126 Configuration conf = HBaseConfiguration.create(); 127 // Disables the persister thread by setting its interval to MAX_VALUE 128 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 129 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 130 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 131 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 132 DEFAULT_ERROR_TOLERATION_DURATION, conf); 133 assertTrue(bucketCache.waitForCacheInitialization(10000)); 134 135 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 136 137 // Add four blocks 138 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 139 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 140 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 141 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 142 143 String firstFileName = blocks[0].getBlockName().getHfileName(); 144 145 // saves the current state of the cache 146 bucketCache.persistToFile(); 147 148 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 149 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 150 DEFAULT_ERROR_TOLERATION_DURATION, conf); 151 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 152 assertEquals(4, newBucketCache.backingMap.size()); 153 154 newBucketCache.evictBlocksByHfileName(firstFileName); 155 assertEquals(3, newBucketCache.backingMap.size()); 156 TEST_UTIL.cleanupTestDir(); 157 } 158 159 @Test 160 public void testValidateCacheInitialization() throws Exception { 161 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 162 Path testDir = TEST_UTIL.getDataTestDir(); 163 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 164 Configuration conf = HBaseConfiguration.create(); 165 // Disables the persister thread by setting its interval to MAX_VALUE 166 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 167 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 168 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 169 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 170 DEFAULT_ERROR_TOLERATION_DURATION, conf); 171 assertTrue(bucketCache.waitForCacheInitialization(10000)); 172 173 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 174 175 // Add four blocks 176 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 177 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 178 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 179 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 180 // saves the current state of the cache 181 bucketCache.persistToFile(); 182 183 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 184 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 185 DEFAULT_ERROR_TOLERATION_DURATION, conf); 186 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 187 188 // Set the state of bucket cache to INITIALIZING 189 newBucketCache.setCacheState(BucketCache.CacheState.INITIALIZING); 190 191 // Validate that zero values are returned for the cache being initialized. 192 assertEquals(0, newBucketCache.acceptableSize()); 193 assertEquals(0, newBucketCache.getPartitionSize(1)); 194 assertEquals(0, newBucketCache.getFreeSize()); 195 assertEquals(0, newBucketCache.getCurrentSize()); 196 assertEquals(false, newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get()); 197 198 newBucketCache.setCacheState(BucketCache.CacheState.ENABLED); 199 200 // Validate that non-zero values are returned for enabled cache 201 assertTrue(newBucketCache.acceptableSize() > 0); 202 assertTrue(newBucketCache.getPartitionSize(1) > 0); 203 assertTrue(newBucketCache.getFreeSize() > 0); 204 assertTrue(newBucketCache.getCurrentSize() > 0); 205 assertTrue(newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get()); 206 207 TEST_UTIL.cleanupTestDir(); 208 } 209 210 @Test 211 public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception { 212 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 213 Path testDir = TEST_UTIL.getDataTestDir(); 214 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 215 Configuration conf = HBaseConfiguration.create(); 216 // Disables the persister thread by setting its interval to MAX_VALUE 217 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 218 conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); 219 conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); 220 conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); 221 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 222 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192, 223 bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 224 DEFAULT_ERROR_TOLERATION_DURATION, conf); 225 assertTrue(bucketCache.waitForCacheInitialization(1000)); 226 assertTrue( 227 bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); 228 229 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5); 230 String[] names = CacheTestUtils.getHFileNames(blocks); 231 232 // Add four blocks 233 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 234 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 235 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 236 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 237 238 // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it 239 // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency 240 BucketEntry bucketEntry = 241 new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(), 242 blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(), 243 0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator()); 244 bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer()); 245 bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry); 246 247 // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The 248 // 5th block has same cache offset as the first 249 bucketCache.persistToFile(); 250 251 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 252 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 253 DEFAULT_ERROR_TOLERATION_DURATION, conf); 254 while (!newBucketCache.getBackingMapValidated().get()) { 255 Thread.sleep(10); 256 } 257 258 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 259 260 assertNull(newBucketCache.getBlock(newKeys[4], false, false, false)); 261 // The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry 262 // or null based on different ordering of the keys in the backing map. 263 // Hence, skipping the check for that key. 264 assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false)); 265 assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false)); 266 assertEquals(blocks[3].getBlock(), newBucketCache.getBlock(newKeys[3], false, false, false)); 267 assertEquals(4, newBucketCache.backingMap.size()); 268 TEST_UTIL.cleanupTestDir(); 269 } 270 271 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 272 throws InterruptedException { 273 Waiter.waitFor(HBaseConfiguration.create(), 12000, 274 () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); 275 } 276 277 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 278 // threads will flush it to the bucket and put reference entry in backingMap. 279 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 280 Cacheable block) throws InterruptedException { 281 cache.cacheBlock(cacheKey, block); 282 waitUntilFlushedToBucket(cache, cacheKey); 283 } 284 285}