001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY; 023import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION; 024import static org.hamcrest.MatcherAssert.assertThat; 025import static org.hamcrest.Matchers.allOf; 026import static org.hamcrest.Matchers.hasItem; 027import static org.hamcrest.Matchers.hasItems; 028import static org.hamcrest.Matchers.not; 029import static org.junit.Assert.assertFalse; 030import static org.junit.Assert.assertTrue; 031import static org.junit.Assert.fail; 032 033import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 034import io.opentelemetry.sdk.trace.data.SpanData; 035import java.io.IOException; 036import java.util.List; 037import java.util.Random; 038import java.util.concurrent.ThreadLocalRandom; 039import java.util.concurrent.TimeUnit; 040import java.util.function.BiConsumer; 041import java.util.function.BiFunction; 042import java.util.function.Consumer; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseConfiguration; 048import org.apache.hadoop.hbase.HBaseTestingUtil; 049import org.apache.hadoop.hbase.KeyValue; 050import org.apache.hadoop.hbase.MatcherPredicate; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.RegionInfoBuilder; 056import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 057import org.apache.hadoop.hbase.fs.HFileSystem; 058import org.apache.hadoop.hbase.io.ByteBuffAllocator; 059import org.apache.hadoop.hbase.io.HFileLink; 060import org.apache.hadoop.hbase.io.compress.Compression; 061import org.apache.hadoop.hbase.regionserver.BloomType; 062import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 063import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 064import org.apache.hadoop.hbase.regionserver.HStoreFile; 065import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 066import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 067import org.apache.hadoop.hbase.regionserver.TestHStoreFile; 068import org.apache.hadoop.hbase.testclassification.IOTests; 069import org.apache.hadoop.hbase.testclassification.MediumTests; 070import org.apache.hadoop.hbase.trace.TraceUtil; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.CommonFSUtils; 073import org.apache.hadoop.hbase.util.Pair; 074import org.junit.Before; 075import org.junit.ClassRule; 076import org.junit.Rule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082@Category({ IOTests.class, MediumTests.class }) 083public class TestPrefetch { 084 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class); 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestPrefetch.class); 089 090 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 091 092 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 093 private static final int DATA_BLOCK_SIZE = 2048; 094 private static final int NUM_KV = 1000; 095 096 private Configuration conf; 097 private CacheConfig cacheConf; 098 private FileSystem fs; 099 private BlockCache blockCache; 100 101 @Rule 102 public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); 103 104 @Before 105 public void setUp() throws IOException { 106 conf = TEST_UTIL.getConfiguration(); 107 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 108 fs = HFileSystem.get(conf); 109 blockCache = BlockCacheFactory.createBlockCache(conf); 110 cacheConf = new CacheConfig(conf, blockCache); 111 } 112 113 @Test 114 public void testPrefetchSetInHCDWorks() { 115 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 116 .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build(); 117 Configuration c = HBaseConfiguration.create(); 118 assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false)); 119 CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 120 assertTrue(cc.shouldPrefetchOnOpen()); 121 } 122 123 @Test 124 public void testPrefetch() throws Exception { 125 TraceUtil.trace(() -> { 126 Path storeFile = writeStoreFile("TestPrefetch"); 127 readStoreFile(storeFile); 128 }, "testPrefetch"); 129 130 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans, 131 hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request")))); 132 final List<SpanData> spans = otelRule.getSpans(); 133 if (LOG.isDebugEnabled()) { 134 StringTraceRenderer renderer = new StringTraceRenderer(spans); 135 renderer.render(LOG::debug); 136 } 137 138 final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst() 139 .orElseThrow(AssertionError::new); 140 assertThat("prefetch spans happen on their own threads, detached from file open.", spans, 141 hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan))))); 142 } 143 144 @Test 145 public void testPrefetchRace() throws Exception { 146 for (int i = 0; i < 10; i++) { 147 Path storeFile = writeStoreFile("TestPrefetchRace-" + i); 148 readStoreFileLikeScanner(storeFile); 149 } 150 } 151 152 /** 153 * Read a storefile in the same manner as a scanner -- using non-positional reads and without 154 * waiting for prefetch to complete. 155 */ 156 private void readStoreFileLikeScanner(Path storeFilePath) throws Exception { 157 // Open the file 158 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 159 do { 160 long offset = 0; 161 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 162 HFileBlock block = 163 reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null); 164 offset += block.getOnDiskSizeWithHeader(); 165 } 166 } while (!reader.prefetchComplete()); 167 } 168 169 private void readStoreFile(Path storeFilePath) throws Exception { 170 readStoreFile(storeFilePath, (r, o) -> { 171 HFileBlock block = null; 172 try { 173 block = r.readBlock(o, -1, false, true, false, true, null, null); 174 } catch (IOException e) { 175 fail(e.getMessage()); 176 } 177 return block; 178 }, (key, block) -> { 179 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 180 if ( 181 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 182 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 183 ) { 184 assertTrue(isCached); 185 } 186 }); 187 } 188 189 private void readStoreFileCacheOnly(Path storeFilePath) throws Exception { 190 readStoreFile(storeFilePath, (r, o) -> { 191 HFileBlock block = null; 192 try { 193 block = r.readBlock(o, -1, false, true, false, true, null, null, true); 194 } catch (IOException e) { 195 fail(e.getMessage()); 196 } 197 return block; 198 }, (key, block) -> { 199 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 200 if (block.getBlockType() == BlockType.DATA) { 201 assertFalse(block.isUnpacked()); 202 } else if ( 203 block.getBlockType() == BlockType.ROOT_INDEX 204 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 205 ) { 206 assertTrue(block.isUnpacked()); 207 } 208 assertTrue(isCached); 209 }); 210 } 211 212 private void readStoreFile(Path storeFilePath, 213 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 214 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 215 // Open the file 216 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 217 218 while (!reader.prefetchComplete()) { 219 // Sleep for a bit 220 Thread.sleep(1000); 221 } 222 long offset = 0; 223 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 224 HFileBlock block = readFunction.apply(reader, offset); 225 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 226 validationFunction.accept(blockCacheKey, block); 227 offset += block.getOnDiskSizeWithHeader(); 228 } 229 } 230 231 @Test 232 public void testPrefetchCompressed() throws Exception { 233 conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 234 cacheConf = new CacheConfig(conf, blockCache); 235 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 236 .withBlockSize(DATA_BLOCK_SIZE).build(); 237 Path storeFile = writeStoreFile("TestPrefetchCompressed", context); 238 readStoreFileCacheOnly(storeFile); 239 conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 240 241 } 242 243 @Test 244 public void testPrefetchSkipsRefs() throws Exception { 245 testPrefetchWhenRefs(true, c -> { 246 boolean isCached = c != null; 247 assertFalse(isCached); 248 }); 249 } 250 251 @Test 252 public void testPrefetchDoesntSkipRefs() throws Exception { 253 testPrefetchWhenRefs(false, c -> { 254 boolean isCached = c != null; 255 assertTrue(isCached); 256 }); 257 } 258 259 @Test 260 public void testPrefetchDoesntSkipHFileLink() throws Exception { 261 testPrefetchWhenHFileLink(c -> { 262 boolean isCached = c != null; 263 assertTrue(isCached); 264 }); 265 } 266 267 private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test) 268 throws Exception { 269 cacheConf = new CacheConfig(conf, blockCache); 270 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 271 Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchSkipRefs"); 272 RegionInfo region = 273 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchSkipRefs")).build(); 274 Path regionDir = new Path(tableDir, region.getEncodedName()); 275 Pair<Path, byte[]> fileWithSplitPoint = 276 writeStoreFileForSplit(new Path(regionDir, "cf"), context); 277 Path storeFile = fileWithSplitPoint.getFirst(); 278 HRegionFileSystem regionFS = 279 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); 280 HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true); 281 Path ref = regionFS.splitStoreFile(region, "cf", file, fileWithSplitPoint.getSecond(), false, 282 new ConstantSizeRegionSplitPolicy()); 283 conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionEnabled); 284 HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true); 285 refHsf.initReader(); 286 HFile.Reader reader = refHsf.getReader().getHFileReader(); 287 while (!reader.prefetchComplete()) { 288 // Sleep for a bit 289 Thread.sleep(1000); 290 } 291 long offset = 0; 292 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 293 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); 294 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 295 if (block.getBlockType() == BlockType.DATA) { 296 test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); 297 } 298 offset += block.getOnDiskSizeWithHeader(); 299 } 300 } 301 302 private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception { 303 cacheConf = new CacheConfig(conf, blockCache); 304 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 305 Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink"); 306 final RegionInfo hri = 307 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build(); 308 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 309 Configuration testConf = new Configuration(this.conf); 310 CommonFSUtils.setRootDir(testConf, testDir); 311 HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 312 CommonFSUtils.getTableDir(testDir, hri.getTable()), hri); 313 314 // Make a store file and write data to it. 315 StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs) 316 .withFilePath(regionFs.createTempName()).withFileContext(context).build(); 317 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"), 318 Bytes.toBytes("testPrefetchWhenHFileLink")); 319 320 Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath()); 321 Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf")); 322 HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName()); 323 Path linkFilePath = 324 new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName())); 325 326 // Try to open store file from link 327 StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true); 328 HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); 329 assertTrue(storeFileInfo.isLink()); 330 331 hsf.initReader(); 332 HFile.Reader reader = hsf.getReader().getHFileReader(); 333 while (!reader.prefetchComplete()) { 334 // Sleep for a bit 335 Thread.sleep(1000); 336 } 337 long offset = 0; 338 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 339 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); 340 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 341 if (block.getBlockType() == BlockType.DATA) { 342 test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); 343 } 344 offset += block.getOnDiskSizeWithHeader(); 345 } 346 } 347 348 private Path writeStoreFile(String fname) throws IOException { 349 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 350 return writeStoreFile(fname, meta); 351 } 352 353 private Path writeStoreFile(String fname, HFileContext context) throws IOException { 354 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); 355 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 356 .withOutputDir(storeFileParentDir).withFileContext(context).build(); 357 Random rand = ThreadLocalRandom.current(); 358 final int rowLen = 32; 359 for (int i = 0; i < NUM_KV; ++i) { 360 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 361 byte[] v = RandomKeyValueUtil.randomValue(rand); 362 int cfLen = rand.nextInt(k.length - rowLen + 1); 363 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 364 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 365 sfw.append(kv); 366 } 367 368 sfw.close(); 369 return sfw.getPath(); 370 } 371 372 private Pair<Path, byte[]> writeStoreFileForSplit(Path storeDir, HFileContext context) 373 throws IOException { 374 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeDir) 375 .withFileContext(context).build(); 376 Random rand = ThreadLocalRandom.current(); 377 final int rowLen = 32; 378 byte[] splitPoint = null; 379 for (int i = 0; i < NUM_KV; ++i) { 380 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 381 byte[] v = RandomKeyValueUtil.randomValue(rand); 382 int cfLen = rand.nextInt(k.length - rowLen + 1); 383 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 384 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 385 sfw.append(kv); 386 if (i == NUM_KV / 2) { 387 splitPoint = k; 388 } 389 } 390 sfw.close(); 391 return new Pair(sfw.getPath(), splitPoint); 392 } 393 394 public static KeyValue.Type generateKeyType(Random rand) { 395 if (rand.nextBoolean()) { 396 // Let's make half of KVs puts. 397 return KeyValue.Type.Put; 398 } else { 399 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 400 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 401 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 402 + "Probably the layout of KeyValue.Type has changed."); 403 } 404 return keyType; 405 } 406 } 407 408}