001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
023import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY;
024import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION;
025import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE;
026import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
027import static org.hamcrest.MatcherAssert.assertThat;
028import static org.hamcrest.Matchers.allOf;
029import static org.hamcrest.Matchers.hasItem;
030import static org.hamcrest.Matchers.hasItems;
031import static org.hamcrest.Matchers.not;
032import static org.junit.Assert.assertEquals;
033import static org.junit.Assert.assertFalse;
034import static org.junit.Assert.assertTrue;
035import static org.junit.Assert.fail;
036
037import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
038import io.opentelemetry.sdk.trace.data.SpanData;
039import java.io.IOException;
040import java.util.List;
041import java.util.Random;
042import java.util.concurrent.ScheduledThreadPoolExecutor;
043import java.util.concurrent.ThreadLocalRandom;
044import java.util.concurrent.TimeUnit;
045import java.util.function.BiConsumer;
046import java.util.function.BiFunction;
047import java.util.function.Consumer;
048import org.apache.commons.lang3.mutable.MutableInt;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.FileSystem;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.hbase.HBaseClassTestRule;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtil;
055import org.apache.hadoop.hbase.KeyValue;
056import org.apache.hadoop.hbase.MatcherPredicate;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.RegionInfo;
061import org.apache.hadoop.hbase.client.RegionInfoBuilder;
062import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
063import org.apache.hadoop.hbase.fs.HFileSystem;
064import org.apache.hadoop.hbase.io.ByteBuffAllocator;
065import org.apache.hadoop.hbase.io.HFileLink;
066import org.apache.hadoop.hbase.io.compress.Compression;
067import org.apache.hadoop.hbase.regionserver.BloomType;
068import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
069import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
070import org.apache.hadoop.hbase.regionserver.HStoreFile;
071import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier;
072import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
073import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
074import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
075import org.apache.hadoop.hbase.testclassification.IOTests;
076import org.apache.hadoop.hbase.testclassification.MediumTests;
077import org.apache.hadoop.hbase.trace.TraceUtil;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.CommonFSUtils;
080import org.apache.hadoop.hbase.util.Pair;
081import org.junit.Before;
082import org.junit.ClassRule;
083import org.junit.Rule;
084import org.junit.Test;
085import org.junit.experimental.categories.Category;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089@Category({ IOTests.class, MediumTests.class })
090public class TestPrefetch {
091  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class);
092
093  @ClassRule
094  public static final HBaseClassTestRule CLASS_RULE =
095    HBaseClassTestRule.forClass(TestPrefetch.class);
096
097  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
098
099  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
100  private static final int DATA_BLOCK_SIZE = 2048;
101  private static final int NUM_KV = 1000;
102  private Configuration conf;
103  private CacheConfig cacheConf;
104  private FileSystem fs;
105  private BlockCache blockCache;
106
107  @Rule
108  public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
109
110  @Before
111  public void setUp() throws IOException {
112    conf = TEST_UTIL.getConfiguration();
113    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
114    fs = HFileSystem.get(conf);
115    blockCache = BlockCacheFactory.createBlockCache(conf);
116    cacheConf = new CacheConfig(conf, blockCache);
117  }
118
119  @Test
120  public void testPrefetchSetInHCDWorks() {
121    ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
122      .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build();
123    Configuration c = HBaseConfiguration.create();
124    assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false));
125    CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
126    assertTrue(cc.shouldPrefetchOnOpen());
127  }
128
129  @Test
130  public void testPrefetchBlockCacheDisabled() throws Exception {
131    ScheduledThreadPoolExecutor poolExecutor =
132      (ScheduledThreadPoolExecutor) PrefetchExecutor.getExecutorPool();
133    long totalCompletedBefore = poolExecutor.getCompletedTaskCount();
134    long queueBefore = poolExecutor.getQueue().size();
135    ColumnFamilyDescriptor columnFamilyDescriptor =
136      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
137        .setBlockCacheEnabled(false).build();
138    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
139    CacheConfig cacheConfig =
140      new CacheConfig(conf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
141    Path storeFile = writeStoreFile("testPrefetchBlockCacheDisabled", meta, cacheConfig);
142    readStoreFile(storeFile, (r, o) -> {
143      HFileBlock block = null;
144      try {
145        block = r.readBlock(o, -1, false, true, false, true, null, null);
146      } catch (IOException e) {
147        fail(e.getMessage());
148      }
149      return block;
150    }, (key, block) -> {
151      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
152      if (
153        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
154          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
155      ) {
156        assertFalse(isCached);
157      }
158    }, cacheConfig);
159    assertEquals(totalCompletedBefore + queueBefore,
160      poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size());
161  }
162
163  @Test
164  public void testPrefetchHeapUsageAboveThreshold() throws Exception {
165    ColumnFamilyDescriptor columnFamilyDescriptor =
166      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
167        .setBlockCacheEnabled(true).build();
168    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
169    Configuration newConf = new Configuration(conf);
170    newConf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, 0.1);
171    CacheConfig cacheConfig =
172      new CacheConfig(newConf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
173    Path storeFile = writeStoreFile("testPrefetchHeapUsageAboveThreshold", meta, cacheConfig);
174    MutableInt cachedCount = new MutableInt(0);
175    MutableInt unCachedCount = new MutableInt(0);
176    readStoreFile(storeFile, (r, o) -> {
177      HFileBlock block = null;
178      try {
179        block = r.readBlock(o, -1, false, true, false, true, null, null);
180      } catch (IOException e) {
181        fail(e.getMessage());
182      }
183      return block;
184    }, (key, block) -> {
185      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
186      if (
187        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
188          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
189      ) {
190        if (isCached) {
191          cachedCount.increment();
192        } else {
193          unCachedCount.increment();
194        }
195      }
196    }, cacheConfig);
197    assertTrue(unCachedCount.compareTo(cachedCount) > 0);
198  }
199
200  @Test
201  public void testPrefetch() throws Exception {
202    TraceUtil.trace(() -> {
203      Path storeFile = writeStoreFile("TestPrefetch");
204      readStoreFile(storeFile);
205    }, "testPrefetch");
206
207    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans,
208      hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request"))));
209    final List<SpanData> spans = otelRule.getSpans();
210    if (LOG.isDebugEnabled()) {
211      StringTraceRenderer renderer = new StringTraceRenderer(spans);
212      renderer.render(LOG::debug);
213    }
214
215    final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst()
216      .orElseThrow(AssertionError::new);
217    assertThat("prefetch spans happen on their own threads, detached from file open.", spans,
218      hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan)))));
219  }
220
221  @Test
222  public void testPrefetchRace() throws Exception {
223    for (int i = 0; i < 10; i++) {
224      Path storeFile = writeStoreFile("TestPrefetchRace-" + i);
225      readStoreFileLikeScanner(storeFile);
226    }
227  }
228
229  /**
230   * Read a storefile in the same manner as a scanner -- using non-positional reads and without
231   * waiting for prefetch to complete.
232   */
233  private void readStoreFileLikeScanner(Path storeFilePath) throws Exception {
234    // Open the file
235    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
236    do {
237      long offset = 0;
238      while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
239        HFileBlock block =
240          reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null);
241        offset += block.getOnDiskSizeWithHeader();
242      }
243    } while (!reader.prefetchComplete());
244  }
245
246  private void readStoreFile(Path storeFilePath) throws Exception {
247    readStoreFile(storeFilePath, (r, o) -> {
248      HFileBlock block = null;
249      try {
250        block = r.readBlock(o, -1, false, true, false, true, null, null);
251      } catch (IOException e) {
252        fail(e.getMessage());
253      }
254      return block;
255    }, (key, block) -> {
256      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
257      if (
258        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
259          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
260      ) {
261        assertTrue(isCached);
262      }
263    });
264  }
265
266  private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
267    readStoreFile(storeFilePath, (r, o) -> {
268      HFileBlock block = null;
269      try {
270        block = r.readBlock(o, -1, false, true, false, true, null, null, true);
271      } catch (IOException e) {
272        fail(e.getMessage());
273      }
274      return block;
275    }, (key, block) -> {
276      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
277      if (block.getBlockType() == BlockType.DATA) {
278        assertFalse(block.isUnpacked());
279      } else if (
280        block.getBlockType() == BlockType.ROOT_INDEX
281          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
282      ) {
283        assertTrue(block.isUnpacked());
284      }
285      assertTrue(isCached);
286    });
287  }
288
289  private void readStoreFile(Path storeFilePath,
290    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
291    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
292    readStoreFile(storeFilePath, readFunction, validationFunction, cacheConf);
293  }
294
295  private void readStoreFile(Path storeFilePath,
296    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
297    BiConsumer<BlockCacheKey, HFileBlock> validationFunction, CacheConfig cacheConfig)
298    throws Exception {
299    // Open the file
300    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
301
302    while (!reader.prefetchComplete()) {
303      // Sleep for a bit
304      Thread.sleep(1000);
305    }
306    long offset = 0;
307    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
308      HFileBlock block = readFunction.apply(reader, offset);
309      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
310      validationFunction.accept(blockCacheKey, block);
311      offset += block.getOnDiskSizeWithHeader();
312    }
313  }
314
315  @Test
316  public void testPrefetchCompressed() throws Exception {
317    conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
318    cacheConf = new CacheConfig(conf, blockCache);
319    HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
320      .withBlockSize(DATA_BLOCK_SIZE).build();
321    Path storeFile = writeStoreFile("TestPrefetchCompressed", context);
322    readStoreFileCacheOnly(storeFile);
323    conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
324  }
325
326  @Test
327  public void testPrefetchSkipsRefs() throws Exception {
328    testPrefetchWhenRefs(true, c -> {
329      boolean isCached = c != null;
330      assertFalse(isCached);
331    });
332  }
333
334  @Test
335  public void testPrefetchDoesntSkipRefs() throws Exception {
336    testPrefetchWhenRefs(false, c -> {
337      boolean isCached = c != null;
338      assertTrue(isCached);
339    });
340  }
341
342  @Test
343  public void testOnConfigurationChange() {
344    PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
345    conf.setInt(PREFETCH_DELAY, 40000);
346    prefetchExecutorNotifier.onConfigurationChange(conf);
347    assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000);
348
349    // restore
350    conf.setInt(PREFETCH_DELAY, 30000);
351    prefetchExecutorNotifier.onConfigurationChange(conf);
352    assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000);
353
354    conf.setInt(PREFETCH_DELAY, 1000);
355    prefetchExecutorNotifier.onConfigurationChange(conf);
356  }
357
358  @Test
359  public void testPrefetchWithDelay() throws Exception {
360    // Configure custom delay
361    PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
362    conf.setInt(PREFETCH_DELAY, 25000);
363    conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f);
364    prefetchExecutorNotifier.onConfigurationChange(conf);
365
366    HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
367      .withBlockSize(DATA_BLOCK_SIZE).build();
368    Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
369    HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf);
370    long startTime = System.currentTimeMillis();
371
372    // Wait for 20 seconds, no thread should start prefetch
373    Thread.sleep(20000);
374    assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted());
375    while (!reader.prefetchStarted()) {
376      assertTrue("Prefetch delay has not been expired yet",
377        getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
378    }
379    if (reader.prefetchStarted()) {
380      // Added some delay as we have started the timer a bit late.
381      Thread.sleep(500);
382      assertTrue("Prefetch should start post configured delay",
383        getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay());
384    }
385    conf.setInt(PREFETCH_DELAY, 1000);
386    conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
387    prefetchExecutorNotifier.onConfigurationChange(conf);
388  }
389
390  @Test
391  public void testPrefetchDoesntSkipHFileLink() throws Exception {
392    testPrefetchWhenHFileLink(c -> {
393      boolean isCached = c != null;
394      assertTrue(isCached);
395    });
396  }
397
398  private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
399    throws Exception {
400    cacheConf = new CacheConfig(conf, blockCache);
401    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
402    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchSkipRefs");
403    RegionInfo region =
404      RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchSkipRefs")).build();
405    Path regionDir = new Path(tableDir, region.getEncodedName());
406    Pair<Path, byte[]> fileWithSplitPoint =
407      writeStoreFileForSplit(new Path(regionDir, "cf"), context);
408    Path storeFile = fileWithSplitPoint.getFirst();
409    HRegionFileSystem regionFS =
410      HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
411    HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true);
412    Path ref = regionFS.splitStoreFile(region, "cf", file, fileWithSplitPoint.getSecond(), false,
413      new ConstantSizeRegionSplitPolicy());
414    conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionEnabled);
415    HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true);
416    refHsf.initReader();
417    HFile.Reader reader = refHsf.getReader().getHFileReader();
418    while (!reader.prefetchComplete()) {
419      // Sleep for a bit
420      Thread.sleep(1000);
421    }
422    long offset = 0;
423    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
424      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
425      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
426      if (block.getBlockType() == BlockType.DATA) {
427        test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
428      }
429      offset += block.getOnDiskSizeWithHeader();
430    }
431  }
432
433  private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception {
434    cacheConf = new CacheConfig(conf, blockCache);
435    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
436    Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink");
437    final RegionInfo hri =
438      RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
439    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
440    Configuration testConf = new Configuration(this.conf);
441    CommonFSUtils.setRootDir(testConf, testDir);
442    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
443      CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
444
445    // Make a store file and write data to it.
446    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
447      .withFilePath(regionFs.createTempName()).withFileContext(context).build();
448    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"),
449      Bytes.toBytes("testPrefetchWhenHFileLink"));
450
451    Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath());
452    Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf"));
453    HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
454    Path linkFilePath =
455      new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
456
457    // Try to open store file from link
458    StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
459    HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
460    assertTrue(storeFileInfo.isLink());
461
462    hsf.initReader();
463    HFile.Reader reader = hsf.getReader().getHFileReader();
464    while (!reader.prefetchComplete()) {
465      // Sleep for a bit
466      Thread.sleep(1000);
467    }
468    long offset = 0;
469    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
470      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
471      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
472      if (block.getBlockType() == BlockType.DATA) {
473        test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
474      }
475      offset += block.getOnDiskSizeWithHeader();
476    }
477  }
478
479  private Path writeStoreFile(String fname) throws IOException {
480    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
481    return writeStoreFile(fname, meta);
482  }
483
484  private Path writeStoreFile(String fname, HFileContext context) throws IOException {
485    return writeStoreFile(fname, context, cacheConf);
486  }
487
488  private Path writeStoreFile(String fname, HFileContext context, CacheConfig cacheConfig)
489    throws IOException {
490    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
491    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConfig, fs)
492      .withOutputDir(storeFileParentDir).withFileContext(context).build();
493    Random rand = ThreadLocalRandom.current();
494    final int rowLen = 32;
495    for (int i = 0; i < NUM_KV; ++i) {
496      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
497      byte[] v = RandomKeyValueUtil.randomValue(rand);
498      int cfLen = rand.nextInt(k.length - rowLen + 1);
499      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
500        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
501      sfw.append(kv);
502    }
503
504    sfw.close();
505    return sfw.getPath();
506  }
507
508  private Pair<Path, byte[]> writeStoreFileForSplit(Path storeDir, HFileContext context)
509    throws IOException {
510    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeDir)
511      .withFileContext(context).build();
512    Random rand = ThreadLocalRandom.current();
513    final int rowLen = 32;
514    byte[] splitPoint = null;
515    for (int i = 0; i < NUM_KV; ++i) {
516      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
517      byte[] v = RandomKeyValueUtil.randomValue(rand);
518      int cfLen = rand.nextInt(k.length - rowLen + 1);
519      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
520        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
521      sfw.append(kv);
522      if (i == NUM_KV / 2) {
523        splitPoint = k;
524      }
525    }
526    sfw.close();
527    return new Pair(sfw.getPath(), splitPoint);
528  }
529
530  public static KeyValue.Type generateKeyType(Random rand) {
531    if (rand.nextBoolean()) {
532      // Let's make half of KVs puts.
533      return KeyValue.Type.Put;
534    } else {
535      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
536      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
537        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
538          + "Probably the layout of KeyValue.Type has changed.");
539      }
540      return keyType;
541    }
542  }
543
544  private long getElapsedTime(long startTime) {
545    return System.currentTimeMillis() - startTime;
546  }
547}