001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertNotEquals; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027 028import java.io.BufferedWriter; 029import java.io.File; 030import java.io.FileOutputStream; 031import java.io.OutputStreamWriter; 032import java.nio.file.FileSystems; 033import java.nio.file.Files; 034import java.nio.file.attribute.FileTime; 035import java.time.Instant; 036import java.util.Arrays; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 044import org.apache.hadoop.hbase.io.hfile.CacheConfig; 045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 046import org.apache.hadoop.hbase.io.hfile.Cacheable; 047import org.apache.hadoop.hbase.testclassification.SmallTests; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.Pair; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.junit.runner.RunWith; 054import org.junit.runners.Parameterized; 055 056/** 057 * Basic test for check file's integrity before start BucketCache in fileIOEngine 058 */ 059@RunWith(Parameterized.class) 060@Category(SmallTests.class) 061public class TestVerifyBucketCacheFile { 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class); 065 066 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 067 public static Iterable<Object[]> data() { 068 return Arrays.asList(new Object[][] { { 8192, null }, 069 { 16 * 1024, 070 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 071 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 072 128 * 1024 + 1024 } } }); 073 } 074 075 @Parameterized.Parameter(0) 076 public int constructedBlockSize; 077 078 @Parameterized.Parameter(1) 079 public int[] constructedBlockSizes; 080 081 final long capacitySize = 32 * 1024 * 1024; 082 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 083 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 084 085 /** 086 * Test cache file or persistence file does not exist whether BucketCache starts normally (1) 087 * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. 088 * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after 089 * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file 090 * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence 091 * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the 092 * cache file and persistence file would be deleted before BucketCache start normally. 093 * @throws Exception the exception 094 */ 095 @Test 096 public void testRetrieveFromFile() throws Exception { 097 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 098 Path testDir = TEST_UTIL.getDataTestDir(); 099 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 100 101 Configuration conf = HBaseConfiguration.create(); 102 // Disables the persister thread by setting its interval to MAX_VALUE 103 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 104 BucketCache bucketCache = null; 105 BucketCache recoveredBucketCache = null; 106 try { 107 bucketCache = 108 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 109 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 110 assertTrue(bucketCache.waitForCacheInitialization(10000)); 111 long usedSize = bucketCache.getAllocator().getUsedSize(); 112 assertEquals(0, usedSize); 113 CacheTestUtils.HFileBlockPair[] blocks = 114 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 115 String[] names = CacheTestUtils.getHFileNames(blocks); 116 // Add blocks 117 for (CacheTestUtils.HFileBlockPair block : blocks) { 118 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 119 } 120 usedSize = bucketCache.getAllocator().getUsedSize(); 121 assertNotEquals(0, usedSize); 122 // 1.persist cache to file 123 bucketCache.shutdown(); 124 // restore cache from file 125 bucketCache = 126 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 127 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 128 assertTrue(bucketCache.waitForCacheInitialization(10000)); 129 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 130 // persist cache to file 131 bucketCache.shutdown(); 132 133 // 2.delete bucket cache file 134 final java.nio.file.Path cacheFile = 135 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 136 assertTrue(Files.deleteIfExists(cacheFile)); 137 // can't restore cache from file 138 recoveredBucketCache = 139 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 140 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 141 assertTrue(recoveredBucketCache.waitForCacheInitialization(10000)); 142 assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize()); 143 assertEquals(0, recoveredBucketCache.backingMap.size()); 144 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 145 // Add blocks 146 for (int i = 0; i < blocks.length; i++) { 147 cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, newKeys[i], blocks[i].getBlock()); 148 } 149 usedSize = recoveredBucketCache.getAllocator().getUsedSize(); 150 assertNotEquals(0, usedSize); 151 // persist cache to file 152 recoveredBucketCache.shutdown(); 153 154 // 3.delete backingMap persistence file 155 final java.nio.file.Path mapFile = 156 FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence"); 157 assertTrue(Files.deleteIfExists(mapFile)); 158 // can't restore cache from file 159 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 160 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 161 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 162 assertTrue(bucketCache.waitForCacheInitialization(10000)); 163 waitPersistentCacheValidation(conf, bucketCache); 164 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 165 assertEquals(0, bucketCache.backingMap.size()); 166 } finally { 167 if (recoveredBucketCache == null && bucketCache != null) { 168 bucketCache.shutdown(); 169 } 170 if (recoveredBucketCache != null) { 171 recoveredBucketCache.shutdown(); 172 } 173 } 174 TEST_UTIL.cleanupTestDir(); 175 } 176 177 @Test 178 public void testRetrieveFromFileAfterDelete() throws Exception { 179 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 180 Path testDir = TEST_UTIL.getDataTestDir(); 181 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 182 Configuration conf = TEST_UTIL.getConfiguration(); 183 conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300); 184 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 185 BucketCache bucketCache = null; 186 try { 187 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 188 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 189 DEFAULT_ERROR_TOLERATION_DURATION, conf); 190 assertTrue(bucketCache.waitForCacheInitialization(10000)); 191 long usedSize = bucketCache.getAllocator().getUsedSize(); 192 assertEquals(0, usedSize); 193 CacheTestUtils.HFileBlockPair[] blocks = 194 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 195 // Add blocks 196 for (CacheTestUtils.HFileBlockPair block : blocks) { 197 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 198 } 199 usedSize = bucketCache.getAllocator().getUsedSize(); 200 assertNotEquals(0, usedSize); 201 // Shutdown BucketCache 202 bucketCache.shutdown(); 203 // Delete the persistence file 204 File mapFile = new File(mapFileName); 205 assertTrue(mapFile.delete()); 206 Thread.sleep(350); 207 // Create BucketCache 208 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 209 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 210 DEFAULT_ERROR_TOLERATION_DURATION, conf); 211 assertTrue(bucketCache.waitForCacheInitialization(10000)); 212 waitPersistentCacheValidation(conf, bucketCache); 213 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 214 assertEquals(0, bucketCache.backingMap.size()); 215 } finally { 216 if (bucketCache != null) { 217 bucketCache.shutdown(); 218 } 219 } 220 } 221 222 /** 223 * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache 224 * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache 225 * after modify cache file's data, and it can't restore cache from file, the cache file and 226 * persistence file would be deleted before BucketCache start normally. 227 * @throws Exception the exception 228 */ 229 @Test 230 public void testModifiedBucketCacheFileData() throws Exception { 231 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 232 Path testDir = TEST_UTIL.getDataTestDir(); 233 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 234 235 Configuration conf = HBaseConfiguration.create(); 236 // Disables the persister thread by setting its interval to MAX_VALUE 237 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 238 BucketCache bucketCache = null; 239 try { 240 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 241 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 242 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 243 assertTrue(bucketCache.waitForCacheInitialization(10000)); 244 long usedSize = bucketCache.getAllocator().getUsedSize(); 245 assertEquals(0, usedSize); 246 247 CacheTestUtils.HFileBlockPair[] blocks = 248 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 249 // Add blocks 250 for (CacheTestUtils.HFileBlockPair block : blocks) { 251 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 252 } 253 usedSize = bucketCache.getAllocator().getUsedSize(); 254 assertNotEquals(0, usedSize); 255 // persist cache to file 256 bucketCache.shutdown(); 257 258 // modified bucket cache file 259 String file = testDir + "/bucket.cache"; 260 try (BufferedWriter out = 261 new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) { 262 out.write("test bucket cache"); 263 } 264 // can't restore cache from file 265 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 266 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 267 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 268 assertTrue(bucketCache.waitForCacheInitialization(10000)); 269 waitPersistentCacheValidation(conf, bucketCache); 270 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 271 assertEquals(0, bucketCache.backingMap.size()); 272 } finally { 273 if (bucketCache != null) { 274 bucketCache.shutdown(); 275 } 276 } 277 TEST_UTIL.cleanupTestDir(); 278 } 279 280 /** 281 * Test whether BucketCache is started normally after modifying the cache file's last modified 282 * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache 283 * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has 284 * modified persistence cache such that now we store extra 8 bytes at the end of each block in the 285 * cache, representing the nanosecond time the block has been cached. So in the event the cache 286 * file has failed checksum verification during loading time, we go through all the cached blocks 287 * in the cache map and validate the cached time long between what is in the map and the cache 288 * file. If that check fails, we pull the cache key entry out of the map. Since in this test we 289 * are only modifying the access time to induce a checksum error, the cache file content is still 290 * valid and the extra verification should validate that all cache keys in the map are still 291 * recoverable from the cache. 292 * @throws Exception the exception 293 */ 294 @Test 295 public void testModifiedBucketCacheFileTime() throws Exception { 296 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 297 Path testDir = TEST_UTIL.getDataTestDir(); 298 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 299 Configuration conf = HBaseConfiguration.create(); 300 // Disables the persister thread by setting its interval to MAX_VALUE 301 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 302 BucketCache bucketCache = null; 303 try { 304 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 305 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 306 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 307 assertTrue(bucketCache.waitForCacheInitialization(10000)); 308 long usedSize = bucketCache.getAllocator().getUsedSize(); 309 assertEquals(0, usedSize); 310 311 Pair<String, Long> myPair = new Pair<>(); 312 313 CacheTestUtils.HFileBlockPair[] blocks = 314 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 315 // Add blocks 316 for (CacheTestUtils.HFileBlockPair block : blocks) { 317 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 318 } 319 usedSize = bucketCache.getAllocator().getUsedSize(); 320 assertNotEquals(0, usedSize); 321 long blockCount = bucketCache.backingMap.size(); 322 assertNotEquals(0, blockCount); 323 // persist cache to file 324 bucketCache.shutdown(); 325 326 // modified bucket cache file LastModifiedTime 327 final java.nio.file.Path file = 328 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 329 Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000))); 330 // can't restore cache from file 331 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 332 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 333 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 334 assertTrue(bucketCache.waitForCacheInitialization(10000)); 335 waitPersistentCacheValidation(conf, bucketCache); 336 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 337 assertEquals(blockCount, bucketCache.backingMap.size()); 338 } finally { 339 if (bucketCache != null) { 340 bucketCache.shutdown(); 341 } 342 } 343 TEST_UTIL.cleanupTestDir(); 344 } 345 346 /** 347 * When using persistent bucket cache, there may be crashes between persisting the backing map and 348 * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache 349 * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the 350 * keys that are still valid do succeed in retrieve related block data from the cache without any 351 * corruption. 352 * @throws Exception the exception 353 */ 354 @Test 355 public void testBucketCacheRecovery() throws Exception { 356 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 357 Path testDir = TEST_UTIL.getDataTestDir(); 358 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 359 Configuration conf = HBaseConfiguration.create(); 360 // Disables the persister thread by setting its interval to MAX_VALUE 361 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 362 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 363 BucketCache bucketCache = null; 364 BucketCache newBucketCache = null; 365 try { 366 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 367 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 368 DEFAULT_ERROR_TOLERATION_DURATION, conf); 369 assertTrue(bucketCache.waitForCacheInitialization(10000)); 370 371 CacheTestUtils.HFileBlockPair[] blocks = 372 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); 373 String[] names = CacheTestUtils.getHFileNames(blocks); 374 // Add three blocks 375 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 376 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 377 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 378 // saves the current state 379 bucketCache.persistToFile(); 380 // evicts first block 381 bucketCache.evictBlock(blocks[0].getBlockName()); 382 383 // now adds a fourth block to bucket cache 384 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 385 // Creates new bucket cache instance without persisting to file after evicting first block 386 // and caching fourth block. So the bucket cache file has only the last three blocks, 387 // but backing map (containing cache keys) was persisted when first three blocks 388 // were in the cache. So the state on this recovery is: 389 // - Backing map: [block0, block1, block2] 390 // - Cache: [block1, block2, block3] 391 // Therefore, this bucket cache would be able to recover only block1 and block2. 392 newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 393 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 394 DEFAULT_ERROR_TOLERATION_DURATION, conf); 395 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 396 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 397 assertNull(newBucketCache.getBlock(newKeys[0], false, false, false)); 398 assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false)); 399 assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false)); 400 assertNull(newBucketCache.getBlock(newKeys[3], false, false, false)); 401 assertEquals(2, newBucketCache.backingMap.size()); 402 } finally { 403 if (newBucketCache == null && bucketCache != null) { 404 bucketCache.shutdown(); 405 } 406 if (newBucketCache != null) { 407 newBucketCache.shutdown(); 408 } 409 TEST_UTIL.cleanupTestDir(); 410 } 411 } 412 413 @Test 414 public void testSingleChunk() throws Exception { 415 testChunkedBackingMapRecovery(5, 5); 416 } 417 418 @Test 419 public void testCompletelyFilledChunks() throws Exception { 420 // Test where the all the chunks are complete with chunkSize entries 421 testChunkedBackingMapRecovery(5, 10); 422 } 423 424 @Test 425 public void testPartiallyFilledChunks() throws Exception { 426 // Test where the last chunk is not completely filled. 427 testChunkedBackingMapRecovery(5, 13); 428 } 429 430 private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception { 431 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 432 Path testDir = TEST_UTIL.getDataTestDir(); 433 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 434 Configuration conf = HBaseConfiguration.create(); 435 conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize); 436 437 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 438 BucketCache bucketCache = null; 439 BucketCache newBucketCache = null; 440 try { 441 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 442 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 443 DEFAULT_ERROR_TOLERATION_DURATION, conf); 444 assertTrue(bucketCache.waitForCacheInitialization(10000)); 445 446 CacheTestUtils.HFileBlockPair[] blocks = 447 CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks); 448 String[] names = CacheTestUtils.getHFileNames(blocks); 449 450 for (int i = 0; i < numBlocks; i++) { 451 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), 452 blocks[i].getBlock()); 453 } 454 455 // saves the current state 456 bucketCache.persistToFile(); 457 458 // Create a new bucket which reads from persistence file. 459 newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 460 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 461 DEFAULT_ERROR_TOLERATION_DURATION, conf); 462 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 463 464 assertEquals(numBlocks, newBucketCache.backingMap.size()); 465 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 466 for (int i = 0; i < numBlocks; i++) { 467 assertEquals(blocks[i].getBlock(), 468 newBucketCache.getBlock(newKeys[i], false, false, false)); 469 } 470 } finally { 471 if (newBucketCache == null && bucketCache != null) { 472 bucketCache.shutdown(); 473 } 474 if (newBucketCache != null) { 475 newBucketCache.shutdown(); 476 } 477 TEST_UTIL.cleanupTestDir(); 478 } 479 } 480 481 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 482 throws InterruptedException { 483 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 484 Thread.sleep(100); 485 } 486 } 487 488 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 489 // threads will flush it to the bucket and put reference entry in backingMap. 490 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 491 Cacheable block) throws InterruptedException { 492 cache.cacheBlock(cacheKey, block); 493 waitUntilFlushedToBucket(cache, cacheKey); 494 } 495 496 private void waitPersistentCacheValidation(Configuration config, final BucketCache bucketCache) { 497 Waiter.waitFor(config, 5000, () -> bucketCache.getBackingMapValidated().get()); 498 } 499}