001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertNotEquals; 026import static org.junit.jupiter.api.Assertions.assertNotNull; 027import static org.junit.jupiter.api.Assertions.assertNull; 028import static org.junit.jupiter.api.Assertions.assertTrue; 029import static org.junit.jupiter.api.Assertions.fail; 030 031import java.io.File; 032import java.io.IOException; 033import java.util.Map; 034import java.util.Random; 035import java.util.concurrent.BlockingQueue; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.function.BiConsumer; 038import java.util.function.BiFunction; 039import org.apache.commons.lang3.mutable.MutableLong; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.Waiter; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.RegionInfoBuilder; 051import org.apache.hadoop.hbase.fs.HFileSystem; 052import org.apache.hadoop.hbase.io.ByteBuffAllocator; 053import org.apache.hadoop.hbase.io.HFileLink; 054import org.apache.hadoop.hbase.io.hfile.BlockCache; 055import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 056import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 057import org.apache.hadoop.hbase.io.hfile.BlockType; 058import org.apache.hadoop.hbase.io.hfile.CacheConfig; 059import org.apache.hadoop.hbase.io.hfile.Cacheable; 060import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 061import org.apache.hadoop.hbase.io.hfile.HFile; 062import org.apache.hadoop.hbase.io.hfile.HFileBlock; 063import org.apache.hadoop.hbase.io.hfile.HFileContext; 064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 065import org.apache.hadoop.hbase.io.hfile.HFileScanner; 066import org.apache.hadoop.hbase.io.hfile.LruBlockCache; 067import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; 068import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 069import org.apache.hadoop.hbase.regionserver.BloomType; 070import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 071import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 072import org.apache.hadoop.hbase.regionserver.HStoreFile; 073import org.apache.hadoop.hbase.regionserver.StoreContext; 074import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 075import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 076import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 077import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 078import org.apache.hadoop.hbase.testclassification.IOTests; 079import org.apache.hadoop.hbase.testclassification.MediumTests; 080import org.apache.hadoop.hbase.util.Bytes; 081import org.apache.hadoop.hbase.util.CommonFSUtils; 082import org.junit.jupiter.api.AfterEach; 083import org.junit.jupiter.api.BeforeEach; 084import org.junit.jupiter.api.Tag; 085import org.junit.jupiter.api.Test; 086import org.junit.jupiter.api.TestInfo; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 091 092@Tag(IOTests.TAG) 093@Tag(MediumTests.TAG) 094public class TestPrefetchWithBucketCache { 095 096 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class); 097 098 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 099 100 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 101 private static final int DATA_BLOCK_SIZE = 2048; 102 private Configuration conf; 103 private CacheConfig cacheConf; 104 private FileSystem fs; 105 private BlockCache blockCache; 106 107 @BeforeEach 108 public void setUp(TestInfo testInfo) throws IOException { 109 conf = TEST_UTIL.getConfiguration(); 110 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 111 fs = HFileSystem.get(conf); 112 File testDir = new File(testInfo.getTestMethod().get().getName()); 113 testDir.mkdir(); 114 conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache"); 115 } 116 117 @AfterEach 118 public void tearDown(TestInfo testInfo) { 119 File cacheFile = new File(testInfo.getTestMethod().get().getName() + "/bucket.cache"); 120 File dir = new File(testInfo.getTestMethod().get().getName()); 121 cacheFile.delete(); 122 dir.delete(); 123 } 124 125 @Test 126 public void testPrefetchDoesntOverwork() throws Exception { 127 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 128 blockCache = BlockCacheFactory.createBlockCache(conf); 129 cacheConf = new CacheConfig(conf, blockCache); 130 Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100); 131 // Prefetches the file blocks 132 LOG.debug("First read should prefetch the blocks."); 133 readStoreFile(storeFile); 134 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 135 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 136 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 137 Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap()); 138 LruBlockCache l1 = (LruBlockCache) ((CombinedBlockCache) blockCache).getFirstLevelCache(); 139 assertEquals(1, l1.getBlockCount()); 140 // Removes the meta block from L1 cache 141 l1.clearCache(); 142 // Reads file again. Checks we are not prefetching data blocks again, 143 // but fetch back the meta block 144 LOG.debug("Second read, prefetch should run, without altering bucket cache state," 145 + " only the meta block should be fetched again."); 146 readStoreFile(storeFile); 147 // Makes sure the bucketcache entries have not changed 148 snapshot.entrySet().forEach(e -> { 149 BucketEntry entry = bc.getBackingMap().get(e.getKey()); 150 assertNotNull(entry); 151 assertEquals(e.getValue().getCachedTime(), entry.getCachedTime()); 152 }); 153 assertEquals(1, l1.getBlockCount()); 154 // forcibly removes first block from the bc backing map, in order to cause it to be cached again 155 BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); 156 LOG.debug("removing block {}", key); 157 bc.getBackingMap().remove(key); 158 bc.getFullyCachedFiles().get().remove(storeFile.getName()); 159 assertTrue(snapshot.size() > bc.getBackingMap().size()); 160 LOG.debug("Third read should prefetch again, as we removed one block for the file."); 161 readStoreFile(storeFile); 162 Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size()); 163 assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); 164 } 165 166 @Test 167 public void testPrefetchRefsAfterSplit() throws Exception { 168 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 169 blockCache = BlockCacheFactory.createBlockCache(conf); 170 cacheConf = new CacheConfig(conf, blockCache); 171 172 Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit"); 173 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 174 Path regionDir = new Path(tableDir, region.getEncodedName()); 175 Path cfDir = new Path(regionDir, "cf"); 176 HRegionFileSystem regionFS = 177 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); 178 Path storeFile = writeStoreFile(100, cfDir); 179 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true, 180 StoreContext.getBuilder().withRegionFileSystem(regionFS).withFamilyStoreDirectoryPath(cfDir) 181 .withCacheConfig(cacheConf).build()); 182 // Prefetches the file blocks 183 LOG.debug("First read should prefetch the blocks."); 184 readStoreFile(storeFile); 185 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 186 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 187 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 188 189 // split the file and return references to the original file 190 Random rand = ThreadLocalRandom.current(); 191 byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50); 192 HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft); 193 Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false, 194 new ConstantSizeRegionSplitPolicy(), sft).getPath(); 195 HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft); 196 // starts reader for the ref. The ref should resolve to the original file blocks 197 // and not duplicate blocks in the cache. 198 refHsf.initReader(); 199 HFile.Reader reader = refHsf.getReader().getHFileReader(); 200 while (!reader.prefetchComplete()) { 201 // Sleep for a bit 202 Thread.sleep(1000); 203 } 204 // the ref file blocks keys should actually resolve to the referred file blocks, 205 // so we should not see additional blocks in the cache. 206 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 207 208 BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0); 209 Cacheable result = bc.getBlock(refCacheKey, true, false, true); 210 assertNotNull(result); 211 BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0); 212 assertEquals(result, bc.getBlock(fileCacheKey, true, false, true)); 213 assertNull(bc.getBackingMap().get(refCacheKey)); 214 assertNotNull(bc.getBlockForReference(refCacheKey)); 215 } 216 217 @Test 218 public void testPrefetchInterruptOnCapacity() throws Exception { 219 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 220 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 221 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 222 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 223 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 224 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 225 blockCache = BlockCacheFactory.createBlockCache(conf); 226 cacheConf = new CacheConfig(conf, blockCache); 227 Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000); 228 // Prefetches the file blocks 229 LOG.debug("First read should prefetch the blocks."); 230 createReaderAndWaitForPrefetchInterruption(storeFile); 231 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 232 () -> PrefetchExecutor.isCompleted(storeFile)); 233 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 234 long evictedFirstPrefetch = bc.getStats().getEvictedCount(); 235 HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile); 236 assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount()); 237 HFileScanner scanner = reader.getScanner(conf, true, true); 238 scanner.seekTo(); 239 while (scanner.next()) { 240 // do a full scan to force some evictions 241 LOG.trace("Iterating the full scan to evict some blocks"); 242 } 243 scanner.close(); 244 Waiter.waitFor(conf, 5000, () -> { 245 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 246 if (!queue.isEmpty()) { 247 return false; 248 } 249 } 250 return true; 251 }); 252 // The scanner should had triggered at least 3x evictions from the prefetch, 253 // as we try cache each block without interruption. 254 assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch); 255 } 256 257 @Test 258 public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception { 259 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 260 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 261 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 262 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 263 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 264 blockCache = BlockCacheFactory.createBlockCache(conf); 265 ColumnFamilyDescriptor family = 266 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build(); 267 cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP); 268 Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000); 269 // Prefetches the file blocks 270 LOG.debug("First read should prefetch the blocks."); 271 createReaderAndWaitForPrefetchInterruption(storeFile); 272 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 273 Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile)); 274 long evictions = bc.getStats().getEvictedCount(); 275 LOG.debug("Total evicted at this point: {}", evictions); 276 // creates another reader, now that cache is full, no block would fit and prefetch should not 277 // trigger any new evictions 278 createReaderAndWaitForPrefetchInterruption(storeFile); 279 assertEquals(evictions, bc.getStats().getEvictedCount()); 280 } 281 282 @Test 283 public void testPrefetchRunNoEvictions() throws Exception { 284 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 285 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 286 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 287 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 288 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 289 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 290 blockCache = BlockCacheFactory.createBlockCache(conf); 291 cacheConf = new CacheConfig(conf, blockCache); 292 Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000); 293 // Prefetches the file blocks 294 createReaderAndWaitForPrefetchInterruption(storeFile); 295 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 296 () -> PrefetchExecutor.isCompleted(storeFile)); 297 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 298 // Wait until all cache writer queues are empty 299 Waiter.waitFor(conf, 5000, () -> { 300 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 301 if (!queue.isEmpty()) { 302 return false; 303 } 304 } 305 return true; 306 }); 307 // With the wait time configuration, prefetch should trigger no evictions once it reaches 308 // cache capacity 309 assertEquals(0, bc.getStats().getEvictedCount()); 310 } 311 312 @Test 313 public void testPrefetchRunTriggersEvictions() throws Exception { 314 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 315 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 316 // Use full capacity as the "acceptable" size so prefetch does not stop at ~98% total usage 317 // (via blockFitsIntoTheCache) before the cache writer path must run freeSpace/evictions. 318 conf.setDouble("hbase.bucketcache.acceptfactor", 1.0); 319 conf.setDouble("hbase.bucketcache.minfactor", 1.0); 320 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 321 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0); 322 // Ensures no prefetch interruption due to heap usage in the event of freeMemory() returning 0. 323 conf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, Double.MAX_VALUE); 324 blockCache = BlockCacheFactory.createBlockCache(conf); 325 cacheConf = new CacheConfig(conf, blockCache); 326 Path storeFile = writeStoreFile("testPrefetchRunTriggersEvictions", 10000); 327 // Prefetches the file blocks 328 createReaderAndWaitForPrefetchInterruption(storeFile); 329 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 330 () -> PrefetchExecutor.isCompleted(storeFile)); 331 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 332 // Wait until all cache writer queues are empty 333 Waiter.waitFor(conf, 5000, () -> { 334 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 335 if (!queue.isEmpty()) { 336 return false; 337 } 338 } 339 return true; 340 }); 341 if (bc.getStats().getFailedInserts() == 0) { 342 // With no wait time configuration, prefetch should trigger evictions once it reaches 343 // cache capacity. Writer threads can finish draining slightly after queue.offer returns. 344 Waiter.waitFor(conf, 15000, () -> bc.getStats().getEvictedCount() > 0); 345 assertNotEquals(0, bc.getStats().getEvictedCount()); 346 } else { 347 LOG.info("We had {} cache insert failures, which may cause cache usage " 348 + "to never reach capacity.", bc.getStats().getFailedInserts()); 349 } 350 } 351 352 @Test 353 public void testPrefetchMetricProgress() throws Exception { 354 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 355 blockCache = BlockCacheFactory.createBlockCache(conf); 356 cacheConf = new CacheConfig(conf, blockCache); 357 Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100); 358 // Prefetches the file blocks 359 LOG.debug("First read should prefetch the blocks."); 360 readStoreFile(storeFile); 361 String regionName = storeFile.getParent().getParent().getName(); 362 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 363 MutableLong regionCachedSize = new MutableLong(0); 364 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 365 Waiter.waitFor(conf, 300, () -> { 366 if (bc.getBackingMap().size() > 0) { 367 long currentSize = bc.getRegionCachedInfo().get().get(regionName); 368 assertTrue(regionCachedSize.getValue() <= currentSize); 369 LOG.debug("Logging progress of region caching: {}", currentSize); 370 regionCachedSize.setValue(currentSize); 371 } 372 return bc.getBackingMap().size() == 6; 373 }); 374 } 375 376 @Test 377 public void testPrefetchMetricProgressForLinks(TestInfo testInfo) throws Exception { 378 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 379 blockCache = BlockCacheFactory.createBlockCache(conf); 380 cacheConf = new CacheConfig(conf, blockCache); 381 final RegionInfo hri = RegionInfoBuilder 382 .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build(); 383 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 384 Configuration testConf = new Configuration(this.conf); 385 Path testDir = TEST_UTIL.getDataTestDir(testInfo.getTestMethod().get().getName()); 386 CommonFSUtils.setRootDir(testConf, testDir); 387 Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable()); 388 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 389 Path regionDir = new Path(tableDir, region.getEncodedName()); 390 Path cfDir = new Path(regionDir, "cf"); 391 HRegionFileSystem regionFS = 392 HRegionFileSystem.createRegionOnFileSystem(testConf, fs, tableDir, region); 393 Path storeFile = writeStoreFile(100, cfDir); 394 // Prefetches the file blocks 395 LOG.debug("First read should prefetch the blocks."); 396 readStoreFile(storeFile); 397 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 398 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 399 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6); 400 long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName()); 401 402 final RegionInfo dstHri = RegionInfoBuilder 403 .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build(); 404 HRegionFileSystem dstRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 405 CommonFSUtils.getTableDir(testDir, dstHri.getTable()), dstHri); 406 407 Path dstPath = new Path(regionFS.getTableDir(), new Path(dstHri.getRegionNameAsString(), "cf")); 408 409 Path linkFilePath = 410 new Path(dstPath, HFileLink.createHFileLinkName(region, storeFile.getName())); 411 412 StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false, 413 StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath) 414 .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf")) 415 .withRegionFileSystem(dstRegionFs).build()); 416 sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true); 417 StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true); 418 419 HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf); 420 assertTrue(sfi.isLink()); 421 hsf.initReader(); 422 HFile.Reader reader = hsf.getReader().getHFileReader(); 423 while (!reader.prefetchComplete()) { 424 // Sleep for a bit 425 Thread.sleep(1000); 426 } 427 // HFileLink use the path of the target file to create a reader, so it should resolve to the 428 // already cached blocks and not insert new blocks in the cache. 429 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6); 430 431 assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName())); 432 } 433 434 @Test 435 public void testPrefetchMetricProgressForLinksToArchived(TestInfo testInfo) throws Exception { 436 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 437 blockCache = BlockCacheFactory.createBlockCache(conf); 438 cacheConf = new CacheConfig(conf, blockCache); 439 440 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 441 Configuration testConf = new Configuration(this.conf); 442 Path testDir = TEST_UTIL.getDataTestDir(testInfo.getTestMethod().get().getName()); 443 CommonFSUtils.setRootDir(testConf, testDir); 444 445 final RegionInfo hri = RegionInfoBuilder 446 .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build(); 447 Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable()); 448 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 449 Path regionDir = new Path(tableDir, region.getEncodedName()); 450 Path cfDir = new Path(regionDir, "cf"); 451 452 Path storeFile = writeStoreFile(100, cfDir); 453 // Prefetches the file blocks 454 LOG.debug("First read should prefetch the blocks."); 455 readStoreFile(storeFile); 456 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 457 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 458 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6); 459 long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName()); 460 461 // create another file, but in the archive dir, hence it won't be cached 462 Path archiveRoot = new Path(testDir, "archive"); 463 Path archiveTableDir = CommonFSUtils.getTableDir(archiveRoot, hri.getTable()); 464 Path archiveRegionDir = new Path(archiveTableDir, region.getEncodedName()); 465 Path archiveCfDir = new Path(archiveRegionDir, "cf"); 466 Path archivedFile = writeStoreFile(100, archiveCfDir); 467 468 final RegionInfo testRegion = 469 RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 470 final HRegionFileSystem testRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 471 CommonFSUtils.getTableDir(testDir, testRegion.getTable()), testRegion); 472 // Just create a link to the archived file 473 Path dstPath = new Path(tableDir, new Path(testRegion.getEncodedName(), "cf")); 474 475 Path linkFilePath = 476 new Path(dstPath, HFileLink.createHFileLinkName(region, archivedFile.getName())); 477 478 StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false, 479 StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath) 480 .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf")) 481 .withRegionFileSystem(testRegionFs).build()); 482 sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true); 483 StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true); 484 485 HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf); 486 assertTrue(sfi.isLink()); 487 hsf.initReader(); 488 HFile.Reader reader = hsf.getReader().getHFileReader(); 489 while (!reader.prefetchComplete()) { 490 // Sleep for a bit 491 Thread.sleep(1000); 492 } 493 // HFileLink use the path of the target file to create a reader, but the target file is in the 494 // archive, so it wasn't cached previously and should be cached when we open the link. 495 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 12); 496 // cached size for the region of target file shouldn't change 497 assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName())); 498 // cached size for the region with link pointing to archive dir shouldn't be updated 499 assertNull(bc.getRegionCachedInfo().get().get(testRegion.getEncodedName())); 500 } 501 502 private void readStoreFile(Path storeFilePath) throws Exception { 503 readStoreFile(storeFilePath, (r, o) -> { 504 HFileBlock block = null; 505 try { 506 block = r.readBlock(o, -1, false, true, false, true, null, null); 507 } catch (IOException e) { 508 fail(e.getMessage()); 509 } 510 return block; 511 }, (key, block) -> { 512 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 513 if ( 514 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 515 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 516 ) { 517 assertTrue(isCached); 518 } 519 }); 520 } 521 522 private void readStoreFile(Path storeFilePath, 523 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 524 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 525 // Open the file 526 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 527 528 while (!reader.prefetchComplete()) { 529 // Sleep for a bit 530 Thread.sleep(1000); 531 } 532 long offset = 0; 533 long sizeForDataBlocks = 0; 534 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 535 HFileBlock block = readFunction.apply(reader, offset); 536 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 537 validationFunction.accept(blockCacheKey, block); 538 offset += block.getOnDiskSizeWithHeader(); 539 } 540 } 541 542 private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath) 543 throws Exception { 544 // Open the file 545 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 546 547 while (!reader.prefetchComplete()) { 548 // Sleep for a bit 549 Thread.sleep(1000); 550 } 551 assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles() 552 .get().size()); 553 554 return reader; 555 } 556 557 private Path writeStoreFile(String fname, int numKVs) throws IOException { 558 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 559 return writeStoreFile(fname, meta, numKVs); 560 } 561 562 private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException { 563 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 564 return writeStoreFile(meta, numKVs, regionCFDir); 565 } 566 567 private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException { 568 return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname)); 569 } 570 571 private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir) 572 throws IOException { 573 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 574 .withOutputDir(regionCFDir).withFileContext(context).build(); 575 Random rand = ThreadLocalRandom.current(); 576 final int rowLen = 32; 577 for (int i = 0; i < numKVs; ++i) { 578 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 579 byte[] v = RandomKeyValueUtil.randomValue(rand); 580 int cfLen = rand.nextInt(k.length - rowLen + 1); 581 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 582 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 583 sfw.append(kv); 584 } 585 586 sfw.close(); 587 return sfw.getPath(); 588 } 589 590 public static KeyValue.Type generateKeyType(Random rand) { 591 if (rand.nextBoolean()) { 592 // Let's make half of KVs puts. 593 return KeyValue.Type.Put; 594 } else { 595 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 596 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 597 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 598 + "Probably the layout of KeyValue.Type has changed."); 599 } 600 return keyType; 601 } 602 } 603}