001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertNotEquals;
025import static org.junit.jupiter.api.Assertions.assertNull;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027
028import java.io.BufferedWriter;
029import java.io.File;
030import java.io.FileOutputStream;
031import java.io.OutputStreamWriter;
032import java.nio.file.FileSystems;
033import java.nio.file.Files;
034import java.nio.file.attribute.FileTime;
035import java.time.Instant;
036import java.util.stream.Stream;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
044import org.apache.hadoop.hbase.io.hfile.CacheConfig;
045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
046import org.apache.hadoop.hbase.io.hfile.Cacheable;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.testclassification.SmallTests;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.TestTemplate;
052import org.junit.jupiter.params.provider.Arguments;
053
054/**
055 * Basic test for check file's integrity before start BucketCache in fileIOEngine
056 */
057@Tag(SmallTests.TAG)
058@Tag(RegionServerTests.TAG)
059@HBaseParameterizedTestTemplate(name = "{index}: blockSize={0}, bucketSizes={1}")
060public class TestVerifyBucketCacheFile {
061
062  public static Stream<Arguments> parameters() {
063    return Stream.of(Arguments.of(8192, null),
064      Arguments.of(16 * 1024,
065        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
066          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
067          128 * 1024 + 1024 }));
068  }
069
070  private final int constructedBlockSize;
071  private final int[] constructedBlockSizes;
072
073  public TestVerifyBucketCacheFile(int constructedBlockSize, int[] constructedBlockSizes) {
074    this.constructedBlockSize = constructedBlockSize;
075    this.constructedBlockSizes = constructedBlockSizes;
076  }
077
078  final long capacitySize = 32 * 1024 * 1024;
079  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
080  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
081
082  /**
083   * Test cache file or persistence file does not exist whether BucketCache starts normally (1)
084   * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
085   * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after
086   * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file
087   * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence
088   * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the
089   * cache file and persistence file would be deleted before BucketCache start normally.
090   * @throws Exception the exception
091   */
092  @TestTemplate
093  public void testRetrieveFromFile() throws Exception {
094    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
095    Path testDir = TEST_UTIL.getDataTestDir();
096    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
097
098    Configuration conf = HBaseConfiguration.create();
099    // Disables the persister thread by setting its interval to MAX_VALUE
100    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
101    BucketCache bucketCache = null;
102    BucketCache recoveredBucketCache = null;
103    try {
104      bucketCache =
105        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
106          constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
107      assertTrue(bucketCache.waitForCacheInitialization(10000));
108      long usedSize = bucketCache.getAllocator().getUsedSize();
109      assertEquals(0, usedSize);
110      CacheTestUtils.HFileBlockPair[] blocks =
111        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
112      String[] names = CacheTestUtils.getHFileNames(blocks);
113      // Add blocks
114      for (CacheTestUtils.HFileBlockPair block : blocks) {
115        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
116      }
117      usedSize = bucketCache.getAllocator().getUsedSize();
118      assertNotEquals(0, usedSize);
119      // 1.persist cache to file
120      bucketCache.shutdown();
121      // restore cache from file
122      bucketCache =
123        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
124          constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
125      assertTrue(bucketCache.waitForCacheInitialization(10000));
126      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
127      // persist cache to file
128      bucketCache.shutdown();
129
130      // 2.delete bucket cache file
131      final java.nio.file.Path cacheFile =
132        FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
133      assertTrue(Files.deleteIfExists(cacheFile));
134      // can't restore cache from file
135      recoveredBucketCache =
136        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
137          constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
138      assertTrue(recoveredBucketCache.waitForCacheInitialization(10000));
139      waitPersistentCacheValidation(conf, recoveredBucketCache);
140      assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
141      assertEquals(0, recoveredBucketCache.backingMap.size());
142      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
143      // Add blocks
144      for (int i = 0; i < blocks.length; i++) {
145        cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, newKeys[i], blocks[i].getBlock());
146      }
147      usedSize = recoveredBucketCache.getAllocator().getUsedSize();
148      assertNotEquals(0, usedSize);
149      // persist cache to file
150      recoveredBucketCache.shutdown();
151
152      // 3.delete backingMap persistence file
153      final java.nio.file.Path mapFile =
154        FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence");
155      assertTrue(Files.deleteIfExists(mapFile));
156      // can't restore cache from file
157      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
158        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
159        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
160      assertTrue(bucketCache.waitForCacheInitialization(10000));
161      waitPersistentCacheValidation(conf, bucketCache);
162      assertEquals(0, bucketCache.getAllocator().getUsedSize());
163      assertEquals(0, bucketCache.backingMap.size());
164    } finally {
165      if (recoveredBucketCache == null && bucketCache != null) {
166        bucketCache.shutdown();
167      }
168      if (recoveredBucketCache != null) {
169        recoveredBucketCache.shutdown();
170      }
171    }
172    TEST_UTIL.cleanupTestDir();
173  }
174
175  @TestTemplate
176  public void testRetrieveFromFileAfterDelete() throws Exception {
177    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
178    Path testDir = TEST_UTIL.getDataTestDir();
179    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
180    Configuration conf = TEST_UTIL.getConfiguration();
181    conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
182    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
183    BucketCache bucketCache = null;
184    try {
185      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
186        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
187        DEFAULT_ERROR_TOLERATION_DURATION, conf);
188      assertTrue(bucketCache.waitForCacheInitialization(10000));
189      long usedSize = bucketCache.getAllocator().getUsedSize();
190      assertEquals(0, usedSize);
191      CacheTestUtils.HFileBlockPair[] blocks =
192        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
193      // Add blocks
194      for (CacheTestUtils.HFileBlockPair block : blocks) {
195        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
196      }
197      usedSize = bucketCache.getAllocator().getUsedSize();
198      assertNotEquals(0, usedSize);
199      // Shutdown BucketCache
200      bucketCache.shutdown();
201      // Delete the persistence file
202      File mapFile = new File(mapFileName);
203      assertTrue(mapFile.delete());
204      Thread.sleep(350);
205      // Create BucketCache
206      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
207        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
208        DEFAULT_ERROR_TOLERATION_DURATION, conf);
209      assertTrue(bucketCache.waitForCacheInitialization(10000));
210      waitPersistentCacheValidation(conf, bucketCache);
211      assertEquals(0, bucketCache.getAllocator().getUsedSize());
212      assertEquals(0, bucketCache.backingMap.size());
213    } finally {
214      if (bucketCache != null) {
215        bucketCache.shutdown();
216      }
217    }
218  }
219
220  /**
221   * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache
222   * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache
223   * after modify cache file's data, and it can't restore cache from file, the cache file and
224   * persistence file would be deleted before BucketCache start normally.
225   * @throws Exception the exception
226   */
227  @TestTemplate
228  public void testModifiedBucketCacheFileData() throws Exception {
229    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
230    Path testDir = TEST_UTIL.getDataTestDir();
231    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
232
233    Configuration conf = HBaseConfiguration.create();
234    // Disables the persister thread by setting its interval to MAX_VALUE
235    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
236    BucketCache bucketCache = null;
237    try {
238      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
239        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
240        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
241      assertTrue(bucketCache.waitForCacheInitialization(10000));
242      long usedSize = bucketCache.getAllocator().getUsedSize();
243      assertEquals(0, usedSize);
244
245      CacheTestUtils.HFileBlockPair[] blocks =
246        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
247      // Add blocks
248      for (CacheTestUtils.HFileBlockPair block : blocks) {
249        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
250      }
251      usedSize = bucketCache.getAllocator().getUsedSize();
252      assertNotEquals(0, usedSize);
253      // persist cache to file
254      bucketCache.shutdown();
255
256      // modified bucket cache file
257      String file = testDir + "/bucket.cache";
258      try (BufferedWriter out =
259        new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) {
260        out.write("test bucket cache");
261      }
262      // can't restore cache from file
263      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
264        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
265        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
266      assertTrue(bucketCache.waitForCacheInitialization(10000));
267      waitPersistentCacheValidation(conf, bucketCache);
268      assertEquals(0, bucketCache.getAllocator().getUsedSize());
269      assertEquals(0, bucketCache.backingMap.size());
270    } finally {
271      if (bucketCache != null) {
272        bucketCache.shutdown();
273      }
274    }
275    TEST_UTIL.cleanupTestDir();
276  }
277
278  /**
279   * Test whether BucketCache is started normally after modifying the cache file's last modified
280   * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
281   * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has
282   * modified persistence cache such that now we store extra 8 bytes at the end of each block in the
283   * cache, representing the nanosecond time the block has been cached. So in the event the cache
284   * file has failed checksum verification during loading time, we go through all the cached blocks
285   * in the cache map and validate the cached time long between what is in the map and the cache
286   * file. If that check fails, we pull the cache key entry out of the map. Since in this test we
287   * are only modifying the access time to induce a checksum error, the cache file content is still
288   * valid and the extra verification should validate that all cache keys in the map are still
289   * recoverable from the cache.
290   * @throws Exception the exception
291   */
292  @TestTemplate
293  public void testModifiedBucketCacheFileTime() throws Exception {
294    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
295    Path testDir = TEST_UTIL.getDataTestDir();
296    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
297    Configuration conf = HBaseConfiguration.create();
298    // Disables the persister thread by setting its interval to MAX_VALUE
299    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
300    BucketCache bucketCache = null;
301    try {
302      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
303        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
304        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
305      assertTrue(bucketCache.waitForCacheInitialization(10000));
306      long usedSize = bucketCache.getAllocator().getUsedSize();
307      assertEquals(0, usedSize);
308
309      CacheTestUtils.HFileBlockPair[] blocks =
310        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
311      // Add blocks
312      for (CacheTestUtils.HFileBlockPair block : blocks) {
313        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
314      }
315      usedSize = bucketCache.getAllocator().getUsedSize();
316      assertNotEquals(0, usedSize);
317      long blockCount = bucketCache.backingMap.size();
318      assertNotEquals(0, blockCount);
319      // persist cache to file
320      bucketCache.shutdown();
321
322      // modified bucket cache file LastModifiedTime
323      final java.nio.file.Path file =
324        FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
325      Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000)));
326      // can't restore cache from file
327      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
328        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
329        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
330      assertTrue(bucketCache.waitForCacheInitialization(10000));
331      waitPersistentCacheValidation(conf, bucketCache);
332      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
333      assertEquals(blockCount, bucketCache.backingMap.size());
334    } finally {
335      if (bucketCache != null) {
336        bucketCache.shutdown();
337      }
338    }
339    TEST_UTIL.cleanupTestDir();
340  }
341
342  /**
343   * When using persistent bucket cache, there may be crashes between persisting the backing map and
344   * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache
345   * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the
346   * keys that are still valid do succeed in retrieve related block data from the cache without any
347   * corruption.
348   * @throws Exception the exception
349   */
350  @TestTemplate
351  public void testBucketCacheRecovery() throws Exception {
352    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
353    Path testDir = TEST_UTIL.getDataTestDir();
354    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
355    Configuration conf = HBaseConfiguration.create();
356    // Disables the persister thread by setting its interval to MAX_VALUE
357    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
358    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
359    BucketCache bucketCache = null;
360    BucketCache newBucketCache = null;
361    try {
362      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
363        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
364        DEFAULT_ERROR_TOLERATION_DURATION, conf);
365      assertTrue(bucketCache.waitForCacheInitialization(10000));
366
367      CacheTestUtils.HFileBlockPair[] blocks =
368        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
369      String[] names = CacheTestUtils.getHFileNames(blocks);
370      // Add three blocks
371      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
372      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
373      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
374      // saves the current state
375      bucketCache.persistToFile();
376      // evicts first block
377      bucketCache.evictBlock(blocks[0].getBlockName());
378
379      // now adds a fourth block to bucket cache
380      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
381      // Creates new bucket cache instance without persisting to file after evicting first block
382      // and caching fourth block. So the bucket cache file has only the last three blocks,
383      // but backing map (containing cache keys) was persisted when first three blocks
384      // were in the cache. So the state on this recovery is:
385      // - Backing map: [block0, block1, block2]
386      // - Cache: [block1, block2, block3]
387      // Therefore, this bucket cache would be able to recover only block1 and block2.
388      newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
389        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
390        DEFAULT_ERROR_TOLERATION_DURATION, conf);
391      assertTrue(newBucketCache.waitForCacheInitialization(10000));
392      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
393      assertNull(newBucketCache.getBlock(newKeys[0], false, false, false));
394      assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false));
395      assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false));
396      assertNull(newBucketCache.getBlock(newKeys[3], false, false, false));
397      assertEquals(2, newBucketCache.backingMap.size());
398    } finally {
399      if (newBucketCache == null && bucketCache != null) {
400        bucketCache.shutdown();
401      }
402      if (newBucketCache != null) {
403        newBucketCache.shutdown();
404      }
405      TEST_UTIL.cleanupTestDir();
406    }
407  }
408
409  @TestTemplate
410  public void testSingleChunk() throws Exception {
411    testChunkedBackingMapRecovery(5, 5);
412  }
413
414  @TestTemplate
415  public void testCompletelyFilledChunks() throws Exception {
416    // Test where the all the chunks are complete with chunkSize entries
417    testChunkedBackingMapRecovery(5, 10);
418  }
419
420  @TestTemplate
421  public void testPartiallyFilledChunks() throws Exception {
422    // Test where the last chunk is not completely filled.
423    testChunkedBackingMapRecovery(5, 13);
424  }
425
426  private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception {
427    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
428    Path testDir = TEST_UTIL.getDataTestDir();
429    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
430    Configuration conf = HBaseConfiguration.create();
431    conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);
432
433    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
434    BucketCache bucketCache = null;
435    BucketCache newBucketCache = null;
436    try {
437      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
438        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
439        DEFAULT_ERROR_TOLERATION_DURATION, conf);
440      assertTrue(bucketCache.waitForCacheInitialization(10000));
441
442      CacheTestUtils.HFileBlockPair[] blocks =
443        CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
444      String[] names = CacheTestUtils.getHFileNames(blocks);
445
446      for (int i = 0; i < numBlocks; i++) {
447        cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(),
448          blocks[i].getBlock());
449      }
450
451      // saves the current state
452      bucketCache.persistToFile();
453
454      // Create a new bucket which reads from persistence file.
455      newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
456        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
457        DEFAULT_ERROR_TOLERATION_DURATION, conf);
458      assertTrue(newBucketCache.waitForCacheInitialization(10000));
459
460      assertEquals(numBlocks, newBucketCache.backingMap.size());
461      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
462      for (int i = 0; i < numBlocks; i++) {
463        assertEquals(blocks[i].getBlock(),
464          newBucketCache.getBlock(newKeys[i], false, false, false));
465      }
466    } finally {
467      if (newBucketCache == null && bucketCache != null) {
468        bucketCache.shutdown();
469      }
470      if (newBucketCache != null) {
471        newBucketCache.shutdown();
472      }
473      TEST_UTIL.cleanupTestDir();
474    }
475  }
476
477  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
478    throws InterruptedException {
479    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
480      Thread.sleep(100);
481    }
482  }
483
484  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
485  // threads will flush it to the bucket and put reference entry in backingMap.
486  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
487    Cacheable block) throws InterruptedException {
488    cache.cacheBlock(cacheKey, block);
489    waitUntilFlushedToBucket(cache, cacheKey);
490  }
491
492  private void waitPersistentCacheValidation(Configuration config, final BucketCache bucketCache) {
493    Waiter.waitFor(config, 5000,
494      () -> bucketCache.getBackingMapValidated().get() && bucketCache.isCacheEnabled());
495  }
496}