001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Random; 024import java.util.UUID; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.CellComparator; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 033import org.apache.hadoop.hbase.io.compress.Compression; 034import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 035import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics; 036import org.apache.hadoop.hbase.nio.ByteBuff; 037import org.apache.hadoop.hbase.regionserver.BloomType; 038import org.apache.hadoop.hbase.regionserver.HStoreFile; 039import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 040import org.apache.hadoop.hbase.regionserver.StoreFileReader; 041import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 042import org.apache.hadoop.hbase.testclassification.IOTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.apache.hadoop.hbase.util.BloomFilter; 045import org.apache.hadoop.hbase.util.BloomFilterFactory; 046import org.apache.hadoop.hbase.util.BloomFilterUtil; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.junit.Assert; 050import org.junit.Before; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059@Category({ IOTests.class, SmallTests.class }) 060public class TestBytesReadFromFs { 061 private static final int NUM_KEYS = 100000; 062 private static final int BLOOM_BLOCK_SIZE = 512; 063 private static final int INDEX_CHUNK_SIZE = 512; 064 private static final int DATA_BLOCK_SIZE = 4096; 065 private static final int ROW_PREFIX_LENGTH_IN_BLOOM_FILTER = 42; 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestBytesReadFromFs.class); 070 071 @Rule 072 public TestName name = new TestName(); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestBytesReadFromFs.class); 075 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 076 private static final Random RNG = new Random(9713312); // Just a fixed seed. 077 078 private Configuration conf; 079 private FileSystem fs; 080 private List<KeyValue> keyValues = new ArrayList<>(); 081 private List<byte[]> keyList = new ArrayList<>(); 082 private Path path; 083 084 @Before 085 public void setUp() throws IOException { 086 conf = TEST_UTIL.getConfiguration(); 087 conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, ROW_PREFIX_LENGTH_IN_BLOOM_FILTER); 088 fs = FileSystem.get(conf); 089 String hfileName = UUID.randomUUID().toString().replaceAll("-", ""); 090 path = new Path(TEST_UTIL.getDataTestDir(), hfileName); 091 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_CHUNK_SIZE); 092 } 093 094 @Test 095 public void testBytesReadFromFsWithScanMetricsDisabled() throws IOException { 096 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false); 097 writeData(path); 098 KeyValue keyValue = keyValues.get(0); 099 readDataAndIndexBlocks(path, keyValue, false); 100 } 101 102 @Test 103 public void testBytesReadFromFsToReadDataUsingIndexBlocks() throws IOException { 104 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true); 105 writeData(path); 106 KeyValue keyValue = keyValues.get(0); 107 readDataAndIndexBlocks(path, keyValue, true); 108 } 109 110 @Test 111 public void testBytesReadFromFsToReadLoadOnOpenDataSection() throws IOException { 112 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true); 113 writeData(path); 114 readLoadOnOpenDataSection(path, false); 115 } 116 117 @Test 118 public void testBytesReadFromFsToReadBloomFilterIndexesAndBloomBlocks() throws IOException { 119 ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true); 120 BloomType[] bloomTypes = { BloomType.ROW, BloomType.ROWCOL, BloomType.ROWPREFIX_FIXED_LENGTH }; 121 for (BloomType bloomType : bloomTypes) { 122 LOG.info("Testing bloom type: {}", bloomType); 123 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 124 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); 125 keyList.clear(); 126 keyValues.clear(); 127 writeBloomFilters(path, bloomType, BLOOM_BLOCK_SIZE); 128 if (bloomType == BloomType.ROWCOL) { 129 KeyValue keyValue = keyValues.get(0); 130 readBloomFilters(path, bloomType, null, keyValue); 131 } else { 132 Assert.assertEquals(ROW_PREFIX_LENGTH_IN_BLOOM_FILTER, keyList.get(0).length); 133 byte[] key = keyList.get(0); 134 readBloomFilters(path, bloomType, key, null); 135 } 136 } 137 } 138 139 private void writeData(Path path) throws IOException { 140 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE) 141 .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE) 142 .withCompression(Compression.Algorithm.NONE).build(); 143 CacheConfig cacheConfig = new CacheConfig(conf); 144 HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, path) 145 .withFileContext(context).create(); 146 147 byte[] cf = Bytes.toBytes("cf"); 148 byte[] cq = Bytes.toBytes("cq"); 149 150 for (int i = 0; i < NUM_KEYS; i++) { 151 byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 10); 152 // A random-length random value. 153 byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10); 154 KeyValue keyValue = 155 new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), valueBytes); 156 writer.append(keyValue); 157 keyValues.add(keyValue); 158 } 159 160 writer.close(); 161 } 162 163 private void readDataAndIndexBlocks(Path path, KeyValue keyValue, boolean isScanMetricsEnabled) 164 throws IOException { 165 long fileSize = fs.getFileStatus(path).getLen(); 166 167 ReaderContext readerContext = 168 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fs, path)) 169 .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build(); 170 171 // Read HFile trailer and create HFileContext 172 HFileInfo hfile = new HFileInfo(readerContext, conf); 173 FixedFileTrailer trailer = hfile.getTrailer(); 174 175 // Read HFile info and load-on-open data section (we will read root again explicitly later) 176 CacheConfig cacheConfig = new CacheConfig(conf); 177 HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf); 178 hfile.initMetaAndIndex(reader); 179 HFileContext meta = hfile.getHFileContext(); 180 181 // Get access to the block reader 182 HFileBlock.FSReader blockReader = reader.getUncachedBlockReader(); 183 184 // Create iterator for reading load-on-open data section 185 HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(), 186 fileSize - trailer.getTrailerSize()); 187 188 // Indexes use NoOpEncodedSeeker 189 MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker(); 190 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 191 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); 192 193 int bytesRead = 0; 194 int blockLevelsRead = 0; 195 196 // Read the root index block 197 HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX); 198 bytesRead += block.getOnDiskSizeWithHeader(); 199 if (block.getNextBlockOnDiskSize() > 0) { 200 bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum()); 201 } 202 blockLevelsRead++; 203 204 // Comparator class name is stored in the trailer in version 3. 205 CellComparator comparator = trailer.createComparator(); 206 // Initialize the seeker 207 seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator, 208 trailer.getNumDataIndexLevels()); 209 210 int rootLevIndex = seeker.rootBlockContainingKey(keyValue); 211 long currentOffset = seeker.getBlockOffset(rootLevIndex); 212 int currentDataSize = seeker.getBlockDataSize(rootLevIndex); 213 214 HFileBlock prevBlock = null; 215 do { 216 prevBlock = block; 217 block = blockReader.readBlockData(currentOffset, currentDataSize, true, true, true); 218 HFileBlock unpacked = block.unpack(meta, blockReader); 219 if (unpacked != block) { 220 block.release(); 221 block = unpacked; 222 } 223 bytesRead += block.getOnDiskSizeWithHeader(); 224 if (block.getNextBlockOnDiskSize() > 0) { 225 bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum()); 226 } 227 if (!block.getBlockType().isData()) { 228 ByteBuff buffer = block.getBufferWithoutHeader(); 229 // Place the buffer at the correct position 230 HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer, keyValue, comparator); 231 currentOffset = buffer.getLong(); 232 currentDataSize = buffer.getInt(); 233 } 234 prevBlock.release(); 235 blockLevelsRead++; 236 } while (!block.getBlockType().isData()); 237 block.release(); 238 239 reader.close(); 240 241 Assert.assertEquals(isScanMetricsEnabled, 242 ThreadLocalServerSideScanMetrics.isScanMetricsEnabled()); 243 bytesRead = isScanMetricsEnabled ? bytesRead : 0; 244 Assert.assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 245 Assert.assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1); 246 Assert.assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset()); 247 // At every index level we read one index block and finally read data block 248 long blockReadOpsCount = isScanMetricsEnabled ? blockLevelsRead : 0; 249 Assert.assertEquals(blockReadOpsCount, 250 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset()); 251 } 252 253 private void readLoadOnOpenDataSection(Path path, boolean hasBloomFilters) throws IOException { 254 long fileSize = fs.getFileStatus(path).getLen(); 255 256 ReaderContext readerContext = 257 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fs, path)) 258 .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build(); 259 260 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 261 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); 262 // Read HFile trailer 263 HFileInfo hfile = new HFileInfo(readerContext, conf); 264 FixedFileTrailer trailer = hfile.getTrailer(); 265 Assert.assertEquals(trailer.getTrailerSize(), 266 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 267 Assert.assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset()); 268 269 CacheConfig cacheConfig = new CacheConfig(conf); 270 HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf); 271 HFileBlock.FSReader blockReader = reader.getUncachedBlockReader(); 272 273 // Create iterator for reading root index block 274 HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(), 275 fileSize - trailer.getTrailerSize()); 276 boolean readNextHeader = false; 277 278 // Read the root index block 279 readNextHeader = readEachBlockInLoadOnOpenDataSection( 280 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader); 281 282 // Read meta index block 283 readNextHeader = readEachBlockInLoadOnOpenDataSection( 284 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader); 285 286 // Read File info block 287 readNextHeader = readEachBlockInLoadOnOpenDataSection( 288 blockIter.nextBlockWithBlockType(BlockType.FILE_INFO), readNextHeader); 289 290 // Read bloom filter indexes 291 boolean bloomFilterIndexesRead = false; 292 HFileBlock block; 293 while ((block = blockIter.nextBlock()) != null) { 294 bloomFilterIndexesRead = true; 295 readNextHeader = readEachBlockInLoadOnOpenDataSection(block, readNextHeader); 296 } 297 298 reader.close(); 299 300 Assert.assertEquals(hasBloomFilters, bloomFilterIndexesRead); 301 Assert.assertEquals(0, ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset()); 302 } 303 304 private boolean readEachBlockInLoadOnOpenDataSection(HFileBlock block, boolean readNextHeader) 305 throws IOException { 306 long bytesRead = block.getOnDiskSizeWithHeader(); 307 if (readNextHeader) { 308 bytesRead -= HFileBlock.headerSize(true); 309 readNextHeader = false; 310 } 311 if (block.getNextBlockOnDiskSize() > 0) { 312 bytesRead += HFileBlock.headerSize(true); 313 readNextHeader = true; 314 } 315 block.release(); 316 Assert.assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 317 Assert.assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset()); 318 return readNextHeader; 319 } 320 321 private void readBloomFilters(Path path, BloomType bt, byte[] key, KeyValue keyValue) 322 throws IOException { 323 Assert.assertTrue(keyValue == null || key == null); 324 325 // Assert that the bloom filter index was read and it's size is accounted in bytes read from 326 // fs 327 readLoadOnOpenDataSection(path, true); 328 329 CacheConfig cacheConf = new CacheConfig(conf); 330 StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(conf, fs, path, true); 331 HStoreFile sf = new HStoreFile(storeFileInfo, bt, cacheConf); 332 333 // Read HFile trailer and load-on-open data section 334 sf.initReader(); 335 336 // Reset bytes read from fs to 0 337 ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset(); 338 // Reset read ops count to 0 339 ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset(); 340 341 StoreFileReader reader = sf.getReader(); 342 BloomFilter bloomFilter = reader.getGeneralBloomFilter(); 343 Assert.assertTrue(bloomFilter instanceof CompoundBloomFilter); 344 CompoundBloomFilter cbf = (CompoundBloomFilter) bloomFilter; 345 346 // Get the bloom filter index reader 347 HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex(); 348 int block; 349 350 // Search for the key in the bloom filter index 351 if (keyValue != null) { 352 block = index.rootBlockContainingKey(keyValue); 353 } else { 354 byte[] row = key; 355 block = index.rootBlockContainingKey(row, 0, row.length); 356 } 357 358 // Read the bloom block from FS 359 HFileBlock bloomBlock = cbf.getBloomBlock(block); 360 long bytesRead = bloomBlock.getOnDiskSizeWithHeader(); 361 if (bloomBlock.getNextBlockOnDiskSize() > 0) { 362 bytesRead += HFileBlock.headerSize(true); 363 } 364 // Asser that the block read is a bloom block 365 Assert.assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK); 366 bloomBlock.release(); 367 368 // Close the reader 369 reader.close(true); 370 371 Assert.assertEquals(bytesRead, ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset()); 372 Assert.assertEquals(1, ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset()); 373 } 374 375 private void writeBloomFilters(Path path, BloomType bt, int bloomBlockByteSize) 376 throws IOException { 377 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, bloomBlockByteSize); 378 CacheConfig cacheConf = new CacheConfig(conf); 379 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE) 380 .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE) 381 .withCompression(Compression.Algorithm.NONE).build(); 382 StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConf, fs).withFileContext(meta) 383 .withBloomType(bt).withFilePath(path).build(); 384 Assert.assertTrue(w.hasGeneralBloom()); 385 Assert.assertTrue(w.getGeneralBloomWriter() instanceof CompoundBloomFilterWriter); 386 CompoundBloomFilterWriter cbbf = (CompoundBloomFilterWriter) w.getGeneralBloomWriter(); 387 byte[] cf = Bytes.toBytes("cf"); 388 byte[] cq = Bytes.toBytes("cq"); 389 for (int i = 0; i < NUM_KEYS; i++) { 390 byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 10); 391 // A random-length random value. 392 byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10); 393 KeyValue keyValue = 394 new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), valueBytes); 395 w.append(keyValue); 396 keyList.add(keyBytes); 397 keyValues.add(keyValue); 398 } 399 Assert.assertEquals(keyList.size(), cbbf.getKeyCount()); 400 w.close(); 401 } 402 403 private static class MyNoOpEncodedSeeker extends NoOpIndexBlockEncoder.NoOpEncodedSeeker { 404 public long getBlockOffset(int i) { 405 return blockOffsets[i]; 406 } 407 408 public int getBlockDataSize(int i) { 409 return blockDataSizes[i]; 410 } 411 } 412}