001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertNotEquals;
026import static org.junit.jupiter.api.Assertions.assertNotNull;
027import static org.junit.jupiter.api.Assertions.assertNull;
028import static org.junit.jupiter.api.Assertions.assertTrue;
029import static org.junit.jupiter.api.Assertions.fail;
030
031import java.io.File;
032import java.io.IOException;
033import java.util.Map;
034import java.util.Random;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.function.BiConsumer;
038import java.util.function.BiFunction;
039import org.apache.commons.lang3.mutable.MutableLong;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Waiter;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionInfoBuilder;
051import org.apache.hadoop.hbase.fs.HFileSystem;
052import org.apache.hadoop.hbase.io.ByteBuffAllocator;
053import org.apache.hadoop.hbase.io.HFileLink;
054import org.apache.hadoop.hbase.io.hfile.BlockCache;
055import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
056import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
057import org.apache.hadoop.hbase.io.hfile.BlockType;
058import org.apache.hadoop.hbase.io.hfile.CacheConfig;
059import org.apache.hadoop.hbase.io.hfile.Cacheable;
060import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
061import org.apache.hadoop.hbase.io.hfile.HFile;
062import org.apache.hadoop.hbase.io.hfile.HFileBlock;
063import org.apache.hadoop.hbase.io.hfile.HFileContext;
064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
065import org.apache.hadoop.hbase.io.hfile.HFileScanner;
066import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
067import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
068import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
069import org.apache.hadoop.hbase.regionserver.BloomType;
070import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
071import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
072import org.apache.hadoop.hbase.regionserver.HStoreFile;
073import org.apache.hadoop.hbase.regionserver.StoreContext;
074import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
075import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
076import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
077import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
078import org.apache.hadoop.hbase.testclassification.IOTests;
079import org.apache.hadoop.hbase.testclassification.MediumTests;
080import org.apache.hadoop.hbase.util.Bytes;
081import org.apache.hadoop.hbase.util.CommonFSUtils;
082import org.junit.jupiter.api.AfterEach;
083import org.junit.jupiter.api.BeforeEach;
084import org.junit.jupiter.api.Tag;
085import org.junit.jupiter.api.Test;
086import org.junit.jupiter.api.TestInfo;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
091
092@Tag(IOTests.TAG)
093@Tag(MediumTests.TAG)
094public class TestPrefetchWithBucketCache {
095
096  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class);
097
098  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
099
100  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
101  private static final int DATA_BLOCK_SIZE = 2048;
102  private Configuration conf;
103  private CacheConfig cacheConf;
104  private FileSystem fs;
105  private BlockCache blockCache;
106
107  @BeforeEach
108  public void setUp(TestInfo testInfo) throws IOException {
109    conf = TEST_UTIL.getConfiguration();
110    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
111    fs = HFileSystem.get(conf);
112    File testDir = new File(testInfo.getTestMethod().get().getName());
113    testDir.mkdir();
114    conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache");
115  }
116
117  @AfterEach
118  public void tearDown(TestInfo testInfo) {
119    File cacheFile = new File(testInfo.getTestMethod().get().getName() + "/bucket.cache");
120    File dir = new File(testInfo.getTestMethod().get().getName());
121    cacheFile.delete();
122    dir.delete();
123  }
124
125  @Test
126  public void testPrefetchDoesntOverwork() throws Exception {
127    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
128    blockCache = BlockCacheFactory.createBlockCache(conf);
129    cacheConf = new CacheConfig(conf, blockCache);
130    Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100);
131    // Prefetches the file blocks
132    LOG.debug("First read should prefetch the blocks.");
133    readStoreFile(storeFile);
134    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
135    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
136    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
137    Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap());
138    LruBlockCache l1 = (LruBlockCache) ((CombinedBlockCache) blockCache).getFirstLevelCache();
139    assertEquals(1, l1.getBlockCount());
140    // Removes the meta block from L1 cache
141    l1.clearCache();
142    // Reads file again. Checks we are not prefetching data blocks again,
143    // but fetch back the meta block
144    LOG.debug("Second read, prefetch should run, without altering bucket cache state,"
145      + " only the meta block should be fetched again.");
146    readStoreFile(storeFile);
147    // Makes sure the bucketcache entries have not changed
148    snapshot.entrySet().forEach(e -> {
149      BucketEntry entry = bc.getBackingMap().get(e.getKey());
150      assertNotNull(entry);
151      assertEquals(e.getValue().getCachedTime(), entry.getCachedTime());
152    });
153    assertEquals(1, l1.getBlockCount());
154    // forcibly removes first block from the bc backing map, in order to cause it to be cached again
155    BlockCacheKey key = snapshot.keySet().stream().findFirst().get();
156    LOG.debug("removing block {}", key);
157    bc.getBackingMap().remove(key);
158    bc.getFullyCachedFiles().get().remove(storeFile.getName());
159    assertTrue(snapshot.size() > bc.getBackingMap().size());
160    LOG.debug("Third read should prefetch again, as we removed one block for the file.");
161    readStoreFile(storeFile);
162    Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size());
163    assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime());
164  }
165
166  @Test
167  public void testPrefetchRefsAfterSplit() throws Exception {
168    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
169    blockCache = BlockCacheFactory.createBlockCache(conf);
170    cacheConf = new CacheConfig(conf, blockCache);
171
172    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit");
173    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
174    Path regionDir = new Path(tableDir, region.getEncodedName());
175    Path cfDir = new Path(regionDir, "cf");
176    HRegionFileSystem regionFS =
177      HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
178    Path storeFile = writeStoreFile(100, cfDir);
179    StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true,
180      StoreContext.getBuilder().withRegionFileSystem(regionFS).withFamilyStoreDirectoryPath(cfDir)
181        .withCacheConfig(cacheConf).build());
182    // Prefetches the file blocks
183    LOG.debug("First read should prefetch the blocks.");
184    readStoreFile(storeFile);
185    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
186    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
187    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
188
189    // split the file and return references to the original file
190    Random rand = ThreadLocalRandom.current();
191    byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50);
192    HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft);
193    Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false,
194      new ConstantSizeRegionSplitPolicy(), sft).getPath();
195    HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft);
196    // starts reader for the ref. The ref should resolve to the original file blocks
197    // and not duplicate blocks in the cache.
198    refHsf.initReader();
199    HFile.Reader reader = refHsf.getReader().getHFileReader();
200    while (!reader.prefetchComplete()) {
201      // Sleep for a bit
202      Thread.sleep(1000);
203    }
204    // the ref file blocks keys should actually resolve to the referred file blocks,
205    // so we should not see additional blocks in the cache.
206    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
207
208    BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0);
209    Cacheable result = bc.getBlock(refCacheKey, true, false, true);
210    assertNotNull(result);
211    BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0);
212    assertEquals(result, bc.getBlock(fileCacheKey, true, false, true));
213    assertNull(bc.getBackingMap().get(refCacheKey));
214    assertNotNull(bc.getBlockForReference(refCacheKey));
215  }
216
217  @Test
218  public void testPrefetchInterruptOnCapacity() throws Exception {
219    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
220    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
221    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
222    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
223    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
224    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
225    blockCache = BlockCacheFactory.createBlockCache(conf);
226    cacheConf = new CacheConfig(conf, blockCache);
227    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
228    // Prefetches the file blocks
229    LOG.debug("First read should prefetch the blocks.");
230    createReaderAndWaitForPrefetchInterruption(storeFile);
231    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
232      () -> PrefetchExecutor.isCompleted(storeFile));
233    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
234    long evictedFirstPrefetch = bc.getStats().getEvictedCount();
235    HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile);
236    assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount());
237    HFileScanner scanner = reader.getScanner(conf, true, true);
238    scanner.seekTo();
239    while (scanner.next()) {
240      // do a full scan to force some evictions
241      LOG.trace("Iterating the full scan to evict some blocks");
242    }
243    scanner.close();
244    Waiter.waitFor(conf, 5000, () -> {
245      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
246        if (!queue.isEmpty()) {
247          return false;
248        }
249      }
250      return true;
251    });
252    // The scanner should had triggered at least 3x evictions from the prefetch,
253    // as we try cache each block without interruption.
254    assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch);
255  }
256
257  @Test
258  public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception {
259    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
260    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
261    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
262    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
263    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
264    blockCache = BlockCacheFactory.createBlockCache(conf);
265    ColumnFamilyDescriptor family =
266      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build();
267    cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP);
268    Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000);
269    // Prefetches the file blocks
270    LOG.debug("First read should prefetch the blocks.");
271    createReaderAndWaitForPrefetchInterruption(storeFile);
272    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
273    Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile));
274    long evictions = bc.getStats().getEvictedCount();
275    LOG.debug("Total evicted at this point: {}", evictions);
276    // creates another reader, now that cache is full, no block would fit and prefetch should not
277    // trigger any new evictions
278    createReaderAndWaitForPrefetchInterruption(storeFile);
279    assertEquals(evictions, bc.getStats().getEvictedCount());
280  }
281
282  @Test
283  public void testPrefetchRunNoEvictions() throws Exception {
284    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
285    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
286    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
287    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
288    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
289    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
290    blockCache = BlockCacheFactory.createBlockCache(conf);
291    cacheConf = new CacheConfig(conf, blockCache);
292    Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000);
293    // Prefetches the file blocks
294    createReaderAndWaitForPrefetchInterruption(storeFile);
295    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
296      () -> PrefetchExecutor.isCompleted(storeFile));
297    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
298    // Wait until all cache writer queues are empty
299    Waiter.waitFor(conf, 5000, () -> {
300      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
301        if (!queue.isEmpty()) {
302          return false;
303        }
304      }
305      return true;
306    });
307    // With the wait time configuration, prefetch should trigger no evictions once it reaches
308    // cache capacity
309    assertEquals(0, bc.getStats().getEvictedCount());
310  }
311
312  @Test
313  public void testPrefetchRunTriggersEvictions() throws Exception {
314    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
315    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
316    // Use full capacity as the "acceptable" size so prefetch does not stop at ~98% total usage
317    // (via blockFitsIntoTheCache) before the cache writer path must run freeSpace/evictions.
318    conf.setDouble("hbase.bucketcache.acceptfactor", 1.0);
319    conf.setDouble("hbase.bucketcache.minfactor", 1.0);
320    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
321    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0);
322    // Ensures no prefetch interruption due to heap usage in the event of freeMemory() returning 0.
323    conf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, Double.MAX_VALUE);
324    blockCache = BlockCacheFactory.createBlockCache(conf);
325    cacheConf = new CacheConfig(conf, blockCache);
326    Path storeFile = writeStoreFile("testPrefetchRunTriggersEvictions", 10000);
327    // Prefetches the file blocks
328    createReaderAndWaitForPrefetchInterruption(storeFile);
329    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
330      () -> PrefetchExecutor.isCompleted(storeFile));
331    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
332    // Wait until all cache writer queues are empty
333    Waiter.waitFor(conf, 5000, () -> {
334      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
335        if (!queue.isEmpty()) {
336          return false;
337        }
338      }
339      return true;
340    });
341    if (bc.getStats().getFailedInserts() == 0) {
342      // With no wait time configuration, prefetch should trigger evictions once it reaches
343      // cache capacity. Writer threads can finish draining slightly after queue.offer returns.
344      Waiter.waitFor(conf, 15000, () -> bc.getStats().getEvictedCount() > 0);
345      assertNotEquals(0, bc.getStats().getEvictedCount());
346    } else {
347      LOG.info("We had {} cache insert failures, which may cause cache usage "
348        + "to never reach capacity.", bc.getStats().getFailedInserts());
349    }
350  }
351
352  @Test
353  public void testPrefetchMetricProgress() throws Exception {
354    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
355    blockCache = BlockCacheFactory.createBlockCache(conf);
356    cacheConf = new CacheConfig(conf, blockCache);
357    Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100);
358    // Prefetches the file blocks
359    LOG.debug("First read should prefetch the blocks.");
360    readStoreFile(storeFile);
361    String regionName = storeFile.getParent().getParent().getName();
362    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
363    MutableLong regionCachedSize = new MutableLong(0);
364    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
365    Waiter.waitFor(conf, 300, () -> {
366      if (bc.getBackingMap().size() > 0) {
367        long currentSize = bc.getRegionCachedInfo().get().get(regionName);
368        assertTrue(regionCachedSize.getValue() <= currentSize);
369        LOG.debug("Logging progress of region caching: {}", currentSize);
370        regionCachedSize.setValue(currentSize);
371      }
372      return bc.getBackingMap().size() == 6;
373    });
374  }
375
376  @Test
377  public void testPrefetchMetricProgressForLinks(TestInfo testInfo) throws Exception {
378    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
379    blockCache = BlockCacheFactory.createBlockCache(conf);
380    cacheConf = new CacheConfig(conf, blockCache);
381    final RegionInfo hri = RegionInfoBuilder
382      .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build();
383    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
384    Configuration testConf = new Configuration(this.conf);
385    Path testDir = TEST_UTIL.getDataTestDir(testInfo.getTestMethod().get().getName());
386    CommonFSUtils.setRootDir(testConf, testDir);
387    Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable());
388    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
389    Path regionDir = new Path(tableDir, region.getEncodedName());
390    Path cfDir = new Path(regionDir, "cf");
391    HRegionFileSystem regionFS =
392      HRegionFileSystem.createRegionOnFileSystem(testConf, fs, tableDir, region);
393    Path storeFile = writeStoreFile(100, cfDir);
394    // Prefetches the file blocks
395    LOG.debug("First read should prefetch the blocks.");
396    readStoreFile(storeFile);
397    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
398    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
399    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
400    long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName());
401
402    final RegionInfo dstHri = RegionInfoBuilder
403      .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build();
404    HRegionFileSystem dstRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
405      CommonFSUtils.getTableDir(testDir, dstHri.getTable()), dstHri);
406
407    Path dstPath = new Path(regionFS.getTableDir(), new Path(dstHri.getRegionNameAsString(), "cf"));
408
409    Path linkFilePath =
410      new Path(dstPath, HFileLink.createHFileLinkName(region, storeFile.getName()));
411
412    StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false,
413      StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath)
414        .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf"))
415        .withRegionFileSystem(dstRegionFs).build());
416    sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true);
417    StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true);
418
419    HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf);
420    assertTrue(sfi.isLink());
421    hsf.initReader();
422    HFile.Reader reader = hsf.getReader().getHFileReader();
423    while (!reader.prefetchComplete()) {
424      // Sleep for a bit
425      Thread.sleep(1000);
426    }
427    // HFileLink use the path of the target file to create a reader, so it should resolve to the
428    // already cached blocks and not insert new blocks in the cache.
429    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
430
431    assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName()));
432  }
433
434  @Test
435  public void testPrefetchMetricProgressForLinksToArchived(TestInfo testInfo) throws Exception {
436    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
437    blockCache = BlockCacheFactory.createBlockCache(conf);
438    cacheConf = new CacheConfig(conf, blockCache);
439
440    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
441    Configuration testConf = new Configuration(this.conf);
442    Path testDir = TEST_UTIL.getDataTestDir(testInfo.getTestMethod().get().getName());
443    CommonFSUtils.setRootDir(testConf, testDir);
444
445    final RegionInfo hri = RegionInfoBuilder
446      .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build();
447    Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable());
448    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
449    Path regionDir = new Path(tableDir, region.getEncodedName());
450    Path cfDir = new Path(regionDir, "cf");
451
452    Path storeFile = writeStoreFile(100, cfDir);
453    // Prefetches the file blocks
454    LOG.debug("First read should prefetch the blocks.");
455    readStoreFile(storeFile);
456    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
457    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
458    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
459    long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName());
460
461    // create another file, but in the archive dir, hence it won't be cached
462    Path archiveRoot = new Path(testDir, "archive");
463    Path archiveTableDir = CommonFSUtils.getTableDir(archiveRoot, hri.getTable());
464    Path archiveRegionDir = new Path(archiveTableDir, region.getEncodedName());
465    Path archiveCfDir = new Path(archiveRegionDir, "cf");
466    Path archivedFile = writeStoreFile(100, archiveCfDir);
467
468    final RegionInfo testRegion =
469      RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
470    final HRegionFileSystem testRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
471      CommonFSUtils.getTableDir(testDir, testRegion.getTable()), testRegion);
472    // Just create a link to the archived file
473    Path dstPath = new Path(tableDir, new Path(testRegion.getEncodedName(), "cf"));
474
475    Path linkFilePath =
476      new Path(dstPath, HFileLink.createHFileLinkName(region, archivedFile.getName()));
477
478    StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false,
479      StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath)
480        .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf"))
481        .withRegionFileSystem(testRegionFs).build());
482    sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true);
483    StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true);
484
485    HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf);
486    assertTrue(sfi.isLink());
487    hsf.initReader();
488    HFile.Reader reader = hsf.getReader().getHFileReader();
489    while (!reader.prefetchComplete()) {
490      // Sleep for a bit
491      Thread.sleep(1000);
492    }
493    // HFileLink use the path of the target file to create a reader, but the target file is in the
494    // archive, so it wasn't cached previously and should be cached when we open the link.
495    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 12);
496    // cached size for the region of target file shouldn't change
497    assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName()));
498    // cached size for the region with link pointing to archive dir shouldn't be updated
499    assertNull(bc.getRegionCachedInfo().get().get(testRegion.getEncodedName()));
500  }
501
502  private void readStoreFile(Path storeFilePath) throws Exception {
503    readStoreFile(storeFilePath, (r, o) -> {
504      HFileBlock block = null;
505      try {
506        block = r.readBlock(o, -1, false, true, false, true, null, null);
507      } catch (IOException e) {
508        fail(e.getMessage());
509      }
510      return block;
511    }, (key, block) -> {
512      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
513      if (
514        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
515          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
516      ) {
517        assertTrue(isCached);
518      }
519    });
520  }
521
522  private void readStoreFile(Path storeFilePath,
523    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
524    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
525    // Open the file
526    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
527
528    while (!reader.prefetchComplete()) {
529      // Sleep for a bit
530      Thread.sleep(1000);
531    }
532    long offset = 0;
533    long sizeForDataBlocks = 0;
534    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
535      HFileBlock block = readFunction.apply(reader, offset);
536      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
537      validationFunction.accept(blockCacheKey, block);
538      offset += block.getOnDiskSizeWithHeader();
539    }
540  }
541
542  private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath)
543    throws Exception {
544    // Open the file
545    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
546
547    while (!reader.prefetchComplete()) {
548      // Sleep for a bit
549      Thread.sleep(1000);
550    }
551    assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles()
552      .get().size());
553
554    return reader;
555  }
556
557  private Path writeStoreFile(String fname, int numKVs) throws IOException {
558    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
559    return writeStoreFile(fname, meta, numKVs);
560  }
561
562  private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException {
563    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
564    return writeStoreFile(meta, numKVs, regionCFDir);
565  }
566
567  private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException {
568    return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname));
569  }
570
571  private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir)
572    throws IOException {
573    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
574      .withOutputDir(regionCFDir).withFileContext(context).build();
575    Random rand = ThreadLocalRandom.current();
576    final int rowLen = 32;
577    for (int i = 0; i < numKVs; ++i) {
578      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
579      byte[] v = RandomKeyValueUtil.randomValue(rand);
580      int cfLen = rand.nextInt(k.length - rowLen + 1);
581      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
582        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
583      sfw.append(kv);
584    }
585
586    sfw.close();
587    return sfw.getPath();
588  }
589
590  public static KeyValue.Type generateKeyType(Random rand) {
591    if (rand.nextBoolean()) {
592      // Let's make half of KVs puts.
593      return KeyValue.Type.Put;
594    } else {
595      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
596      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
597        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
598          + "Probably the layout of KeyValue.Type has changed.");
599      }
600      return keyType;
601    }
602  }
603}