001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Random;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.CellComparator;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
033import org.apache.hadoop.hbase.io.compress.Compression;
034import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
035import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
036import org.apache.hadoop.hbase.nio.ByteBuff;
037import org.apache.hadoop.hbase.regionserver.BloomType;
038import org.apache.hadoop.hbase.regionserver.HStoreFile;
039import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
040import org.apache.hadoop.hbase.regionserver.StoreFileReader;
041import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
042import org.apache.hadoop.hbase.testclassification.IOTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.BloomFilter;
045import org.apache.hadoop.hbase.util.BloomFilterFactory;
046import org.apache.hadoop.hbase.util.BloomFilterUtil;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.junit.Assert;
050import org.junit.Before;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Category({ IOTests.class, SmallTests.class })
060public class TestBytesReadFromFs {
061  private static final int NUM_KEYS = 100000;
062  private static final int BLOOM_BLOCK_SIZE = 512;
063  private static final int INDEX_CHUNK_SIZE = 512;
064  private static final int DATA_BLOCK_SIZE = 4096;
065  private static final int ROW_PREFIX_LENGTH_IN_BLOOM_FILTER = 42;
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestBytesReadFromFs.class);
070
071  @Rule
072  public TestName name = new TestName();
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestBytesReadFromFs.class);
075  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
076  private static final Random RNG = new Random(9713312); // Just a fixed seed.
077
078  private Configuration conf;
079  private FileSystem fs;
080  private List<KeyValue> keyValues = new ArrayList<>();
081  private List<byte[]> keyList = new ArrayList<>();
082  private Path path;
083
084  @Before
085  public void setUp() throws IOException {
086    conf = TEST_UTIL.getConfiguration();
087    conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, ROW_PREFIX_LENGTH_IN_BLOOM_FILTER);
088    fs = FileSystem.get(conf);
089    String hfileName = UUID.randomUUID().toString().replaceAll("-", "");
090    path = new Path(TEST_UTIL.getDataTestDir(), hfileName);
091    conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_CHUNK_SIZE);
092  }
093
094  @Test
095  public void testBytesReadFromFsWithScanMetricsDisabled() throws IOException {
096    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false);
097    writeData(path);
098    KeyValue keyValue = keyValues.get(0);
099    readDataAndIndexBlocks(path, keyValue, false);
100  }
101
102  @Test
103  public void testBytesReadFromFsToReadDataUsingIndexBlocks() throws IOException {
104    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
105    writeData(path);
106    KeyValue keyValue = keyValues.get(0);
107    readDataAndIndexBlocks(path, keyValue, true);
108  }
109
110  @Test
111  public void testBytesReadFromFsToReadLoadOnOpenDataSection() throws IOException {
112    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
113    writeData(path);
114    readLoadOnOpenDataSection(path, false);
115  }
116
117  @Test
118  public void testBytesReadFromFsToReadBloomFilterIndexesAndBloomBlocks() throws IOException {
119    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
120    BloomType[] bloomTypes = { BloomType.ROW, BloomType.ROWCOL, BloomType.ROWPREFIX_FIXED_LENGTH };
121    for (BloomType bloomType : bloomTypes) {
122      LOG.info("Testing bloom type: {}", bloomType);
123      ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
124      ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
125      keyList.clear();
126      keyValues.clear();
127      writeBloomFilters(path, bloomType, BLOOM_BLOCK_SIZE);
128      if (bloomType == BloomType.ROWCOL) {
129        KeyValue keyValue = keyValues.get(0);
130        readBloomFilters(path, bloomType, null, keyValue);
131      } else {
132        Assert.assertEquals(ROW_PREFIX_LENGTH_IN_BLOOM_FILTER, keyList.get(0).length);
133        byte[] key = keyList.get(0);
134        readBloomFilters(path, bloomType, key, null);
135      }
136    }
137  }
138
139  private void writeData(Path path) throws IOException {
140    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE)
141      .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE)
142      .withCompression(Compression.Algorithm.NONE).build();
143    CacheConfig cacheConfig = new CacheConfig(conf);
144    HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, path)
145      .withFileContext(context).create();
146
147    byte[] cf = Bytes.toBytes("cf");
148    byte[] cq = Bytes.toBytes("cq");
149
150    for (int i = 0; i < NUM_KEYS; i++) {
151      byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 10);
152      // A random-length random value.
153      byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10);
154      KeyValue keyValue =
155        new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), valueBytes);
156      writer.append(keyValue);
157      keyValues.add(keyValue);
158    }
159
160    writer.close();
161  }
162
163  private void readDataAndIndexBlocks(Path path, KeyValue keyValue, boolean isScanMetricsEnabled)
164    throws IOException {
165    long fileSize = fs.getFileStatus(path).getLen();
166
167    ReaderContext readerContext =
168      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fs, path))
169        .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build();
170
171    // Read HFile trailer and create HFileContext
172    HFileInfo hfile = new HFileInfo(readerContext, conf);
173    FixedFileTrailer trailer = hfile.getTrailer();
174
175    // Read HFile info and load-on-open data section (we will read root again explicitly later)
176    CacheConfig cacheConfig = new CacheConfig(conf);
177    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
178    hfile.initMetaAndIndex(reader);
179    HFileContext meta = hfile.getHFileContext();
180
181    // Get access to the block reader
182    HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
183
184    // Create iterator for reading load-on-open data section
185    HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
186      fileSize - trailer.getTrailerSize());
187
188    // Indexes use NoOpEncodedSeeker
189    MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker();
190    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
191    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
192
193    int bytesRead = 0;
194    int blockLevelsRead = 0;
195
196    // Read the root index block
197    HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX);
198    bytesRead += block.getOnDiskSizeWithHeader();
199    if (block.getNextBlockOnDiskSize() > 0) {
200      bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum());
201    }
202    blockLevelsRead++;
203
204    // Comparator class name is stored in the trailer in version 3.
205    CellComparator comparator = trailer.createComparator();
206    // Initialize the seeker
207    seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator,
208      trailer.getNumDataIndexLevels());
209
210    int rootLevIndex = seeker.rootBlockContainingKey(keyValue);
211    long currentOffset = seeker.getBlockOffset(rootLevIndex);
212    int currentDataSize = seeker.getBlockDataSize(rootLevIndex);
213
214    HFileBlock prevBlock = null;
215    do {
216      prevBlock = block;
217      block = blockReader.readBlockData(currentOffset, currentDataSize, true, true, true);
218      HFileBlock unpacked = block.unpack(meta, blockReader);
219      if (unpacked != block) {
220        block.release();
221        block = unpacked;
222      }
223      bytesRead += block.getOnDiskSizeWithHeader();
224      if (block.getNextBlockOnDiskSize() > 0) {
225        bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum());
226      }
227      if (!block.getBlockType().isData()) {
228        ByteBuff buffer = block.getBufferWithoutHeader();
229        // Place the buffer at the correct position
230        HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer, keyValue, comparator);
231        currentOffset = buffer.getLong();
232        currentDataSize = buffer.getInt();
233      }
234      prevBlock.release();
235      blockLevelsRead++;
236    } while (!block.getBlockType().isData());
237    block.release();
238
239    reader.close();
240
241    Assert.assertEquals(isScanMetricsEnabled,
242      ThreadLocalServerSideScanMetrics.isScanMetricsEnabled());
243    bytesRead = isScanMetricsEnabled ? bytesRead : 0;
244    Assert.assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
245    Assert.assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1);
246    Assert.assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
247    // At every index level we read one index block and finally read data block
248    long blockReadOpsCount = isScanMetricsEnabled ? blockLevelsRead : 0;
249    Assert.assertEquals(blockReadOpsCount,
250      ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
251  }
252
253  private void readLoadOnOpenDataSection(Path path, boolean hasBloomFilters) throws IOException {
254    long fileSize = fs.getFileStatus(path).getLen();
255
256    ReaderContext readerContext =
257      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fs, path))
258        .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build();
259
260    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
261    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
262    // Read HFile trailer
263    HFileInfo hfile = new HFileInfo(readerContext, conf);
264    FixedFileTrailer trailer = hfile.getTrailer();
265    Assert.assertEquals(trailer.getTrailerSize(),
266      ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
267    Assert.assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
268
269    CacheConfig cacheConfig = new CacheConfig(conf);
270    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
271    // Since HBASE-28466, we call fileInfo.initMetaAndIndex inside HFilePreadReader,
272    // which reads some blocks and increment the counters, so we need to reset it here.
273    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
274    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
275    HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
276
277    // Create iterator for reading root index block
278    HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
279      fileSize - trailer.getTrailerSize());
280    boolean readNextHeader = false;
281
282    // Read the root index block
283    readNextHeader = readEachBlockInLoadOnOpenDataSection(
284      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader);
285
286    // Read meta index block
287    readNextHeader = readEachBlockInLoadOnOpenDataSection(
288      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader);
289
290    // Read File info block
291    readNextHeader = readEachBlockInLoadOnOpenDataSection(
292      blockIter.nextBlockWithBlockType(BlockType.FILE_INFO), readNextHeader);
293
294    // Read bloom filter indexes
295    boolean bloomFilterIndexesRead = false;
296    HFileBlock block;
297    while ((block = blockIter.nextBlock()) != null) {
298      bloomFilterIndexesRead = true;
299      readNextHeader = readEachBlockInLoadOnOpenDataSection(block, readNextHeader);
300    }
301
302    reader.close();
303
304    Assert.assertEquals(hasBloomFilters, bloomFilterIndexesRead);
305    Assert.assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
306  }
307
308  private boolean readEachBlockInLoadOnOpenDataSection(HFileBlock block, boolean readNextHeader)
309    throws IOException {
310    long bytesRead = block.getOnDiskSizeWithHeader();
311    if (readNextHeader) {
312      bytesRead -= HFileBlock.headerSize(true);
313      readNextHeader = false;
314    }
315    if (block.getNextBlockOnDiskSize() > 0) {
316      bytesRead += HFileBlock.headerSize(true);
317      readNextHeader = true;
318    }
319    block.release();
320    Assert.assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
321    Assert.assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
322    return readNextHeader;
323  }
324
325  private void readBloomFilters(Path path, BloomType bt, byte[] key, KeyValue keyValue)
326    throws IOException {
327    Assert.assertTrue(keyValue == null || key == null);
328
329    // Assert that the bloom filter index was read and it's size is accounted in bytes read from
330    // fs
331    readLoadOnOpenDataSection(path, true);
332
333    CacheConfig cacheConf = new CacheConfig(conf);
334    StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(conf, fs, path, true);
335    HStoreFile sf = new HStoreFile(storeFileInfo, bt, cacheConf);
336
337    // Read HFile trailer and load-on-open data section
338    sf.initReader();
339
340    // Reset bytes read from fs to 0
341    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
342    // Reset read ops count to 0
343    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
344
345    StoreFileReader reader = sf.getReader();
346    BloomFilter bloomFilter = reader.getGeneralBloomFilter();
347    Assert.assertTrue(bloomFilter instanceof CompoundBloomFilter);
348    CompoundBloomFilter cbf = (CompoundBloomFilter) bloomFilter;
349
350    // Get the bloom filter index reader
351    HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex();
352    int block;
353
354    // Search for the key in the bloom filter index
355    if (keyValue != null) {
356      block = index.rootBlockContainingKey(keyValue);
357    } else {
358      byte[] row = key;
359      block = index.rootBlockContainingKey(row, 0, row.length);
360    }
361
362    // Read the bloom block from FS
363    HFileBlock bloomBlock = cbf.getBloomBlock(block);
364    long bytesRead = bloomBlock.getOnDiskSizeWithHeader();
365    if (bloomBlock.getNextBlockOnDiskSize() > 0) {
366      bytesRead += HFileBlock.headerSize(true);
367    }
368    // Asser that the block read is a bloom block
369    Assert.assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK);
370    bloomBlock.release();
371
372    // Close the reader
373    reader.close(true);
374
375    Assert.assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
376    Assert.assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
377  }
378
379  private void writeBloomFilters(Path path, BloomType bt, int bloomBlockByteSize)
380    throws IOException {
381    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, bloomBlockByteSize);
382    CacheConfig cacheConf = new CacheConfig(conf);
383    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE)
384      .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE)
385      .withCompression(Compression.Algorithm.NONE).build();
386    StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta)
387      .withBloomType(bt).withFilePath(path).build();
388    Assert.assertTrue(w.hasGeneralBloom());
389    Assert.assertTrue(w.getGeneralBloomWriter() instanceof CompoundBloomFilterWriter);
390    CompoundBloomFilterWriter cbbf = (CompoundBloomFilterWriter) w.getGeneralBloomWriter();
391    byte[] cf = Bytes.toBytes("cf");
392    byte[] cq = Bytes.toBytes("cq");
393    for (int i = 0; i < NUM_KEYS; i++) {
394      byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 10);
395      // A random-length random value.
396      byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10);
397      KeyValue keyValue =
398        new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), valueBytes);
399      w.append(keyValue);
400      keyList.add(keyBytes);
401      keyValues.add(keyValue);
402    }
403    Assert.assertEquals(keyList.size(), cbbf.getKeyCount());
404    w.close();
405  }
406
407  private static class MyNoOpEncodedSeeker extends NoOpIndexBlockEncoder.NoOpEncodedSeeker {
408    public long getBlockOffset(int i) {
409      return blockOffsets[i];
410    }
411
412    public int getBlockDataSize(int i) {
413      return blockDataSizes[i];
414    }
415  }
416}