001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertNotEquals; 026import static org.junit.jupiter.api.Assertions.assertNotNull; 027import static org.junit.jupiter.api.Assertions.assertNull; 028import static org.junit.jupiter.api.Assertions.assertTrue; 029import static org.junit.jupiter.api.Assertions.fail; 030 031import java.io.File; 032import java.io.IOException; 033import java.util.Map; 034import java.util.Random; 035import java.util.concurrent.BlockingQueue; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.function.BiConsumer; 038import java.util.function.BiFunction; 039import org.apache.commons.lang3.mutable.MutableLong; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.Waiter; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.RegionInfoBuilder; 051import org.apache.hadoop.hbase.fs.HFileSystem; 052import org.apache.hadoop.hbase.io.ByteBuffAllocator; 053import org.apache.hadoop.hbase.io.HFileLink; 054import org.apache.hadoop.hbase.io.hfile.BlockCache; 055import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 056import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 057import org.apache.hadoop.hbase.io.hfile.BlockType; 058import org.apache.hadoop.hbase.io.hfile.CacheConfig; 059import org.apache.hadoop.hbase.io.hfile.Cacheable; 060import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 061import org.apache.hadoop.hbase.io.hfile.HFile; 062import org.apache.hadoop.hbase.io.hfile.HFileBlock; 063import org.apache.hadoop.hbase.io.hfile.HFileContext; 064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 065import org.apache.hadoop.hbase.io.hfile.HFileScanner; 066import org.apache.hadoop.hbase.io.hfile.LruBlockCache; 067import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; 068import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 069import org.apache.hadoop.hbase.regionserver.BloomType; 070import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 071import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 072import org.apache.hadoop.hbase.regionserver.HStoreFile; 073import org.apache.hadoop.hbase.regionserver.StoreContext; 074import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 075import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 076import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 077import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 078import org.apache.hadoop.hbase.testclassification.IOTests; 079import org.apache.hadoop.hbase.testclassification.MediumTests; 080import org.apache.hadoop.hbase.util.Bytes; 081import org.apache.hadoop.hbase.util.CommonFSUtils; 082import org.junit.jupiter.api.AfterEach; 083import org.junit.jupiter.api.BeforeEach; 084import org.junit.jupiter.api.Tag; 085import org.junit.jupiter.api.Test; 086import org.junit.jupiter.api.TestInfo; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 091 092@Tag(IOTests.TAG) 093@Tag(MediumTests.TAG) 094public class TestPrefetchWithBucketCache { 095 096 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class); 097 098 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 099 100 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 101 private static final int DATA_BLOCK_SIZE = 2048; 102 private Configuration conf; 103 private CacheConfig cacheConf; 104 private FileSystem fs; 105 private BlockCache blockCache; 106 107 @BeforeEach 108 public void setUp(TestInfo testInfo) throws IOException { 109 conf = TEST_UTIL.getConfiguration(); 110 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 111 fs = HFileSystem.get(conf); 112 File testDir = new File(testInfo.getTestMethod().get().getName()); 113 testDir.mkdir(); 114 conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache"); 115 } 116 117 @AfterEach 118 public void tearDown(TestInfo testInfo) { 119 File cacheFile = new File(testInfo.getTestMethod().get().getName() + "/bucket.cache"); 120 File dir = new File(testInfo.getTestMethod().get().getName()); 121 cacheFile.delete(); 122 dir.delete(); 123 } 124 125 @Test 126 public void testPrefetchDoesntOverwork() throws Exception { 127 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 128 blockCache = BlockCacheFactory.createBlockCache(conf); 129 cacheConf = new CacheConfig(conf, blockCache); 130 Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100); 131 // Prefetches the file blocks 132 LOG.debug("First read should prefetch the blocks."); 133 readStoreFile(storeFile); 134 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 135 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 136 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 137 Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap()); 138 LruBlockCache l1 = (LruBlockCache) ((CombinedBlockCache) blockCache).getFirstLevelCache(); 139 assertEquals(1, l1.getBlockCount()); 140 // Removes the meta block from L1 cache 141 l1.clearCache(); 142 // Reads file again. Checks we are not prefetching data blocks again, 143 // but fetch back the meta block 144 LOG.debug("Second read, prefetch should run, without altering bucket cache state," 145 + " only the meta block should be fetched again."); 146 readStoreFile(storeFile); 147 // Makes sure the bucketcache entries have not changed 148 snapshot.entrySet().forEach(e -> { 149 BucketEntry entry = bc.getBackingMap().get(e.getKey()); 150 assertNotNull(entry); 151 assertEquals(e.getValue().getCachedTime(), entry.getCachedTime()); 152 }); 153 assertEquals(1, l1.getBlockCount()); 154 // forcibly removes first block from the bc backing map, in order to cause it to be cached again 155 BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); 156 LOG.debug("removing block {}", key); 157 bc.getBackingMap().remove(key); 158 bc.getFullyCachedFiles().get().remove(storeFile.getName()); 159 assertTrue(snapshot.size() > bc.getBackingMap().size()); 160 LOG.debug("Third read should prefetch again, as we removed one block for the file."); 161 readStoreFile(storeFile); 162 Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size()); 163 assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); 164 } 165 166 @Test 167 public void testPrefetchRefsAfterSplit() throws Exception { 168 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 169 blockCache = BlockCacheFactory.createBlockCache(conf); 170 cacheConf = new CacheConfig(conf, blockCache); 171 172 Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit"); 173 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 174 Path regionDir = new Path(tableDir, region.getEncodedName()); 175 Path cfDir = new Path(regionDir, "cf"); 176 HRegionFileSystem regionFS = 177 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); 178 Path storeFile = writeStoreFile(100, cfDir); 179 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true, 180 StoreContext.getBuilder().withRegionFileSystem(regionFS).withFamilyStoreDirectoryPath(cfDir) 181 .withCacheConfig(cacheConf).build()); 182 // Prefetches the file blocks 183 LOG.debug("First read should prefetch the blocks."); 184 readStoreFile(storeFile); 185 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 186 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 187 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 188 189 // split the file and return references to the original file 190 Random rand = ThreadLocalRandom.current(); 191 byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50); 192 HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft); 193 Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false, 194 new ConstantSizeRegionSplitPolicy(), sft).getPath(); 195 HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft); 196 // starts reader for the ref. The ref should resolve to the original file blocks 197 // and not duplicate blocks in the cache. 198 refHsf.initReader(); 199 HFile.Reader reader = refHsf.getReader().getHFileReader(); 200 while (!reader.prefetchComplete()) { 201 // Sleep for a bit 202 Thread.sleep(1000); 203 } 204 // the ref file blocks keys should actually resolve to the referred file blocks, 205 // so we should not see additional blocks in the cache. 206 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 207 208 BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0); 209 Cacheable result = bc.getBlock(refCacheKey, true, false, true); 210 assertNotNull(result); 211 BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0); 212 assertEquals(result, bc.getBlock(fileCacheKey, true, false, true)); 213 assertNull(bc.getBackingMap().get(refCacheKey)); 214 assertNotNull(bc.getBlockForReference(refCacheKey)); 215 } 216 217 @Test 218 public void testPrefetchInterruptOnCapacity() throws Exception { 219 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 220 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 221 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 222 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 223 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 224 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 225 blockCache = BlockCacheFactory.createBlockCache(conf); 226 cacheConf = new CacheConfig(conf, blockCache); 227 Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000); 228 // Prefetches the file blocks 229 LOG.debug("First read should prefetch the blocks."); 230 createReaderAndWaitForPrefetchInterruption(storeFile); 231 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 232 () -> PrefetchExecutor.isCompleted(storeFile)); 233 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 234 long evictedFirstPrefetch = bc.getStats().getEvictedCount(); 235 HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile); 236 assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount()); 237 HFileScanner scanner = reader.getScanner(conf, true, true); 238 scanner.seekTo(); 239 while (scanner.next()) { 240 // do a full scan to force some evictions 241 LOG.trace("Iterating the full scan to evict some blocks"); 242 } 243 scanner.close(); 244 Waiter.waitFor(conf, 5000, () -> { 245 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 246 if (!queue.isEmpty()) { 247 return false; 248 } 249 } 250 return true; 251 }); 252 // The scanner should had triggered at least 3x evictions from the prefetch, 253 // as we try cache each block without interruption. 254 assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch); 255 } 256 257 @Test 258 public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception { 259 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 260 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 261 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 262 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 263 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 264 blockCache = BlockCacheFactory.createBlockCache(conf); 265 ColumnFamilyDescriptor family = 266 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build(); 267 cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP); 268 Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000); 269 // Prefetches the file blocks 270 LOG.debug("First read should prefetch the blocks."); 271 createReaderAndWaitForPrefetchInterruption(storeFile); 272 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 273 Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile)); 274 long evictions = bc.getStats().getEvictedCount(); 275 LOG.debug("Total evicted at this point: {}", evictions); 276 // creates another reader, now that cache is full, no block would fit and prefetch should not 277 // trigger any new evictions 278 createReaderAndWaitForPrefetchInterruption(storeFile); 279 assertEquals(evictions, bc.getStats().getEvictedCount()); 280 } 281 282 @Test 283 public void testPrefetchRunNoEvictions() throws Exception { 284 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 285 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 286 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 287 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 288 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 289 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 290 blockCache = BlockCacheFactory.createBlockCache(conf); 291 cacheConf = new CacheConfig(conf, blockCache); 292 Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000); 293 // Prefetches the file blocks 294 createReaderAndWaitForPrefetchInterruption(storeFile); 295 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 296 () -> PrefetchExecutor.isCompleted(storeFile)); 297 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 298 // Wait until all cache writer queues are empty 299 Waiter.waitFor(conf, 5000, () -> { 300 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 301 if (!queue.isEmpty()) { 302 return false; 303 } 304 } 305 return true; 306 }); 307 // With the wait time configuration, prefetch should trigger no evictions once it reaches 308 // cache capacity 309 assertEquals(0, bc.getStats().getEvictedCount()); 310 } 311 312 @Test 313 public void testPrefetchRunTriggersEvictions() throws Exception { 314 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 315 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 316 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 317 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 318 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 319 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0); 320 blockCache = BlockCacheFactory.createBlockCache(conf); 321 cacheConf = new CacheConfig(conf, blockCache); 322 Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000); 323 // Prefetches the file blocks 324 createReaderAndWaitForPrefetchInterruption(storeFile); 325 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 326 () -> PrefetchExecutor.isCompleted(storeFile)); 327 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 328 // Wait until all cache writer queues are empty 329 Waiter.waitFor(conf, 5000, () -> { 330 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 331 if (!queue.isEmpty()) { 332 return false; 333 } 334 } 335 return true; 336 }); 337 if (bc.getStats().getFailedInserts() == 0) { 338 // With no wait time configuration, prefetch should trigger evictions once it reaches 339 // cache capacity 340 assertNotEquals(0, bc.getStats().getEvictedCount()); 341 } else { 342 LOG.info("We had {} cache insert failures, which may cause cache usage " 343 + "to never reach capacity.", bc.getStats().getFailedInserts()); 344 } 345 } 346 347 @Test 348 public void testPrefetchMetricProgress() throws Exception { 349 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 350 blockCache = BlockCacheFactory.createBlockCache(conf); 351 cacheConf = new CacheConfig(conf, blockCache); 352 Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100); 353 // Prefetches the file blocks 354 LOG.debug("First read should prefetch the blocks."); 355 readStoreFile(storeFile); 356 String regionName = storeFile.getParent().getParent().getName(); 357 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 358 MutableLong regionCachedSize = new MutableLong(0); 359 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 360 Waiter.waitFor(conf, 300, () -> { 361 if (bc.getBackingMap().size() > 0) { 362 long currentSize = bc.getRegionCachedInfo().get().get(regionName); 363 assertTrue(regionCachedSize.getValue() <= currentSize); 364 LOG.debug("Logging progress of region caching: {}", currentSize); 365 regionCachedSize.setValue(currentSize); 366 } 367 return bc.getBackingMap().size() == 6; 368 }); 369 } 370 371 @Test 372 public void testPrefetchMetricProgressForLinks(TestInfo testInfo) throws Exception { 373 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 374 blockCache = BlockCacheFactory.createBlockCache(conf); 375 cacheConf = new CacheConfig(conf, blockCache); 376 final RegionInfo hri = RegionInfoBuilder 377 .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build(); 378 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 379 Configuration testConf = new Configuration(this.conf); 380 Path testDir = TEST_UTIL.getDataTestDir(testInfo.getTestMethod().get().getName()); 381 CommonFSUtils.setRootDir(testConf, testDir); 382 Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable()); 383 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 384 Path regionDir = new Path(tableDir, region.getEncodedName()); 385 Path cfDir = new Path(regionDir, "cf"); 386 HRegionFileSystem regionFS = 387 HRegionFileSystem.createRegionOnFileSystem(testConf, fs, tableDir, region); 388 Path storeFile = writeStoreFile(100, cfDir); 389 // Prefetches the file blocks 390 LOG.debug("First read should prefetch the blocks."); 391 readStoreFile(storeFile); 392 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 393 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 394 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6); 395 long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName()); 396 397 final RegionInfo dstHri = RegionInfoBuilder 398 .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build(); 399 HRegionFileSystem dstRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 400 CommonFSUtils.getTableDir(testDir, dstHri.getTable()), dstHri); 401 402 Path dstPath = new Path(regionFS.getTableDir(), new Path(dstHri.getRegionNameAsString(), "cf")); 403 404 Path linkFilePath = 405 new Path(dstPath, HFileLink.createHFileLinkName(region, storeFile.getName())); 406 407 StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false, 408 StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath) 409 .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf")) 410 .withRegionFileSystem(dstRegionFs).build()); 411 sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true); 412 StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true); 413 414 HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf); 415 assertTrue(sfi.isLink()); 416 hsf.initReader(); 417 HFile.Reader reader = hsf.getReader().getHFileReader(); 418 while (!reader.prefetchComplete()) { 419 // Sleep for a bit 420 Thread.sleep(1000); 421 } 422 // HFileLink use the path of the target file to create a reader, so it should resolve to the 423 // already cached blocks and not insert new blocks in the cache. 424 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6); 425 426 assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName())); 427 } 428 429 @Test 430 public void testPrefetchMetricProgressForLinksToArchived(TestInfo testInfo) throws Exception { 431 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 432 blockCache = BlockCacheFactory.createBlockCache(conf); 433 cacheConf = new CacheConfig(conf, blockCache); 434 435 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 436 Configuration testConf = new Configuration(this.conf); 437 Path testDir = TEST_UTIL.getDataTestDir(testInfo.getTestMethod().get().getName()); 438 CommonFSUtils.setRootDir(testConf, testDir); 439 440 final RegionInfo hri = RegionInfoBuilder 441 .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName())).build(); 442 Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable()); 443 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 444 Path regionDir = new Path(tableDir, region.getEncodedName()); 445 Path cfDir = new Path(regionDir, "cf"); 446 447 Path storeFile = writeStoreFile(100, cfDir); 448 // Prefetches the file blocks 449 LOG.debug("First read should prefetch the blocks."); 450 readStoreFile(storeFile); 451 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 452 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 453 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6); 454 long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName()); 455 456 // create another file, but in the archive dir, hence it won't be cached 457 Path archiveRoot = new Path(testDir, "archive"); 458 Path archiveTableDir = CommonFSUtils.getTableDir(archiveRoot, hri.getTable()); 459 Path archiveRegionDir = new Path(archiveTableDir, region.getEncodedName()); 460 Path archiveCfDir = new Path(archiveRegionDir, "cf"); 461 Path archivedFile = writeStoreFile(100, archiveCfDir); 462 463 final RegionInfo testRegion = 464 RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 465 final HRegionFileSystem testRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 466 CommonFSUtils.getTableDir(testDir, testRegion.getTable()), testRegion); 467 // Just create a link to the archived file 468 Path dstPath = new Path(tableDir, new Path(testRegion.getEncodedName(), "cf")); 469 470 Path linkFilePath = 471 new Path(dstPath, HFileLink.createHFileLinkName(region, archivedFile.getName())); 472 473 StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false, 474 StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath) 475 .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf")) 476 .withRegionFileSystem(testRegionFs).build()); 477 sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true); 478 StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true); 479 480 HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf); 481 assertTrue(sfi.isLink()); 482 hsf.initReader(); 483 HFile.Reader reader = hsf.getReader().getHFileReader(); 484 while (!reader.prefetchComplete()) { 485 // Sleep for a bit 486 Thread.sleep(1000); 487 } 488 // HFileLink use the path of the target file to create a reader, but the target file is in the 489 // archive, so it wasn't cached previously and should be cached when we open the link. 490 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 12); 491 // cached size for the region of target file shouldn't change 492 assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName())); 493 // cached size for the region with link pointing to archive dir shouldn't be updated 494 assertNull(bc.getRegionCachedInfo().get().get(testRegion.getEncodedName())); 495 } 496 497 private void readStoreFile(Path storeFilePath) throws Exception { 498 readStoreFile(storeFilePath, (r, o) -> { 499 HFileBlock block = null; 500 try { 501 block = r.readBlock(o, -1, false, true, false, true, null, null); 502 } catch (IOException e) { 503 fail(e.getMessage()); 504 } 505 return block; 506 }, (key, block) -> { 507 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 508 if ( 509 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 510 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 511 ) { 512 assertTrue(isCached); 513 } 514 }); 515 } 516 517 private void readStoreFile(Path storeFilePath, 518 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 519 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 520 // Open the file 521 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 522 523 while (!reader.prefetchComplete()) { 524 // Sleep for a bit 525 Thread.sleep(1000); 526 } 527 long offset = 0; 528 long sizeForDataBlocks = 0; 529 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 530 HFileBlock block = readFunction.apply(reader, offset); 531 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 532 validationFunction.accept(blockCacheKey, block); 533 offset += block.getOnDiskSizeWithHeader(); 534 } 535 } 536 537 private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath) 538 throws Exception { 539 // Open the file 540 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 541 542 while (!reader.prefetchComplete()) { 543 // Sleep for a bit 544 Thread.sleep(1000); 545 } 546 assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles() 547 .get().size()); 548 549 return reader; 550 } 551 552 private Path writeStoreFile(String fname, int numKVs) throws IOException { 553 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 554 return writeStoreFile(fname, meta, numKVs); 555 } 556 557 private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException { 558 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 559 return writeStoreFile(meta, numKVs, regionCFDir); 560 } 561 562 private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException { 563 return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname)); 564 } 565 566 private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir) 567 throws IOException { 568 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 569 .withOutputDir(regionCFDir).withFileContext(context).build(); 570 Random rand = ThreadLocalRandom.current(); 571 final int rowLen = 32; 572 for (int i = 0; i < numKVs; ++i) { 573 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 574 byte[] v = RandomKeyValueUtil.randomValue(rand); 575 int cfLen = rand.nextInt(k.length - rowLen + 1); 576 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 577 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 578 sfw.append(kv); 579 } 580 581 sfw.close(); 582 return sfw.getPath(); 583 } 584 585 public static KeyValue.Type generateKeyType(Random rand) { 586 if (rand.nextBoolean()) { 587 // Let's make half of KVs puts. 588 return KeyValue.Type.Put; 589 } else { 590 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 591 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 592 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 593 + "Probably the layout of KeyValue.Type has changed."); 594 } 595 return keyType; 596 } 597 } 598}