001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertNotEquals; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027 028import java.io.BufferedWriter; 029import java.io.File; 030import java.io.FileOutputStream; 031import java.io.OutputStreamWriter; 032import java.nio.file.FileSystems; 033import java.nio.file.Files; 034import java.nio.file.attribute.FileTime; 035import java.time.Instant; 036import java.util.Arrays; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 044import org.apache.hadoop.hbase.io.hfile.CacheConfig; 045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 046import org.apache.hadoop.hbase.io.hfile.Cacheable; 047import org.apache.hadoop.hbase.testclassification.SmallTests; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.runner.RunWith; 053import org.junit.runners.Parameterized; 054 055/** 056 * Basic test for check file's integrity before start BucketCache in fileIOEngine 057 */ 058@RunWith(Parameterized.class) 059@Category(SmallTests.class) 060public class TestVerifyBucketCacheFile { 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class); 064 065 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 066 public static Iterable<Object[]> data() { 067 return Arrays.asList(new Object[][] { { 8192, null }, 068 { 16 * 1024, 069 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 070 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 071 128 * 1024 + 1024 } } }); 072 } 073 074 @Parameterized.Parameter(0) 075 public int constructedBlockSize; 076 077 @Parameterized.Parameter(1) 078 public int[] constructedBlockSizes; 079 080 final long capacitySize = 32 * 1024 * 1024; 081 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 082 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 083 084 /** 085 * Test cache file or persistence file does not exist whether BucketCache starts normally (1) 086 * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. 087 * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after 088 * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file 089 * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence 090 * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the 091 * cache file and persistence file would be deleted before BucketCache start normally. 092 * @throws Exception the exception 093 */ 094 @Test 095 public void testRetrieveFromFile() throws Exception { 096 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 097 Path testDir = TEST_UTIL.getDataTestDir(); 098 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 099 100 Configuration conf = HBaseConfiguration.create(); 101 // Disables the persister thread by setting its interval to MAX_VALUE 102 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 103 BucketCache bucketCache = null; 104 BucketCache recoveredBucketCache = null; 105 try { 106 bucketCache = 107 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 108 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 109 assertTrue(bucketCache.waitForCacheInitialization(10000)); 110 long usedSize = bucketCache.getAllocator().getUsedSize(); 111 assertEquals(0, usedSize); 112 CacheTestUtils.HFileBlockPair[] blocks = 113 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 114 String[] names = CacheTestUtils.getHFileNames(blocks); 115 // Add blocks 116 for (CacheTestUtils.HFileBlockPair block : blocks) { 117 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 118 } 119 usedSize = bucketCache.getAllocator().getUsedSize(); 120 assertNotEquals(0, usedSize); 121 // 1.persist cache to file 122 bucketCache.shutdown(); 123 // restore cache from file 124 bucketCache = 125 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 126 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 127 assertTrue(bucketCache.waitForCacheInitialization(10000)); 128 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 129 // persist cache to file 130 bucketCache.shutdown(); 131 132 // 2.delete bucket cache file 133 final java.nio.file.Path cacheFile = 134 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 135 assertTrue(Files.deleteIfExists(cacheFile)); 136 // can't restore cache from file 137 recoveredBucketCache = 138 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 139 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 140 assertTrue(recoveredBucketCache.waitForCacheInitialization(10000)); 141 waitPersistentCacheValidation(conf, recoveredBucketCache); 142 assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize()); 143 assertEquals(0, recoveredBucketCache.backingMap.size()); 144 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 145 // Add blocks 146 for (int i = 0; i < blocks.length; i++) { 147 cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, newKeys[i], blocks[i].getBlock()); 148 } 149 usedSize = recoveredBucketCache.getAllocator().getUsedSize(); 150 assertNotEquals(0, usedSize); 151 // persist cache to file 152 recoveredBucketCache.shutdown(); 153 154 // 3.delete backingMap persistence file 155 final java.nio.file.Path mapFile = 156 FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence"); 157 assertTrue(Files.deleteIfExists(mapFile)); 158 // can't restore cache from file 159 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 160 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 161 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 162 assertTrue(bucketCache.waitForCacheInitialization(10000)); 163 waitPersistentCacheValidation(conf, bucketCache); 164 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 165 assertEquals(0, bucketCache.backingMap.size()); 166 } finally { 167 if (recoveredBucketCache == null && bucketCache != null) { 168 bucketCache.shutdown(); 169 } 170 if (recoveredBucketCache != null) { 171 recoveredBucketCache.shutdown(); 172 } 173 } 174 TEST_UTIL.cleanupTestDir(); 175 } 176 177 @Test 178 public void testRetrieveFromFileAfterDelete() throws Exception { 179 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 180 Path testDir = TEST_UTIL.getDataTestDir(); 181 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 182 Configuration conf = TEST_UTIL.getConfiguration(); 183 conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300); 184 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 185 BucketCache bucketCache = null; 186 try { 187 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 188 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 189 DEFAULT_ERROR_TOLERATION_DURATION, conf); 190 assertTrue(bucketCache.waitForCacheInitialization(10000)); 191 long usedSize = bucketCache.getAllocator().getUsedSize(); 192 assertEquals(0, usedSize); 193 CacheTestUtils.HFileBlockPair[] blocks = 194 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 195 // Add blocks 196 for (CacheTestUtils.HFileBlockPair block : blocks) { 197 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 198 } 199 usedSize = bucketCache.getAllocator().getUsedSize(); 200 assertNotEquals(0, usedSize); 201 // Shutdown BucketCache 202 bucketCache.shutdown(); 203 // Delete the persistence file 204 File mapFile = new File(mapFileName); 205 assertTrue(mapFile.delete()); 206 Thread.sleep(350); 207 // Create BucketCache 208 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 209 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 210 DEFAULT_ERROR_TOLERATION_DURATION, conf); 211 assertTrue(bucketCache.waitForCacheInitialization(10000)); 212 waitPersistentCacheValidation(conf, bucketCache); 213 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 214 assertEquals(0, bucketCache.backingMap.size()); 215 } finally { 216 if (bucketCache != null) { 217 bucketCache.shutdown(); 218 } 219 } 220 } 221 222 /** 223 * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache 224 * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache 225 * after modify cache file's data, and it can't restore cache from file, the cache file and 226 * persistence file would be deleted before BucketCache start normally. 227 * @throws Exception the exception 228 */ 229 @Test 230 public void testModifiedBucketCacheFileData() throws Exception { 231 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 232 Path testDir = TEST_UTIL.getDataTestDir(); 233 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 234 235 Configuration conf = HBaseConfiguration.create(); 236 // Disables the persister thread by setting its interval to MAX_VALUE 237 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 238 BucketCache bucketCache = null; 239 try { 240 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 241 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 242 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 243 assertTrue(bucketCache.waitForCacheInitialization(10000)); 244 long usedSize = bucketCache.getAllocator().getUsedSize(); 245 assertEquals(0, usedSize); 246 247 CacheTestUtils.HFileBlockPair[] blocks = 248 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 249 // Add blocks 250 for (CacheTestUtils.HFileBlockPair block : blocks) { 251 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 252 } 253 usedSize = bucketCache.getAllocator().getUsedSize(); 254 assertNotEquals(0, usedSize); 255 // persist cache to file 256 bucketCache.shutdown(); 257 258 // modified bucket cache file 259 String file = testDir + "/bucket.cache"; 260 try (BufferedWriter out = 261 new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) { 262 out.write("test bucket cache"); 263 } 264 // can't restore cache from file 265 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 266 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 267 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 268 assertTrue(bucketCache.waitForCacheInitialization(10000)); 269 waitPersistentCacheValidation(conf, bucketCache); 270 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 271 assertEquals(0, bucketCache.backingMap.size()); 272 } finally { 273 if (bucketCache != null) { 274 bucketCache.shutdown(); 275 } 276 } 277 TEST_UTIL.cleanupTestDir(); 278 } 279 280 /** 281 * Test whether BucketCache is started normally after modifying the cache file's last modified 282 * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache 283 * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has 284 * modified persistence cache such that now we store extra 8 bytes at the end of each block in the 285 * cache, representing the nanosecond time the block has been cached. So in the event the cache 286 * file has failed checksum verification during loading time, we go through all the cached blocks 287 * in the cache map and validate the cached time long between what is in the map and the cache 288 * file. If that check fails, we pull the cache key entry out of the map. Since in this test we 289 * are only modifying the access time to induce a checksum error, the cache file content is still 290 * valid and the extra verification should validate that all cache keys in the map are still 291 * recoverable from the cache. 292 * @throws Exception the exception 293 */ 294 @Test 295 public void testModifiedBucketCacheFileTime() throws Exception { 296 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 297 Path testDir = TEST_UTIL.getDataTestDir(); 298 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 299 Configuration conf = HBaseConfiguration.create(); 300 // Disables the persister thread by setting its interval to MAX_VALUE 301 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 302 BucketCache bucketCache = null; 303 try { 304 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 305 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 306 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 307 assertTrue(bucketCache.waitForCacheInitialization(10000)); 308 long usedSize = bucketCache.getAllocator().getUsedSize(); 309 assertEquals(0, usedSize); 310 311 CacheTestUtils.HFileBlockPair[] blocks = 312 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 313 // Add blocks 314 for (CacheTestUtils.HFileBlockPair block : blocks) { 315 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 316 } 317 usedSize = bucketCache.getAllocator().getUsedSize(); 318 assertNotEquals(0, usedSize); 319 long blockCount = bucketCache.backingMap.size(); 320 assertNotEquals(0, blockCount); 321 // persist cache to file 322 bucketCache.shutdown(); 323 324 // modified bucket cache file LastModifiedTime 325 final java.nio.file.Path file = 326 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 327 Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000))); 328 // can't restore cache from file 329 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 330 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 331 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 332 assertTrue(bucketCache.waitForCacheInitialization(10000)); 333 waitPersistentCacheValidation(conf, bucketCache); 334 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 335 assertEquals(blockCount, bucketCache.backingMap.size()); 336 } finally { 337 if (bucketCache != null) { 338 bucketCache.shutdown(); 339 } 340 } 341 TEST_UTIL.cleanupTestDir(); 342 } 343 344 /** 345 * When using persistent bucket cache, there may be crashes between persisting the backing map and 346 * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache 347 * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the 348 * keys that are still valid do succeed in retrieve related block data from the cache without any 349 * corruption. 350 * @throws Exception the exception 351 */ 352 @Test 353 public void testBucketCacheRecovery() throws Exception { 354 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 355 Path testDir = TEST_UTIL.getDataTestDir(); 356 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 357 Configuration conf = HBaseConfiguration.create(); 358 // Disables the persister thread by setting its interval to MAX_VALUE 359 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 360 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 361 BucketCache bucketCache = null; 362 BucketCache newBucketCache = null; 363 try { 364 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 365 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 366 DEFAULT_ERROR_TOLERATION_DURATION, conf); 367 assertTrue(bucketCache.waitForCacheInitialization(10000)); 368 369 CacheTestUtils.HFileBlockPair[] blocks = 370 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); 371 String[] names = CacheTestUtils.getHFileNames(blocks); 372 // Add three blocks 373 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 374 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 375 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 376 // saves the current state 377 bucketCache.persistToFile(); 378 // evicts first block 379 bucketCache.evictBlock(blocks[0].getBlockName()); 380 381 // now adds a fourth block to bucket cache 382 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 383 // Creates new bucket cache instance without persisting to file after evicting first block 384 // and caching fourth block. So the bucket cache file has only the last three blocks, 385 // but backing map (containing cache keys) was persisted when first three blocks 386 // were in the cache. So the state on this recovery is: 387 // - Backing map: [block0, block1, block2] 388 // - Cache: [block1, block2, block3] 389 // Therefore, this bucket cache would be able to recover only block1 and block2. 390 newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 391 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 392 DEFAULT_ERROR_TOLERATION_DURATION, conf); 393 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 394 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 395 assertNull(newBucketCache.getBlock(newKeys[0], false, false, false)); 396 assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false)); 397 assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false)); 398 assertNull(newBucketCache.getBlock(newKeys[3], false, false, false)); 399 assertEquals(2, newBucketCache.backingMap.size()); 400 } finally { 401 if (newBucketCache == null && bucketCache != null) { 402 bucketCache.shutdown(); 403 } 404 if (newBucketCache != null) { 405 newBucketCache.shutdown(); 406 } 407 TEST_UTIL.cleanupTestDir(); 408 } 409 } 410 411 @Test 412 public void testSingleChunk() throws Exception { 413 testChunkedBackingMapRecovery(5, 5); 414 } 415 416 @Test 417 public void testCompletelyFilledChunks() throws Exception { 418 // Test where the all the chunks are complete with chunkSize entries 419 testChunkedBackingMapRecovery(5, 10); 420 } 421 422 @Test 423 public void testPartiallyFilledChunks() throws Exception { 424 // Test where the last chunk is not completely filled. 425 testChunkedBackingMapRecovery(5, 13); 426 } 427 428 private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception { 429 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 430 Path testDir = TEST_UTIL.getDataTestDir(); 431 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 432 Configuration conf = HBaseConfiguration.create(); 433 conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize); 434 435 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 436 BucketCache bucketCache = null; 437 BucketCache newBucketCache = null; 438 try { 439 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 440 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 441 DEFAULT_ERROR_TOLERATION_DURATION, conf); 442 assertTrue(bucketCache.waitForCacheInitialization(10000)); 443 444 CacheTestUtils.HFileBlockPair[] blocks = 445 CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks); 446 String[] names = CacheTestUtils.getHFileNames(blocks); 447 448 for (int i = 0; i < numBlocks; i++) { 449 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), 450 blocks[i].getBlock()); 451 } 452 453 // saves the current state 454 bucketCache.persistToFile(); 455 456 // Create a new bucket which reads from persistence file. 457 newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 458 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 459 DEFAULT_ERROR_TOLERATION_DURATION, conf); 460 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 461 462 assertEquals(numBlocks, newBucketCache.backingMap.size()); 463 BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names); 464 for (int i = 0; i < numBlocks; i++) { 465 assertEquals(blocks[i].getBlock(), 466 newBucketCache.getBlock(newKeys[i], false, false, false)); 467 } 468 } finally { 469 if (newBucketCache == null && bucketCache != null) { 470 bucketCache.shutdown(); 471 } 472 if (newBucketCache != null) { 473 newBucketCache.shutdown(); 474 } 475 TEST_UTIL.cleanupTestDir(); 476 } 477 } 478 479 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 480 throws InterruptedException { 481 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 482 Thread.sleep(100); 483 } 484 } 485 486 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 487 // threads will flush it to the bucket and put reference entry in backingMap. 488 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 489 Cacheable block) throws InterruptedException { 490 cache.cacheBlock(cacheKey, block); 491 waitUntilFlushedToBucket(cache, cacheKey); 492 } 493 494 private void waitPersistentCacheValidation(Configuration config, final BucketCache bucketCache) { 495 Waiter.waitFor(config, 5000, 496 () -> bucketCache.getBackingMapValidated().get() && bucketCache.isCacheEnabled()); 497 } 498}