001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertNotEquals; 025import static org.junit.jupiter.api.Assertions.assertNull; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027 028import java.io.BufferedWriter; 029import java.io.File; 030import java.io.FileOutputStream; 031import java.io.OutputStreamWriter; 032import java.nio.file.FileSystems; 033import java.nio.file.Files; 034import java.nio.file.attribute.FileTime; 035import java.time.Instant; 036import java.util.stream.Stream; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 044import org.apache.hadoop.hbase.io.hfile.CacheConfig; 045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 046import org.apache.hadoop.hbase.io.hfile.Cacheable; 047import org.apache.hadoop.hbase.testclassification.RegionServerTests; 048import org.apache.hadoop.hbase.testclassification.SmallTests; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.TestTemplate; 052import org.junit.jupiter.params.provider.Arguments; 053 054/** 055 * Basic test for check file's integrity before start BucketCache in fileIOEngine 056 */ 057@Tag(SmallTests.TAG) 058@Tag(RegionServerTests.TAG) 059@HBaseParameterizedTestTemplate(name = "{index}: blockSize={0}, bucketSizes={1}") 060public class TestVerifyBucketCacheFile { 061 062 public static Stream<Arguments> parameters() { 063 return Stream.of(Arguments.of(8192, null), 064 Arguments.of(16 * 1024, 065 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 066 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 067 128 * 1024 + 1024 })); 068 } 069 070 private final int constructedBlockSize; 071 private final int[] constructedBlockSizes; 072 073 public TestVerifyBucketCacheFile(int constructedBlockSize, int[] constructedBlockSizes) { 074 this.constructedBlockSize = constructedBlockSize; 075 this.constructedBlockSizes = constructedBlockSizes; 076 } 077 078 final long capacitySize = 32 * 1024 * 1024; 079 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 080 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 081 082 /** 083 * Test cache file or persistence file does not exist whether BucketCache starts normally (1) 084 * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. 085 * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after 086 * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file 087 * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence 088 * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the 089 * cache file and persistence file would be deleted before BucketCache start normally. 090 * @throws Exception the exception 091 */ 092 @TestTemplate 093 public void testRetrieveFromFile() throws Exception { 094 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 095 Path testDir = TEST_UTIL.getDataTestDir(); 096 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 097 098 Configuration conf = HBaseConfiguration.create(); 099 // Disables the persister thread by setting its interval to MAX_VALUE 100 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 101 BucketCache bucketCache = null; 102 BucketCache recoveredBucketCache = null; 103 try { 104 bucketCache = 105 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 106 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 107 assertTrue(bucketCache.waitForCacheInitialization(10000)); 108 long usedSize = bucketCache.getAllocator().getUsedSize(); 109 assertEquals(0, usedSize); 110 CacheTestUtils.HFileBlockPair[] blocks = 111 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 112 String[] names = CacheTestUtils.getHFileNames(blocks); 113 // Add blocks 114 for (CacheTestUtils.HFileBlockPair block : blocks) { 115 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 116 } 117 usedSize = bucketCache.getAllocator().getUsedSize(); 118 assertNotEquals(0, usedSize); 119 // 1.persist cache to file 120 bucketCache.shutdown(); 121 // restore cache from file 122 bucketCache = 123 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 124 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 125 assertTrue(bucketCache.waitForCacheInitialization(10000)); 126 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 127 // persist cache to file 128 bucketCache.shutdown(); 129 130 // 2.delete bucket cache file 131 final java.nio.file.Path cacheFile = 132 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 133 assertTrue(Files.deleteIfExists(cacheFile)); 134 // can't restore cache from file 135 recoveredBucketCache = 136 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 137 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 138 assertTrue(recoveredBucketCache.waitForCacheInitialization(10000)); 139 waitPersistentCacheValidation(conf, recoveredBucketCache); 140 assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize()); 141 assertEquals(0, recoveredBucketCache.backingMap.size()); 142 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 143 // Add blocks 144 for (int i = 0; i < blocks.length; i++) { 145 cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, newKeys[i], blocks[i].getBlock()); 146 } 147 usedSize = recoveredBucketCache.getAllocator().getUsedSize(); 148 assertNotEquals(0, usedSize); 149 // persist cache to file 150 recoveredBucketCache.shutdown(); 151 152 // 3.delete backingMap persistence file 153 final java.nio.file.Path mapFile = 154 FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence"); 155 assertTrue(Files.deleteIfExists(mapFile)); 156 // can't restore cache from file 157 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 158 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 159 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 160 assertTrue(bucketCache.waitForCacheInitialization(10000)); 161 waitPersistentCacheValidation(conf, bucketCache); 162 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 163 assertEquals(0, bucketCache.backingMap.size()); 164 } finally { 165 if (recoveredBucketCache == null && bucketCache != null) { 166 bucketCache.shutdown(); 167 } 168 if (recoveredBucketCache != null) { 169 recoveredBucketCache.shutdown(); 170 } 171 } 172 TEST_UTIL.cleanupTestDir(); 173 } 174 175 @TestTemplate 176 public void testRetrieveFromFileAfterDelete() throws Exception { 177 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 178 Path testDir = TEST_UTIL.getDataTestDir(); 179 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 180 Configuration conf = TEST_UTIL.getConfiguration(); 181 conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300); 182 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 183 BucketCache bucketCache = null; 184 try { 185 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 186 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 187 DEFAULT_ERROR_TOLERATION_DURATION, conf); 188 assertTrue(bucketCache.waitForCacheInitialization(10000)); 189 long usedSize = bucketCache.getAllocator().getUsedSize(); 190 assertEquals(0, usedSize); 191 CacheTestUtils.HFileBlockPair[] blocks = 192 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 193 // Add blocks 194 for (CacheTestUtils.HFileBlockPair block : blocks) { 195 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 196 } 197 usedSize = bucketCache.getAllocator().getUsedSize(); 198 assertNotEquals(0, usedSize); 199 // Shutdown BucketCache 200 bucketCache.shutdown(); 201 // Delete the persistence file 202 File mapFile = new File(mapFileName); 203 assertTrue(mapFile.delete()); 204 Thread.sleep(350); 205 // Create BucketCache 206 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 207 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 208 DEFAULT_ERROR_TOLERATION_DURATION, conf); 209 assertTrue(bucketCache.waitForCacheInitialization(10000)); 210 waitPersistentCacheValidation(conf, bucketCache); 211 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 212 assertEquals(0, bucketCache.backingMap.size()); 213 } finally { 214 if (bucketCache != null) { 215 bucketCache.shutdown(); 216 } 217 } 218 } 219 220 /** 221 * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache 222 * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache 223 * after modify cache file's data, and it can't restore cache from file, the cache file and 224 * persistence file would be deleted before BucketCache start normally. 225 * @throws Exception the exception 226 */ 227 @TestTemplate 228 public void testModifiedBucketCacheFileData() throws Exception { 229 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 230 Path testDir = TEST_UTIL.getDataTestDir(); 231 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 232 233 Configuration conf = HBaseConfiguration.create(); 234 // Disables the persister thread by setting its interval to MAX_VALUE 235 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 236 BucketCache bucketCache = null; 237 try { 238 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 239 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 240 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 241 assertTrue(bucketCache.waitForCacheInitialization(10000)); 242 long usedSize = bucketCache.getAllocator().getUsedSize(); 243 assertEquals(0, usedSize); 244 245 CacheTestUtils.HFileBlockPair[] blocks = 246 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 247 // Add blocks 248 for (CacheTestUtils.HFileBlockPair block : blocks) { 249 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 250 } 251 usedSize = bucketCache.getAllocator().getUsedSize(); 252 assertNotEquals(0, usedSize); 253 // persist cache to file 254 bucketCache.shutdown(); 255 256 // modified bucket cache file 257 String file = testDir + "/bucket.cache"; 258 try (BufferedWriter out = 259 new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) { 260 out.write("test bucket cache"); 261 } 262 // can't restore cache from file 263 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 264 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 265 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 266 assertTrue(bucketCache.waitForCacheInitialization(10000)); 267 waitPersistentCacheValidation(conf, bucketCache); 268 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 269 assertEquals(0, bucketCache.backingMap.size()); 270 } finally { 271 if (bucketCache != null) { 272 bucketCache.shutdown(); 273 } 274 } 275 TEST_UTIL.cleanupTestDir(); 276 } 277 278 /** 279 * Test whether BucketCache is started normally after modifying the cache file's last modified 280 * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache 281 * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has 282 * modified persistence cache such that now we store extra 8 bytes at the end of each block in the 283 * cache, representing the nanosecond time the block has been cached. So in the event the cache 284 * file has failed checksum verification during loading time, we go through all the cached blocks 285 * in the cache map and validate the cached time long between what is in the map and the cache 286 * file. If that check fails, we pull the cache key entry out of the map. Since in this test we 287 * are only modifying the access time to induce a checksum error, the cache file content is still 288 * valid and the extra verification should validate that all cache keys in the map are still 289 * recoverable from the cache. 290 * @throws Exception the exception 291 */ 292 @TestTemplate 293 public void testModifiedBucketCacheFileTime() throws Exception { 294 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 295 Path testDir = TEST_UTIL.getDataTestDir(); 296 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 297 Configuration conf = HBaseConfiguration.create(); 298 // Disables the persister thread by setting its interval to MAX_VALUE 299 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 300 BucketCache bucketCache = null; 301 try { 302 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 303 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 304 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 305 assertTrue(bucketCache.waitForCacheInitialization(10000)); 306 long usedSize = bucketCache.getAllocator().getUsedSize(); 307 assertEquals(0, usedSize); 308 309 CacheTestUtils.HFileBlockPair[] blocks = 310 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 311 // Add blocks 312 for (CacheTestUtils.HFileBlockPair block : blocks) { 313 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 314 } 315 usedSize = bucketCache.getAllocator().getUsedSize(); 316 assertNotEquals(0, usedSize); 317 long blockCount = bucketCache.backingMap.size(); 318 assertNotEquals(0, blockCount); 319 // persist cache to file 320 bucketCache.shutdown(); 321 322 // modified bucket cache file LastModifiedTime 323 final java.nio.file.Path file = 324 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 325 Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000))); 326 // can't restore cache from file 327 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 328 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 329 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 330 assertTrue(bucketCache.waitForCacheInitialization(10000)); 331 waitPersistentCacheValidation(conf, bucketCache); 332 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 333 assertEquals(blockCount, bucketCache.backingMap.size()); 334 } finally { 335 if (bucketCache != null) { 336 bucketCache.shutdown(); 337 } 338 } 339 TEST_UTIL.cleanupTestDir(); 340 } 341 342 /** 343 * When using persistent bucket cache, there may be crashes between persisting the backing map and 344 * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache 345 * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the 346 * keys that are still valid do succeed in retrieve related block data from the cache without any 347 * corruption. 348 * @throws Exception the exception 349 */ 350 @TestTemplate 351 public void testBucketCacheRecovery() throws Exception { 352 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 353 Path testDir = TEST_UTIL.getDataTestDir(); 354 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 355 Configuration conf = HBaseConfiguration.create(); 356 // Disables the persister thread by setting its interval to MAX_VALUE 357 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 358 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 359 BucketCache bucketCache = null; 360 BucketCache newBucketCache = null; 361 try { 362 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 363 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 364 DEFAULT_ERROR_TOLERATION_DURATION, conf); 365 assertTrue(bucketCache.waitForCacheInitialization(10000)); 366 367 CacheTestUtils.HFileBlockPair[] blocks = 368 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); 369 String[] names = CacheTestUtils.getHFileNames(blocks); 370 // Add three blocks 371 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 372 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 373 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 374 // saves the current state 375 bucketCache.persistToFile(); 376 // evicts first block 377 bucketCache.evictBlock(blocks[0].getBlockName()); 378 379 // now adds a fourth block to bucket cache 380 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 381 // Creates new bucket cache instance without persisting to file after evicting first block 382 // and caching fourth block. So the bucket cache file has only the last three blocks, 383 // but backing map (containing cache keys) was persisted when first three blocks 384 // were in the cache. So the state on this recovery is: 385 // - Backing map: [block0, block1, block2] 386 // - Cache: [block1, block2, block3] 387 // Therefore, this bucket cache would be able to recover only block1 and block2. 388 newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 389 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 390 DEFAULT_ERROR_TOLERATION_DURATION, conf); 391 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 392 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 393 assertNull(newBucketCache.getBlock(newKeys[0], false, false, false)); 394 assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false)); 395 assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false)); 396 assertNull(newBucketCache.getBlock(newKeys[3], false, false, false)); 397 assertEquals(2, newBucketCache.backingMap.size()); 398 } finally { 399 if (newBucketCache == null && bucketCache != null) { 400 bucketCache.shutdown(); 401 } 402 if (newBucketCache != null) { 403 newBucketCache.shutdown(); 404 } 405 TEST_UTIL.cleanupTestDir(); 406 } 407 } 408 409 @TestTemplate 410 public void testSingleChunk() throws Exception { 411 testChunkedBackingMapRecovery(5, 5); 412 } 413 414 @TestTemplate 415 public void testCompletelyFilledChunks() throws Exception { 416 // Test where the all the chunks are complete with chunkSize entries 417 testChunkedBackingMapRecovery(5, 10); 418 } 419 420 @TestTemplate 421 public void testPartiallyFilledChunks() throws Exception { 422 // Test where the last chunk is not completely filled. 423 testChunkedBackingMapRecovery(5, 13); 424 } 425 426 private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception { 427 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 428 Path testDir = TEST_UTIL.getDataTestDir(); 429 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 430 Configuration conf = HBaseConfiguration.create(); 431 conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize); 432 433 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 434 BucketCache bucketCache = null; 435 BucketCache newBucketCache = null; 436 try { 437 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 438 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 439 DEFAULT_ERROR_TOLERATION_DURATION, conf); 440 assertTrue(bucketCache.waitForCacheInitialization(10000)); 441 442 CacheTestUtils.HFileBlockPair[] blocks = 443 CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks); 444 String[] names = CacheTestUtils.getHFileNames(blocks); 445 446 for (int i = 0; i < numBlocks; i++) { 447 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), 448 blocks[i].getBlock()); 449 } 450 451 // saves the current state 452 bucketCache.persistToFile(); 453 454 // Create a new bucket which reads from persistence file. 455 newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 456 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 457 DEFAULT_ERROR_TOLERATION_DURATION, conf); 458 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 459 460 assertEquals(numBlocks, newBucketCache.backingMap.size()); 461 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 462 for (int i = 0; i < numBlocks; i++) { 463 assertEquals(blocks[i].getBlock(), 464 newBucketCache.getBlock(newKeys[i], false, false, false)); 465 } 466 } finally { 467 if (newBucketCache == null && bucketCache != null) { 468 bucketCache.shutdown(); 469 } 470 if (newBucketCache != null) { 471 newBucketCache.shutdown(); 472 } 473 TEST_UTIL.cleanupTestDir(); 474 } 475 } 476 477 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 478 throws InterruptedException { 479 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 480 Thread.sleep(100); 481 } 482 } 483 484 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 485 // threads will flush it to the bucket and put reference entry in backingMap. 486 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 487 Cacheable block) throws InterruptedException { 488 cache.cacheBlock(cacheKey, block); 489 waitUntilFlushedToBucket(cache, cacheKey); 490 } 491 492 private void waitPersistentCacheValidation(Configuration config, final BucketCache bucketCache) { 493 Waiter.waitFor(config, 5000, 494 () -> bucketCache.getBackingMapValidated().get() && bucketCache.isCacheEnabled()); 495 } 496}