001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertNotEquals;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.File;
032import java.io.IOException;
033import java.util.Map;
034import java.util.Random;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.function.BiConsumer;
038import java.util.function.BiFunction;
039import org.apache.commons.lang3.mutable.MutableLong;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.Waiter;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.RegionInfoBuilder;
052import org.apache.hadoop.hbase.fs.HFileSystem;
053import org.apache.hadoop.hbase.io.ByteBuffAllocator;
054import org.apache.hadoop.hbase.io.HFileLink;
055import org.apache.hadoop.hbase.io.hfile.BlockCache;
056import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
057import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
058import org.apache.hadoop.hbase.io.hfile.BlockType;
059import org.apache.hadoop.hbase.io.hfile.CacheConfig;
060import org.apache.hadoop.hbase.io.hfile.Cacheable;
061import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
062import org.apache.hadoop.hbase.io.hfile.HFile;
063import org.apache.hadoop.hbase.io.hfile.HFileBlock;
064import org.apache.hadoop.hbase.io.hfile.HFileContext;
065import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
066import org.apache.hadoop.hbase.io.hfile.HFileScanner;
067import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
068import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
069import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
070import org.apache.hadoop.hbase.regionserver.BloomType;
071import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
072import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
073import org.apache.hadoop.hbase.regionserver.HStoreFile;
074import org.apache.hadoop.hbase.regionserver.StoreContext;
075import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
076import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
077import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
078import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
079import org.apache.hadoop.hbase.testclassification.IOTests;
080import org.apache.hadoop.hbase.testclassification.MediumTests;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.CommonFSUtils;
083import org.junit.After;
084import org.junit.Before;
085import org.junit.ClassRule;
086import org.junit.Rule;
087import org.junit.Test;
088import org.junit.experimental.categories.Category;
089import org.junit.rules.TestName;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
094
095@Category({ IOTests.class, MediumTests.class })
096public class TestPrefetchWithBucketCache {
097
098  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class);
099
100  @ClassRule
101  public static final HBaseClassTestRule CLASS_RULE =
102    HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class);
103
104  @Rule
105  public TestName name = new TestName();
106
107  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
108
109  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
110  private static final int DATA_BLOCK_SIZE = 2048;
111  private Configuration conf;
112  private CacheConfig cacheConf;
113  private FileSystem fs;
114  private BlockCache blockCache;
115
116  @Before
117  public void setUp() throws IOException {
118    conf = TEST_UTIL.getConfiguration();
119    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
120    fs = HFileSystem.get(conf);
121    File testDir = new File(name.getMethodName());
122    testDir.mkdir();
123    conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache");
124  }
125
126  @After
127  public void tearDown() {
128    File cacheFile = new File(name.getMethodName() + "/bucket.cache");
129    File dir = new File(name.getMethodName());
130    cacheFile.delete();
131    dir.delete();
132  }
133
134  @Test
135  public void testPrefetchDoesntOverwork() throws Exception {
136    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
137    blockCache = BlockCacheFactory.createBlockCache(conf);
138    cacheConf = new CacheConfig(conf, blockCache);
139    Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100);
140    // Prefetches the file blocks
141    LOG.debug("First read should prefetch the blocks.");
142    readStoreFile(storeFile);
143    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
144    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
145    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
146    Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap());
147    LruBlockCache l1 = (LruBlockCache) ((CombinedBlockCache) blockCache).getFirstLevelCache();
148    assertEquals(1, l1.getBlockCount());
149    // Removes the meta block from L1 cache
150    l1.clearCache();
151    // Reads file again. Checks we are not prefetching data blocks again,
152    // but fetch back the meta block
153    LOG.debug("Second read, prefetch should run, without altering bucket cache state,"
154      + " only the meta block should be fetched again.");
155    readStoreFile(storeFile);
156    // Makes sure the bucketcache entries have not changed
157    snapshot.entrySet().forEach(e -> {
158      BucketEntry entry = bc.getBackingMap().get(e.getKey());
159      assertNotNull(entry);
160      assertEquals(e.getValue().getCachedTime(), entry.getCachedTime());
161    });
162    assertEquals(1, l1.getBlockCount());
163    // forcibly removes first block from the bc backing map, in order to cause it to be cached again
164    BlockCacheKey key = snapshot.keySet().stream().findFirst().get();
165    LOG.debug("removing block {}", key);
166    bc.getBackingMap().remove(key);
167    bc.getFullyCachedFiles().get().remove(storeFile.getName());
168    assertTrue(snapshot.size() > bc.getBackingMap().size());
169    LOG.debug("Third read should prefetch again, as we removed one block for the file.");
170    readStoreFile(storeFile);
171    Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size());
172    assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime());
173  }
174
175  @Test
176  public void testPrefetchRefsAfterSplit() throws Exception {
177    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
178    blockCache = BlockCacheFactory.createBlockCache(conf);
179    cacheConf = new CacheConfig(conf, blockCache);
180
181    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit");
182    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
183    Path regionDir = new Path(tableDir, region.getEncodedName());
184    Path cfDir = new Path(regionDir, "cf");
185    HRegionFileSystem regionFS =
186      HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
187    Path storeFile = writeStoreFile(100, cfDir);
188    StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true,
189      StoreContext.getBuilder().withRegionFileSystem(regionFS).withFamilyStoreDirectoryPath(cfDir)
190        .withCacheConfig(cacheConf).build());
191    // Prefetches the file blocks
192    LOG.debug("First read should prefetch the blocks.");
193    readStoreFile(storeFile);
194    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
195    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
196    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
197
198    // split the file and return references to the original file
199    Random rand = ThreadLocalRandom.current();
200    byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50);
201    HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft);
202    Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false,
203      new ConstantSizeRegionSplitPolicy(), sft);
204    HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft);
205    // starts reader for the ref. The ref should resolve to the original file blocks
206    // and not duplicate blocks in the cache.
207    refHsf.initReader();
208    HFile.Reader reader = refHsf.getReader().getHFileReader();
209    while (!reader.prefetchComplete()) {
210      // Sleep for a bit
211      Thread.sleep(1000);
212    }
213    // the ref file blocks keys should actually resolve to the referred file blocks,
214    // so we should not see additional blocks in the cache.
215    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
216
217    BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0);
218    Cacheable result = bc.getBlock(refCacheKey, true, false, true);
219    assertNotNull(result);
220    BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0);
221    assertEquals(result, bc.getBlock(fileCacheKey, true, false, true));
222    assertNull(bc.getBackingMap().get(refCacheKey));
223    assertNotNull(bc.getBlockForReference(refCacheKey));
224  }
225
226  @Test
227  public void testPrefetchInterruptOnCapacity() throws Exception {
228    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
229    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
230    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
231    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
232    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
233    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
234    blockCache = BlockCacheFactory.createBlockCache(conf);
235    cacheConf = new CacheConfig(conf, blockCache);
236    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
237    // Prefetches the file blocks
238    LOG.debug("First read should prefetch the blocks.");
239    createReaderAndWaitForPrefetchInterruption(storeFile);
240    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
241      () -> PrefetchExecutor.isCompleted(storeFile));
242    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
243    long evictedFirstPrefetch = bc.getStats().getEvictedCount();
244    HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile);
245    assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount());
246    HFileScanner scanner = reader.getScanner(conf, true, true);
247    scanner.seekTo();
248    while (scanner.next()) {
249      // do a full scan to force some evictions
250      LOG.trace("Iterating the full scan to evict some blocks");
251    }
252    scanner.close();
253    Waiter.waitFor(conf, 5000, () -> {
254      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
255        if (!queue.isEmpty()) {
256          return false;
257        }
258      }
259      return true;
260    });
261    // The scanner should had triggered at least 3x evictions from the prefetch,
262    // as we try cache each block without interruption.
263    assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch);
264  }
265
266  @Test
267  public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception {
268    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
269    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
270    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
271    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
272    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
273    blockCache = BlockCacheFactory.createBlockCache(conf);
274    ColumnFamilyDescriptor family =
275      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build();
276    cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP);
277    Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000);
278    // Prefetches the file blocks
279    LOG.debug("First read should prefetch the blocks.");
280    createReaderAndWaitForPrefetchInterruption(storeFile);
281    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
282    Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile));
283    long evictions = bc.getStats().getEvictedCount();
284    LOG.debug("Total evicted at this point: {}", evictions);
285    // creates another reader, now that cache is full, no block would fit and prefetch should not
286    // trigger any new evictions
287    createReaderAndWaitForPrefetchInterruption(storeFile);
288    assertEquals(evictions, bc.getStats().getEvictedCount());
289  }
290
291  @Test
292  public void testPrefetchRunNoEvictions() throws Exception {
293    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
294    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
295    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
296    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
297    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
298    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
299    blockCache = BlockCacheFactory.createBlockCache(conf);
300    cacheConf = new CacheConfig(conf, blockCache);
301    Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000);
302    // Prefetches the file blocks
303    createReaderAndWaitForPrefetchInterruption(storeFile);
304    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
305      () -> PrefetchExecutor.isCompleted(storeFile));
306    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
307    // Wait until all cache writer queues are empty
308    Waiter.waitFor(conf, 5000, () -> {
309      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
310        if (!queue.isEmpty()) {
311          return false;
312        }
313      }
314      return true;
315    });
316    // With the wait time configuration, prefetch should trigger no evictions once it reaches
317    // cache capacity
318    assertEquals(0, bc.getStats().getEvictedCount());
319  }
320
321  @Test
322  public void testPrefetchRunTriggersEvictions() throws Exception {
323    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
324    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
325    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
326    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
327    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
328    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0);
329    blockCache = BlockCacheFactory.createBlockCache(conf);
330    cacheConf = new CacheConfig(conf, blockCache);
331    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
332    // Prefetches the file blocks
333    createReaderAndWaitForPrefetchInterruption(storeFile);
334    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
335      () -> PrefetchExecutor.isCompleted(storeFile));
336    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
337    // Wait until all cache writer queues are empty
338    Waiter.waitFor(conf, 5000, () -> {
339      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
340        if (!queue.isEmpty()) {
341          return false;
342        }
343      }
344      return true;
345    });
346    if (bc.getStats().getFailedInserts() == 0) {
347      // With no wait time configuration, prefetch should trigger evictions once it reaches
348      // cache capacity
349      assertNotEquals(0, bc.getStats().getEvictedCount());
350    } else {
351      LOG.info("We had {} cache insert failures, which may cause cache usage "
352        + "to never reach capacity.", bc.getStats().getFailedInserts());
353    }
354  }
355
356  @Test
357  public void testPrefetchMetricProgress() throws Exception {
358    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
359    blockCache = BlockCacheFactory.createBlockCache(conf);
360    cacheConf = new CacheConfig(conf, blockCache);
361    Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100);
362    // Prefetches the file blocks
363    LOG.debug("First read should prefetch the blocks.");
364    readStoreFile(storeFile);
365    String regionName = storeFile.getParent().getParent().getName();
366    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
367    MutableLong regionCachedSize = new MutableLong(0);
368    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
369    Waiter.waitFor(conf, 300, () -> {
370      if (bc.getBackingMap().size() > 0) {
371        long currentSize = bc.getRegionCachedInfo().get().get(regionName);
372        assertTrue(regionCachedSize.getValue() <= currentSize);
373        LOG.debug("Logging progress of region caching: {}", currentSize);
374        regionCachedSize.setValue(currentSize);
375      }
376      return bc.getBackingMap().size() == 6;
377    });
378  }
379
380  @Test
381  public void testPrefetchMetricProgressForLinks() throws Exception {
382    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
383    blockCache = BlockCacheFactory.createBlockCache(conf);
384    cacheConf = new CacheConfig(conf, blockCache);
385    final RegionInfo hri =
386      RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build();
387    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
388    Configuration testConf = new Configuration(this.conf);
389    Path testDir = TEST_UTIL.getDataTestDir(name.getMethodName());
390    CommonFSUtils.setRootDir(testConf, testDir);
391    Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable());
392    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
393    Path regionDir = new Path(tableDir, region.getEncodedName());
394    Path cfDir = new Path(regionDir, "cf");
395    HRegionFileSystem regionFS =
396      HRegionFileSystem.createRegionOnFileSystem(testConf, fs, tableDir, region);
397    Path storeFile = writeStoreFile(100, cfDir);
398    // Prefetches the file blocks
399    LOG.debug("First read should prefetch the blocks.");
400    readStoreFile(storeFile);
401    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
402    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
403    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
404    long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName());
405
406    final RegionInfo dstHri =
407      RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build();
408    HRegionFileSystem dstRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
409      CommonFSUtils.getTableDir(testDir, dstHri.getTable()), dstHri);
410
411    Path dstPath = new Path(regionFS.getTableDir(), new Path(dstHri.getRegionNameAsString(), "cf"));
412
413    Path linkFilePath =
414      new Path(dstPath, HFileLink.createHFileLinkName(region, storeFile.getName()));
415
416    StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false,
417      StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath)
418        .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf"))
419        .withRegionFileSystem(dstRegionFs).build());
420    sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true);
421    StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true);
422
423    HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf);
424    assertTrue(sfi.isLink());
425    hsf.initReader();
426    HFile.Reader reader = hsf.getReader().getHFileReader();
427    while (!reader.prefetchComplete()) {
428      // Sleep for a bit
429      Thread.sleep(1000);
430    }
431    // HFileLink use the path of the target file to create a reader, so it should resolve to the
432    // already cached blocks and not insert new blocks in the cache.
433    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
434
435    assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName()));
436  }
437
438  @Test
439  public void testPrefetchMetricProgressForLinksToArchived() throws Exception {
440    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
441    blockCache = BlockCacheFactory.createBlockCache(conf);
442    cacheConf = new CacheConfig(conf, blockCache);
443
444    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
445    Configuration testConf = new Configuration(this.conf);
446    Path testDir = TEST_UTIL.getDataTestDir(name.getMethodName());
447    CommonFSUtils.setRootDir(testConf, testDir);
448
449    final RegionInfo hri =
450      RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build();
451    Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable());
452    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
453    Path regionDir = new Path(tableDir, region.getEncodedName());
454    Path cfDir = new Path(regionDir, "cf");
455
456    Path storeFile = writeStoreFile(100, cfDir);
457    // Prefetches the file blocks
458    LOG.debug("First read should prefetch the blocks.");
459    readStoreFile(storeFile);
460    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
461    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
462    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
463    long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName());
464
465    // create another file, but in the archive dir, hence it won't be cached
466    Path archiveRoot = new Path(testDir, "archive");
467    Path archiveTableDir = CommonFSUtils.getTableDir(archiveRoot, hri.getTable());
468    Path archiveRegionDir = new Path(archiveTableDir, region.getEncodedName());
469    Path archiveCfDir = new Path(archiveRegionDir, "cf");
470    Path archivedFile = writeStoreFile(100, archiveCfDir);
471
472    final RegionInfo testRegion =
473      RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
474    final HRegionFileSystem testRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
475      CommonFSUtils.getTableDir(testDir, testRegion.getTable()), testRegion);
476    // Just create a link to the archived file
477    Path dstPath = new Path(tableDir, new Path(testRegion.getEncodedName(), "cf"));
478
479    Path linkFilePath =
480      new Path(dstPath, HFileLink.createHFileLinkName(region, archivedFile.getName()));
481
482    StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false,
483      StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath)
484        .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf"))
485        .withRegionFileSystem(testRegionFs).build());
486    sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true);
487    StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true);
488
489    HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf);
490    assertTrue(sfi.isLink());
491    hsf.initReader();
492    HFile.Reader reader = hsf.getReader().getHFileReader();
493    while (!reader.prefetchComplete()) {
494      // Sleep for a bit
495      Thread.sleep(1000);
496    }
497    // HFileLink use the path of the target file to create a reader, but the target file is in the
498    // archive, so it wasn't cached previously and should be cached when we open the link.
499    Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 12);
500    // cached size for the region of target file shouldn't change
501    assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName()));
502    // cached size for the region with link pointing to archive dir shouldn't be updated
503    assertNull(bc.getRegionCachedInfo().get().get(testRegion.getEncodedName()));
504  }
505
506  private void readStoreFile(Path storeFilePath) throws Exception {
507    readStoreFile(storeFilePath, (r, o) -> {
508      HFileBlock block = null;
509      try {
510        block = r.readBlock(o, -1, false, true, false, true, null, null);
511      } catch (IOException e) {
512        fail(e.getMessage());
513      }
514      return block;
515    }, (key, block) -> {
516      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
517      if (
518        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
519          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
520      ) {
521        assertTrue(isCached);
522      }
523    });
524  }
525
526  private void readStoreFile(Path storeFilePath,
527    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
528    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
529    // Open the file
530    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
531
532    while (!reader.prefetchComplete()) {
533      // Sleep for a bit
534      Thread.sleep(1000);
535    }
536    long offset = 0;
537    long sizeForDataBlocks = 0;
538    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
539      HFileBlock block = readFunction.apply(reader, offset);
540      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
541      validationFunction.accept(blockCacheKey, block);
542      offset += block.getOnDiskSizeWithHeader();
543    }
544  }
545
546  private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath)
547    throws Exception {
548    // Open the file
549    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
550
551    while (!reader.prefetchComplete()) {
552      // Sleep for a bit
553      Thread.sleep(1000);
554    }
555    assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles()
556      .get().size());
557
558    return reader;
559  }
560
561  private Path writeStoreFile(String fname, int numKVs) throws IOException {
562    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
563    return writeStoreFile(fname, meta, numKVs);
564  }
565
566  private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException {
567    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
568    return writeStoreFile(meta, numKVs, regionCFDir);
569  }
570
571  private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException {
572    return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname));
573  }
574
575  private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir)
576    throws IOException {
577    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
578      .withOutputDir(regionCFDir).withFileContext(context).build();
579    Random rand = ThreadLocalRandom.current();
580    final int rowLen = 32;
581    for (int i = 0; i < numKVs; ++i) {
582      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
583      byte[] v = RandomKeyValueUtil.randomValue(rand);
584      int cfLen = rand.nextInt(k.length - rowLen + 1);
585      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
586        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
587      sfw.append(kv);
588    }
589
590    sfw.close();
591    return sfw.getPath();
592  }
593
594  public static KeyValue.Type generateKeyType(Random rand) {
595    if (rand.nextBoolean()) {
596      // Let's make half of KVs puts.
597      return KeyValue.Type.Put;
598    } else {
599      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
600      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
601        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
602          + "Probably the layout of KeyValue.Type has changed.");
603      }
604      return keyType;
605    }
606  }
607}