001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotEquals; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.BufferedWriter; 028import java.io.File; 029import java.io.FileOutputStream; 030import java.io.OutputStreamWriter; 031import java.nio.file.FileSystems; 032import java.nio.file.Files; 033import java.nio.file.attribute.FileTime; 034import java.time.Instant; 035import java.util.Arrays; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 042import org.apache.hadoop.hbase.io.hfile.CacheConfig; 043import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 044import org.apache.hadoop.hbase.io.hfile.Cacheable; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.Pair; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.runner.RunWith; 052import org.junit.runners.Parameterized; 053 054/** 055 * Basic test for check file's integrity before start BucketCache in fileIOEngine 056 */ 057@RunWith(Parameterized.class) 058@Category(SmallTests.class) 059public class TestVerifyBucketCacheFile { 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class); 063 064 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 065 public static Iterable<Object[]> data() { 066 return Arrays.asList(new Object[][] { { 8192, null }, 067 { 16 * 1024, 068 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 069 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 070 128 * 1024 + 1024 } } }); 071 } 072 073 @Parameterized.Parameter(0) 074 public int constructedBlockSize; 075 076 @Parameterized.Parameter(1) 077 public int[] constructedBlockSizes; 078 079 final long capacitySize = 32 * 1024 * 1024; 080 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 081 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 082 083 /** 084 * Test cache file or persistence file does not exist whether BucketCache starts normally (1) 085 * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. 086 * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after 087 * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file 088 * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence 089 * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the 090 * cache file and persistence file would be deleted before BucketCache start normally. 091 * @throws Exception the exception 092 */ 093 @Test 094 public void testRetrieveFromFile() throws Exception { 095 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 096 Path testDir = TEST_UTIL.getDataTestDir(); 097 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 098 099 BucketCache bucketCache = 100 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 101 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 102 long usedSize = bucketCache.getAllocator().getUsedSize(); 103 assertEquals(0, usedSize); 104 CacheTestUtils.HFileBlockPair[] blocks = 105 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 106 // Add blocks 107 for (CacheTestUtils.HFileBlockPair block : blocks) { 108 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 109 } 110 usedSize = bucketCache.getAllocator().getUsedSize(); 111 assertNotEquals(0, usedSize); 112 // 1.persist cache to file 113 bucketCache.shutdown(); 114 // restore cache from file 115 bucketCache = 116 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 117 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 118 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 119 // persist cache to file 120 bucketCache.shutdown(); 121 122 // 2.delete bucket cache file 123 final java.nio.file.Path cacheFile = 124 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 125 assertTrue(Files.deleteIfExists(cacheFile)); 126 // can't restore cache from file 127 bucketCache = 128 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 129 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 130 Thread.sleep(100); 131 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 132 assertEquals(0, bucketCache.backingMap.size()); 133 // Add blocks 134 for (CacheTestUtils.HFileBlockPair block : blocks) { 135 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 136 } 137 usedSize = bucketCache.getAllocator().getUsedSize(); 138 assertNotEquals(0, usedSize); 139 // persist cache to file 140 bucketCache.shutdown(); 141 142 // 3.delete backingMap persistence file 143 final java.nio.file.Path mapFile = 144 FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence"); 145 assertTrue(Files.deleteIfExists(mapFile)); 146 // can't restore cache from file 147 bucketCache = 148 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 149 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 150 Thread.sleep(100); 151 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 152 assertEquals(0, bucketCache.backingMap.size()); 153 154 TEST_UTIL.cleanupTestDir(); 155 } 156 157 @Test 158 public void testRetrieveFromFileAfterDelete() throws Exception { 159 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 160 Path testDir = TEST_UTIL.getDataTestDir(); 161 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 162 Configuration conf = TEST_UTIL.getConfiguration(); 163 conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300); 164 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 165 BucketCache bucketCache = 166 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 167 constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); 168 169 long usedSize = bucketCache.getAllocator().getUsedSize(); 170 assertEquals(0, usedSize); 171 CacheTestUtils.HFileBlockPair[] blocks = 172 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 173 // Add blocks 174 for (CacheTestUtils.HFileBlockPair block : blocks) { 175 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 176 } 177 usedSize = bucketCache.getAllocator().getUsedSize(); 178 assertNotEquals(0, usedSize); 179 // Shutdown BucketCache 180 bucketCache.shutdown(); 181 // Delete the persistence file 182 File mapFile = new File(mapFileName); 183 assertTrue(mapFile.delete()); 184 Thread.sleep(350); 185 // Create BucketCache 186 bucketCache = 187 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 188 constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); 189 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 190 assertEquals(0, bucketCache.backingMap.size()); 191 } 192 193 /** 194 * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache 195 * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache 196 * after modify cache file's data, and it can't restore cache from file, the cache file and 197 * persistence file would be deleted before BucketCache start normally. 198 * @throws Exception the exception 199 */ 200 @Test 201 public void testModifiedBucketCacheFileData() throws Exception { 202 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 203 Path testDir = TEST_UTIL.getDataTestDir(); 204 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 205 206 Configuration conf = HBaseConfiguration.create(); 207 // Disables the persister thread by setting its interval to MAX_VALUE 208 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 209 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 210 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 211 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 212 long usedSize = bucketCache.getAllocator().getUsedSize(); 213 assertEquals(0, usedSize); 214 215 CacheTestUtils.HFileBlockPair[] blocks = 216 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 217 // Add blocks 218 for (CacheTestUtils.HFileBlockPair block : blocks) { 219 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 220 } 221 usedSize = bucketCache.getAllocator().getUsedSize(); 222 assertNotEquals(0, usedSize); 223 // persist cache to file 224 bucketCache.shutdown(); 225 226 // modified bucket cache file 227 String file = testDir + "/bucket.cache"; 228 try (BufferedWriter out = 229 new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) { 230 out.write("test bucket cache"); 231 } 232 // can't restore cache from file 233 bucketCache = 234 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 235 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 236 Thread.sleep(100); 237 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 238 assertEquals(0, bucketCache.backingMap.size()); 239 240 TEST_UTIL.cleanupTestDir(); 241 } 242 243 /** 244 * Test whether BucketCache is started normally after modifying the cache file's last modified 245 * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache 246 * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has 247 * modified persistence cache such that now we store extra 8 bytes at the end of each block in the 248 * cache, representing the nanosecond time the block has been cached. So in the event the cache 249 * file has failed checksum verification during loading time, we go through all the cached blocks 250 * in the cache map and validate the cached time long between what is in the map and the cache 251 * file. If that check fails, we pull the cache key entry out of the map. Since in this test we 252 * are only modifying the access time to induce a checksum error, the cache file content is still 253 * valid and the extra verification should validate that all cache keys in the map are still 254 * recoverable from the cache. 255 * @throws Exception the exception 256 */ 257 @Test 258 public void testModifiedBucketCacheFileTime() throws Exception { 259 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 260 Path testDir = TEST_UTIL.getDataTestDir(); 261 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 262 263 BucketCache bucketCache = 264 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 265 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 266 long usedSize = bucketCache.getAllocator().getUsedSize(); 267 assertEquals(0, usedSize); 268 269 Pair<String, Long> myPair = new Pair<>(); 270 271 CacheTestUtils.HFileBlockPair[] blocks = 272 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 273 // Add blocks 274 for (CacheTestUtils.HFileBlockPair block : blocks) { 275 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 276 } 277 usedSize = bucketCache.getAllocator().getUsedSize(); 278 assertNotEquals(0, usedSize); 279 long blockCount = bucketCache.backingMap.size(); 280 assertNotEquals(0, blockCount); 281 // persist cache to file 282 bucketCache.shutdown(); 283 284 // modified bucket cache file LastModifiedTime 285 final java.nio.file.Path file = 286 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 287 Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000))); 288 // can't restore cache from file 289 bucketCache = 290 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 291 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 292 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 293 assertEquals(blockCount, bucketCache.backingMap.size()); 294 295 TEST_UTIL.cleanupTestDir(); 296 } 297 298 /** 299 * When using persistent bucket cache, there may be crashes between persisting the backing map and 300 * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache 301 * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the 302 * keys that are still valid do succeed in retrieve related block data from the cache without any 303 * corruption. 304 * @throws Exception the exception 305 */ 306 @Test 307 public void testBucketCacheRecovery() throws Exception { 308 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 309 Path testDir = TEST_UTIL.getDataTestDir(); 310 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 311 Configuration conf = HBaseConfiguration.create(); 312 // Disables the persister thread by setting its interval to MAX_VALUE 313 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 314 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 315 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 316 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 317 DEFAULT_ERROR_TOLERATION_DURATION, conf); 318 319 CacheTestUtils.HFileBlockPair[] blocks = 320 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); 321 // Add three blocks 322 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 323 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 324 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 325 // saves the current state 326 bucketCache.persistToFile(); 327 // evicts first block 328 bucketCache.evictBlock(blocks[0].getBlockName()); 329 330 // now adds a fourth block to bucket cache 331 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 332 // Creates new bucket cache instance without persisting to file after evicting first block 333 // and caching fourth block. So the bucket cache file has only the last three blocks, 334 // but backing map (containing cache keys) was persisted when first three blocks 335 // were in the cache. So the state on this recovery is: 336 // - Backing map: [block0, block1, block2] 337 // - Cache: [block1, block2, block3] 338 // Therefore, this bucket cache would be able to recover only block1 and block2. 339 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 340 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 341 DEFAULT_ERROR_TOLERATION_DURATION, conf); 342 343 assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); 344 assertEquals(blocks[1].getBlock(), 345 newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); 346 assertEquals(blocks[2].getBlock(), 347 newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); 348 assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); 349 assertEquals(2, newBucketCache.backingMap.size()); 350 TEST_UTIL.cleanupTestDir(); 351 } 352 353 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 354 throws InterruptedException { 355 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 356 Thread.sleep(100); 357 } 358 } 359 360 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 361 // threads will flush it to the bucket and put reference entry in backingMap. 362 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 363 Cacheable block) throws InterruptedException { 364 cache.cacheBlock(cacheKey, block); 365 waitUntilFlushedToBucket(cache, cacheKey); 366 } 367}