001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
023import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
024import static org.hamcrest.MatcherAssert.assertThat;
025import static org.hamcrest.Matchers.allOf;
026import static org.hamcrest.Matchers.hasItem;
027import static org.hamcrest.Matchers.hasItems;
028import static org.hamcrest.Matchers.not;
029import static org.junit.Assert.assertFalse;
030import static org.junit.Assert.assertTrue;
031import static org.junit.Assert.fail;
032
033import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
034import io.opentelemetry.sdk.trace.data.SpanData;
035import java.io.IOException;
036import java.util.List;
037import java.util.Random;
038import java.util.concurrent.ThreadLocalRandom;
039import java.util.concurrent.TimeUnit;
040import java.util.function.BiConsumer;
041import java.util.function.BiFunction;
042import java.util.function.Consumer;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseConfiguration;
048import org.apache.hadoop.hbase.HBaseTestingUtil;
049import org.apache.hadoop.hbase.KeyValue;
050import org.apache.hadoop.hbase.MatcherPredicate;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.RegionInfoBuilder;
056import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
057import org.apache.hadoop.hbase.fs.HFileSystem;
058import org.apache.hadoop.hbase.io.ByteBuffAllocator;
059import org.apache.hadoop.hbase.io.HFileLink;
060import org.apache.hadoop.hbase.io.compress.Compression;
061import org.apache.hadoop.hbase.regionserver.BloomType;
062import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
063import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
064import org.apache.hadoop.hbase.regionserver.HStoreFile;
065import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
066import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
067import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
068import org.apache.hadoop.hbase.testclassification.IOTests;
069import org.apache.hadoop.hbase.testclassification.MediumTests;
070import org.apache.hadoop.hbase.trace.TraceUtil;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.CommonFSUtils;
073import org.apache.hadoop.hbase.util.Pair;
074import org.junit.Before;
075import org.junit.ClassRule;
076import org.junit.Rule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082@Category({ IOTests.class, MediumTests.class })
083public class TestPrefetch {
084  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class);
085
086  @ClassRule
087  public static final HBaseClassTestRule CLASS_RULE =
088    HBaseClassTestRule.forClass(TestPrefetch.class);
089
090  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
091
092  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
093  private static final int DATA_BLOCK_SIZE = 2048;
094  private static final int NUM_KV = 1000;
095
096  private Configuration conf;
097  private CacheConfig cacheConf;
098  private FileSystem fs;
099  private BlockCache blockCache;
100
101  @Rule
102  public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
103
104  @Before
105  public void setUp() throws IOException {
106    conf = TEST_UTIL.getConfiguration();
107    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
108    fs = HFileSystem.get(conf);
109    blockCache = BlockCacheFactory.createBlockCache(conf);
110    cacheConf = new CacheConfig(conf, blockCache);
111  }
112
113  @Test
114  public void testPrefetchSetInHCDWorks() {
115    ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
116      .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build();
117    Configuration c = HBaseConfiguration.create();
118    assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false));
119    CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
120    assertTrue(cc.shouldPrefetchOnOpen());
121  }
122
123  @Test
124  public void testPrefetch() throws Exception {
125    TraceUtil.trace(() -> {
126      Path storeFile = writeStoreFile("TestPrefetch");
127      readStoreFile(storeFile);
128    }, "testPrefetch");
129
130    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans,
131      hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request"))));
132    final List<SpanData> spans = otelRule.getSpans();
133    if (LOG.isDebugEnabled()) {
134      StringTraceRenderer renderer = new StringTraceRenderer(spans);
135      renderer.render(LOG::debug);
136    }
137
138    final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst()
139      .orElseThrow(AssertionError::new);
140    assertThat("prefetch spans happen on their own threads, detached from file open.", spans,
141      hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan)))));
142  }
143
144  @Test
145  public void testPrefetchRace() throws Exception {
146    for (int i = 0; i < 10; i++) {
147      Path storeFile = writeStoreFile("TestPrefetchRace-" + i);
148      readStoreFileLikeScanner(storeFile);
149    }
150  }
151
152  /**
153   * Read a storefile in the same manner as a scanner -- using non-positional reads and without
154   * waiting for prefetch to complete.
155   */
156  private void readStoreFileLikeScanner(Path storeFilePath) throws Exception {
157    // Open the file
158    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
159    do {
160      long offset = 0;
161      while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
162        HFileBlock block =
163          reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null);
164        offset += block.getOnDiskSizeWithHeader();
165      }
166    } while (!reader.prefetchComplete());
167  }
168
169  private void readStoreFile(Path storeFilePath) throws Exception {
170    readStoreFile(storeFilePath, (r, o) -> {
171      HFileBlock block = null;
172      try {
173        block = r.readBlock(o, -1, false, true, false, true, null, null);
174      } catch (IOException e) {
175        fail(e.getMessage());
176      }
177      return block;
178    }, (key, block) -> {
179      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
180      if (
181        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
182          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
183      ) {
184        assertTrue(isCached);
185      }
186    });
187  }
188
189  private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
190    readStoreFile(storeFilePath, (r, o) -> {
191      HFileBlock block = null;
192      try {
193        block = r.readBlock(o, -1, false, true, false, true, null, null, true);
194      } catch (IOException e) {
195        fail(e.getMessage());
196      }
197      return block;
198    }, (key, block) -> {
199      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
200      if (block.getBlockType() == BlockType.DATA) {
201        assertFalse(block.isUnpacked());
202      } else if (
203        block.getBlockType() == BlockType.ROOT_INDEX
204          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
205      ) {
206        assertTrue(block.isUnpacked());
207      }
208      assertTrue(isCached);
209    });
210  }
211
212  private void readStoreFile(Path storeFilePath,
213    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
214    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
215    // Open the file
216    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
217
218    while (!reader.prefetchComplete()) {
219      // Sleep for a bit
220      Thread.sleep(1000);
221    }
222    long offset = 0;
223    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
224      HFileBlock block = readFunction.apply(reader, offset);
225      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
226      validationFunction.accept(blockCacheKey, block);
227      offset += block.getOnDiskSizeWithHeader();
228    }
229  }
230
231  @Test
232  public void testPrefetchCompressed() throws Exception {
233    conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
234    cacheConf = new CacheConfig(conf, blockCache);
235    HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
236      .withBlockSize(DATA_BLOCK_SIZE).build();
237    Path storeFile = writeStoreFile("TestPrefetchCompressed", context);
238    readStoreFileCacheOnly(storeFile);
239    conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
240
241  }
242
243  @Test
244  public void testPrefetchSkipsRefs() throws Exception {
245    testPrefetchWhenRefs(true, c -> {
246      boolean isCached = c != null;
247      assertFalse(isCached);
248    });
249  }
250
251  @Test
252  public void testPrefetchDoesntSkipRefs() throws Exception {
253    testPrefetchWhenRefs(false, c -> {
254      boolean isCached = c != null;
255      assertTrue(isCached);
256    });
257  }
258
259  @Test
260  public void testPrefetchDoesntSkipHFileLink() throws Exception {
261    testPrefetchWhenHFileLink(c -> {
262      boolean isCached = c != null;
263      assertTrue(isCached);
264    });
265  }
266
267  private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
268    throws Exception {
269    cacheConf = new CacheConfig(conf, blockCache);
270    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
271    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchSkipRefs");
272    RegionInfo region =
273      RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchSkipRefs")).build();
274    Path regionDir = new Path(tableDir, region.getEncodedName());
275    Pair<Path, byte[]> fileWithSplitPoint =
276      writeStoreFileForSplit(new Path(regionDir, "cf"), context);
277    Path storeFile = fileWithSplitPoint.getFirst();
278    HRegionFileSystem regionFS =
279      HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
280    HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true);
281    Path ref = regionFS.splitStoreFile(region, "cf", file, fileWithSplitPoint.getSecond(), false,
282      new ConstantSizeRegionSplitPolicy());
283    conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionEnabled);
284    HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true);
285    refHsf.initReader();
286    HFile.Reader reader = refHsf.getReader().getHFileReader();
287    while (!reader.prefetchComplete()) {
288      // Sleep for a bit
289      Thread.sleep(1000);
290    }
291    long offset = 0;
292    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
293      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
294      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
295      if (block.getBlockType() == BlockType.DATA) {
296        test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
297      }
298      offset += block.getOnDiskSizeWithHeader();
299    }
300  }
301
302  private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception {
303    cacheConf = new CacheConfig(conf, blockCache);
304    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
305    Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink");
306    final RegionInfo hri =
307      RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
308    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
309    Configuration testConf = new Configuration(this.conf);
310    CommonFSUtils.setRootDir(testConf, testDir);
311    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
312      CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
313
314    // Make a store file and write data to it.
315    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
316      .withFilePath(regionFs.createTempName()).withFileContext(context).build();
317    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"),
318      Bytes.toBytes("testPrefetchWhenHFileLink"));
319
320    Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath());
321    Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf"));
322    HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
323    Path linkFilePath =
324      new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
325
326    // Try to open store file from link
327    StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
328    HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
329    assertTrue(storeFileInfo.isLink());
330
331    hsf.initReader();
332    HFile.Reader reader = hsf.getReader().getHFileReader();
333    while (!reader.prefetchComplete()) {
334      // Sleep for a bit
335      Thread.sleep(1000);
336    }
337    long offset = 0;
338    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
339      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
340      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
341      if (block.getBlockType() == BlockType.DATA) {
342        test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
343      }
344      offset += block.getOnDiskSizeWithHeader();
345    }
346  }
347
348  private Path writeStoreFile(String fname) throws IOException {
349    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
350    return writeStoreFile(fname, meta);
351  }
352
353  private Path writeStoreFile(String fname, HFileContext context) throws IOException {
354    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
355    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
356      .withOutputDir(storeFileParentDir).withFileContext(context).build();
357    Random rand = ThreadLocalRandom.current();
358    final int rowLen = 32;
359    for (int i = 0; i < NUM_KV; ++i) {
360      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
361      byte[] v = RandomKeyValueUtil.randomValue(rand);
362      int cfLen = rand.nextInt(k.length - rowLen + 1);
363      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
364        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
365      sfw.append(kv);
366    }
367
368    sfw.close();
369    return sfw.getPath();
370  }
371
372  private Pair<Path, byte[]> writeStoreFileForSplit(Path storeDir, HFileContext context)
373    throws IOException {
374    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeDir)
375      .withFileContext(context).build();
376    Random rand = ThreadLocalRandom.current();
377    final int rowLen = 32;
378    byte[] splitPoint = null;
379    for (int i = 0; i < NUM_KV; ++i) {
380      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
381      byte[] v = RandomKeyValueUtil.randomValue(rand);
382      int cfLen = rand.nextInt(k.length - rowLen + 1);
383      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
384        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
385      sfw.append(kv);
386      if (i == NUM_KV / 2) {
387        splitPoint = k;
388      }
389    }
390    sfw.close();
391    return new Pair(sfw.getPath(), splitPoint);
392  }
393
394  public static KeyValue.Type generateKeyType(Random rand) {
395    if (rand.nextBoolean()) {
396      // Let's make half of KVs puts.
397      return KeyValue.Type.Put;
398    } else {
399      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
400      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
401        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
402          + "Probably the layout of KeyValue.Type has changed.");
403      }
404      return keyType;
405    }
406  }
407
408}