001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY; 023import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY; 024import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION; 025import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE; 026import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION; 027import static org.hamcrest.MatcherAssert.assertThat; 028import static org.hamcrest.Matchers.allOf; 029import static org.hamcrest.Matchers.hasItem; 030import static org.hamcrest.Matchers.hasItems; 031import static org.hamcrest.Matchers.not; 032import static org.junit.Assert.assertEquals; 033import static org.junit.Assert.assertFalse; 034import static org.junit.Assert.assertTrue; 035import static org.junit.Assert.fail; 036 037import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 038import io.opentelemetry.sdk.trace.data.SpanData; 039import java.io.IOException; 040import java.util.List; 041import java.util.Random; 042import java.util.concurrent.ScheduledThreadPoolExecutor; 043import java.util.concurrent.ThreadLocalRandom; 044import java.util.concurrent.TimeUnit; 045import java.util.function.BiConsumer; 046import java.util.function.BiFunction; 047import java.util.function.Consumer; 048import org.apache.commons.lang3.mutable.MutableInt; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.FileSystem; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.hbase.HBaseClassTestRule; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtil; 055import org.apache.hadoop.hbase.KeyValue; 056import org.apache.hadoop.hbase.MatcherPredicate; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.RegionInfo; 061import org.apache.hadoop.hbase.client.RegionInfoBuilder; 062import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 063import org.apache.hadoop.hbase.fs.HFileSystem; 064import org.apache.hadoop.hbase.io.ByteBuffAllocator; 065import org.apache.hadoop.hbase.io.HFileLink; 066import org.apache.hadoop.hbase.io.compress.Compression; 067import org.apache.hadoop.hbase.regionserver.BloomType; 068import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 069import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 070import org.apache.hadoop.hbase.regionserver.HStoreFile; 071import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier; 072import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 073import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 074import org.apache.hadoop.hbase.regionserver.TestHStoreFile; 075import org.apache.hadoop.hbase.testclassification.IOTests; 076import org.apache.hadoop.hbase.testclassification.MediumTests; 077import org.apache.hadoop.hbase.trace.TraceUtil; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.CommonFSUtils; 080import org.apache.hadoop.hbase.util.Pair; 081import org.junit.Before; 082import org.junit.ClassRule; 083import org.junit.Rule; 084import org.junit.Test; 085import org.junit.experimental.categories.Category; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089@Category({ IOTests.class, MediumTests.class }) 090public class TestPrefetch { 091 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class); 092 093 @ClassRule 094 public static final HBaseClassTestRule CLASS_RULE = 095 HBaseClassTestRule.forClass(TestPrefetch.class); 096 097 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 098 099 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 100 private static final int DATA_BLOCK_SIZE = 2048; 101 private static final int NUM_KV = 1000; 102 private Configuration conf; 103 private CacheConfig cacheConf; 104 private FileSystem fs; 105 private BlockCache blockCache; 106 107 @Rule 108 public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); 109 110 @Before 111 public void setUp() throws IOException { 112 conf = TEST_UTIL.getConfiguration(); 113 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 114 fs = HFileSystem.get(conf); 115 blockCache = BlockCacheFactory.createBlockCache(conf); 116 cacheConf = new CacheConfig(conf, blockCache); 117 } 118 119 @Test 120 public void testPrefetchSetInHCDWorks() { 121 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 122 .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build(); 123 Configuration c = HBaseConfiguration.create(); 124 assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false)); 125 CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 126 assertTrue(cc.shouldPrefetchOnOpen()); 127 } 128 129 @Test 130 public void testPrefetchBlockCacheDisabled() throws Exception { 131 ScheduledThreadPoolExecutor poolExecutor = 132 (ScheduledThreadPoolExecutor) PrefetchExecutor.getExecutorPool(); 133 long totalCompletedBefore = poolExecutor.getCompletedTaskCount(); 134 long queueBefore = poolExecutor.getQueue().size(); 135 ColumnFamilyDescriptor columnFamilyDescriptor = 136 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true) 137 .setBlockCacheEnabled(false).build(); 138 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 139 CacheConfig cacheConfig = 140 new CacheConfig(conf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 141 Path storeFile = writeStoreFile("testPrefetchBlockCacheDisabled", meta, cacheConfig); 142 readStoreFile(storeFile, (r, o) -> { 143 HFileBlock block = null; 144 try { 145 block = r.readBlock(o, -1, false, true, false, true, null, null); 146 } catch (IOException e) { 147 fail(e.getMessage()); 148 } 149 return block; 150 }, (key, block) -> { 151 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 152 if ( 153 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 154 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 155 ) { 156 assertFalse(isCached); 157 } 158 }, cacheConfig); 159 assertEquals(totalCompletedBefore + queueBefore, 160 poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size()); 161 } 162 163 @Test 164 public void testPrefetchHeapUsageAboveThreshold() throws Exception { 165 ColumnFamilyDescriptor columnFamilyDescriptor = 166 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true) 167 .setBlockCacheEnabled(true).build(); 168 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 169 Configuration newConf = new Configuration(conf); 170 newConf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, 0.1); 171 CacheConfig cacheConfig = 172 new CacheConfig(newConf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 173 Path storeFile = writeStoreFile("testPrefetchHeapUsageAboveThreshold", meta, cacheConfig); 174 MutableInt cachedCount = new MutableInt(0); 175 MutableInt unCachedCount = new MutableInt(0); 176 readStoreFile(storeFile, (r, o) -> { 177 HFileBlock block = null; 178 try { 179 block = r.readBlock(o, -1, false, true, false, true, null, null); 180 } catch (IOException e) { 181 fail(e.getMessage()); 182 } 183 return block; 184 }, (key, block) -> { 185 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 186 if ( 187 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 188 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 189 ) { 190 if (isCached) { 191 cachedCount.increment(); 192 } else { 193 unCachedCount.increment(); 194 } 195 } 196 }, cacheConfig); 197 assertTrue(unCachedCount.compareTo(cachedCount) > 0); 198 } 199 200 @Test 201 public void testPrefetch() throws Exception { 202 TraceUtil.trace(() -> { 203 Path storeFile = writeStoreFile("TestPrefetch"); 204 readStoreFile(storeFile); 205 }, "testPrefetch"); 206 207 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans, 208 hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request")))); 209 final List<SpanData> spans = otelRule.getSpans(); 210 if (LOG.isDebugEnabled()) { 211 StringTraceRenderer renderer = new StringTraceRenderer(spans); 212 renderer.render(LOG::debug); 213 } 214 215 final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst() 216 .orElseThrow(AssertionError::new); 217 assertThat("prefetch spans happen on their own threads, detached from file open.", spans, 218 hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan))))); 219 } 220 221 @Test 222 public void testPrefetchRace() throws Exception { 223 for (int i = 0; i < 10; i++) { 224 Path storeFile = writeStoreFile("TestPrefetchRace-" + i); 225 readStoreFileLikeScanner(storeFile); 226 } 227 } 228 229 /** 230 * Read a storefile in the same manner as a scanner -- using non-positional reads and without 231 * waiting for prefetch to complete. 232 */ 233 private void readStoreFileLikeScanner(Path storeFilePath) throws Exception { 234 // Open the file 235 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 236 do { 237 long offset = 0; 238 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 239 HFileBlock block = 240 reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null); 241 offset += block.getOnDiskSizeWithHeader(); 242 } 243 } while (!reader.prefetchComplete()); 244 } 245 246 private void readStoreFile(Path storeFilePath) throws Exception { 247 readStoreFile(storeFilePath, (r, o) -> { 248 HFileBlock block = null; 249 try { 250 block = r.readBlock(o, -1, false, true, false, true, null, null); 251 } catch (IOException e) { 252 fail(e.getMessage()); 253 } 254 return block; 255 }, (key, block) -> { 256 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 257 if ( 258 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 259 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 260 ) { 261 assertTrue(isCached); 262 } 263 }); 264 } 265 266 private void readStoreFileCacheOnly(Path storeFilePath) throws Exception { 267 readStoreFile(storeFilePath, (r, o) -> { 268 HFileBlock block = null; 269 try { 270 block = r.readBlock(o, -1, false, true, false, true, null, null, true); 271 } catch (IOException e) { 272 fail(e.getMessage()); 273 } 274 return block; 275 }, (key, block) -> { 276 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 277 if (block.getBlockType() == BlockType.DATA) { 278 assertFalse(block.isUnpacked()); 279 } else if ( 280 block.getBlockType() == BlockType.ROOT_INDEX 281 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 282 ) { 283 assertTrue(block.isUnpacked()); 284 } 285 assertTrue(isCached); 286 }); 287 } 288 289 private void readStoreFile(Path storeFilePath, 290 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 291 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 292 readStoreFile(storeFilePath, readFunction, validationFunction, cacheConf); 293 } 294 295 private void readStoreFile(Path storeFilePath, 296 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 297 BiConsumer<BlockCacheKey, HFileBlock> validationFunction, CacheConfig cacheConfig) 298 throws Exception { 299 // Open the file 300 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 301 302 while (!reader.prefetchComplete()) { 303 // Sleep for a bit 304 Thread.sleep(1000); 305 } 306 long offset = 0; 307 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 308 HFileBlock block = readFunction.apply(reader, offset); 309 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 310 validationFunction.accept(blockCacheKey, block); 311 offset += block.getOnDiskSizeWithHeader(); 312 } 313 } 314 315 @Test 316 public void testPrefetchCompressed() throws Exception { 317 conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 318 cacheConf = new CacheConfig(conf, blockCache); 319 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 320 .withBlockSize(DATA_BLOCK_SIZE).build(); 321 Path storeFile = writeStoreFile("TestPrefetchCompressed", context); 322 readStoreFileCacheOnly(storeFile); 323 conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 324 } 325 326 @Test 327 public void testPrefetchSkipsRefs() throws Exception { 328 testPrefetchWhenRefs(true, c -> { 329 boolean isCached = c != null; 330 assertFalse(isCached); 331 }); 332 } 333 334 @Test 335 public void testPrefetchDoesntSkipRefs() throws Exception { 336 testPrefetchWhenRefs(false, c -> { 337 boolean isCached = c != null; 338 assertTrue(isCached); 339 }); 340 } 341 342 @Test 343 public void testOnConfigurationChange() { 344 PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 345 conf.setInt(PREFETCH_DELAY, 40000); 346 prefetchExecutorNotifier.onConfigurationChange(conf); 347 assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000); 348 349 // restore 350 conf.setInt(PREFETCH_DELAY, 30000); 351 prefetchExecutorNotifier.onConfigurationChange(conf); 352 assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000); 353 354 conf.setInt(PREFETCH_DELAY, 1000); 355 prefetchExecutorNotifier.onConfigurationChange(conf); 356 } 357 358 @Test 359 public void testPrefetchWithDelay() throws Exception { 360 // Configure custom delay 361 PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 362 conf.setInt(PREFETCH_DELAY, 25000); 363 conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f); 364 prefetchExecutorNotifier.onConfigurationChange(conf); 365 366 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 367 .withBlockSize(DATA_BLOCK_SIZE).build(); 368 Path storeFile = writeStoreFile("TestPrefetchWithDelay", context); 369 HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf); 370 long startTime = System.currentTimeMillis(); 371 372 // Wait for 20 seconds, no thread should start prefetch 373 Thread.sleep(20000); 374 assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted()); 375 while (!reader.prefetchStarted()) { 376 assertTrue("Prefetch delay has not been expired yet", 377 getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay()); 378 } 379 if (reader.prefetchStarted()) { 380 // Added some delay as we have started the timer a bit late. 381 Thread.sleep(500); 382 assertTrue("Prefetch should start post configured delay", 383 getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay()); 384 } 385 conf.setInt(PREFETCH_DELAY, 1000); 386 conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); 387 prefetchExecutorNotifier.onConfigurationChange(conf); 388 } 389 390 @Test 391 public void testPrefetchDoesntSkipHFileLink() throws Exception { 392 testPrefetchWhenHFileLink(c -> { 393 boolean isCached = c != null; 394 assertTrue(isCached); 395 }); 396 } 397 398 private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test) 399 throws Exception { 400 cacheConf = new CacheConfig(conf, blockCache); 401 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 402 Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchSkipRefs"); 403 RegionInfo region = 404 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchSkipRefs")).build(); 405 Path regionDir = new Path(tableDir, region.getEncodedName()); 406 Pair<Path, byte[]> fileWithSplitPoint = 407 writeStoreFileForSplit(new Path(regionDir, "cf"), context); 408 Path storeFile = fileWithSplitPoint.getFirst(); 409 HRegionFileSystem regionFS = 410 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); 411 HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true); 412 Path ref = regionFS.splitStoreFile(region, "cf", file, fileWithSplitPoint.getSecond(), false, 413 new ConstantSizeRegionSplitPolicy()); 414 conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionEnabled); 415 HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true); 416 refHsf.initReader(); 417 HFile.Reader reader = refHsf.getReader().getHFileReader(); 418 while (!reader.prefetchComplete()) { 419 // Sleep for a bit 420 Thread.sleep(1000); 421 } 422 long offset = 0; 423 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 424 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); 425 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 426 if (block.getBlockType() == BlockType.DATA) { 427 test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); 428 } 429 offset += block.getOnDiskSizeWithHeader(); 430 } 431 } 432 433 private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception { 434 cacheConf = new CacheConfig(conf, blockCache); 435 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 436 Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink"); 437 final RegionInfo hri = 438 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build(); 439 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 440 Configuration testConf = new Configuration(this.conf); 441 CommonFSUtils.setRootDir(testConf, testDir); 442 HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 443 CommonFSUtils.getTableDir(testDir, hri.getTable()), hri); 444 445 // Make a store file and write data to it. 446 StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs) 447 .withFilePath(regionFs.createTempName()).withFileContext(context).build(); 448 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"), 449 Bytes.toBytes("testPrefetchWhenHFileLink")); 450 451 Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath()); 452 Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf")); 453 HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName()); 454 Path linkFilePath = 455 new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName())); 456 457 // Try to open store file from link 458 StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true); 459 HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); 460 assertTrue(storeFileInfo.isLink()); 461 462 hsf.initReader(); 463 HFile.Reader reader = hsf.getReader().getHFileReader(); 464 while (!reader.prefetchComplete()) { 465 // Sleep for a bit 466 Thread.sleep(1000); 467 } 468 long offset = 0; 469 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 470 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); 471 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 472 if (block.getBlockType() == BlockType.DATA) { 473 test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); 474 } 475 offset += block.getOnDiskSizeWithHeader(); 476 } 477 } 478 479 private Path writeStoreFile(String fname) throws IOException { 480 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 481 return writeStoreFile(fname, meta); 482 } 483 484 private Path writeStoreFile(String fname, HFileContext context) throws IOException { 485 return writeStoreFile(fname, context, cacheConf); 486 } 487 488 private Path writeStoreFile(String fname, HFileContext context, CacheConfig cacheConfig) 489 throws IOException { 490 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); 491 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConfig, fs) 492 .withOutputDir(storeFileParentDir).withFileContext(context).build(); 493 Random rand = ThreadLocalRandom.current(); 494 final int rowLen = 32; 495 for (int i = 0; i < NUM_KV; ++i) { 496 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 497 byte[] v = RandomKeyValueUtil.randomValue(rand); 498 int cfLen = rand.nextInt(k.length - rowLen + 1); 499 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 500 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 501 sfw.append(kv); 502 } 503 504 sfw.close(); 505 return sfw.getPath(); 506 } 507 508 private Pair<Path, byte[]> writeStoreFileForSplit(Path storeDir, HFileContext context) 509 throws IOException { 510 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeDir) 511 .withFileContext(context).build(); 512 Random rand = ThreadLocalRandom.current(); 513 final int rowLen = 32; 514 byte[] splitPoint = null; 515 for (int i = 0; i < NUM_KV; ++i) { 516 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 517 byte[] v = RandomKeyValueUtil.randomValue(rand); 518 int cfLen = rand.nextInt(k.length - rowLen + 1); 519 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 520 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 521 sfw.append(kv); 522 if (i == NUM_KV / 2) { 523 splitPoint = k; 524 } 525 } 526 sfw.close(); 527 return new Pair(sfw.getPath(), splitPoint); 528 } 529 530 public static KeyValue.Type generateKeyType(Random rand) { 531 if (rand.nextBoolean()) { 532 // Let's make half of KVs puts. 533 return KeyValue.Type.Put; 534 } else { 535 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 536 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 537 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 538 + "Probably the layout of KeyValue.Type has changed."); 539 } 540 return keyType; 541 } 542 } 543 544 private long getElapsedTime(long startTime) { 545 return System.currentTimeMillis() - startTime; 546 } 547}