001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Random;
027import java.util.UUID;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.CellComparator;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.KeyValue;
034import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
035import org.apache.hadoop.hbase.io.compress.Compression;
036import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
037import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
038import org.apache.hadoop.hbase.nio.ByteBuff;
039import org.apache.hadoop.hbase.regionserver.BloomType;
040import org.apache.hadoop.hbase.regionserver.HStoreFile;
041import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
042import org.apache.hadoop.hbase.regionserver.StoreFileReader;
043import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
044import org.apache.hadoop.hbase.testclassification.IOTests;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.apache.hadoop.hbase.util.BloomFilter;
047import org.apache.hadoop.hbase.util.BloomFilterFactory;
048import org.apache.hadoop.hbase.util.BloomFilterUtil;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.junit.jupiter.api.BeforeEach;
052import org.junit.jupiter.api.Tag;
053import org.junit.jupiter.api.Test;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057@Tag(IOTests.TAG)
058@Tag(SmallTests.TAG)
059public class TestBytesReadFromFs {
060  private static final int NUM_KEYS = 100000;
061  private static final int BLOOM_BLOCK_SIZE = 512;
062  private static final int INDEX_CHUNK_SIZE = 512;
063  private static final int DATA_BLOCK_SIZE = 4096;
064  private static final int ROW_PREFIX_LENGTH_IN_BLOOM_FILTER = 42;
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestBytesReadFromFs.class);
067  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
068  private static final Random RNG = new Random(9713312); // Just a fixed seed.
069
070  private Configuration conf;
071  private FileSystem fs;
072  private List<KeyValue> keyValues = new ArrayList<>();
073  private List<byte[]> keyList = new ArrayList<>();
074  private Path path;
075
076  @BeforeEach
077  public void setUp() throws IOException {
078    conf = TEST_UTIL.getConfiguration();
079    conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, ROW_PREFIX_LENGTH_IN_BLOOM_FILTER);
080    fs = FileSystem.get(conf);
081    String hfileName = UUID.randomUUID().toString().replaceAll("-", "");
082    path = new Path(TEST_UTIL.getDataTestDir(), hfileName);
083    conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_CHUNK_SIZE);
084  }
085
086  @Test
087  public void testBytesReadFromFsWithScanMetricsDisabled() throws IOException {
088    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false);
089    writeData(path);
090    KeyValue keyValue = keyValues.get(0);
091    readDataAndIndexBlocks(path, keyValue, false);
092  }
093
094  @Test
095  public void testBytesReadFromFsToReadDataUsingIndexBlocks() throws IOException {
096    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
097    writeData(path);
098    KeyValue keyValue = keyValues.get(0);
099    readDataAndIndexBlocks(path, keyValue, true);
100  }
101
102  @Test
103  public void testBytesReadFromFsToReadLoadOnOpenDataSection() throws IOException {
104    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
105    writeData(path);
106    readLoadOnOpenDataSection(path, false);
107  }
108
109  @Test
110  public void testBytesReadFromFsToReadBloomFilterIndexesAndBloomBlocks() throws IOException {
111    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
112    BloomType[] bloomTypes = { BloomType.ROW, BloomType.ROWCOL, BloomType.ROWPREFIX_FIXED_LENGTH };
113    for (BloomType bloomType : bloomTypes) {
114      LOG.info("Testing bloom type: {}", bloomType);
115      ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
116      ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
117      keyList.clear();
118      keyValues.clear();
119      writeBloomFilters(path, bloomType, BLOOM_BLOCK_SIZE);
120      if (bloomType == BloomType.ROWCOL) {
121        KeyValue keyValue = keyValues.get(0);
122        readBloomFilters(path, bloomType, null, keyValue);
123      } else {
124        assertEquals(ROW_PREFIX_LENGTH_IN_BLOOM_FILTER, keyList.get(0).length);
125        byte[] key = keyList.get(0);
126        readBloomFilters(path, bloomType, key, null);
127      }
128    }
129  }
130
131  private void writeData(Path path) throws IOException {
132    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE)
133      .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE)
134      .withCompression(Compression.Algorithm.NONE).build();
135    CacheConfig cacheConfig = new CacheConfig(conf);
136    HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, path)
137      .withFileContext(context).create();
138
139    byte[] cf = Bytes.toBytes("cf");
140    byte[] cq = Bytes.toBytes("cq");
141
142    for (int i = 0; i < NUM_KEYS; i++) {
143      byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 10);
144      // A random-length random value.
145      byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10);
146      KeyValue keyValue =
147        new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), valueBytes);
148      writer.append(keyValue);
149      keyValues.add(keyValue);
150    }
151
152    writer.close();
153  }
154
155  private void readDataAndIndexBlocks(Path path, KeyValue keyValue, boolean isScanMetricsEnabled)
156    throws IOException {
157    long fileSize = fs.getFileStatus(path).getLen();
158
159    ReaderContext readerContext =
160      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fs, path))
161        .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build();
162
163    // Read HFile trailer and create HFileContext
164    HFileInfo hfile = new HFileInfo(readerContext, conf);
165    FixedFileTrailer trailer = hfile.getTrailer();
166
167    // Read HFile info and load-on-open data section (we will read root again explicitly later)
168    CacheConfig cacheConfig = new CacheConfig(conf);
169    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
170    hfile.initMetaAndIndex(reader);
171    HFileContext meta = hfile.getHFileContext();
172
173    // Get access to the block reader
174    HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
175
176    // Create iterator for reading load-on-open data section
177    HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
178      fileSize - trailer.getTrailerSize());
179
180    // Indexes use NoOpEncodedSeeker
181    MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker();
182    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
183    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
184
185    int bytesRead = 0;
186    int blockLevelsRead = 0;
187
188    // Read the root index block
189    HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX);
190    bytesRead += block.getOnDiskSizeWithHeader();
191    if (block.getNextBlockOnDiskSize() > 0) {
192      bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum());
193    }
194    blockLevelsRead++;
195
196    // Comparator class name is stored in the trailer in version 3.
197    CellComparator comparator = trailer.createComparator();
198    // Initialize the seeker
199    seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator,
200      trailer.getNumDataIndexLevels());
201
202    int rootLevIndex = seeker.rootBlockContainingKey(keyValue);
203    long currentOffset = seeker.getBlockOffset(rootLevIndex);
204    int currentDataSize = seeker.getBlockDataSize(rootLevIndex);
205
206    HFileBlock prevBlock = null;
207    do {
208      prevBlock = block;
209      block = blockReader.readBlockData(currentOffset, currentDataSize, true, true, true);
210      HFileBlock unpacked = block.unpack(meta, blockReader);
211      if (unpacked != block) {
212        block.release();
213        block = unpacked;
214      }
215      bytesRead += block.getOnDiskSizeWithHeader();
216      if (block.getNextBlockOnDiskSize() > 0) {
217        bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum());
218      }
219      if (!block.getBlockType().isData()) {
220        ByteBuff buffer = block.getBufferWithoutHeader();
221        // Place the buffer at the correct position
222        HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer, keyValue, comparator);
223        currentOffset = buffer.getLong();
224        currentDataSize = buffer.getInt();
225      }
226      prevBlock.release();
227      blockLevelsRead++;
228    } while (!block.getBlockType().isData());
229    block.release();
230
231    reader.close();
232
233    assertEquals(isScanMetricsEnabled, ThreadLocalServerSideScanMetrics.isScanMetricsEnabled());
234    bytesRead = isScanMetricsEnabled ? bytesRead : 0;
235    assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
236    assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1);
237    assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
238    // At every index level we read one index block and finally read data block
239    long blockReadOpsCount = isScanMetricsEnabled ? blockLevelsRead : 0;
240    assertEquals(blockReadOpsCount,
241      ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
242  }
243
244  private void readLoadOnOpenDataSection(Path path, boolean hasBloomFilters) throws IOException {
245    long fileSize = fs.getFileStatus(path).getLen();
246
247    ReaderContext readerContext =
248      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fs, path))
249        .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build();
250
251    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
252    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
253    // Read HFile trailer
254    HFileInfo hfile = new HFileInfo(readerContext, conf);
255    FixedFileTrailer trailer = hfile.getTrailer();
256    assertEquals(trailer.getTrailerSize(),
257      ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
258    assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
259
260    CacheConfig cacheConfig = new CacheConfig(conf);
261    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
262    // Since HBASE-28466, we call fileInfo.initMetaAndIndex inside HFilePreadReader,
263    // which reads some blocks and increment the counters, so we need to reset it here.
264    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
265    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
266    HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
267
268    // Create iterator for reading root index block
269    HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
270      fileSize - trailer.getTrailerSize());
271    boolean readNextHeader = false;
272
273    // Read the root index block
274    readNextHeader = readEachBlockInLoadOnOpenDataSection(
275      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader);
276
277    // Read meta index block
278    readNextHeader = readEachBlockInLoadOnOpenDataSection(
279      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader);
280
281    // Read File info block
282    readNextHeader = readEachBlockInLoadOnOpenDataSection(
283      blockIter.nextBlockWithBlockType(BlockType.FILE_INFO), readNextHeader);
284
285    // Read bloom filter indexes
286    boolean bloomFilterIndexesRead = false;
287    HFileBlock block;
288    while ((block = blockIter.nextBlock()) != null) {
289      bloomFilterIndexesRead = true;
290      readNextHeader = readEachBlockInLoadOnOpenDataSection(block, readNextHeader);
291    }
292
293    reader.close();
294
295    assertEquals(hasBloomFilters, bloomFilterIndexesRead);
296    assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
297  }
298
299  private boolean readEachBlockInLoadOnOpenDataSection(HFileBlock block, boolean readNextHeader)
300    throws IOException {
301    long bytesRead = block.getOnDiskSizeWithHeader();
302    if (readNextHeader) {
303      bytesRead -= HFileBlock.headerSize(true);
304      readNextHeader = false;
305    }
306    if (block.getNextBlockOnDiskSize() > 0) {
307      bytesRead += HFileBlock.headerSize(true);
308      readNextHeader = true;
309    }
310    block.release();
311    assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
312    assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
313    return readNextHeader;
314  }
315
316  private void readBloomFilters(Path path, BloomType bt, byte[] key, KeyValue keyValue)
317    throws IOException {
318    assertTrue(keyValue == null || key == null);
319
320    // Assert that the bloom filter index was read and it's size is accounted in bytes read from
321    // fs
322    readLoadOnOpenDataSection(path, true);
323
324    CacheConfig cacheConf = new CacheConfig(conf);
325    StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(conf, fs, path, true);
326    HStoreFile sf = new HStoreFile(storeFileInfo, bt, cacheConf);
327
328    // Read HFile trailer and load-on-open data section
329    sf.initReader();
330
331    // Reset bytes read from fs to 0
332    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
333    // Reset read ops count to 0
334    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
335
336    StoreFileReader reader = sf.getReader();
337    BloomFilter bloomFilter = reader.getGeneralBloomFilter();
338    assertTrue(bloomFilter instanceof CompoundBloomFilter);
339    CompoundBloomFilter cbf = (CompoundBloomFilter) bloomFilter;
340
341    // Get the bloom filter index reader
342    HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex();
343    int block;
344
345    // Search for the key in the bloom filter index
346    if (keyValue != null) {
347      block = index.rootBlockContainingKey(keyValue);
348    } else {
349      byte[] row = key;
350      block = index.rootBlockContainingKey(row, 0, row.length);
351    }
352
353    // Read the bloom block from FS
354    HFileBlock bloomBlock = cbf.getBloomBlock(block);
355    long bytesRead = bloomBlock.getOnDiskSizeWithHeader();
356    if (bloomBlock.getNextBlockOnDiskSize() > 0) {
357      bytesRead += HFileBlock.headerSize(true);
358    }
359    // Asser that the block read is a bloom block
360    assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK);
361    bloomBlock.release();
362
363    // Close the reader
364    reader.close(true);
365
366    assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
367    assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
368  }
369
370  private void writeBloomFilters(Path path, BloomType bt, int bloomBlockByteSize)
371    throws IOException {
372    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, bloomBlockByteSize);
373    CacheConfig cacheConf = new CacheConfig(conf);
374    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE)
375      .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE)
376      .withCompression(Compression.Algorithm.NONE).build();
377    StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta)
378      .withBloomType(bt).withFilePath(path).build();
379    assertTrue(w.hasGeneralBloom());
380    assertTrue(w.getGeneralBloomWriter() instanceof CompoundBloomFilterWriter);
381    CompoundBloomFilterWriter cbbf = (CompoundBloomFilterWriter) w.getGeneralBloomWriter();
382    byte[] cf = Bytes.toBytes("cf");
383    byte[] cq = Bytes.toBytes("cq");
384    for (int i = 0; i < NUM_KEYS; i++) {
385      byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 10);
386      // A random-length random value.
387      byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10);
388      KeyValue keyValue =
389        new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), valueBytes);
390      w.append(keyValue);
391      keyList.add(keyBytes);
392      keyValues.add(keyValue);
393    }
394    assertEquals(keyList.size(), cbbf.getKeyCount());
395    w.close();
396  }
397
398  private static class MyNoOpEncodedSeeker extends NoOpIndexBlockEncoder.NoOpEncodedSeeker {
399    public long getBlockOffset(int i) {
400      return blockOffsets[i];
401    }
402
403    public int getBlockDataSize(int i) {
404      return blockDataSizes[i];
405    }
406  }
407}