001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotEquals;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027
028import java.io.BufferedWriter;
029import java.io.File;
030import java.io.FileOutputStream;
031import java.io.OutputStreamWriter;
032import java.nio.file.FileSystems;
033import java.nio.file.Files;
034import java.nio.file.attribute.FileTime;
035import java.time.Instant;
036import java.util.Arrays;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
044import org.apache.hadoop.hbase.io.hfile.CacheConfig;
045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
046import org.apache.hadoop.hbase.io.hfile.Cacheable;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.Pair;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.runner.RunWith;
054import org.junit.runners.Parameterized;
055
056/**
057 * Basic test for check file's integrity before start BucketCache in fileIOEngine
058 */
059@RunWith(Parameterized.class)
060@Category(SmallTests.class)
061public class TestVerifyBucketCacheFile {
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class);
065
066  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
067  public static Iterable<Object[]> data() {
068    return Arrays.asList(new Object[][] { { 8192, null },
069      { 16 * 1024,
070        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
071          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
072          128 * 1024 + 1024 } } });
073  }
074
075  @Parameterized.Parameter(0)
076  public int constructedBlockSize;
077
078  @Parameterized.Parameter(1)
079  public int[] constructedBlockSizes;
080
081  final long capacitySize = 32 * 1024 * 1024;
082  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
083  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
084
085  /**
086   * Test cache file or persistence file does not exist whether BucketCache starts normally (1)
087   * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
088   * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after
089   * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file
090   * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence
091   * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the
092   * cache file and persistence file would be deleted before BucketCache start normally.
093   * @throws Exception the exception
094   */
095  @Test
096  public void testRetrieveFromFile() throws Exception {
097    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
098    Path testDir = TEST_UTIL.getDataTestDir();
099    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
100
101    Configuration conf = HBaseConfiguration.create();
102    // Disables the persister thread by setting its interval to MAX_VALUE
103    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
104    BucketCache bucketCache = null;
105    BucketCache recoveredBucketCache = null;
106    try {
107      bucketCache =
108        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
109          constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
110      assertTrue(bucketCache.waitForCacheInitialization(10000));
111      long usedSize = bucketCache.getAllocator().getUsedSize();
112      assertEquals(0, usedSize);
113      CacheTestUtils.HFileBlockPair[] blocks =
114        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
115      String[] names = CacheTestUtils.getHFileNames(blocks);
116      // Add blocks
117      for (CacheTestUtils.HFileBlockPair block : blocks) {
118        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
119      }
120      usedSize = bucketCache.getAllocator().getUsedSize();
121      assertNotEquals(0, usedSize);
122      // 1.persist cache to file
123      bucketCache.shutdown();
124      // restore cache from file
125      bucketCache =
126        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
127          constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
128      assertTrue(bucketCache.waitForCacheInitialization(10000));
129      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
130      // persist cache to file
131      bucketCache.shutdown();
132
133      // 2.delete bucket cache file
134      final java.nio.file.Path cacheFile =
135        FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
136      assertTrue(Files.deleteIfExists(cacheFile));
137      // can't restore cache from file
138      recoveredBucketCache =
139        new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
140          constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
141      assertTrue(recoveredBucketCache.waitForCacheInitialization(10000));
142      assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
143      assertEquals(0, recoveredBucketCache.backingMap.size());
144      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
145      // Add blocks
146      for (int i = 0; i < blocks.length; i++) {
147        cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, newKeys[i], blocks[i].getBlock());
148      }
149      usedSize = recoveredBucketCache.getAllocator().getUsedSize();
150      assertNotEquals(0, usedSize);
151      // persist cache to file
152      recoveredBucketCache.shutdown();
153
154      // 3.delete backingMap persistence file
155      final java.nio.file.Path mapFile =
156        FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence");
157      assertTrue(Files.deleteIfExists(mapFile));
158      // can't restore cache from file
159      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
160        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
161        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
162      assertTrue(bucketCache.waitForCacheInitialization(10000));
163      waitPersistentCacheValidation(conf, bucketCache);
164      assertEquals(0, bucketCache.getAllocator().getUsedSize());
165      assertEquals(0, bucketCache.backingMap.size());
166    } finally {
167      if (recoveredBucketCache == null && bucketCache != null) {
168        bucketCache.shutdown();
169      }
170      if (recoveredBucketCache != null) {
171        recoveredBucketCache.shutdown();
172      }
173    }
174    TEST_UTIL.cleanupTestDir();
175  }
176
177  @Test
178  public void testRetrieveFromFileAfterDelete() throws Exception {
179    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
180    Path testDir = TEST_UTIL.getDataTestDir();
181    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
182    Configuration conf = TEST_UTIL.getConfiguration();
183    conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
184    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
185    BucketCache bucketCache = null;
186    try {
187      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
188        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
189        DEFAULT_ERROR_TOLERATION_DURATION, conf);
190      assertTrue(bucketCache.waitForCacheInitialization(10000));
191      long usedSize = bucketCache.getAllocator().getUsedSize();
192      assertEquals(0, usedSize);
193      CacheTestUtils.HFileBlockPair[] blocks =
194        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
195      // Add blocks
196      for (CacheTestUtils.HFileBlockPair block : blocks) {
197        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
198      }
199      usedSize = bucketCache.getAllocator().getUsedSize();
200      assertNotEquals(0, usedSize);
201      // Shutdown BucketCache
202      bucketCache.shutdown();
203      // Delete the persistence file
204      File mapFile = new File(mapFileName);
205      assertTrue(mapFile.delete());
206      Thread.sleep(350);
207      // Create BucketCache
208      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
209        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
210        DEFAULT_ERROR_TOLERATION_DURATION, conf);
211      assertTrue(bucketCache.waitForCacheInitialization(10000));
212      waitPersistentCacheValidation(conf, bucketCache);
213      assertEquals(0, bucketCache.getAllocator().getUsedSize());
214      assertEquals(0, bucketCache.backingMap.size());
215    } finally {
216      if (bucketCache != null) {
217        bucketCache.shutdown();
218      }
219    }
220  }
221
222  /**
223   * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache
224   * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache
225   * after modify cache file's data, and it can't restore cache from file, the cache file and
226   * persistence file would be deleted before BucketCache start normally.
227   * @throws Exception the exception
228   */
229  @Test
230  public void testModifiedBucketCacheFileData() throws Exception {
231    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
232    Path testDir = TEST_UTIL.getDataTestDir();
233    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
234
235    Configuration conf = HBaseConfiguration.create();
236    // Disables the persister thread by setting its interval to MAX_VALUE
237    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
238    BucketCache bucketCache = null;
239    try {
240      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
241        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
242        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
243      assertTrue(bucketCache.waitForCacheInitialization(10000));
244      long usedSize = bucketCache.getAllocator().getUsedSize();
245      assertEquals(0, usedSize);
246
247      CacheTestUtils.HFileBlockPair[] blocks =
248        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
249      // Add blocks
250      for (CacheTestUtils.HFileBlockPair block : blocks) {
251        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
252      }
253      usedSize = bucketCache.getAllocator().getUsedSize();
254      assertNotEquals(0, usedSize);
255      // persist cache to file
256      bucketCache.shutdown();
257
258      // modified bucket cache file
259      String file = testDir + "/bucket.cache";
260      try (BufferedWriter out =
261        new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) {
262        out.write("test bucket cache");
263      }
264      // can't restore cache from file
265      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
266        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
267        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
268      assertTrue(bucketCache.waitForCacheInitialization(10000));
269      waitPersistentCacheValidation(conf, bucketCache);
270      assertEquals(0, bucketCache.getAllocator().getUsedSize());
271      assertEquals(0, bucketCache.backingMap.size());
272    } finally {
273      if (bucketCache != null) {
274        bucketCache.shutdown();
275      }
276    }
277    TEST_UTIL.cleanupTestDir();
278  }
279
280  /**
281   * Test whether BucketCache is started normally after modifying the cache file's last modified
282   * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
283   * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has
284   * modified persistence cache such that now we store extra 8 bytes at the end of each block in the
285   * cache, representing the nanosecond time the block has been cached. So in the event the cache
286   * file has failed checksum verification during loading time, we go through all the cached blocks
287   * in the cache map and validate the cached time long between what is in the map and the cache
288   * file. If that check fails, we pull the cache key entry out of the map. Since in this test we
289   * are only modifying the access time to induce a checksum error, the cache file content is still
290   * valid and the extra verification should validate that all cache keys in the map are still
291   * recoverable from the cache.
292   * @throws Exception the exception
293   */
294  @Test
295  public void testModifiedBucketCacheFileTime() throws Exception {
296    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
297    Path testDir = TEST_UTIL.getDataTestDir();
298    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
299    Configuration conf = HBaseConfiguration.create();
300    // Disables the persister thread by setting its interval to MAX_VALUE
301    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
302    BucketCache bucketCache = null;
303    try {
304      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
305        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
306        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
307      assertTrue(bucketCache.waitForCacheInitialization(10000));
308      long usedSize = bucketCache.getAllocator().getUsedSize();
309      assertEquals(0, usedSize);
310
311      Pair<String, Long> myPair = new Pair<>();
312
313      CacheTestUtils.HFileBlockPair[] blocks =
314        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
315      // Add blocks
316      for (CacheTestUtils.HFileBlockPair block : blocks) {
317        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
318      }
319      usedSize = bucketCache.getAllocator().getUsedSize();
320      assertNotEquals(0, usedSize);
321      long blockCount = bucketCache.backingMap.size();
322      assertNotEquals(0, blockCount);
323      // persist cache to file
324      bucketCache.shutdown();
325
326      // modified bucket cache file LastModifiedTime
327      final java.nio.file.Path file =
328        FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
329      Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000)));
330      // can't restore cache from file
331      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
332        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
333        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
334      assertTrue(bucketCache.waitForCacheInitialization(10000));
335      waitPersistentCacheValidation(conf, bucketCache);
336      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
337      assertEquals(blockCount, bucketCache.backingMap.size());
338    } finally {
339      if (bucketCache != null) {
340        bucketCache.shutdown();
341      }
342    }
343    TEST_UTIL.cleanupTestDir();
344  }
345
346  /**
347   * When using persistent bucket cache, there may be crashes between persisting the backing map and
348   * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache
349   * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the
350   * keys that are still valid do succeed in retrieve related block data from the cache without any
351   * corruption.
352   * @throws Exception the exception
353   */
354  @Test
355  public void testBucketCacheRecovery() throws Exception {
356    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
357    Path testDir = TEST_UTIL.getDataTestDir();
358    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
359    Configuration conf = HBaseConfiguration.create();
360    // Disables the persister thread by setting its interval to MAX_VALUE
361    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
362    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
363    BucketCache bucketCache = null;
364    BucketCache newBucketCache = null;
365    try {
366      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
367        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
368        DEFAULT_ERROR_TOLERATION_DURATION, conf);
369      assertTrue(bucketCache.waitForCacheInitialization(10000));
370
371      CacheTestUtils.HFileBlockPair[] blocks =
372        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
373      String[] names = CacheTestUtils.getHFileNames(blocks);
374      // Add three blocks
375      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
376      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
377      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
378      // saves the current state
379      bucketCache.persistToFile();
380      // evicts first block
381      bucketCache.evictBlock(blocks[0].getBlockName());
382
383      // now adds a fourth block to bucket cache
384      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
385      // Creates new bucket cache instance without persisting to file after evicting first block
386      // and caching fourth block. So the bucket cache file has only the last three blocks,
387      // but backing map (containing cache keys) was persisted when first three blocks
388      // were in the cache. So the state on this recovery is:
389      // - Backing map: [block0, block1, block2]
390      // - Cache: [block1, block2, block3]
391      // Therefore, this bucket cache would be able to recover only block1 and block2.
392      newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
393        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
394        DEFAULT_ERROR_TOLERATION_DURATION, conf);
395      assertTrue(newBucketCache.waitForCacheInitialization(10000));
396      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
397      assertNull(newBucketCache.getBlock(newKeys[0], false, false, false));
398      assertEquals(blocks[1].getBlock(), newBucketCache.getBlock(newKeys[1], false, false, false));
399      assertEquals(blocks[2].getBlock(), newBucketCache.getBlock(newKeys[2], false, false, false));
400      assertNull(newBucketCache.getBlock(newKeys[3], false, false, false));
401      assertEquals(2, newBucketCache.backingMap.size());
402    } finally {
403      if (newBucketCache == null && bucketCache != null) {
404        bucketCache.shutdown();
405      }
406      if (newBucketCache != null) {
407        newBucketCache.shutdown();
408      }
409      TEST_UTIL.cleanupTestDir();
410    }
411  }
412
413  @Test
414  public void testSingleChunk() throws Exception {
415    testChunkedBackingMapRecovery(5, 5);
416  }
417
418  @Test
419  public void testCompletelyFilledChunks() throws Exception {
420    // Test where the all the chunks are complete with chunkSize entries
421    testChunkedBackingMapRecovery(5, 10);
422  }
423
424  @Test
425  public void testPartiallyFilledChunks() throws Exception {
426    // Test where the last chunk is not completely filled.
427    testChunkedBackingMapRecovery(5, 13);
428  }
429
430  private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception {
431    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
432    Path testDir = TEST_UTIL.getDataTestDir();
433    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
434    Configuration conf = HBaseConfiguration.create();
435    conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);
436
437    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
438    BucketCache bucketCache = null;
439    BucketCache newBucketCache = null;
440    try {
441      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
442        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
443        DEFAULT_ERROR_TOLERATION_DURATION, conf);
444      assertTrue(bucketCache.waitForCacheInitialization(10000));
445
446      CacheTestUtils.HFileBlockPair[] blocks =
447        CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
448      String[] names = CacheTestUtils.getHFileNames(blocks);
449
450      for (int i = 0; i < numBlocks; i++) {
451        cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(),
452          blocks[i].getBlock());
453      }
454
455      // saves the current state
456      bucketCache.persistToFile();
457
458      // Create a new bucket which reads from persistence file.
459      newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
460        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
461        DEFAULT_ERROR_TOLERATION_DURATION, conf);
462      assertTrue(newBucketCache.waitForCacheInitialization(10000));
463
464      assertEquals(numBlocks, newBucketCache.backingMap.size());
465      BlockCacheKey[] newKeys = CacheTestUtils.regenerateKeys(blocks, names);
466      for (int i = 0; i < numBlocks; i++) {
467        assertEquals(blocks[i].getBlock(),
468          newBucketCache.getBlock(newKeys[i], false, false, false));
469      }
470    } finally {
471      if (newBucketCache == null && bucketCache != null) {
472        bucketCache.shutdown();
473      }
474      if (newBucketCache != null) {
475        newBucketCache.shutdown();
476      }
477      TEST_UTIL.cleanupTestDir();
478    }
479  }
480
481  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
482    throws InterruptedException {
483    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
484      Thread.sleep(100);
485    }
486  }
487
488  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
489  // threads will flush it to the bucket and put reference entry in backingMap.
490  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
491    Cacheable block) throws InterruptedException {
492    cache.cacheBlock(cacheKey, block);
493    waitUntilFlushedToBucket(cache, cacheKey);
494  }
495
496  private void waitPersistentCacheValidation(Configuration config, final BucketCache bucketCache) {
497    Waiter.waitFor(config, 5000, () -> bucketCache.getBackingMapValidated().get());
498  }
499}