001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Random; 027import java.util.stream.Stream; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FSDataInputStream; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.ArrayBackedTag; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellComparatorImpl; 036import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.Tag; 041import org.apache.hadoop.hbase.io.ByteBuffAllocator; 042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 043import org.apache.hadoop.hbase.io.compress.Compression; 044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; 045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 046import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 047import org.apache.hadoop.hbase.nio.ByteBuff; 048import org.apache.hadoop.hbase.testclassification.IOTests; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.Writables; 052import org.apache.hadoop.io.Text; 053import org.junit.jupiter.api.BeforeEach; 054import org.junit.jupiter.api.TestTemplate; 055import org.junit.jupiter.params.provider.Arguments; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * Testing writing a version 3 {@link HFile} for all encoded blocks 061 */ 062@org.junit.jupiter.api.Tag(IOTests.TAG) 063@org.junit.jupiter.api.Tag(MediumTests.TAG) 064@HBaseParameterizedTestTemplate(name = "{index}: useTags={0}, dataBlockEncoding={1}") 065public class TestHFileWriterV3WithDataEncoders { 066 067 private static final Logger LOG = 068 LoggerFactory.getLogger(TestHFileWriterV3WithDataEncoders.class); 069 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 070 private static final Random RNG = new Random(9713312); // Just a fixed seed. 071 072 private Configuration conf; 073 private FileSystem fs; 074 private boolean useTags; 075 private DataBlockEncoding dataBlockEncoding; 076 077 public TestHFileWriterV3WithDataEncoders(boolean useTags, DataBlockEncoding dataBlockEncoding) { 078 this.useTags = useTags; 079 this.dataBlockEncoding = dataBlockEncoding; 080 } 081 082 public static Stream<Arguments> parameters() { 083 DataBlockEncoding[] dataBlockEncodings = DataBlockEncoding.values(); 084 Stream.Builder<Arguments> builder = Stream.builder(); 085 for (DataBlockEncoding dataBlockEncoding : dataBlockEncodings) { 086 if (dataBlockEncoding == DataBlockEncoding.NONE) { 087 continue; 088 } 089 builder.add(Arguments.of(false, dataBlockEncoding)); 090 builder.add(Arguments.of(true, dataBlockEncoding)); 091 } 092 return builder.build(); 093 } 094 095 @BeforeEach 096 public void setUp() throws IOException { 097 conf = TEST_UTIL.getConfiguration(); 098 fs = FileSystem.get(conf); 099 } 100 101 @TestTemplate 102 public void testHFileFormatV3() throws IOException { 103 testHFileFormatV3Internals(useTags); 104 } 105 106 private void testHFileFormatV3Internals(boolean useTags) throws IOException { 107 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3"); 108 final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ; 109 final int entryCount = 10000; 110 writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags); 111 } 112 113 @TestTemplate 114 public void testMidKeyInHFile() throws IOException { 115 testMidKeyInHFileInternals(useTags); 116 } 117 118 private void testMidKeyInHFileInternals(boolean useTags) throws IOException { 119 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testMidKeyInHFile"); 120 Compression.Algorithm compressAlgo = Compression.Algorithm.NONE; 121 int entryCount = 50000; 122 writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags); 123 } 124 125 private void writeDataAndReadFromHFile(Path hfilePath, Compression.Algorithm compressAlgo, 126 int entryCount, boolean findMidKey, boolean useTags) throws IOException { 127 128 HFileContext context = new HFileContextBuilder().withBlockSize(4096).withIncludesTags(useTags) 129 .withDataBlockEncoding(dataBlockEncoding).withCellComparator(CellComparatorImpl.COMPARATOR) 130 .withCompression(compressAlgo).build(); 131 CacheConfig cacheConfig = new CacheConfig(conf); 132 HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, hfilePath) 133 .withFileContext(context).create(); 134 135 List<KeyValue> keyValues = new ArrayList<>(entryCount); 136 writeKeyValues(entryCount, useTags, writer, RNG, keyValues); 137 138 FSDataInputStream fsdis = fs.open(hfilePath); 139 140 long fileSize = fs.getFileStatus(hfilePath).getLen(); 141 FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, fileSize); 142 143 assertEquals(3, trailer.getMajorVersion()); 144 assertEquals(entryCount, trailer.getEntryCount()); 145 HFileContext meta = new HFileContextBuilder().withCompression(compressAlgo) 146 .withIncludesMvcc(true).withIncludesTags(useTags).withDataBlockEncoding(dataBlockEncoding) 147 .withHBaseCheckSum(true).build(); 148 ReaderContext readerContext = 149 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fsdis)) 150 .withFilePath(hfilePath).withFileSystem(fs).withFileSize(fileSize).build(); 151 HFileBlock.FSReader blockReader = 152 new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP, conf); 153 // Comparator class name is stored in the trailer in version 3. 154 CellComparator comparator = trailer.createComparator(); 155 HFileBlockIndex.BlockIndexReader dataBlockIndexReader = 156 new HFileBlockIndex.CellBasedKeyBlockIndexReaderV2(comparator, 157 trailer.getNumDataIndexLevels()); 158 HFileBlockIndex.BlockIndexReader metaBlockIndexReader = 159 new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1); 160 161 HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(), 162 fileSize - trailer.getTrailerSize()); 163 // Data index. We also read statistics about the block index written after 164 // the root level. 165 dataBlockIndexReader.readMultiLevelIndexRoot( 166 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount()); 167 168 FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fs, hfilePath); 169 readerContext = new ReaderContextBuilder().withFilePath(hfilePath).withFileSize(fileSize) 170 .withFileSystem(wrapper.getHfs()).withInputStreamWrapper(wrapper).build(); 171 HFileInfo hfile = new HFileInfo(readerContext, conf); 172 HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf); 173 hfile.initMetaAndIndex(reader); 174 if (findMidKey) { 175 Cell midkey = dataBlockIndexReader.midkey(reader); 176 assertNotNull(midkey, "Midkey should not be null"); 177 } 178 179 // Meta index. 180 metaBlockIndexReader.readRootIndex( 181 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), 182 trailer.getMetaIndexCount()); 183 // File info 184 HFileInfo fileInfo = new HFileInfo(); 185 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); 186 byte[] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION); 187 boolean includeMemstoreTS = 188 keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0; 189 190 // Counters for the number of key/value pairs and the number of blocks 191 int entriesRead = 0; 192 int blocksRead = 0; 193 long memstoreTS = 0; 194 195 DataBlockEncoder encoder = dataBlockEncoding.getEncoder(); 196 long curBlockPos = scanBlocks(entryCount, context, keyValues, fsdis, trailer, meta, blockReader, 197 entriesRead, blocksRead, encoder); 198 199 // Meta blocks. We can scan until the load-on-open data offset (which is 200 // the root block index offset in version 2) because we are not testing 201 // intermediate-level index blocks here. 202 203 int metaCounter = 0; 204 while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { 205 LOG.info("Current offset: {}, scanning until {}", fsdis.getPos(), 206 trailer.getLoadOnOpenDataOffset()); 207 HFileBlock block = 208 blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader); 209 assertEquals(BlockType.META, block.getBlockType()); 210 Text t = new Text(); 211 ByteBuff buf = block.getBufferWithoutHeader(); 212 if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) { 213 throw new IOException( 214 "Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName()); 215 } 216 Text expectedText = (metaCounter == 0 ? new Text("Paris") 217 : metaCounter == 1 ? new Text("Moscow") 218 : new Text("Washington, D.C.")); 219 assertEquals(expectedText, t); 220 LOG.info("Read meta block data: " + t); 221 ++metaCounter; 222 curBlockPos += block.getOnDiskSizeWithHeader(); 223 } 224 225 fsdis.close(); 226 reader.close(); 227 } 228 229 private long scanBlocks(int entryCount, HFileContext context, List<KeyValue> keyValues, 230 FSDataInputStream fsdis, FixedFileTrailer trailer, HFileContext meta, 231 HFileBlock.FSReader blockReader, int entriesRead, int blocksRead, DataBlockEncoder encoder) 232 throws IOException { 233 // Scan blocks the way the reader would scan them 234 fsdis.seek(0); 235 long curBlockPos = 0; 236 while (curBlockPos <= trailer.getLastDataBlockOffset()) { 237 HFileBlockDecodingContext ctx = blockReader.getBlockDecodingContext(); 238 HFileBlock block = 239 blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader); 240 assertEquals(BlockType.ENCODED_DATA, block.getBlockType()); 241 ByteBuff origBlock = block.getBufferReadOnly(); 242 int pos = block.headerSize() + DataBlockEncoding.ID_SIZE; 243 origBlock.position(pos); 244 origBlock.limit(pos + block.getUncompressedSizeWithoutHeader() - DataBlockEncoding.ID_SIZE); 245 ByteBuff buf = origBlock.slice(); 246 DataBlockEncoder.EncodedSeeker seeker = 247 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 248 seeker.setCurrentBuffer(buf); 249 Cell res = seeker.getCell(); 250 KeyValue kv = keyValues.get(entriesRead); 251 assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv)); 252 ++entriesRead; 253 while (seeker.next()) { 254 res = seeker.getCell(); 255 kv = keyValues.get(entriesRead); 256 assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv)); 257 ++entriesRead; 258 } 259 ++blocksRead; 260 curBlockPos += block.getOnDiskSizeWithHeader(); 261 } 262 LOG.info("Finished reading: entries={}, blocksRead = {}", entriesRead, blocksRead); 263 assertEquals(entryCount, entriesRead); 264 return curBlockPos; 265 } 266 267 private void writeKeyValues(int entryCount, boolean useTags, HFile.Writer writer, Random rand, 268 List<KeyValue> keyValues) throws IOException { 269 270 for (int i = 0; i < entryCount; ++i) { 271 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i); 272 273 // A random-length random value. 274 byte[] valueBytes = RandomKeyValueUtil.randomValue(rand); 275 KeyValue keyValue = null; 276 if (useTags) { 277 ArrayList<Tag> tags = new ArrayList<>(); 278 for (int j = 0; j < 1 + rand.nextInt(4); j++) { 279 byte[] tagBytes = new byte[16]; 280 rand.nextBytes(tagBytes); 281 tags.add(new ArrayBackedTag((byte) 1, tagBytes)); 282 } 283 keyValue = 284 new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes, tags); 285 } else { 286 keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes); 287 } 288 writer.append(keyValue); 289 keyValues.add(keyValue); 290 } 291 292 // Add in an arbitrary order. They will be sorted lexicographically by 293 // the key. 294 writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C.")); 295 writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow")); 296 writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris")); 297 298 writer.close(); 299 } 300 301}