001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotEquals;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027
028import java.io.BufferedWriter;
029import java.io.File;
030import java.io.FileOutputStream;
031import java.io.OutputStreamWriter;
032import java.nio.file.FileSystems;
033import java.nio.file.Files;
034import java.nio.file.attribute.FileTime;
035import java.time.Instant;
036import java.util.Arrays;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
044import org.apache.hadoop.hbase.io.hfile.CacheConfig;
045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
046import org.apache.hadoop.hbase.io.hfile.Cacheable;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.runner.RunWith;
053import org.junit.runners.Parameterized;
054
055/**
056 * Basic test for check file's integrity before start BucketCache in fileIOEngine
057 */
058@RunWith(Parameterized.class)
059@Category(SmallTests.class)
060public class TestVerifyBucketCacheFile {
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class);
064
065  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
066  public static Iterable<Object[]> data() {
067    return Arrays.asList(new Object[][] { { 8192, null },
068      { 16 * 1024,
069        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
070          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
071          128 * 1024 + 1024 } } });
072  }
073
074  @Parameterized.Parameter(0)
075  public int constructedBlockSize;
076
077  @Parameterized.Parameter(1)
078  public int[] constructedBlockSizes;
079
080  final long capacitySize = 32 * 1024 * 1024;
081  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
082  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
083
084  /**
085   * Test cache file or persistence file does not exist whether BucketCache starts normally (1)
086   * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
087   * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after
088   * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file
089   * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence
090   * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the
091   * cache file and persistence file would be deleted before BucketCache start normally.
092   * @throws Exception the exception
093   */
094  @Test
095  public void testRetrieveFromFile() throws Exception {
096    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
097    Path testDir = TEST_UTIL.getDataTestDir();
098    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
099
100    Configuration conf = HBaseConfiguration.create();
101    // Disables the persister thread by setting its interval to MAX_VALUE
102    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
103    BucketCache bucketCache = null;
104    BucketCache recoveredBucketCache = null;
105    try {
106      bucketCache =
107        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
108          constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
109      assertTrue(bucketCache.waitForCacheInitialization(10000));
110      long usedSize = bucketCache.getAllocator().getUsedSize();
111      assertEquals(0, usedSize);
112      CacheTestUtils.HFileBlockPair[] blocks =
113        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
114      String[] names = CacheTestUtils.getHFileNames(blocks);
115      // Add blocks
116      for (CacheTestUtils.HFileBlockPair block : blocks) {
117        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
118      }
119      usedSize = bucketCache.getAllocator().getUsedSize();
120      assertNotEquals(0, usedSize);
121      // 1.persist cache to file
122      bucketCache.shutdown();
123      // restore cache from file
124      bucketCache =
125        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
126          constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
127      assertTrue(bucketCache.waitForCacheInitialization(10000));
128      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
129      // persist cache to file
130      bucketCache.shutdown();
131
132      // 2.delete bucket cache file
133      final java.nio.file.Path cacheFile =
134        FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
135      assertTrue(Files.deleteIfExists(cacheFile));
136      // can't restore cache from file
137      recoveredBucketCache =
138        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
139          constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
140      assertTrue(recoveredBucketCache.waitForCacheInitialization(10000));
141      waitPersistentCacheValidation(conf, recoveredBucketCache);
142      assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
143      assertEquals(0, recoveredBucketCache.backingMap.size());
144      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
145      // Add blocks
146      for (int i = 0; i < blocks.length; i++) {
147        cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, newKeys[i], blocks[i].getBlock());
148      }
149      usedSize = recoveredBucketCache.getAllocator().getUsedSize();
150      assertNotEquals(0, usedSize);
151      // persist cache to file
152      recoveredBucketCache.shutdown();
153
154      // 3.delete backingMap persistence file
155      final java.nio.file.Path mapFile =
156        FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence");
157      assertTrue(Files.deleteIfExists(mapFile));
158      // can't restore cache from file
159      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
160        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
161        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
162      assertTrue(bucketCache.waitForCacheInitialization(10000));
163      waitPersistentCacheValidation(conf, bucketCache);
164      assertEquals(0, bucketCache.getAllocator().getUsedSize());
165      assertEquals(0, bucketCache.backingMap.size());
166    } finally {
167      if (recoveredBucketCache == null && bucketCache != null) {
168        bucketCache.shutdown();
169      }
170      if (recoveredBucketCache != null) {
171        recoveredBucketCache.shutdown();
172      }
173    }
174    TEST_UTIL.cleanupTestDir();
175  }
176
177  @Test
178  public void testRetrieveFromFileAfterDelete() throws Exception {
179    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
180    Path testDir = TEST_UTIL.getDataTestDir();
181    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
182    Configuration conf = TEST_UTIL.getConfiguration();
183    conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
184    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
185    BucketCache bucketCache = null;
186    try {
187      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
188        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
189        DEFAULT_ERROR_TOLERATION_DURATION, conf);
190      assertTrue(bucketCache.waitForCacheInitialization(10000));
191      long usedSize = bucketCache.getAllocator().getUsedSize();
192      assertEquals(0, usedSize);
193      CacheTestUtils.HFileBlockPair[] blocks =
194        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
195      // Add blocks
196      for (CacheTestUtils.HFileBlockPair block : blocks) {
197        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
198      }
199      usedSize = bucketCache.getAllocator().getUsedSize();
200      assertNotEquals(0, usedSize);
201      // Shutdown BucketCache
202      bucketCache.shutdown();
203      // Delete the persistence file
204      File mapFile = new File(mapFileName);
205      assertTrue(mapFile.delete());
206      Thread.sleep(350);
207      // Create BucketCache
208      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
209        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
210        DEFAULT_ERROR_TOLERATION_DURATION, conf);
211      assertTrue(bucketCache.waitForCacheInitialization(10000));
212      waitPersistentCacheValidation(conf, bucketCache);
213      assertEquals(0, bucketCache.getAllocator().getUsedSize());
214      assertEquals(0, bucketCache.backingMap.size());
215    } finally {
216      if (bucketCache != null) {
217        bucketCache.shutdown();
218      }
219    }
220  }
221
222  /**
223   * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache
224   * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache
225   * after modify cache file's data, and it can't restore cache from file, the cache file and
226   * persistence file would be deleted before BucketCache start normally.
227   * @throws Exception the exception
228   */
229  @Test
230  public void testModifiedBucketCacheFileData() throws Exception {
231    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
232    Path testDir = TEST_UTIL.getDataTestDir();
233    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
234
235    Configuration conf = HBaseConfiguration.create();
236    // Disables the persister thread by setting its interval to MAX_VALUE
237    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
238    BucketCache bucketCache = null;
239    try {
240      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
241        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
242        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
243      assertTrue(bucketCache.waitForCacheInitialization(10000));
244      long usedSize = bucketCache.getAllocator().getUsedSize();
245      assertEquals(0, usedSize);
246
247      CacheTestUtils.HFileBlockPair[] blocks =
248        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
249      // Add blocks
250      for (CacheTestUtils.HFileBlockPair block : blocks) {
251        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
252      }
253      usedSize = bucketCache.getAllocator().getUsedSize();
254      assertNotEquals(0, usedSize);
255      // persist cache to file
256      bucketCache.shutdown();
257
258      // modified bucket cache file
259      String file = testDir + "/bucket.cache";
260      try (BufferedWriter out =
261        new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) {
262        out.write("test bucket cache");
263      }
264      // can't restore cache from file
265      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
266        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
267        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
268      assertTrue(bucketCache.waitForCacheInitialization(10000));
269      waitPersistentCacheValidation(conf, bucketCache);
270      assertEquals(0, bucketCache.getAllocator().getUsedSize());
271      assertEquals(0, bucketCache.backingMap.size());
272    } finally {
273      if (bucketCache != null) {
274        bucketCache.shutdown();
275      }
276    }
277    TEST_UTIL.cleanupTestDir();
278  }
279
280  /**
281   * Test whether BucketCache is started normally after modifying the cache file's last modified
282   * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
283   * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has
284   * modified persistence cache such that now we store extra 8 bytes at the end of each block in the
285   * cache, representing the nanosecond time the block has been cached. So in the event the cache
286   * file has failed checksum verification during loading time, we go through all the cached blocks
287   * in the cache map and validate the cached time long between what is in the map and the cache
288   * file. If that check fails, we pull the cache key entry out of the map. Since in this test we
289   * are only modifying the access time to induce a checksum error, the cache file content is still
290   * valid and the extra verification should validate that all cache keys in the map are still
291   * recoverable from the cache.
292   * @throws Exception the exception
293   */
294  @Test
295  public void testModifiedBucketCacheFileTime() throws Exception {
296    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
297    Path testDir = TEST_UTIL.getDataTestDir();
298    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
299    Configuration conf = HBaseConfiguration.create();
300    // Disables the persister thread by setting its interval to MAX_VALUE
301    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
302    BucketCache bucketCache = null;
303    try {
304      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
305        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
306        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
307      assertTrue(bucketCache.waitForCacheInitialization(10000));
308      long usedSize = bucketCache.getAllocator().getUsedSize();
309      assertEquals(0, usedSize);
310
311      CacheTestUtils.HFileBlockPair[] blocks =
312        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
313      // Add blocks
314      for (CacheTestUtils.HFileBlockPair block : blocks) {
315        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
316      }
317      usedSize = bucketCache.getAllocator().getUsedSize();
318      assertNotEquals(0, usedSize);
319      long blockCount = bucketCache.backingMap.size();
320      assertNotEquals(0, blockCount);
321      // persist cache to file
322      bucketCache.shutdown();
323
324      // modified bucket cache file LastModifiedTime
325      final java.nio.file.Path file =
326        FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
327      Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000)));
328      // can't restore cache from file
329      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
330        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
331        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
332      assertTrue(bucketCache.waitForCacheInitialization(10000));
333      waitPersistentCacheValidation(conf, bucketCache);
334      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
335      assertEquals(blockCount, bucketCache.backingMap.size());
336    } finally {
337      if (bucketCache != null) {
338        bucketCache.shutdown();
339      }
340    }
341    TEST_UTIL.cleanupTestDir();
342  }
343
344  /**
345   * When using persistent bucket cache, there may be crashes between persisting the backing map and
346   * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache
347   * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the
348   * keys that are still valid do succeed in retrieve related block data from the cache without any
349   * corruption.
350   * @throws Exception the exception
351   */
352  @Test
353  public void testBucketCacheRecovery() throws Exception {
354    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
355    Path testDir = TEST_UTIL.getDataTestDir();
356    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
357    Configuration conf = HBaseConfiguration.create();
358    // Disables the persister thread by setting its interval to MAX_VALUE
359    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
360    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
361    BucketCache bucketCache = null;
362    BucketCache newBucketCache = null;
363    try {
364      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
365        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
366        DEFAULT_ERROR_TOLERATION_DURATION, conf);
367      assertTrue(bucketCache.waitForCacheInitialization(10000));
368
369      CacheTestUtils.HFileBlockPair[] blocks =
370        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
371      String[] names = CacheTestUtils.getHFileNames(blocks);
372      // Add three blocks
373      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
374      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
375      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
376      // saves the current state
377      bucketCache.persistToFile();
378      // evicts first block
379      bucketCache.evictBlock(blocks[0].getBlockName());
380
381      // now adds a fourth block to bucket cache
382      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
383      // Creates new bucket cache instance without persisting to file after evicting first block
384      // and caching fourth block. So the bucket cache file has only the last three blocks,
385      // but backing map (containing cache keys) was persisted when first three blocks
386      // were in the cache. So the state on this recovery is:
387      // - Backing map: [block0, block1, block2]
388      // - Cache: [block1, block2, block3]
389      // Therefore, this bucket cache would be able to recover only block1 and block2.
390      newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
391        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
392        DEFAULT_ERROR_TOLERATION_DURATION, conf);
393      assertTrue(newBucketCache.waitForCacheInitialization(10000));
394      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
395      assertNull(newBucketCache.getBlock(newKeys[0], false, false, false));
396      assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false));
397      assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false));
398      assertNull(newBucketCache.getBlock(newKeys[3], false, false, false));
399      assertEquals(2, newBucketCache.backingMap.size());
400    } finally {
401      if (newBucketCache == null && bucketCache != null) {
402        bucketCache.shutdown();
403      }
404      if (newBucketCache != null) {
405        newBucketCache.shutdown();
406      }
407      TEST_UTIL.cleanupTestDir();
408    }
409  }
410
411  @Test
412  public void testSingleChunk() throws Exception {
413    testChunkedBackingMapRecovery(5, 5);
414  }
415
416  @Test
417  public void testCompletelyFilledChunks() throws Exception {
418    // Test where the all the chunks are complete with chunkSize entries
419    testChunkedBackingMapRecovery(5, 10);
420  }
421
422  @Test
423  public void testPartiallyFilledChunks() throws Exception {
424    // Test where the last chunk is not completely filled.
425    testChunkedBackingMapRecovery(5, 13);
426  }
427
428  private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception {
429    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
430    Path testDir = TEST_UTIL.getDataTestDir();
431    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
432    Configuration conf = HBaseConfiguration.create();
433    conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);
434
435    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
436    BucketCache bucketCache = null;
437    BucketCache newBucketCache = null;
438    try {
439      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
440        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
441        DEFAULT_ERROR_TOLERATION_DURATION, conf);
442      assertTrue(bucketCache.waitForCacheInitialization(10000));
443
444      CacheTestUtils.HFileBlockPair[] blocks =
445        CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
446      String[] names = CacheTestUtils.getHFileNames(blocks);
447
448      for (int i = 0; i < numBlocks; i++) {
449        cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(),
450          blocks[i].getBlock());
451      }
452
453      // saves the current state
454      bucketCache.persistToFile();
455
456      // Create a new bucket which reads from persistence file.
457      newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
458        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
459        DEFAULT_ERROR_TOLERATION_DURATION, conf);
460      assertTrue(newBucketCache.waitForCacheInitialization(10000));
461
462      assertEquals(numBlocks, newBucketCache.backingMap.size());
463      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
464      for (int i = 0; i < numBlocks; i++) {
465        assertEquals(blocks[i].getBlock(),
466          newBucketCache.getBlock(newKeys[i], false, false, false));
467      }
468    } finally {
469      if (newBucketCache == null && bucketCache != null) {
470        bucketCache.shutdown();
471      }
472      if (newBucketCache != null) {
473        newBucketCache.shutdown();
474      }
475      TEST_UTIL.cleanupTestDir();
476    }
477  }
478
479  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
480    throws InterruptedException {
481    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
482      Thread.sleep(100);
483    }
484  }
485
486  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
487  // threads will flush it to the bucket and put reference entry in backingMap.
488  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
489    Cacheable block) throws InterruptedException {
490    cache.cacheBlock(cacheKey, block);
491    waitUntilFlushedToBucket(cache, cacheKey);
492  }
493
494  private void waitPersistentCacheValidation(Configuration config, final BucketCache bucketCache) {
495    Waiter.waitFor(config, 5000,
496      () -> bucketCache.getBackingMapValidated().get() && bucketCache.isCacheEnabled());
497  }
498}