001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY; 023import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY; 024import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION; 025import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE; 026import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION; 027import static org.hamcrest.MatcherAssert.assertThat; 028import static org.hamcrest.Matchers.allOf; 029import static org.hamcrest.Matchers.hasItem; 030import static org.hamcrest.Matchers.hasItems; 031import static org.hamcrest.Matchers.not; 032import static org.junit.jupiter.api.Assertions.assertEquals; 033import static org.junit.jupiter.api.Assertions.assertFalse; 034import static org.junit.jupiter.api.Assertions.assertTrue; 035import static org.junit.jupiter.api.Assertions.fail; 036 037import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; 038import io.opentelemetry.sdk.trace.data.SpanData; 039import java.io.IOException; 040import java.util.List; 041import java.util.Random; 042import java.util.concurrent.ScheduledThreadPoolExecutor; 043import java.util.concurrent.ThreadLocalRandom; 044import java.util.concurrent.TimeUnit; 045import java.util.function.BiConsumer; 046import java.util.function.BiFunction; 047import java.util.function.Consumer; 048import org.apache.commons.lang3.mutable.MutableInt; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.FileSystem; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtil; 054import org.apache.hadoop.hbase.KeyValue; 055import org.apache.hadoop.hbase.MatcherPredicate; 056import org.apache.hadoop.hbase.TableName; 057import org.apache.hadoop.hbase.Waiter; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.RegionInfo; 061import org.apache.hadoop.hbase.client.RegionInfoBuilder; 062import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 063import org.apache.hadoop.hbase.fs.HFileSystem; 064import org.apache.hadoop.hbase.io.ByteBuffAllocator; 065import org.apache.hadoop.hbase.io.HFileLink; 066import org.apache.hadoop.hbase.io.compress.Compression; 067import org.apache.hadoop.hbase.regionserver.BloomType; 068import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 069import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 070import org.apache.hadoop.hbase.regionserver.HStoreFile; 071import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier; 072import org.apache.hadoop.hbase.regionserver.StoreContext; 073import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 074import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 075import org.apache.hadoop.hbase.regionserver.TestHStoreFile; 076import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 077import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 078import org.apache.hadoop.hbase.testclassification.IOTests; 079import org.apache.hadoop.hbase.testclassification.MediumTests; 080import org.apache.hadoop.hbase.trace.TraceUtil; 081import org.apache.hadoop.hbase.util.Bytes; 082import org.apache.hadoop.hbase.util.CommonFSUtils; 083import org.apache.hadoop.hbase.util.Pair; 084import org.junit.jupiter.api.BeforeEach; 085import org.junit.jupiter.api.Tag; 086import org.junit.jupiter.api.Test; 087import org.junit.jupiter.api.extension.RegisterExtension; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090 091@Tag(IOTests.TAG) 092@Tag(MediumTests.TAG) 093public class TestPrefetch { 094 095 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class); 096 097 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 098 099 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 100 private static final int DATA_BLOCK_SIZE = 2048; 101 private static final int NUM_KV = 1000; 102 private Configuration conf; 103 private CacheConfig cacheConf; 104 private FileSystem fs; 105 private BlockCache blockCache; 106 107 @RegisterExtension 108 private static OpenTelemetryExtension OTEL_EXT = OpenTelemetryExtension.create(); 109 110 @BeforeEach 111 public void setUp() throws IOException, InterruptedException { 112 conf = TEST_UTIL.getConfiguration(); 113 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 114 fs = HFileSystem.get(conf); 115 blockCache = BlockCacheFactory.createBlockCache(conf); 116 cacheConf = new CacheConfig(conf, blockCache); 117 } 118 119 @Test 120 public void testPrefetchSetInHCDWorks() { 121 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 122 .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build(); 123 Configuration c = HBaseConfiguration.create(); 124 assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false)); 125 CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 126 assertTrue(cc.shouldPrefetchOnOpen()); 127 } 128 129 @Test 130 public void testPrefetchBlockCacheDisabled() throws Exception { 131 ScheduledThreadPoolExecutor poolExecutor = 132 (ScheduledThreadPoolExecutor) PrefetchExecutor.getExecutorPool(); 133 long totalCompletedBefore = poolExecutor.getCompletedTaskCount(); 134 long queueBefore = poolExecutor.getQueue().size(); 135 ColumnFamilyDescriptor columnFamilyDescriptor = 136 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true) 137 .setBlockCacheEnabled(false).build(); 138 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 139 CacheConfig cacheConfig = 140 new CacheConfig(conf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 141 Path storeFile = writeStoreFile("testPrefetchBlockCacheDisabled", meta, cacheConfig); 142 readStoreFile(storeFile, (r, o) -> { 143 HFileBlock block = null; 144 try { 145 block = r.readBlock(o, -1, false, true, false, true, null, null); 146 } catch (IOException e) { 147 fail(e.getMessage()); 148 } 149 return block; 150 }, (key, block) -> { 151 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 152 if ( 153 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 154 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 155 ) { 156 assertFalse(isCached); 157 } 158 }, cacheConfig); 159 assertEquals(totalCompletedBefore + queueBefore, 160 poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size()); 161 } 162 163 @Test 164 public void testPrefetchHeapUsageAboveThreshold() throws Exception { 165 ColumnFamilyDescriptor columnFamilyDescriptor = 166 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true) 167 .setBlockCacheEnabled(true).build(); 168 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 169 Configuration newConf = new Configuration(conf); 170 newConf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, 0.1); 171 CacheConfig cacheConfig = 172 new CacheConfig(newConf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 173 Path storeFile = writeStoreFile("testPrefetchHeapUsageAboveThreshold", meta, cacheConfig); 174 MutableInt cachedCount = new MutableInt(0); 175 MutableInt unCachedCount = new MutableInt(0); 176 readStoreFile(storeFile, (r, o) -> { 177 HFileBlock block = null; 178 try { 179 block = r.readBlock(o, -1, false, true, false, true, null, null); 180 } catch (IOException e) { 181 fail(e.getMessage()); 182 } 183 return block; 184 }, (key, block) -> { 185 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 186 if ( 187 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 188 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 189 ) { 190 if (isCached) { 191 cachedCount.increment(); 192 } else { 193 unCachedCount.increment(); 194 } 195 } 196 }, cacheConfig); 197 assertTrue(unCachedCount.compareTo(cachedCount) > 0); 198 } 199 200 @Test 201 public void testPrefetch() throws Exception { 202 TraceUtil.trace(() -> { 203 Path storeFile = writeStoreFile("TestPrefetch"); 204 readStoreFile(storeFile); 205 }, "testPrefetch"); 206 207 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(OTEL_EXT::getSpans, 208 hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request")))); 209 final List<SpanData> spans = OTEL_EXT.getSpans(); 210 if (LOG.isDebugEnabled()) { 211 StringTraceRenderer renderer = new StringTraceRenderer(spans); 212 renderer.render(LOG::debug); 213 } 214 215 final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst() 216 .orElseThrow(AssertionError::new); 217 assertThat("prefetch spans happen on their own threads, detached from file open.", spans, 218 hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan))))); 219 } 220 221 @Test 222 public void testPrefetchRace() throws Exception { 223 for (int i = 0; i < 10; i++) { 224 Path storeFile = writeStoreFile("TestPrefetchRace-" + i); 225 readStoreFileLikeScanner(storeFile); 226 } 227 } 228 229 /** 230 * Read a storefile in the same manner as a scanner -- using non-positional reads and without 231 * waiting for prefetch to complete. 232 */ 233 private void readStoreFileLikeScanner(Path storeFilePath) throws Exception { 234 // Open the file 235 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 236 do { 237 long offset = 0; 238 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 239 HFileBlock block = 240 reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null); 241 offset += block.getOnDiskSizeWithHeader(); 242 } 243 } while (!reader.prefetchComplete()); 244 } 245 246 private void readStoreFile(Path storeFilePath) throws Exception { 247 readStoreFile(storeFilePath, (r, o) -> { 248 HFileBlock block = null; 249 try { 250 block = r.readBlock(o, -1, false, true, false, true, null, null); 251 } catch (IOException e) { 252 fail(e.getMessage()); 253 } 254 return block; 255 }, (key, block) -> { 256 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 257 if ( 258 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 259 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 260 ) { 261 assertTrue(isCached); 262 } 263 }); 264 } 265 266 private void readStoreFileCacheOnly(Path storeFilePath) throws Exception { 267 readStoreFile(storeFilePath, (r, o) -> { 268 HFileBlock block = null; 269 try { 270 block = r.readBlock(o, -1, false, true, false, true, null, null, true); 271 } catch (IOException e) { 272 fail(e.getMessage()); 273 } 274 return block; 275 }, (key, block) -> { 276 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 277 if (block.getBlockType() == BlockType.DATA) { 278 assertFalse(block.isUnpacked()); 279 } else if ( 280 block.getBlockType() == BlockType.ROOT_INDEX 281 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 282 ) { 283 assertTrue(block.isUnpacked()); 284 } 285 assertTrue(isCached); 286 }); 287 } 288 289 private void readStoreFile(Path storeFilePath, 290 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 291 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 292 readStoreFile(storeFilePath, readFunction, validationFunction, cacheConf); 293 } 294 295 private void readStoreFile(Path storeFilePath, 296 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 297 BiConsumer<BlockCacheKey, HFileBlock> validationFunction, CacheConfig cacheConfig) 298 throws Exception { 299 // Open the file 300 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 301 302 while (!reader.prefetchComplete()) { 303 // Sleep for a bit 304 Thread.sleep(1000); 305 } 306 long offset = 0; 307 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 308 HFileBlock block = readFunction.apply(reader, offset); 309 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 310 validationFunction.accept(blockCacheKey, block); 311 offset += block.getOnDiskSizeWithHeader(); 312 } 313 } 314 315 @Test 316 public void testPrefetchCompressed() throws Exception { 317 conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 318 cacheConf = new CacheConfig(conf, blockCache); 319 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 320 .withBlockSize(DATA_BLOCK_SIZE).build(); 321 Path storeFile = writeStoreFile("TestPrefetchCompressed", context); 322 readStoreFileCacheOnly(storeFile); 323 conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 324 } 325 326 @Test 327 public void testPrefetchDoesntSkipRefs() throws Exception { 328 testPrefetchWhenRefs(false, c -> { 329 boolean isCached = c != null; 330 assertTrue(isCached); 331 }); 332 } 333 334 @Test 335 public void testOnConfigurationChange() { 336 PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 337 conf.setInt(PREFETCH_DELAY, 40000); 338 prefetchExecutorNotifier.onConfigurationChange(conf); 339 assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000); 340 341 // restore 342 conf.setInt(PREFETCH_DELAY, 30000); 343 prefetchExecutorNotifier.onConfigurationChange(conf); 344 assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000); 345 346 conf.setInt(PREFETCH_DELAY, 1000); 347 prefetchExecutorNotifier.onConfigurationChange(conf); 348 } 349 350 @Test 351 public void testPrefetchWithDelay() throws Exception { 352 // Configure custom delay 353 PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 354 conf.setInt(PREFETCH_DELAY, 25000); 355 conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f); 356 prefetchExecutorNotifier.onConfigurationChange(conf); 357 358 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 359 .withBlockSize(DATA_BLOCK_SIZE).build(); 360 Path storeFile = writeStoreFile("TestPrefetchWithDelay", context); 361 long startTime = System.currentTimeMillis(); 362 HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf); 363 364 // Wait for 20 seconds, no thread should start prefetch 365 Thread.sleep(20000); 366 assertFalse(reader.prefetchStarted(), "Prefetch threads should not be running at this point"); 367 Waiter.waitFor(conf, 10000, () -> (reader.prefetchStarted() || reader.prefetchComplete())); 368 369 assertTrue(reader.prefetchStarted() || reader.prefetchComplete()); 370 371 assertTrue(getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay(), 372 "Prefetch should start post configured delay"); 373 374 conf.setInt(PREFETCH_DELAY, 1000); 375 conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); 376 prefetchExecutorNotifier.onConfigurationChange(conf); 377 } 378 379 @Test 380 public void testPrefetchWhenNoBlockCache() throws Exception { 381 PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 382 try { 383 // Set a delay to max, as we don't need to have the thread running, but rather 384 // assert that it never gets scheduled 385 conf.setInt(PREFETCH_DELAY, Integer.MAX_VALUE); 386 conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f); 387 prefetchExecutorNotifier.onConfigurationChange(conf); 388 389 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 390 .withBlockSize(DATA_BLOCK_SIZE).build(); 391 Path storeFile = writeStoreFile("testPrefetchWhenNoBlockCache", context); 392 HFile.createReader(fs, storeFile, new CacheConfig(conf), true, conf); 393 assertEquals(0, PrefetchExecutor.getPrefetchFutures().size()); 394 } finally { 395 conf.setInt(PREFETCH_DELAY, 1000); 396 conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); 397 prefetchExecutorNotifier.onConfigurationChange(conf); 398 } 399 } 400 401 @Test 402 public void testPrefetchDoesntSkipHFileLink() throws Exception { 403 testPrefetchWhenHFileLink(c -> { 404 boolean isCached = c != null; 405 assertTrue(isCached); 406 }); 407 } 408 409 private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test) 410 throws Exception { 411 cacheConf = new CacheConfig(conf, blockCache); 412 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 413 Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchSkipRefs"); 414 RegionInfo region = 415 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchSkipRefs")).build(); 416 Path regionDir = new Path(tableDir, region.getEncodedName()); 417 Pair<Path, byte[]> fileWithSplitPoint = 418 writeStoreFileForSplit(new Path(regionDir, "cf"), context); 419 Path storeFile = fileWithSplitPoint.getFirst(); 420 HRegionFileSystem regionFS = 421 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); 422 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true, 423 StoreContext.getBuilder().withFamilyStoreDirectoryPath(new Path(regionDir, "cf")) 424 .withRegionFileSystem(regionFS).build()); 425 HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft); 426 Path ref = regionFS.splitStoreFile(region, "cf", file, fileWithSplitPoint.getSecond(), false, 427 new ConstantSizeRegionSplitPolicy(), sft).getPath(); 428 conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionEnabled); 429 HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft); 430 refHsf.initReader(); 431 HFile.Reader reader = refHsf.getReader().getHFileReader(); 432 while (!reader.prefetchComplete()) { 433 // Sleep for a bit 434 Thread.sleep(1000); 435 } 436 long offset = 0; 437 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 438 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); 439 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 440 if (block.getBlockType() == BlockType.DATA) { 441 test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); 442 } 443 offset += block.getOnDiskSizeWithHeader(); 444 } 445 } 446 447 private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception { 448 cacheConf = new CacheConfig(conf, blockCache); 449 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 450 Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink"); 451 final RegionInfo hri = 452 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build(); 453 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 454 Configuration testConf = new Configuration(this.conf); 455 CommonFSUtils.setRootDir(testConf, testDir); 456 HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 457 CommonFSUtils.getTableDir(testDir, hri.getTable()), hri); 458 459 // Make a store file and write data to it. 460 StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs) 461 .withFilePath(regionFs.createTempName()).withFileContext(context).build(); 462 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"), 463 Bytes.toBytes("testPrefetchWhenHFileLink")); 464 465 Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath()); 466 final RegionInfo dstHri = 467 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build(); 468 HRegionFileSystem dstRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 469 CommonFSUtils.getTableDir(testDir, dstHri.getTable()), dstHri); 470 Path dstPath = new Path(regionFs.getTableDir(), new Path(dstHri.getRegionNameAsString(), "cf")); 471 StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false, 472 StoreContext.getBuilder() 473 .withFamilyStoreDirectoryPath(new Path(dstRegionFs.getRegionDir(), "cf")) 474 .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf")) 475 .withRegionFileSystem(dstRegionFs).build()); 476 sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFilePath.getName(), true); 477 Path linkFilePath = 478 new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName())); 479 480 // Try to open store file from link 481 StoreFileInfo storeFileInfo = sft.getStoreFileInfo(linkFilePath, true); 482 HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); 483 assertTrue(storeFileInfo.isLink()); 484 485 hsf.initReader(); 486 HFile.Reader reader = hsf.getReader().getHFileReader(); 487 while (!reader.prefetchComplete()) { 488 // Sleep for a bit 489 Thread.sleep(1000); 490 } 491 long offset = 0; 492 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 493 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); 494 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 495 if (block.getBlockType() == BlockType.DATA) { 496 test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); 497 } 498 offset += block.getOnDiskSizeWithHeader(); 499 } 500 } 501 502 private Path writeStoreFile(String fname) throws IOException { 503 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 504 return writeStoreFile(fname, meta); 505 } 506 507 private Path writeStoreFile(String fname, HFileContext context) throws IOException { 508 return writeStoreFile(fname, context, cacheConf); 509 } 510 511 private Path writeStoreFile(String fname, HFileContext context, CacheConfig cacheConfig) 512 throws IOException { 513 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); 514 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConfig, fs) 515 .withOutputDir(storeFileParentDir).withFileContext(context).build(); 516 Random rand = ThreadLocalRandom.current(); 517 final int rowLen = 32; 518 for (int i = 0; i < NUM_KV; ++i) { 519 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 520 byte[] v = RandomKeyValueUtil.randomValue(rand); 521 int cfLen = rand.nextInt(k.length - rowLen + 1); 522 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 523 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 524 sfw.append(kv); 525 } 526 527 sfw.close(); 528 return sfw.getPath(); 529 } 530 531 private Pair<Path, byte[]> writeStoreFileForSplit(Path storeDir, HFileContext context) 532 throws IOException { 533 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeDir) 534 .withFileContext(context).build(); 535 Random rand = ThreadLocalRandom.current(); 536 final int rowLen = 32; 537 byte[] splitPoint = null; 538 for (int i = 0; i < NUM_KV; ++i) { 539 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 540 byte[] v = RandomKeyValueUtil.randomValue(rand); 541 int cfLen = rand.nextInt(k.length - rowLen + 1); 542 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 543 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 544 sfw.append(kv); 545 if (i == NUM_KV / 2) { 546 splitPoint = k; 547 } 548 } 549 sfw.close(); 550 return new Pair<>(sfw.getPath(), splitPoint); 551 } 552 553 public static KeyValue.Type generateKeyType(Random rand) { 554 if (rand.nextBoolean()) { 555 // Let's make half of KVs puts. 556 return KeyValue.Type.Put; 557 } else { 558 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 559 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 560 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 561 + "Probably the layout of KeyValue.Type has changed."); 562 } 563 return keyType; 564 } 565 } 566 567 private long getElapsedTime(long startTime) { 568 return System.currentTimeMillis() - startTime; 569 } 570}