001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 022import static org.hamcrest.MatcherAssert.assertThat; 023import static org.hamcrest.Matchers.allOf; 024import static org.hamcrest.Matchers.hasItem; 025import static org.hamcrest.Matchers.hasItems; 026import static org.hamcrest.Matchers.not; 027import static org.junit.Assert.assertFalse; 028import static org.junit.Assert.assertTrue; 029 030import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 031import io.opentelemetry.sdk.trace.data.SpanData; 032import java.io.IOException; 033import java.util.List; 034import java.util.Random; 035import java.util.concurrent.ThreadLocalRandom; 036import java.util.concurrent.TimeUnit; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.MatcherPredicate; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 047import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 048import org.apache.hadoop.hbase.fs.HFileSystem; 049import org.apache.hadoop.hbase.io.ByteBuffAllocator; 050import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 051import org.apache.hadoop.hbase.testclassification.IOTests; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.trace.TraceUtil; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.junit.Before; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063@Category({ IOTests.class, MediumTests.class }) 064public class TestPrefetch { 065 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class); 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestPrefetch.class); 070 071 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 072 073 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 074 private static final int DATA_BLOCK_SIZE = 2048; 075 private static final int NUM_KV = 1000; 076 077 private Configuration conf; 078 private CacheConfig cacheConf; 079 private FileSystem fs; 080 private BlockCache blockCache; 081 082 @Rule 083 public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); 084 085 @Before 086 public void setUp() throws IOException { 087 conf = TEST_UTIL.getConfiguration(); 088 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 089 fs = HFileSystem.get(conf); 090 blockCache = BlockCacheFactory.createBlockCache(conf); 091 cacheConf = new CacheConfig(conf, blockCache); 092 } 093 094 @Test 095 public void testPrefetchSetInHCDWorks() { 096 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 097 .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build(); 098 Configuration c = HBaseConfiguration.create(); 099 assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false)); 100 CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 101 assertTrue(cc.shouldPrefetchOnOpen()); 102 } 103 104 @Test 105 public void testPrefetch() throws Exception { 106 TraceUtil.trace(() -> { 107 Path storeFile = writeStoreFile("TestPrefetch"); 108 readStoreFile(storeFile); 109 }, "testPrefetch"); 110 111 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans, 112 hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request")))); 113 final List<SpanData> spans = otelRule.getSpans(); 114 if (LOG.isDebugEnabled()) { 115 StringTraceRenderer renderer = new StringTraceRenderer(spans); 116 renderer.render(LOG::debug); 117 } 118 119 final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst() 120 .orElseThrow(AssertionError::new); 121 assertThat("prefetch spans happen on their own threads, detached from file open.", spans, 122 hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan))))); 123 } 124 125 @Test 126 public void testPrefetchRace() throws Exception { 127 for (int i = 0; i < 10; i++) { 128 Path storeFile = writeStoreFile("TestPrefetchRace-" + i); 129 readStoreFileLikeScanner(storeFile); 130 } 131 } 132 133 /** 134 * Read a storefile in the same manner as a scanner -- using non-positional reads and without 135 * waiting for prefetch to complete. 136 */ 137 private void readStoreFileLikeScanner(Path storeFilePath) throws Exception { 138 // Open the file 139 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 140 do { 141 long offset = 0; 142 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 143 HFileBlock block = 144 reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null); 145 offset += block.getOnDiskSizeWithHeader(); 146 } 147 } while (!reader.prefetchComplete()); 148 } 149 150 private void readStoreFile(Path storeFilePath) throws Exception { 151 // Open the file 152 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 153 154 while (!reader.prefetchComplete()) { 155 // Sleep for a bit 156 Thread.sleep(1000); 157 } 158 159 // Check that all of the data blocks were preloaded 160 BlockCache blockCache = cacheConf.getBlockCache().get(); 161 long offset = 0; 162 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 163 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null); 164 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 165 boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; 166 if ( 167 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 168 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 169 ) { 170 assertTrue(isCached); 171 } 172 offset += block.getOnDiskSizeWithHeader(); 173 } 174 } 175 176 private Path writeStoreFile(String fname) throws IOException { 177 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); 178 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 179 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 180 .withOutputDir(storeFileParentDir).withFileContext(meta).build(); 181 Random rand = ThreadLocalRandom.current(); 182 final int rowLen = 32; 183 for (int i = 0; i < NUM_KV; ++i) { 184 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 185 byte[] v = RandomKeyValueUtil.randomValue(rand); 186 int cfLen = rand.nextInt(k.length - rowLen + 1); 187 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 188 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 189 sfw.append(kv); 190 } 191 192 sfw.close(); 193 return sfw.getPath(); 194 } 195 196 public static KeyValue.Type generateKeyType(Random rand) { 197 if (rand.nextBoolean()) { 198 // Let's make half of KVs puts. 199 return KeyValue.Type.Put; 200 } else { 201 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 202 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 203 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 204 + "Probably the layout of KeyValue.Type has changed."); 205 } 206 return keyType; 207 } 208 } 209 210}