001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertNotEquals;
026import static org.junit.jupiter.api.Assertions.assertNotNull;
027import static org.junit.jupiter.api.Assertions.assertNull;
028import static org.junit.jupiter.api.Assertions.assertTrue;
029import static org.junit.jupiter.api.Assertions.fail;
030
031import java.io.File;
032import java.io.IOException;
033import java.util.Map;
034import java.util.Random;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.function.BiConsumer;
038import java.util.function.BiFunction;
039import org.apache.commons.lang3.mutable.MutableLong;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Waiter;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionInfoBuilder;
051import org.apache.hadoop.hbase.fs.HFileSystem;
052import org.apache.hadoop.hbase.io.ByteBuffAllocator;
053import org.apache.hadoop.hbase.io.HFileLink;
054import org.apache.hadoop.hbase.io.hfile.BlockCache;
055import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
056import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
057import org.apache.hadoop.hbase.io.hfile.BlockType;
058import org.apache.hadoop.hbase.io.hfile.CacheConfig;
059import org.apache.hadoop.hbase.io.hfile.Cacheable;
060import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
061import org.apache.hadoop.hbase.io.hfile.HFile;
062import org.apache.hadoop.hbase.io.hfile.HFileBlock;
063import org.apache.hadoop.hbase.io.hfile.HFileContext;
064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
065import org.apache.hadoop.hbase.io.hfile.HFileScanner;
066import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
067import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
068import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
069import org.apache.hadoop.hbase.regionserver.BloomType;
070import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
071import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
072import org.apache.hadoop.hbase.regionserver.HStoreFile;
073import org.apache.hadoop.hbase.regionserver.StoreContext;
074import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
075import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
076import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
077import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
078import org.apache.hadoop.hbase.testclassification.IOTests;
079import org.apache.hadoop.hbase.testclassification.MediumTests;
080import org.apache.hadoop.hbase.util.Bytes;
081import org.apache.hadoop.hbase.util.CommonFSUtils;
082import org.junit.jupiter.api.AfterEach;
083import org.junit.jupiter.api.BeforeEach;
084import org.junit.jupiter.api.Tag;
085import org.junit.jupiter.api.Test;
086import org.junit.jupiter.api.TestInfo;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
091
092@Tag(IOTests.TAG)
093@Tag(MediumTests.TAG)
094public class TestPrefetchWithBucketCache {
095
096  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class);
097
098  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
099
100  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
101  private static final int DATA_BLOCK_SIZE = 2048;
102  private Configuration conf;
103  private CacheConfig cacheConf;
104  private FileSystem fs;
105  private BlockCache blockCache;
106
107  @BeforeEach
108  public void setUp(TestInfo testInfo) throws IOException {
109    conf = TEST_UTIL.getConfiguration();
110    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
111    fs = HFileSystem.get(conf);
112    File testDir = new File(testInfo.getTestMethod().get().getName());
113    testDir.mkdir();
114    conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache");
115  }
116
117  @AfterEach
118  public void tearDown(TestInfo testInfo) {
119    File cacheFile = new File(testInfo.getTestMethod().get().getName() + "/bucket.cache");
120    File dir = new File(testInfo.getTestMethod().get().getName());
121    cacheFile.delete();
122    dir.delete();
123  }
124
125  @Test
126  public void testPrefetchDoesntOverwork() throws Exception {
127    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
128    blockCache = BlockCacheFactory.createBlockCache(conf);
129    cacheConf = new CacheConfig(conf, blockCache);
130    Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100);
131    // Prefetches the file blocks
132    LOG.debug("First read should prefetch the blocks.");
133    readStoreFile(storeFile);
134    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
135    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
136    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
137    Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap());
138    LruBlockCache l1 = (LruBlockCache) ((CombinedBlockCache) blockCache).getFirstLevelCache();
139    assertEquals(1, l1.getBlockCount());
140    // Removes the meta block from L1 cache
141    l1.clearCache();
142    // Reads file again. Checks we are not prefetching data blocks again,
143    // but fetch back the meta block
144    LOG.debug("Second read, prefetch should run, without altering bucket cache state,"
145      + " only the meta block should be fetched again.");
146    readStoreFile(storeFile);
147    // Makes sure the bucketcache entries have not changed
148    snapshot.entrySet().forEach(e -> {
149      BucketEntry entry = bc.getBackingMap().get(e.getKey());
150      assertNotNull(entry);
151      assertEquals(e.getValue().getCachedTime(), entry.getCachedTime());
152    });
153    assertEquals(1, l1.getBlockCount());
154    // forcibly removes first block from the bc backing map, in order to cause it to be cached again
155    BlockCacheKey key = snapshot.keySet().stream().findFirst().get();
156    LOG.debug("removing block {}", key);
157    bc.getBackingMap().remove(key);
158    bc.getFullyCachedFiles().get().remove(storeFile.getName());
159    assertTrue(snapshot.size() > bc.getBackingMap().size());
160    LOG.debug("Third read should prefetch again, as we removed one block for the file.");
161    readStoreFile(storeFile);
162    Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size());
163    assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime());
164  }
165
166  @Test
167  public void testPrefetchRefsAfterSplit() throws Exception {
168    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
169    blockCache = BlockCacheFactory.createBlockCache(conf);
170    cacheConf = new CacheConfig(conf, blockCache);
171
172    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit");
173    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
174    Path regionDir = new Path(tableDir, region.getEncodedName());
175    Path cfDir = new Path(regionDir, "cf");
176    HRegionFileSystem regionFS =
177      HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
178    Path storeFile = writeStoreFile(100, cfDir);
179    StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true,
180      StoreContext.getBuilder().withRegionFileSystem(regionFS).withFamilyStoreDirectoryPath(cfDir)
181        .withCacheConfig(cacheConf).build());
182    // Prefetches the file blocks
183    LOG.debug("First read should prefetch the blocks.");
184    readStoreFile(storeFile);
185    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
186    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
187    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
188
189    // split the file and return references to the original file
190    Random rand = ThreadLocalRandom.current();
191    byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50);
192    HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft);
193    Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false,
194      new ConstantSizeRegionSplitPolicy(), sft).getPath();
195    HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft);
196    // starts reader for the ref. The ref should resolve to the original file blocks
197    // and not duplicate blocks in the cache.
198    refHsf.initReader();
199    HFile.Reader reader = refHsf.getReader().getHFileReader();
200    while (!reader.prefetchComplete()) {
201      // Sleep for a bit
202      Thread.sleep(1000);
203    }
204    // the ref file blocks keys should actually resolve to the referred file blocks,
205    // so we should not see additional blocks in the cache.
206    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
207
208    BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0);
209    Cacheable result = bc.getBlock(refCacheKey, true, false, true);
210    assertNotNull(result);
211    BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0);
212    assertEquals(result, bc.getBlock(fileCacheKey, true, false, true));
213    assertNull(bc.getBackingMap().get(refCacheKey));
214    assertNotNull(bc.getBlockForReference(refCacheKey));
215  }
216
217  @Test
218  public void testPrefetchInterruptOnCapacity() throws Exception {
219    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
220    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
221    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
222    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
223    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
224    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
225    blockCache = BlockCacheFactory.createBlockCache(conf);
226    cacheConf = new CacheConfig(conf, blockCache);
227    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
228    // Prefetches the file blocks
229    LOG.debug("First read should prefetch the blocks.");
230    createReaderAndWaitForPrefetchInterruption(storeFile);
231    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
232      () -> PrefetchExecutor.isCompleted(storeFile));
233    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
234    long evictedFirstPrefetch = bc.getStats().getEvictedCount();
235    HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile);
236    assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount());
237    HFileScanner scanner = reader.getScanner(conf, true, true);
238    scanner.seekTo();
239    while (scanner.next()) {
240      // do a full scan to force some evictions
241      LOG.trace("Iterating the full scan to evict some blocks");
242    }
243    scanner.close();
244    Waiter.waitFor(conf, 5000, () -> {
245      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
246        if (!queue.isEmpty()) {
247          return false;
248        }
249      }
250      return true;
251    });
252    // The scanner should had triggered at least 3x evictions from the prefetch,
253    // as we try cache each block without interruption.
254    assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch);
255  }
256
257  @Test
258  public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception {
259    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
260    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
261    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
262    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
263    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
264    blockCache = BlockCacheFactory.createBlockCache(conf);
265    ColumnFamilyDescriptor family =
266      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build();
267    cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP);
268    Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000);
269    // Prefetches the file blocks
270    LOG.debug("First read should prefetch the blocks.");
271    createReaderAndWaitForPrefetchInterruption(storeFile);
272    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
273    Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile));
274    long evictions = bc.getStats().getEvictedCount();
275    LOG.debug("Total evicted at this point: {}", evictions);
276    // creates another reader, now that cache is full, no block would fit and prefetch should not
277    // trigger any new evictions
278    createReaderAndWaitForPrefetchInterruption(storeFile);
279    assertEquals(evictions, bc.getStats().getEvictedCount());
280  }
281
282  @Test
283  public void testPrefetchRunNoEvictions() throws Exception {
284    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
285    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
286    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
287    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
288    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
289    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
290    blockCache = BlockCacheFactory.createBlockCache(conf);
291    cacheConf = new CacheConfig(conf, blockCache);
292    Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000);
293    // Prefetches the file blocks
294    createReaderAndWaitForPrefetchInterruption(storeFile);
295    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
296      () -> PrefetchExecutor.isCompleted(storeFile));
297    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
298    // Wait until all cache writer queues are empty
299    Waiter.waitFor(conf, 5000, () -> {
300      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
301        if (!queue.isEmpty()) {
302          return false;
303        }
304      }
305      return true;
306    });
307    // With the wait time configuration, prefetch should trigger no evictions once it reaches
308    // cache capacity
309    assertEquals(0, bc.getStats().getEvictedCount());
310  }
311
312  @Test
313  public void testPrefetchRunTriggersEvictions() throws Exception {
314    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
315    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
316    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
317    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
318    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
319    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0);
320    blockCache = BlockCacheFactory.createBlockCache(conf);
321    cacheConf = new CacheConfig(conf, blockCache);
322    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
323    // Prefetches the file blocks
324    createReaderAndWaitForPrefetchInterruption(storeFile);
325    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
326      () -> PrefetchExecutor.isCompleted(storeFile));
327    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
328    // Wait until all cache writer queues are empty
329    Waiter.waitFor(conf, 5000, () -> {
330      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
331        if (!queue.isEmpty()) {
332          return false;
333        }
334      }
335      return true;
336    });
337    if (bc.getStats().getFailedInserts() == 0) {
338      // With no wait time configuration, prefetch should trigger evictions once it reaches
339      // cache capacity
340      assertNotEquals(0, bc.getStats().getEvictedCount());
341    } else {
342      LOG.info("We had {} cache insert failures, which may cause cache usage "
343        + "to never reach capacity.", bc.getStats().getFailedInserts());
344    }
345  }
346
347  @Test
348  public void testPrefetchMetricProgress() throws Exception {
349    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
350    blockCache = BlockCacheFactory.createBlockCache(conf);
351    cacheConf = new CacheConfig(conf, blockCache);
352    Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100);
353    // Prefetches the file blocks
354    LOG.debug("First read should prefetch the blocks.");
355    readStoreFile(storeFile);
356    String regionName = storeFile.getParent().getParent().getName();
357    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
358    MutableLong regionCachedSize = new MutableLong(0);
359    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
360    Waiter.waitFor(conf, 300, () -> {
361      if (bc.getBackingMap().size() > 0) {
362        long currentSize = bc.getRegionCachedInfo().get().get(regionName);
363        assertTrue(regionCachedSize.getValue() <= currentSize);
364        LOG.debug("Logging progress of region caching: {}", currentSize);
365        regionCachedSize.setValue(currentSize);
366      }
367      return bc.getBackingMap().size() == 6;
368    });
369  }
370
371  @Test
372  public void testPrefetchMetricProgressForLinks(TestInfo testInfo) throws Exception {
373    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
374    blockCache = BlockCacheFactory.createBlockCache(conf);
375    cacheConf = new CacheConfig(conf, blockCache);
376    final RegionInfo hri = RegionInfoBuilder
377      .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build();
378    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
379    Configuration testConf = new Configuration(this.conf);
380    Path testDir = TEST_UTIL.getDataTestDir(testInfo.getTestMethod().get().getName());
381    CommonFSUtils.setRootDir(testConf, testDir);
382    Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable());
383    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
384    Path regionDir = new Path(tableDir, region.getEncodedName());
385    Path cfDir = new Path(regionDir, "cf");
386    HRegionFileSystem regionFS =
387      HRegionFileSystem.createRegionOnFileSystem(testConf, fs, tableDir, region);
388    Path storeFile = writeStoreFile(100, cfDir);
389    // Prefetches the file blocks
390    LOG.debug("First read should prefetch the blocks.");
391    readStoreFile(storeFile);
392    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
393    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
394    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
395    long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName());
396
397    final RegionInfo dstHri = RegionInfoBuilder
398      .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build();
399    HRegionFileSystem dstRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
400      CommonFSUtils.getTableDir(testDir, dstHri.getTable()), dstHri);
401
402    Path dstPath = new Path(regionFS.getTableDir(), new Path(dstHri.getRegionNameAsString(), "cf"));
403
404    Path linkFilePath =
405      new Path(dstPath, HFileLink.createHFileLinkName(region, storeFile.getName()));
406
407    StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false,
408      StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath)
409        .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf"))
410        .withRegionFileSystem(dstRegionFs).build());
411    sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true);
412    StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true);
413
414    HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf);
415    assertTrue(sfi.isLink());
416    hsf.initReader();
417    HFile.Reader reader = hsf.getReader().getHFileReader();
418    while (!reader.prefetchComplete()) {
419      // Sleep for a bit
420      Thread.sleep(1000);
421    }
422    // HFileLink use the path of the target file to create a reader, so it should resolve to the
423    // already cached blocks and not insert new blocks in the cache.
424    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
425
426    assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName()));
427  }
428
429  @Test
430  public void testPrefetchMetricProgressForLinksToArchived(TestInfo testInfo) throws Exception {
431    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
432    blockCache = BlockCacheFactory.createBlockCache(conf);
433    cacheConf = new CacheConfig(conf, blockCache);
434
435    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
436    Configuration testConf = new Configuration(this.conf);
437    Path testDir = TEST_UTIL.getDataTestDir(testInfo.getTestMethod().get().getName());
438    CommonFSUtils.setRootDir(testConf, testDir);
439
440    final RegionInfo hri = RegionInfoBuilder
441      .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build();
442    Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable());
443    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
444    Path regionDir = new Path(tableDir, region.getEncodedName());
445    Path cfDir = new Path(regionDir, "cf");
446
447    Path storeFile = writeStoreFile(100, cfDir);
448    // Prefetches the file blocks
449    LOG.debug("First read should prefetch the blocks.");
450    readStoreFile(storeFile);
451    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
452    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
453    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
454    long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName());
455
456    // create another file, but in the archive dir, hence it won't be cached
457    Path archiveRoot = new Path(testDir, "archive");
458    Path archiveTableDir = CommonFSUtils.getTableDir(archiveRoot, hri.getTable());
459    Path archiveRegionDir = new Path(archiveTableDir, region.getEncodedName());
460    Path archiveCfDir = new Path(archiveRegionDir, "cf");
461    Path archivedFile = writeStoreFile(100, archiveCfDir);
462
463    final RegionInfo testRegion =
464      RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
465    final HRegionFileSystem testRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
466      CommonFSUtils.getTableDir(testDir, testRegion.getTable()), testRegion);
467    // Just create a link to the archived file
468    Path dstPath = new Path(tableDir, new Path(testRegion.getEncodedName(), "cf"));
469
470    Path linkFilePath =
471      new Path(dstPath, HFileLink.createHFileLinkName(region, archivedFile.getName()));
472
473    StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false,
474      StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath)
475        .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf"))
476        .withRegionFileSystem(testRegionFs).build());
477    sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true);
478    StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true);
479
480    HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf);
481    assertTrue(sfi.isLink());
482    hsf.initReader();
483    HFile.Reader reader = hsf.getReader().getHFileReader();
484    while (!reader.prefetchComplete()) {
485      // Sleep for a bit
486      Thread.sleep(1000);
487    }
488    // HFileLink use the path of the target file to create a reader, but the target file is in the
489    // archive, so it wasn't cached previously and should be cached when we open the link.
490    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 12);
491    // cached size for the region of target file shouldn't change
492    assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName()));
493    // cached size for the region with link pointing to archive dir shouldn't be updated
494    assertNull(bc.getRegionCachedInfo().get().get(testRegion.getEncodedName()));
495  }
496
497  private void readStoreFile(Path storeFilePath) throws Exception {
498    readStoreFile(storeFilePath, (r, o) -> {
499      HFileBlock block = null;
500      try {
501        block = r.readBlock(o, -1, false, true, false, true, null, null);
502      } catch (IOException e) {
503        fail(e.getMessage());
504      }
505      return block;
506    }, (key, block) -> {
507      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
508      if (
509        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
510          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
511      ) {
512        assertTrue(isCached);
513      }
514    });
515  }
516
517  private void readStoreFile(Path storeFilePath,
518    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
519    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
520    // Open the file
521    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
522
523    while (!reader.prefetchComplete()) {
524      // Sleep for a bit
525      Thread.sleep(1000);
526    }
527    long offset = 0;
528    long sizeForDataBlocks = 0;
529    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
530      HFileBlock block = readFunction.apply(reader, offset);
531      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
532      validationFunction.accept(blockCacheKey, block);
533      offset += block.getOnDiskSizeWithHeader();
534    }
535  }
536
537  private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath)
538    throws Exception {
539    // Open the file
540    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
541
542    while (!reader.prefetchComplete()) {
543      // Sleep for a bit
544      Thread.sleep(1000);
545    }
546    assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles()
547      .get().size());
548
549    return reader;
550  }
551
552  private Path writeStoreFile(String fname, int numKVs) throws IOException {
553    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
554    return writeStoreFile(fname, meta, numKVs);
555  }
556
557  private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException {
558    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
559    return writeStoreFile(meta, numKVs, regionCFDir);
560  }
561
562  private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException {
563    return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname));
564  }
565
566  private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir)
567    throws IOException {
568    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
569      .withOutputDir(regionCFDir).withFileContext(context).build();
570    Random rand = ThreadLocalRandom.current();
571    final int rowLen = 32;
572    for (int i = 0; i < numKVs; ++i) {
573      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
574      byte[] v = RandomKeyValueUtil.randomValue(rand);
575      int cfLen = rand.nextInt(k.length - rowLen + 1);
576      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
577        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
578      sfw.append(kv);
579    }
580
581    sfw.close();
582    return sfw.getPath();
583  }
584
585  public static KeyValue.Type generateKeyType(Random rand) {
586    if (rand.nextBoolean()) {
587      // Let's make half of KVs puts.
588      return KeyValue.Type.Put;
589    } else {
590      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
591      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
592        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
593          + "Probably the layout of KeyValue.Type has changed.");
594      }
595      return keyType;
596    }
597  }
598}