001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotEquals;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.BufferedWriter;
028import java.io.File;
029import java.io.FileOutputStream;
030import java.io.OutputStreamWriter;
031import java.nio.file.FileSystems;
032import java.nio.file.Files;
033import java.nio.file.attribute.FileTime;
034import java.time.Instant;
035import java.util.Arrays;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
042import org.apache.hadoop.hbase.io.hfile.CacheConfig;
043import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
044import org.apache.hadoop.hbase.io.hfile.Cacheable;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.Pair;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.runner.RunWith;
052import org.junit.runners.Parameterized;
053
054/**
055 * Basic test for check file's integrity before start BucketCache in fileIOEngine
056 */
057@RunWith(Parameterized.class)
058@Category(SmallTests.class)
059public class TestVerifyBucketCacheFile {
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class);
063
064  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
065  public static Iterable<Object[]> data() {
066    return Arrays.asList(new Object[][] { { 8192, null },
067      { 16 * 1024,
068        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
069          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
070          128 * 1024 + 1024 } } });
071  }
072
073  @Parameterized.Parameter(0)
074  public int constructedBlockSize;
075
076  @Parameterized.Parameter(1)
077  public int[] constructedBlockSizes;
078
079  final long capacitySize = 32 * 1024 * 1024;
080  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
081  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
082
083  /**
084   * Test cache file or persistence file does not exist whether BucketCache starts normally (1)
085   * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
086   * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after
087   * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file
088   * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence
089   * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the
090   * cache file and persistence file would be deleted before BucketCache start normally.
091   * @throws Exception the exception
092   */
093  @Test
094  public void testRetrieveFromFile() throws Exception {
095    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
096    Path testDir = TEST_UTIL.getDataTestDir();
097    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
098
099    BucketCache bucketCache =
100      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
101        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
102    long usedSize = bucketCache.getAllocator().getUsedSize();
103    assertEquals(0, usedSize);
104    CacheTestUtils.HFileBlockPair[] blocks =
105      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
106    // Add blocks
107    for (CacheTestUtils.HFileBlockPair block : blocks) {
108      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
109    }
110    usedSize = bucketCache.getAllocator().getUsedSize();
111    assertNotEquals(0, usedSize);
112    // 1.persist cache to file
113    bucketCache.shutdown();
114    // restore cache from file
115    bucketCache =
116      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
117        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
118    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
119    // persist cache to file
120    bucketCache.shutdown();
121
122    // 2.delete bucket cache file
123    final java.nio.file.Path cacheFile =
124      FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
125    assertTrue(Files.deleteIfExists(cacheFile));
126    // can't restore cache from file
127    bucketCache =
128      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
129        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
130    Thread.sleep(100);
131    assertEquals(0, bucketCache.getAllocator().getUsedSize());
132    assertEquals(0, bucketCache.backingMap.size());
133    // Add blocks
134    for (CacheTestUtils.HFileBlockPair block : blocks) {
135      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
136    }
137    usedSize = bucketCache.getAllocator().getUsedSize();
138    assertNotEquals(0, usedSize);
139    // persist cache to file
140    bucketCache.shutdown();
141
142    // 3.delete backingMap persistence file
143    final java.nio.file.Path mapFile =
144      FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence");
145    assertTrue(Files.deleteIfExists(mapFile));
146    // can't restore cache from file
147    bucketCache =
148      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
149        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
150    Thread.sleep(100);
151    assertEquals(0, bucketCache.getAllocator().getUsedSize());
152    assertEquals(0, bucketCache.backingMap.size());
153
154    TEST_UTIL.cleanupTestDir();
155  }
156
157  @Test
158  public void testRetrieveFromFileAfterDelete() throws Exception {
159    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
160    Path testDir = TEST_UTIL.getDataTestDir();
161    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
162    Configuration conf = TEST_UTIL.getConfiguration();
163    conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
164    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
165    BucketCache bucketCache =
166      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
167        constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf);
168
169    long usedSize = bucketCache.getAllocator().getUsedSize();
170    assertEquals(0, usedSize);
171    CacheTestUtils.HFileBlockPair[] blocks =
172      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
173    // Add blocks
174    for (CacheTestUtils.HFileBlockPair block : blocks) {
175      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
176    }
177    usedSize = bucketCache.getAllocator().getUsedSize();
178    assertNotEquals(0, usedSize);
179    // Shutdown BucketCache
180    bucketCache.shutdown();
181    // Delete the persistence file
182    File mapFile = new File(mapFileName);
183    assertTrue(mapFile.delete());
184    Thread.sleep(350);
185    // Create BucketCache
186    bucketCache =
187      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
188        constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf);
189    assertEquals(0, bucketCache.getAllocator().getUsedSize());
190    assertEquals(0, bucketCache.backingMap.size());
191  }
192
193  /**
194   * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache
195   * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache
196   * after modify cache file's data, and it can't restore cache from file, the cache file and
197   * persistence file would be deleted before BucketCache start normally.
198   * @throws Exception the exception
199   */
200  @Test
201  public void testModifiedBucketCacheFileData() throws Exception {
202    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
203    Path testDir = TEST_UTIL.getDataTestDir();
204    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
205
206    Configuration conf = HBaseConfiguration.create();
207    // Disables the persister thread by setting its interval to MAX_VALUE
208    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
209    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
210      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
211      testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
212    long usedSize = bucketCache.getAllocator().getUsedSize();
213    assertEquals(0, usedSize);
214
215    CacheTestUtils.HFileBlockPair[] blocks =
216      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
217    // Add blocks
218    for (CacheTestUtils.HFileBlockPair block : blocks) {
219      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
220    }
221    usedSize = bucketCache.getAllocator().getUsedSize();
222    assertNotEquals(0, usedSize);
223    // persist cache to file
224    bucketCache.shutdown();
225
226    // modified bucket cache file
227    String file = testDir + "/bucket.cache";
228    try (BufferedWriter out =
229      new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) {
230      out.write("test bucket cache");
231    }
232    // can't restore cache from file
233    bucketCache =
234      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
235        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
236    Thread.sleep(100);
237    assertEquals(0, bucketCache.getAllocator().getUsedSize());
238    assertEquals(0, bucketCache.backingMap.size());
239
240    TEST_UTIL.cleanupTestDir();
241  }
242
243  /**
244   * Test whether BucketCache is started normally after modifying the cache file's last modified
245   * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
246   * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has
247   * modified persistence cache such that now we store extra 8 bytes at the end of each block in the
248   * cache, representing the nanosecond time the block has been cached. So in the event the cache
249   * file has failed checksum verification during loading time, we go through all the cached blocks
250   * in the cache map and validate the cached time long between what is in the map and the cache
251   * file. If that check fails, we pull the cache key entry out of the map. Since in this test we
252   * are only modifying the access time to induce a checksum error, the cache file content is still
253   * valid and the extra verification should validate that all cache keys in the map are still
254   * recoverable from the cache.
255   * @throws Exception the exception
256   */
257  @Test
258  public void testModifiedBucketCacheFileTime() throws Exception {
259    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
260    Path testDir = TEST_UTIL.getDataTestDir();
261    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
262
263    BucketCache bucketCache =
264      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
265        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
266    long usedSize = bucketCache.getAllocator().getUsedSize();
267    assertEquals(0, usedSize);
268
269    Pair<String, Long> myPair = new Pair<>();
270
271    CacheTestUtils.HFileBlockPair[] blocks =
272      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
273    // Add blocks
274    for (CacheTestUtils.HFileBlockPair block : blocks) {
275      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
276    }
277    usedSize = bucketCache.getAllocator().getUsedSize();
278    assertNotEquals(0, usedSize);
279    long blockCount = bucketCache.backingMap.size();
280    assertNotEquals(0, blockCount);
281    // persist cache to file
282    bucketCache.shutdown();
283
284    // modified bucket cache file LastModifiedTime
285    final java.nio.file.Path file =
286      FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
287    Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000)));
288    // can't restore cache from file
289    bucketCache =
290      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
291        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
292    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
293    assertEquals(blockCount, bucketCache.backingMap.size());
294
295    TEST_UTIL.cleanupTestDir();
296  }
297
298  /**
299   * When using persistent bucket cache, there may be crashes between persisting the backing map and
300   * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache
301   * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the
302   * keys that are still valid do succeed in retrieve related block data from the cache without any
303   * corruption.
304   * @throws Exception the exception
305   */
306  @Test
307  public void testBucketCacheRecovery() throws Exception {
308    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
309    Path testDir = TEST_UTIL.getDataTestDir();
310    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
311    Configuration conf = HBaseConfiguration.create();
312    // Disables the persister thread by setting its interval to MAX_VALUE
313    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
314    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
315    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
316      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
317      DEFAULT_ERROR_TOLERATION_DURATION, conf);
318
319    CacheTestUtils.HFileBlockPair[] blocks =
320      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
321    // Add three blocks
322    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
323    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
324    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
325    // saves the current state
326    bucketCache.persistToFile();
327    // evicts first block
328    bucketCache.evictBlock(blocks[0].getBlockName());
329
330    // now adds a fourth block to bucket cache
331    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
332    // Creates new bucket cache instance without persisting to file after evicting first block
333    // and caching fourth block. So the bucket cache file has only the last three blocks,
334    // but backing map (containing cache keys) was persisted when first three blocks
335    // were in the cache. So the state on this recovery is:
336    // - Backing map: [block0, block1, block2]
337    // - Cache: [block1, block2, block3]
338    // Therefore, this bucket cache would be able to recover only block1 and block2.
339    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
340      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
341      DEFAULT_ERROR_TOLERATION_DURATION, conf);
342
343    assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false));
344    assertEquals(blocks[1].getBlock(),
345      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
346    assertEquals(blocks[2].getBlock(),
347      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
348    assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
349    assertEquals(2, newBucketCache.backingMap.size());
350    TEST_UTIL.cleanupTestDir();
351  }
352
353  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
354    throws InterruptedException {
355    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
356      Thread.sleep(100);
357    }
358  }
359
360  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
361  // threads will flush it to the bucket and put reference entry in backingMap.
362  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
363    Cacheable block) throws InterruptedException {
364    cache.cacheBlock(cacheKey, block);
365    waitUntilFlushedToBucket(cache, cacheKey);
366  }
367}