001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
022import static org.hamcrest.MatcherAssert.assertThat;
023import static org.hamcrest.Matchers.allOf;
024import static org.hamcrest.Matchers.hasItem;
025import static org.hamcrest.Matchers.hasItems;
026import static org.hamcrest.Matchers.not;
027import static org.junit.Assert.assertFalse;
028import static org.junit.Assert.assertTrue;
029
030import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
031import io.opentelemetry.sdk.trace.data.SpanData;
032import java.io.IOException;
033import java.util.List;
034import java.util.Random;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.TimeUnit;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.MatcherPredicate;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
048import org.apache.hadoop.hbase.fs.HFileSystem;
049import org.apache.hadoop.hbase.io.ByteBuffAllocator;
050import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
051import org.apache.hadoop.hbase.testclassification.IOTests;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.trace.TraceUtil;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.junit.Before;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063@Category({ IOTests.class, MediumTests.class })
064public class TestPrefetch {
065  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class);
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestPrefetch.class);
070
071  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
072
073  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
074  private static final int DATA_BLOCK_SIZE = 2048;
075  private static final int NUM_KV = 1000;
076
077  private Configuration conf;
078  private CacheConfig cacheConf;
079  private FileSystem fs;
080  private BlockCache blockCache;
081
082  @Rule
083  public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
084
085  @Before
086  public void setUp() throws IOException {
087    conf = TEST_UTIL.getConfiguration();
088    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
089    fs = HFileSystem.get(conf);
090    blockCache = BlockCacheFactory.createBlockCache(conf);
091    cacheConf = new CacheConfig(conf, blockCache);
092  }
093
094  @Test
095  public void testPrefetchSetInHCDWorks() {
096    ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
097      .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build();
098    Configuration c = HBaseConfiguration.create();
099    assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false));
100    CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
101    assertTrue(cc.shouldPrefetchOnOpen());
102  }
103
104  @Test
105  public void testPrefetch() throws Exception {
106    TraceUtil.trace(() -> {
107      Path storeFile = writeStoreFile("TestPrefetch");
108      readStoreFile(storeFile);
109    }, "testPrefetch");
110
111    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans,
112      hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request"))));
113    final List<SpanData> spans = otelRule.getSpans();
114    if (LOG.isDebugEnabled()) {
115      StringTraceRenderer renderer = new StringTraceRenderer(spans);
116      renderer.render(LOG::debug);
117    }
118
119    final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst()
120      .orElseThrow(AssertionError::new);
121    assertThat("prefetch spans happen on their own threads, detached from file open.", spans,
122      hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan)))));
123  }
124
125  @Test
126  public void testPrefetchRace() throws Exception {
127    for (int i = 0; i < 10; i++) {
128      Path storeFile = writeStoreFile("TestPrefetchRace-" + i);
129      readStoreFileLikeScanner(storeFile);
130    }
131  }
132
133  /**
134   * Read a storefile in the same manner as a scanner -- using non-positional reads and without
135   * waiting for prefetch to complete.
136   */
137  private void readStoreFileLikeScanner(Path storeFilePath) throws Exception {
138    // Open the file
139    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
140    do {
141      long offset = 0;
142      while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
143        HFileBlock block =
144          reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null);
145        offset += block.getOnDiskSizeWithHeader();
146      }
147    } while (!reader.prefetchComplete());
148  }
149
150  private void readStoreFile(Path storeFilePath) throws Exception {
151    // Open the file
152    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
153
154    while (!reader.prefetchComplete()) {
155      // Sleep for a bit
156      Thread.sleep(1000);
157    }
158
159    // Check that all of the data blocks were preloaded
160    BlockCache blockCache = cacheConf.getBlockCache().get();
161    long offset = 0;
162    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
163      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
164      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
165      boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
166      if (
167        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
168          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
169      ) {
170        assertTrue(isCached);
171      }
172      offset += block.getOnDiskSizeWithHeader();
173    }
174  }
175
176  private Path writeStoreFile(String fname) throws IOException {
177    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
178    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
179    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
180      .withOutputDir(storeFileParentDir).withFileContext(meta).build();
181    Random rand = ThreadLocalRandom.current();
182    final int rowLen = 32;
183    for (int i = 0; i < NUM_KV; ++i) {
184      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
185      byte[] v = RandomKeyValueUtil.randomValue(rand);
186      int cfLen = rand.nextInt(k.length - rowLen + 1);
187      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
188        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
189      sfw.append(kv);
190    }
191
192    sfw.close();
193    return sfw.getPath();
194  }
195
196  public static KeyValue.Type generateKeyType(Random rand) {
197    if (rand.nextBoolean()) {
198      // Let's make half of KVs puts.
199      return KeyValue.Type.Put;
200    } else {
201      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
202      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
203        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
204          + "Probably the layout of KeyValue.Type has changed.");
205      }
206      return keyType;
207    }
208  }
209
210}