001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertNotEquals; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.File; 032import java.io.IOException; 033import java.util.Map; 034import java.util.Random; 035import java.util.concurrent.BlockingQueue; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.function.BiConsumer; 038import java.util.function.BiFunction; 039import org.apache.commons.lang3.mutable.MutableLong; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.Waiter; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.RegionInfo; 051import org.apache.hadoop.hbase.client.RegionInfoBuilder; 052import org.apache.hadoop.hbase.fs.HFileSystem; 053import org.apache.hadoop.hbase.io.ByteBuffAllocator; 054import org.apache.hadoop.hbase.io.HFileLink; 055import org.apache.hadoop.hbase.io.hfile.BlockCache; 056import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 057import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 058import org.apache.hadoop.hbase.io.hfile.BlockType; 059import org.apache.hadoop.hbase.io.hfile.CacheConfig; 060import org.apache.hadoop.hbase.io.hfile.Cacheable; 061import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 062import org.apache.hadoop.hbase.io.hfile.HFile; 063import org.apache.hadoop.hbase.io.hfile.HFileBlock; 064import org.apache.hadoop.hbase.io.hfile.HFileContext; 065import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 066import org.apache.hadoop.hbase.io.hfile.HFileScanner; 067import org.apache.hadoop.hbase.io.hfile.LruBlockCache; 068import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; 069import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 070import org.apache.hadoop.hbase.regionserver.BloomType; 071import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 072import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 073import org.apache.hadoop.hbase.regionserver.HStoreFile; 074import org.apache.hadoop.hbase.regionserver.StoreContext; 075import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 076import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 077import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 078import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 079import org.apache.hadoop.hbase.testclassification.IOTests; 080import org.apache.hadoop.hbase.testclassification.MediumTests; 081import org.apache.hadoop.hbase.util.Bytes; 082import org.apache.hadoop.hbase.util.CommonFSUtils; 083import org.junit.After; 084import org.junit.Before; 085import org.junit.ClassRule; 086import org.junit.Rule; 087import org.junit.Test; 088import org.junit.experimental.categories.Category; 089import org.junit.rules.TestName; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 094 095@Category({ IOTests.class, MediumTests.class }) 096public class TestPrefetchWithBucketCache { 097 098 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class); 099 100 @ClassRule 101 public static final HBaseClassTestRule CLASS_RULE = 102 HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class); 103 104 @Rule 105 public TestName name = new TestName(); 106 107 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 108 109 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 110 private static final int DATA_BLOCK_SIZE = 2048; 111 private Configuration conf; 112 private CacheConfig cacheConf; 113 private FileSystem fs; 114 private BlockCache blockCache; 115 116 @Before 117 public void setUp() throws IOException { 118 conf = TEST_UTIL.getConfiguration(); 119 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 120 fs = HFileSystem.get(conf); 121 File testDir = new File(name.getMethodName()); 122 testDir.mkdir(); 123 conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache"); 124 } 125 126 @After 127 public void tearDown() { 128 File cacheFile = new File(name.getMethodName() + "/bucket.cache"); 129 File dir = new File(name.getMethodName()); 130 cacheFile.delete(); 131 dir.delete(); 132 } 133 134 @Test 135 public void testPrefetchDoesntOverwork() throws Exception { 136 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 137 blockCache = BlockCacheFactory.createBlockCache(conf); 138 cacheConf = new CacheConfig(conf, blockCache); 139 Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100); 140 // Prefetches the file blocks 141 LOG.debug("First read should prefetch the blocks."); 142 readStoreFile(storeFile); 143 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 144 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 145 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 146 Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap()); 147 LruBlockCache l1 = (LruBlockCache) ((CombinedBlockCache) blockCache).getFirstLevelCache(); 148 assertEquals(1, l1.getBlockCount()); 149 // Removes the meta block from L1 cache 150 l1.clearCache(); 151 // Reads file again. Checks we are not prefetching data blocks again, 152 // but fetch back the meta block 153 LOG.debug("Second read, prefetch should run, without altering bucket cache state," 154 + " only the meta block should be fetched again."); 155 readStoreFile(storeFile); 156 // Makes sure the bucketcache entries have not changed 157 snapshot.entrySet().forEach(e -> { 158 BucketEntry entry = bc.getBackingMap().get(e.getKey()); 159 assertNotNull(entry); 160 assertEquals(e.getValue().getCachedTime(), entry.getCachedTime()); 161 }); 162 assertEquals(1, l1.getBlockCount()); 163 // forcibly removes first block from the bc backing map, in order to cause it to be cached again 164 BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); 165 LOG.debug("removing block {}", key); 166 bc.getBackingMap().remove(key); 167 bc.getFullyCachedFiles().get().remove(storeFile.getName()); 168 assertTrue(snapshot.size() > bc.getBackingMap().size()); 169 LOG.debug("Third read should prefetch again, as we removed one block for the file."); 170 readStoreFile(storeFile); 171 Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size()); 172 assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); 173 } 174 175 @Test 176 public void testPrefetchRefsAfterSplit() throws Exception { 177 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 178 blockCache = BlockCacheFactory.createBlockCache(conf); 179 cacheConf = new CacheConfig(conf, blockCache); 180 181 Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit"); 182 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 183 Path regionDir = new Path(tableDir, region.getEncodedName()); 184 Path cfDir = new Path(regionDir, "cf"); 185 HRegionFileSystem regionFS = 186 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); 187 Path storeFile = writeStoreFile(100, cfDir); 188 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true, 189 StoreContext.getBuilder().withRegionFileSystem(regionFS).withFamilyStoreDirectoryPath(cfDir) 190 .withCacheConfig(cacheConf).build()); 191 // Prefetches the file blocks 192 LOG.debug("First read should prefetch the blocks."); 193 readStoreFile(storeFile); 194 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 195 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 196 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 197 198 // split the file and return references to the original file 199 Random rand = ThreadLocalRandom.current(); 200 byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50); 201 HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft); 202 Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false, 203 new ConstantSizeRegionSplitPolicy(), sft); 204 HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft); 205 // starts reader for the ref. The ref should resolve to the original file blocks 206 // and not duplicate blocks in the cache. 207 refHsf.initReader(); 208 HFile.Reader reader = refHsf.getReader().getHFileReader(); 209 while (!reader.prefetchComplete()) { 210 // Sleep for a bit 211 Thread.sleep(1000); 212 } 213 // the ref file blocks keys should actually resolve to the referred file blocks, 214 // so we should not see additional blocks in the cache. 215 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 216 217 BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0); 218 Cacheable result = bc.getBlock(refCacheKey, true, false, true); 219 assertNotNull(result); 220 BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0); 221 assertEquals(result, bc.getBlock(fileCacheKey, true, false, true)); 222 assertNull(bc.getBackingMap().get(refCacheKey)); 223 assertNotNull(bc.getBlockForReference(refCacheKey)); 224 } 225 226 @Test 227 public void testPrefetchInterruptOnCapacity() throws Exception { 228 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 229 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 230 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 231 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 232 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 233 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 234 blockCache = BlockCacheFactory.createBlockCache(conf); 235 cacheConf = new CacheConfig(conf, blockCache); 236 Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000); 237 // Prefetches the file blocks 238 LOG.debug("First read should prefetch the blocks."); 239 createReaderAndWaitForPrefetchInterruption(storeFile); 240 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 241 () -> PrefetchExecutor.isCompleted(storeFile)); 242 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 243 long evictedFirstPrefetch = bc.getStats().getEvictedCount(); 244 HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile); 245 assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount()); 246 HFileScanner scanner = reader.getScanner(conf, true, true); 247 scanner.seekTo(); 248 while (scanner.next()) { 249 // do a full scan to force some evictions 250 LOG.trace("Iterating the full scan to evict some blocks"); 251 } 252 scanner.close(); 253 Waiter.waitFor(conf, 5000, () -> { 254 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 255 if (!queue.isEmpty()) { 256 return false; 257 } 258 } 259 return true; 260 }); 261 // The scanner should had triggered at least 3x evictions from the prefetch, 262 // as we try cache each block without interruption. 263 assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch); 264 } 265 266 @Test 267 public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception { 268 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 269 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 270 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 271 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 272 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 273 blockCache = BlockCacheFactory.createBlockCache(conf); 274 ColumnFamilyDescriptor family = 275 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build(); 276 cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP); 277 Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000); 278 // Prefetches the file blocks 279 LOG.debug("First read should prefetch the blocks."); 280 createReaderAndWaitForPrefetchInterruption(storeFile); 281 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 282 Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile)); 283 long evictions = bc.getStats().getEvictedCount(); 284 LOG.debug("Total evicted at this point: {}", evictions); 285 // creates another reader, now that cache is full, no block would fit and prefetch should not 286 // trigger any new evictions 287 createReaderAndWaitForPrefetchInterruption(storeFile); 288 assertEquals(evictions, bc.getStats().getEvictedCount()); 289 } 290 291 @Test 292 public void testPrefetchRunNoEvictions() throws Exception { 293 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 294 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 295 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 296 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 297 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 298 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 299 blockCache = BlockCacheFactory.createBlockCache(conf); 300 cacheConf = new CacheConfig(conf, blockCache); 301 Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000); 302 // Prefetches the file blocks 303 createReaderAndWaitForPrefetchInterruption(storeFile); 304 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 305 () -> PrefetchExecutor.isCompleted(storeFile)); 306 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 307 // Wait until all cache writer queues are empty 308 Waiter.waitFor(conf, 5000, () -> { 309 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 310 if (!queue.isEmpty()) { 311 return false; 312 } 313 } 314 return true; 315 }); 316 // With the wait time configuration, prefetch should trigger no evictions once it reaches 317 // cache capacity 318 assertEquals(0, bc.getStats().getEvictedCount()); 319 } 320 321 @Test 322 public void testPrefetchRunTriggersEvictions() throws Exception { 323 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 324 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 325 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 326 conf.setDouble("hbase.bucketcache.minfactor", 0.98); 327 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0); 328 conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0); 329 blockCache = BlockCacheFactory.createBlockCache(conf); 330 cacheConf = new CacheConfig(conf, blockCache); 331 Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000); 332 // Prefetches the file blocks 333 createReaderAndWaitForPrefetchInterruption(storeFile); 334 Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000), 335 () -> PrefetchExecutor.isCompleted(storeFile)); 336 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 337 // Wait until all cache writer queues are empty 338 Waiter.waitFor(conf, 5000, () -> { 339 for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) { 340 if (!queue.isEmpty()) { 341 return false; 342 } 343 } 344 return true; 345 }); 346 if (bc.getStats().getFailedInserts() == 0) { 347 // With no wait time configuration, prefetch should trigger evictions once it reaches 348 // cache capacity 349 assertNotEquals(0, bc.getStats().getEvictedCount()); 350 } else { 351 LOG.info("We had {} cache insert failures, which may cause cache usage " 352 + "to never reach capacity.", bc.getStats().getFailedInserts()); 353 } 354 } 355 356 @Test 357 public void testPrefetchMetricProgress() throws Exception { 358 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 359 blockCache = BlockCacheFactory.createBlockCache(conf); 360 cacheConf = new CacheConfig(conf, blockCache); 361 Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100); 362 // Prefetches the file blocks 363 LOG.debug("First read should prefetch the blocks."); 364 readStoreFile(storeFile); 365 String regionName = storeFile.getParent().getParent().getName(); 366 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 367 MutableLong regionCachedSize = new MutableLong(0); 368 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 369 Waiter.waitFor(conf, 300, () -> { 370 if (bc.getBackingMap().size() > 0) { 371 long currentSize = bc.getRegionCachedInfo().get().get(regionName); 372 assertTrue(regionCachedSize.getValue() <= currentSize); 373 LOG.debug("Logging progress of region caching: {}", currentSize); 374 regionCachedSize.setValue(currentSize); 375 } 376 return bc.getBackingMap().size() == 6; 377 }); 378 } 379 380 @Test 381 public void testPrefetchMetricProgressForLinks() throws Exception { 382 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 383 blockCache = BlockCacheFactory.createBlockCache(conf); 384 cacheConf = new CacheConfig(conf, blockCache); 385 final RegionInfo hri = 386 RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build(); 387 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 388 Configuration testConf = new Configuration(this.conf); 389 Path testDir = TEST_UTIL.getDataTestDir(name.getMethodName()); 390 CommonFSUtils.setRootDir(testConf, testDir); 391 Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable()); 392 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 393 Path regionDir = new Path(tableDir, region.getEncodedName()); 394 Path cfDir = new Path(regionDir, "cf"); 395 HRegionFileSystem regionFS = 396 HRegionFileSystem.createRegionOnFileSystem(testConf, fs, tableDir, region); 397 Path storeFile = writeStoreFile(100, cfDir); 398 // Prefetches the file blocks 399 LOG.debug("First read should prefetch the blocks."); 400 readStoreFile(storeFile); 401 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 402 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 403 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6); 404 long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName()); 405 406 final RegionInfo dstHri = 407 RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build(); 408 HRegionFileSystem dstRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 409 CommonFSUtils.getTableDir(testDir, dstHri.getTable()), dstHri); 410 411 Path dstPath = new Path(regionFS.getTableDir(), new Path(dstHri.getRegionNameAsString(), "cf")); 412 413 Path linkFilePath = 414 new Path(dstPath, HFileLink.createHFileLinkName(region, storeFile.getName())); 415 416 StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false, 417 StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath) 418 .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf")) 419 .withRegionFileSystem(dstRegionFs).build()); 420 sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true); 421 StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true); 422 423 HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf); 424 assertTrue(sfi.isLink()); 425 hsf.initReader(); 426 HFile.Reader reader = hsf.getReader().getHFileReader(); 427 while (!reader.prefetchComplete()) { 428 // Sleep for a bit 429 Thread.sleep(1000); 430 } 431 // HFileLink use the path of the target file to create a reader, so it should resolve to the 432 // already cached blocks and not insert new blocks in the cache. 433 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6); 434 435 assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName())); 436 } 437 438 @Test 439 public void testPrefetchMetricProgressForLinksToArchived() throws Exception { 440 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 441 blockCache = BlockCacheFactory.createBlockCache(conf); 442 cacheConf = new CacheConfig(conf, blockCache); 443 444 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 445 Configuration testConf = new Configuration(this.conf); 446 Path testDir = TEST_UTIL.getDataTestDir(name.getMethodName()); 447 CommonFSUtils.setRootDir(testConf, testDir); 448 449 final RegionInfo hri = 450 RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build(); 451 Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable()); 452 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 453 Path regionDir = new Path(tableDir, region.getEncodedName()); 454 Path cfDir = new Path(regionDir, "cf"); 455 456 Path storeFile = writeStoreFile(100, cfDir); 457 // Prefetches the file blocks 458 LOG.debug("First read should prefetch the blocks."); 459 readStoreFile(storeFile); 460 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 461 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 462 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6); 463 long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName()); 464 465 // create another file, but in the archive dir, hence it won't be cached 466 Path archiveRoot = new Path(testDir, "archive"); 467 Path archiveTableDir = CommonFSUtils.getTableDir(archiveRoot, hri.getTable()); 468 Path archiveRegionDir = new Path(archiveTableDir, region.getEncodedName()); 469 Path archiveCfDir = new Path(archiveRegionDir, "cf"); 470 Path archivedFile = writeStoreFile(100, archiveCfDir); 471 472 final RegionInfo testRegion = 473 RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 474 final HRegionFileSystem testRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 475 CommonFSUtils.getTableDir(testDir, testRegion.getTable()), testRegion); 476 // Just create a link to the archived file 477 Path dstPath = new Path(tableDir, new Path(testRegion.getEncodedName(), "cf")); 478 479 Path linkFilePath = 480 new Path(dstPath, HFileLink.createHFileLinkName(region, archivedFile.getName())); 481 482 StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false, 483 StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath) 484 .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf")) 485 .withRegionFileSystem(testRegionFs).build()); 486 sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true); 487 StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true); 488 489 HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf); 490 assertTrue(sfi.isLink()); 491 hsf.initReader(); 492 HFile.Reader reader = hsf.getReader().getHFileReader(); 493 while (!reader.prefetchComplete()) { 494 // Sleep for a bit 495 Thread.sleep(1000); 496 } 497 // HFileLink use the path of the target file to create a reader, but the target file is in the 498 // archive, so it wasn't cached previously and should be cached when we open the link. 499 Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 12); 500 // cached size for the region of target file shouldn't change 501 assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName())); 502 // cached size for the region with link pointing to archive dir shouldn't be updated 503 assertNull(bc.getRegionCachedInfo().get().get(testRegion.getEncodedName())); 504 } 505 506 private void readStoreFile(Path storeFilePath) throws Exception { 507 readStoreFile(storeFilePath, (r, o) -> { 508 HFileBlock block = null; 509 try { 510 block = r.readBlock(o, -1, false, true, false, true, null, null); 511 } catch (IOException e) { 512 fail(e.getMessage()); 513 } 514 return block; 515 }, (key, block) -> { 516 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 517 if ( 518 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 519 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 520 ) { 521 assertTrue(isCached); 522 } 523 }); 524 } 525 526 private void readStoreFile(Path storeFilePath, 527 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 528 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 529 // Open the file 530 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 531 532 while (!reader.prefetchComplete()) { 533 // Sleep for a bit 534 Thread.sleep(1000); 535 } 536 long offset = 0; 537 long sizeForDataBlocks = 0; 538 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 539 HFileBlock block = readFunction.apply(reader, offset); 540 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 541 validationFunction.accept(blockCacheKey, block); 542 offset += block.getOnDiskSizeWithHeader(); 543 } 544 } 545 546 private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath) 547 throws Exception { 548 // Open the file 549 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 550 551 while (!reader.prefetchComplete()) { 552 // Sleep for a bit 553 Thread.sleep(1000); 554 } 555 assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles() 556 .get().size()); 557 558 return reader; 559 } 560 561 private Path writeStoreFile(String fname, int numKVs) throws IOException { 562 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 563 return writeStoreFile(fname, meta, numKVs); 564 } 565 566 private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException { 567 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 568 return writeStoreFile(meta, numKVs, regionCFDir); 569 } 570 571 private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException { 572 return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname)); 573 } 574 575 private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir) 576 throws IOException { 577 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 578 .withOutputDir(regionCFDir).withFileContext(context).build(); 579 Random rand = ThreadLocalRandom.current(); 580 final int rowLen = 32; 581 for (int i = 0; i < numKVs; ++i) { 582 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 583 byte[] v = RandomKeyValueUtil.randomValue(rand); 584 int cfLen = rand.nextInt(k.length - rowLen + 1); 585 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 586 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 587 sfw.append(kv); 588 } 589 590 sfw.close(); 591 return sfw.getPath(); 592 } 593 594 public static KeyValue.Type generateKeyType(Random rand) { 595 if (rand.nextBoolean()) { 596 // Let's make half of KVs puts. 597 return KeyValue.Type.Put; 598 } else { 599 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 600 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 601 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 602 + "Probably the layout of KeyValue.Type has changed."); 603 } 604 return keyType; 605 } 606 } 607}