001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Random;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.CellComparator;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
033import org.apache.hadoop.hbase.io.compress.Compression;
034import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
035import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
036import org.apache.hadoop.hbase.nio.ByteBuff;
037import org.apache.hadoop.hbase.regionserver.BloomType;
038import org.apache.hadoop.hbase.regionserver.HStoreFile;
039import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
040import org.apache.hadoop.hbase.regionserver.StoreFileReader;
041import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
042import org.apache.hadoop.hbase.testclassification.IOTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.BloomFilter;
045import org.apache.hadoop.hbase.util.BloomFilterFactory;
046import org.apache.hadoop.hbase.util.BloomFilterUtil;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.junit.Assert;
050import org.junit.Before;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Category({ IOTests.class, SmallTests.class })
060public class TestBytesReadFromFs {
061  private static final int NUM_KEYS = 100000;
062  private static final int BLOOM_BLOCK_SIZE = 512;
063  private static final int INDEX_CHUNK_SIZE = 512;
064  private static final int DATA_BLOCK_SIZE = 4096;
065  private static final int ROW_PREFIX_LENGTH_IN_BLOOM_FILTER = 42;
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestBytesReadFromFs.class);
070
071  @Rule
072  public TestName name = new TestName();
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestBytesReadFromFs.class);
075  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
076  private static final Random RNG = new Random(9713312); // Just a fixed seed.
077
078  private Configuration conf;
079  private FileSystem fs;
080  private List<KeyValue> keyValues = new ArrayList<>();
081  private List<byte[]> keyList = new ArrayList<>();
082  private Path path;
083
084  @Before
085  public void setUp() throws IOException {
086    conf = TEST_UTIL.getConfiguration();
087    conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, ROW_PREFIX_LENGTH_IN_BLOOM_FILTER);
088    fs = FileSystem.get(conf);
089    String hfileName = UUID.randomUUID().toString().replaceAll("-", "");
090    path = new Path(TEST_UTIL.getDataTestDir(), hfileName);
091    conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_CHUNK_SIZE);
092  }
093
094  @Test
095  public void testBytesReadFromFsWithScanMetricsDisabled() throws IOException {
096    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false);
097    writeData(path);
098    KeyValue keyValue = keyValues.get(0);
099    readDataAndIndexBlocks(path, keyValue, false);
100  }
101
102  @Test
103  public void testBytesReadFromFsToReadDataUsingIndexBlocks() throws IOException {
104    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
105    writeData(path);
106    KeyValue keyValue = keyValues.get(0);
107    readDataAndIndexBlocks(path, keyValue, true);
108  }
109
110  @Test
111  public void testBytesReadFromFsToReadLoadOnOpenDataSection() throws IOException {
112    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
113    writeData(path);
114    readLoadOnOpenDataSection(path, false);
115  }
116
117  @Test
118  public void testBytesReadFromFsToReadBloomFilterIndexesAndBloomBlocks() throws IOException {
119    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
120    BloomType[] bloomTypes = { BloomType.ROW, BloomType.ROWCOL, BloomType.ROWPREFIX_FIXED_LENGTH };
121    for (BloomType bloomType : bloomTypes) {
122      LOG.info("Testing bloom type: {}", bloomType);
123      ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
124      ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
125      keyList.clear();
126      keyValues.clear();
127      writeBloomFilters(path, bloomType, BLOOM_BLOCK_SIZE);
128      if (bloomType == BloomType.ROWCOL) {
129        KeyValue keyValue = keyValues.get(0);
130        readBloomFilters(path, bloomType, null, keyValue);
131      } else {
132        Assert.assertEquals(ROW_PREFIX_LENGTH_IN_BLOOM_FILTER, keyList.get(0).length);
133        byte[] key = keyList.get(0);
134        readBloomFilters(path, bloomType, key, null);
135      }
136    }
137  }
138
139  private void writeData(Path path) throws IOException {
140    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE)
141      .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE)
142      .withCompression(Compression.Algorithm.NONE).build();
143    CacheConfig cacheConfig = new CacheConfig(conf);
144    HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, path)
145      .withFileContext(context).create();
146
147    byte[] cf = Bytes.toBytes("cf");
148    byte[] cq = Bytes.toBytes("cq");
149
150    for (int i = 0; i < NUM_KEYS; i++) {
151      byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 10);
152      // A random-length random value.
153      byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10);
154      KeyValue keyValue =
155        new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), valueBytes);
156      writer.append(keyValue);
157      keyValues.add(keyValue);
158    }
159
160    writer.close();
161  }
162
163  private void readDataAndIndexBlocks(Path path, KeyValue keyValue, boolean isScanMetricsEnabled)
164    throws IOException {
165    long fileSize = fs.getFileStatus(path).getLen();
166
167    ReaderContext readerContext =
168      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fs, path))
169        .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build();
170
171    // Read HFile trailer and create HFileContext
172    HFileInfo hfile = new HFileInfo(readerContext, conf);
173    FixedFileTrailer trailer = hfile.getTrailer();
174
175    // Read HFile info and load-on-open data section (we will read root again explicitly later)
176    CacheConfig cacheConfig = new CacheConfig(conf);
177    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
178    hfile.initMetaAndIndex(reader);
179    HFileContext meta = hfile.getHFileContext();
180
181    // Get access to the block reader
182    HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
183
184    // Create iterator for reading load-on-open data section
185    HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
186      fileSize - trailer.getTrailerSize());
187
188    // Indexes use NoOpEncodedSeeker
189    MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker();
190    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
191    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
192
193    int bytesRead = 0;
194    int blockLevelsRead = 0;
195
196    // Read the root index block
197    HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX);
198    bytesRead += block.getOnDiskSizeWithHeader();
199    if (block.getNextBlockOnDiskSize() > 0) {
200      bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum());
201    }
202    blockLevelsRead++;
203
204    // Comparator class name is stored in the trailer in version 3.
205    CellComparator comparator = trailer.createComparator();
206    // Initialize the seeker
207    seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator,
208      trailer.getNumDataIndexLevels());
209
210    int rootLevIndex = seeker.rootBlockContainingKey(keyValue);
211    long currentOffset = seeker.getBlockOffset(rootLevIndex);
212    int currentDataSize = seeker.getBlockDataSize(rootLevIndex);
213
214    HFileBlock prevBlock = null;
215    do {
216      prevBlock = block;
217      block = blockReader.readBlockData(currentOffset, currentDataSize, true, true, true);
218      HFileBlock unpacked = block.unpack(meta, blockReader);
219      if (unpacked != block) {
220        block.release();
221        block = unpacked;
222      }
223      bytesRead += block.getOnDiskSizeWithHeader();
224      if (block.getNextBlockOnDiskSize() > 0) {
225        bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum());
226      }
227      if (!block.getBlockType().isData()) {
228        ByteBuff buffer = block.getBufferWithoutHeader();
229        // Place the buffer at the correct position
230        HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer, keyValue, comparator);
231        currentOffset = buffer.getLong();
232        currentDataSize = buffer.getInt();
233      }
234      prevBlock.release();
235      blockLevelsRead++;
236    } while (!block.getBlockType().isData());
237    block.release();
238
239    reader.close();
240
241    Assert.assertEquals(isScanMetricsEnabled,
242      ThreadLocalServerSideScanMetrics.isScanMetricsEnabled());
243    bytesRead = isScanMetricsEnabled ? bytesRead : 0;
244    Assert.assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
245    Assert.assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1);
246    Assert.assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
247    // At every index level we read one index block and finally read data block
248    long blockReadOpsCount = isScanMetricsEnabled ? blockLevelsRead : 0;
249    Assert.assertEquals(blockReadOpsCount,
250      ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
251  }
252
253  private void readLoadOnOpenDataSection(Path path, boolean hasBloomFilters) throws IOException {
254    long fileSize = fs.getFileStatus(path).getLen();
255
256    ReaderContext readerContext =
257      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fs, path))
258        .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build();
259
260    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
261    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
262    // Read HFile trailer
263    HFileInfo hfile = new HFileInfo(readerContext, conf);
264    FixedFileTrailer trailer = hfile.getTrailer();
265    Assert.assertEquals(trailer.getTrailerSize(),
266      ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
267    Assert.assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
268
269    CacheConfig cacheConfig = new CacheConfig(conf);
270    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
271    HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
272
273    // Create iterator for reading root index block
274    HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
275      fileSize - trailer.getTrailerSize());
276    boolean readNextHeader = false;
277
278    // Read the root index block
279    readNextHeader = readEachBlockInLoadOnOpenDataSection(
280      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader);
281
282    // Read meta index block
283    readNextHeader = readEachBlockInLoadOnOpenDataSection(
284      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader);
285
286    // Read File info block
287    readNextHeader = readEachBlockInLoadOnOpenDataSection(
288      blockIter.nextBlockWithBlockType(BlockType.FILE_INFO), readNextHeader);
289
290    // Read bloom filter indexes
291    boolean bloomFilterIndexesRead = false;
292    HFileBlock block;
293    while ((block = blockIter.nextBlock()) != null) {
294      bloomFilterIndexesRead = true;
295      readNextHeader = readEachBlockInLoadOnOpenDataSection(block, readNextHeader);
296    }
297
298    reader.close();
299
300    Assert.assertEquals(hasBloomFilters, bloomFilterIndexesRead);
301    Assert.assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
302  }
303
304  private boolean readEachBlockInLoadOnOpenDataSection(HFileBlock block, boolean readNextHeader)
305    throws IOException {
306    long bytesRead = block.getOnDiskSizeWithHeader();
307    if (readNextHeader) {
308      bytesRead -= HFileBlock.headerSize(true);
309      readNextHeader = false;
310    }
311    if (block.getNextBlockOnDiskSize() > 0) {
312      bytesRead += HFileBlock.headerSize(true);
313      readNextHeader = true;
314    }
315    block.release();
316    Assert.assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
317    Assert.assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
318    return readNextHeader;
319  }
320
321  private void readBloomFilters(Path path, BloomType bt, byte[] key, KeyValue keyValue)
322    throws IOException {
323    Assert.assertTrue(keyValue == null || key == null);
324
325    // Assert that the bloom filter index was read and it's size is accounted in bytes read from
326    // fs
327    readLoadOnOpenDataSection(path, true);
328
329    CacheConfig cacheConf = new CacheConfig(conf);
330    StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(conf, fs, path, true);
331    HStoreFile sf = new HStoreFile(storeFileInfo, bt, cacheConf);
332
333    // Read HFile trailer and load-on-open data section
334    sf.initReader();
335
336    // Reset bytes read from fs to 0
337    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
338    // Reset read ops count to 0
339    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
340
341    StoreFileReader reader = sf.getReader();
342    BloomFilter bloomFilter = reader.getGeneralBloomFilter();
343    Assert.assertTrue(bloomFilter instanceof CompoundBloomFilter);
344    CompoundBloomFilter cbf = (CompoundBloomFilter) bloomFilter;
345
346    // Get the bloom filter index reader
347    HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex();
348    int block;
349
350    // Search for the key in the bloom filter index
351    if (keyValue != null) {
352      block = index.rootBlockContainingKey(keyValue);
353    } else {
354      byte[] row = key;
355      block = index.rootBlockContainingKey(row, 0, row.length);
356    }
357
358    // Read the bloom block from FS
359    HFileBlock bloomBlock = cbf.getBloomBlock(block);
360    long bytesRead = bloomBlock.getOnDiskSizeWithHeader();
361    if (bloomBlock.getNextBlockOnDiskSize() > 0) {
362      bytesRead += HFileBlock.headerSize(true);
363    }
364    // Asser that the block read is a bloom block
365    Assert.assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK);
366    bloomBlock.release();
367
368    // Close the reader
369    reader.close(true);
370
371    Assert.assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
372    Assert.assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
373  }
374
375  private void writeBloomFilters(Path path, BloomType bt, int bloomBlockByteSize)
376    throws IOException {
377    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, bloomBlockByteSize);
378    CacheConfig cacheConf = new CacheConfig(conf);
379    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE)
380      .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE)
381      .withCompression(Compression.Algorithm.NONE).build();
382    StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta)
383      .withBloomType(bt).withFilePath(path).build();
384    Assert.assertTrue(w.hasGeneralBloom());
385    Assert.assertTrue(w.getGeneralBloomWriter() instanceof CompoundBloomFilterWriter);
386    CompoundBloomFilterWriter cbbf = (CompoundBloomFilterWriter) w.getGeneralBloomWriter();
387    byte[] cf = Bytes.toBytes("cf");
388    byte[] cq = Bytes.toBytes("cq");
389    for (int i = 0; i < NUM_KEYS; i++) {
390      byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 10);
391      // A random-length random value.
392      byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10);
393      KeyValue keyValue =
394        new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), valueBytes);
395      w.append(keyValue);
396      keyList.add(keyBytes);
397      keyValues.add(keyValue);
398    }
399    Assert.assertEquals(keyList.size(), cbbf.getKeyCount());
400    w.close();
401  }
402
403  private static class MyNoOpEncodedSeeker extends NoOpIndexBlockEncoder.NoOpEncodedSeeker {
404    public long getBlockOffset(int i) {
405      return blockOffsets[i];
406    }
407
408    public int getBlockDataSize(int i) {
409      return blockDataSizes[i];
410    }
411  }
412}