001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Random; 027import java.util.UUID; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.CellComparator; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.KeyValue; 034import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 035import org.apache.hadoop.hbase.io.compress.Compression; 036import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 037import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; 038import org.apache.hadoop.hbase.nio.ByteBuff; 039import org.apache.hadoop.hbase.regionserver.BloomType; 040import org.apache.hadoop.hbase.regionserver.HStoreFile; 041import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 042import org.apache.hadoop.hbase.regionserver.StoreFileReader; 043import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 044import org.apache.hadoop.hbase.testclassification.IOTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.apache.hadoop.hbase.util.BloomFilter; 047import org.apache.hadoop.hbase.util.BloomFilterFactory; 048import org.apache.hadoop.hbase.util.BloomFilterUtil; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.junit.jupiter.api.BeforeEach; 052import org.junit.jupiter.api.Tag; 053import org.junit.jupiter.api.Test; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057@Tag(IOTests.TAG) 058@Tag(SmallTests.TAG) 059public class TestBytesReadFromFs { 060 private static final int NUM_KEYS = 100000; 061 private static final int BLOOM_BLOCK_SIZE = 512; 062 private static final int INDEX_CHUNK_SIZE = 512; 063 private static final int DATA_BLOCK_SIZE = 4096; 064 private static final int ROW_PREFIX_LENGTH_IN_BLOOM_FILTER = 42; 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestBytesReadFromFs.class); 067 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 068 private static final Random RNG = new Random(9713312); // Just a fixed seed. 069 070 private Configuration conf; 071 private FileSystem fs; 072 private List<KeyValue> keyValues = new ArrayList<>(); 073 private List<byte[]> keyList = new ArrayList<>(); 074 private Path path; 075 076 @BeforeEach 077 public void setUp() throws IOException { 078 conf = TEST_UTIL.getConfiguration(); 079 conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, ROW_PREFIX_LENGTH_IN_BLOOM_FILTER); 080 fs = FileSystem.get(conf); 081 String hfileName = UUID.randomUUID().toString().replaceAll("-", ""); 082 path = new Path(TEST_UTIL.getDataTestDir(), hfileName); 083 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_CHUNK_SIZE); 084 } 085 086 @Test 087 public void testBytesReadFromFsWithScanMetricsDisabled() throws IOException { 088 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false); 089 writeData(path); 090 KeyValue keyValue = keyValues.get(0); 091 readDataAndIndexBlocks(path, keyValue, false); 092 } 093 094 @Test 095 public void testBytesReadFromFsToReadDataUsingIndexBlocks() throws IOException { 096 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true); 097 writeData(path); 098 KeyValue keyValue = keyValues.get(0); 099 readDataAndIndexBlocks(path, keyValue, true); 100 } 101 102 @Test 103 public void testBytesReadFromFsToReadLoadOnOpenDataSection() throws IOException { 104 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true); 105 writeData(path); 106 readLoadOnOpenDataSection(path, false); 107 } 108 109 @Test 110 public void testBytesReadFromFsToReadBloomFilterIndexesAndBloomBlocks() throws IOException { 111 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true); 112 BloomType[] bloomTypes = { BloomType.ROW, BloomType.ROWCOL, BloomType.ROWPREFIX_FIXED_LENGTH }; 113 for (BloomType bloomType : bloomTypes) { 114 LOG.info("Testing bloom type: {}", bloomType); 115 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 116 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); 117 keyList.clear(); 118 keyValues.clear(); 119 writeBloomFilters(path, bloomType, BLOOM_BLOCK_SIZE); 120 if (bloomType == BloomType.ROWCOL) { 121 KeyValue keyValue = keyValues.get(0); 122 readBloomFilters(path, bloomType, null, keyValue); 123 } else { 124 assertEquals(ROW_PREFIX_LENGTH_IN_BLOOM_FILTER, keyList.get(0).length); 125 byte[] key = keyList.get(0); 126 readBloomFilters(path, bloomType, key, null); 127 } 128 } 129 } 130 131 private void writeData(Path path) throws IOException { 132 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE) 133 .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE) 134 .withCompression(Compression.Algorithm.NONE).build(); 135 CacheConfig cacheConfig = new CacheConfig(conf); 136 HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, path) 137 .withFileContext(context).create(); 138 139 byte[] cf = Bytes.toBytes("cf"); 140 byte[] cq = Bytes.toBytes("cq"); 141 142 for (int i = 0; i < NUM_KEYS; i++) { 143 byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 10); 144 // A random-length random value. 145 byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10); 146 KeyValue keyValue = 147 new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), valueBytes); 148 writer.append(keyValue); 149 keyValues.add(keyValue); 150 } 151 152 writer.close(); 153 } 154 155 private void readDataAndIndexBlocks(Path path, KeyValue keyValue, boolean isScanMetricsEnabled) 156 throws IOException { 157 long fileSize = fs.getFileStatus(path).getLen(); 158 159 ReaderContext readerContext = 160 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fs, path)) 161 .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build(); 162 163 // Read HFile trailer and create HFileContext 164 HFileInfo hfile = new HFileInfo(readerContext, conf); 165 FixedFileTrailer trailer = hfile.getTrailer(); 166 167 // Read HFile info and load-on-open data section (we will read root again explicitly later) 168 CacheConfig cacheConfig = new CacheConfig(conf); 169 HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf); 170 hfile.initMetaAndIndex(reader); 171 HFileContext meta = hfile.getHFileContext(); 172 173 // Get access to the block reader 174 HFileBlock.FSReader blockReader = reader.getUncachedBlockReader(); 175 176 // Create iterator for reading load-on-open data section 177 HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(), 178 fileSize - trailer.getTrailerSize()); 179 180 // Indexes use NoOpEncodedSeeker 181 MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker(); 182 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 183 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); 184 185 int bytesRead = 0; 186 int blockLevelsRead = 0; 187 188 // Read the root index block 189 HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX); 190 bytesRead += block.getOnDiskSizeWithHeader(); 191 if (block.getNextBlockOnDiskSize() > 0) { 192 bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum()); 193 } 194 blockLevelsRead++; 195 196 // Comparator class name is stored in the trailer in version 3. 197 CellComparator comparator = trailer.createComparator(); 198 // Initialize the seeker 199 seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator, 200 trailer.getNumDataIndexLevels()); 201 202 int rootLevIndex = seeker.rootBlockContainingKey(keyValue); 203 long currentOffset = seeker.getBlockOffset(rootLevIndex); 204 int currentDataSize = seeker.getBlockDataSize(rootLevIndex); 205 206 HFileBlock prevBlock = null; 207 do { 208 prevBlock = block; 209 block = blockReader.readBlockData(currentOffset, currentDataSize, true, true, true); 210 HFileBlock unpacked = block.unpack(meta, blockReader); 211 if (unpacked != block) { 212 block.release(); 213 block = unpacked; 214 } 215 bytesRead += block.getOnDiskSizeWithHeader(); 216 if (block.getNextBlockOnDiskSize() > 0) { 217 bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum()); 218 } 219 if (!block.getBlockType().isData()) { 220 ByteBuff buffer = block.getBufferWithoutHeader(); 221 // Place the buffer at the correct position 222 HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer, keyValue, comparator); 223 currentOffset = buffer.getLong(); 224 currentDataSize = buffer.getInt(); 225 } 226 prevBlock.release(); 227 blockLevelsRead++; 228 } while (!block.getBlockType().isData()); 229 block.release(); 230 231 reader.close(); 232 233 assertEquals(isScanMetricsEnabled, ThreadLocalServerSideScanMetrics.isScanMetricsEnabled()); 234 bytesRead = isScanMetricsEnabled ? bytesRead : 0; 235 assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 236 assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1); 237 assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset()); 238 // At every index level we read one index block and finally read data block 239 long blockReadOpsCount = isScanMetricsEnabled ? blockLevelsRead : 0; 240 assertEquals(blockReadOpsCount, 241 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset()); 242 } 243 244 private void readLoadOnOpenDataSection(Path path, boolean hasBloomFilters) throws IOException { 245 long fileSize = fs.getFileStatus(path).getLen(); 246 247 ReaderContext readerContext = 248 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fs, path)) 249 .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build(); 250 251 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 252 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); 253 // Read HFile trailer 254 HFileInfo hfile = new HFileInfo(readerContext, conf); 255 FixedFileTrailer trailer = hfile.getTrailer(); 256 assertEquals(trailer.getTrailerSize(), 257 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 258 assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset()); 259 260 CacheConfig cacheConfig = new CacheConfig(conf); 261 HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf); 262 // Since HBASE-28466, we call fileInfo.initMetaAndIndex inside HFilePreadReader, 263 // which reads some blocks and increment the counters, so we need to reset it here. 264 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 265 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); 266 HFileBlock.FSReader blockReader = reader.getUncachedBlockReader(); 267 268 // Create iterator for reading root index block 269 HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(), 270 fileSize - trailer.getTrailerSize()); 271 boolean readNextHeader = false; 272 273 // Read the root index block 274 readNextHeader = readEachBlockInLoadOnOpenDataSection( 275 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader); 276 277 // Read meta index block 278 readNextHeader = readEachBlockInLoadOnOpenDataSection( 279 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader); 280 281 // Read File info block 282 readNextHeader = readEachBlockInLoadOnOpenDataSection( 283 blockIter.nextBlockWithBlockType(BlockType.FILE_INFO), readNextHeader); 284 285 // Read bloom filter indexes 286 boolean bloomFilterIndexesRead = false; 287 HFileBlock block; 288 while ((block = blockIter.nextBlock()) != null) { 289 bloomFilterIndexesRead = true; 290 readNextHeader = readEachBlockInLoadOnOpenDataSection(block, readNextHeader); 291 } 292 293 reader.close(); 294 295 assertEquals(hasBloomFilters, bloomFilterIndexesRead); 296 assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset()); 297 } 298 299 private boolean readEachBlockInLoadOnOpenDataSection(HFileBlock block, boolean readNextHeader) 300 throws IOException { 301 long bytesRead = block.getOnDiskSizeWithHeader(); 302 if (readNextHeader) { 303 bytesRead -= HFileBlock.headerSize(true); 304 readNextHeader = false; 305 } 306 if (block.getNextBlockOnDiskSize() > 0) { 307 bytesRead += HFileBlock.headerSize(true); 308 readNextHeader = true; 309 } 310 block.release(); 311 assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 312 assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset()); 313 return readNextHeader; 314 } 315 316 private void readBloomFilters(Path path, BloomType bt, byte[] key, KeyValue keyValue) 317 throws IOException { 318 assertTrue(keyValue == null || key == null); 319 320 // Assert that the bloom filter index was read and it's size is accounted in bytes read from 321 // fs 322 readLoadOnOpenDataSection(path, true); 323 324 CacheConfig cacheConf = new CacheConfig(conf); 325 StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(conf, fs, path, true); 326 HStoreFile sf = new HStoreFile(storeFileInfo, bt, cacheConf); 327 328 // Read HFile trailer and load-on-open data section 329 sf.initReader(); 330 331 // Reset bytes read from fs to 0 332 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 333 // Reset read ops count to 0 334 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); 335 336 StoreFileReader reader = sf.getReader(); 337 BloomFilter bloomFilter = reader.getGeneralBloomFilter(); 338 assertTrue(bloomFilter instanceof CompoundBloomFilter); 339 CompoundBloomFilter cbf = (CompoundBloomFilter) bloomFilter; 340 341 // Get the bloom filter index reader 342 HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex(); 343 int block; 344 345 // Search for the key in the bloom filter index 346 if (keyValue != null) { 347 block = index.rootBlockContainingKey(keyValue); 348 } else { 349 byte[] row = key; 350 block = index.rootBlockContainingKey(row, 0, row.length); 351 } 352 353 // Read the bloom block from FS 354 HFileBlock bloomBlock = cbf.getBloomBlock(block); 355 long bytesRead = bloomBlock.getOnDiskSizeWithHeader(); 356 if (bloomBlock.getNextBlockOnDiskSize() > 0) { 357 bytesRead += HFileBlock.headerSize(true); 358 } 359 // Asser that the block read is a bloom block 360 assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK); 361 bloomBlock.release(); 362 363 // Close the reader 364 reader.close(true); 365 366 assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 367 assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset()); 368 } 369 370 private void writeBloomFilters(Path path, BloomType bt, int bloomBlockByteSize) 371 throws IOException { 372 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, bloomBlockByteSize); 373 CacheConfig cacheConf = new CacheConfig(conf); 374 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE) 375 .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE) 376 .withCompression(Compression.Algorithm.NONE).build(); 377 StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) 378 .withBloomType(bt).withFilePath(path).build(); 379 assertTrue(w.hasGeneralBloom()); 380 assertTrue(w.getGeneralBloomWriter() instanceof CompoundBloomFilterWriter); 381 CompoundBloomFilterWriter cbbf = (CompoundBloomFilterWriter) w.getGeneralBloomWriter(); 382 byte[] cf = Bytes.toBytes("cf"); 383 byte[] cq = Bytes.toBytes("cq"); 384 for (int i = 0; i < NUM_KEYS; i++) { 385 byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 10); 386 // A random-length random value. 387 byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10); 388 KeyValue keyValue = 389 new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), valueBytes); 390 w.append(keyValue); 391 keyList.add(keyBytes); 392 keyValues.add(keyValue); 393 } 394 assertEquals(keyList.size(), cbbf.getKeyCount()); 395 w.close(); 396 } 397 398 private static class MyNoOpEncodedSeeker extends NoOpIndexBlockEncoder.NoOpEncodedSeeker { 399 public long getBlockOffset(int i) { 400 return blockOffsets[i]; 401 } 402 403 public int getBlockDataSize(int i) { 404 return blockDataSizes[i]; 405 } 406 } 407}