001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.ByteArrayInputStream; 025import java.io.DataInputStream; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Random; 030import java.util.stream.Stream; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataInputStream; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.ArrayBackedTag; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellComparator; 038import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 039import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.Tag; 044import org.apache.hadoop.hbase.io.ByteBuffAllocator; 045import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 046import org.apache.hadoop.hbase.io.compress.Compression; 047import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 048import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 049import org.apache.hadoop.hbase.nio.ByteBuff; 050import org.apache.hadoop.hbase.testclassification.IOTests; 051import org.apache.hadoop.hbase.testclassification.SmallTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.Writables; 054import org.apache.hadoop.io.Text; 055import org.apache.hadoop.io.WritableUtils; 056import org.junit.jupiter.api.BeforeEach; 057import org.junit.jupiter.api.TestTemplate; 058import org.junit.jupiter.params.provider.Arguments; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * Testing writing a version 3 {@link HFile}. 064 */ 065@org.junit.jupiter.api.Tag(IOTests.TAG) 066@org.junit.jupiter.api.Tag(SmallTests.TAG) 067@HBaseParameterizedTestTemplate(name = "{index}: useTags={0}") 068public class TestHFileWriterV3 { 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestHFileWriterV3.class); 071 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 072 private static final Random RNG = new Random(9713312); // Just a fixed seed. 073 074 private Configuration conf; 075 private FileSystem fs; 076 private boolean useTags; 077 078 public TestHFileWriterV3(boolean useTags) { 079 this.useTags = useTags; 080 } 081 082 public static Stream<Arguments> parameters() { 083 return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED.stream().map(arr -> Arguments.of(arr)); 084 } 085 086 @BeforeEach 087 public void setUp() throws IOException { 088 conf = TEST_UTIL.getConfiguration(); 089 fs = FileSystem.get(conf); 090 } 091 092 @TestTemplate 093 public void testHFileFormatV3() throws IOException { 094 testHFileFormatV3Internals(useTags); 095 } 096 097 private void testHFileFormatV3Internals(boolean useTags) throws IOException { 098 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3"); 099 final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ; 100 final int entryCount = 10000; 101 writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags); 102 } 103 104 @TestTemplate 105 public void testMidKeyInHFile() throws IOException { 106 testMidKeyInHFileInternals(useTags); 107 } 108 109 private void testMidKeyInHFileInternals(boolean useTags) throws IOException { 110 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testMidKeyInHFile"); 111 Compression.Algorithm compressAlgo = Compression.Algorithm.NONE; 112 int entryCount = 50000; 113 writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags); 114 } 115 116 private void writeDataAndReadFromHFile(Path hfilePath, Algorithm compressAlgo, int entryCount, 117 boolean findMidKey, boolean useTags) throws IOException { 118 HFileContext context = new HFileContextBuilder().withBlockSize(4096).withIncludesTags(useTags) 119 .withDataBlockEncoding(DataBlockEncoding.NONE).withCompression(compressAlgo).build(); 120 CacheConfig cacheConfig = new CacheConfig(conf); 121 HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, hfilePath) 122 .withFileContext(context).create(); 123 124 List<KeyValue> keyValues = new ArrayList<>(entryCount); 125 for (int i = 0; i < entryCount; ++i) { 126 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(RNG, i); 127 // A random-length random value. 128 byte[] valueBytes = RandomKeyValueUtil.randomValue(RNG); 129 KeyValue keyValue = null; 130 if (useTags) { 131 ArrayList<Tag> tags = new ArrayList<>(); 132 for (int j = 0; j < 1 + RNG.nextInt(4); j++) { 133 byte[] tagBytes = new byte[16]; 134 RNG.nextBytes(tagBytes); 135 tags.add(new ArrayBackedTag((byte) 1, tagBytes)); 136 } 137 keyValue = 138 new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes, tags); 139 } else { 140 keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes); 141 } 142 writer.append(keyValue); 143 keyValues.add(keyValue); 144 } 145 146 // Add in an arbitrary order. They will be sorted lexicographically by 147 // the key. 148 writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C.")); 149 writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow")); 150 writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris")); 151 152 writer.close(); 153 154 FSDataInputStream fsdis = fs.open(hfilePath); 155 156 long fileSize = fs.getFileStatus(hfilePath).getLen(); 157 FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, fileSize); 158 159 assertEquals(3, trailer.getMajorVersion()); 160 assertEquals(entryCount, trailer.getEntryCount()); 161 HFileContext meta = new HFileContextBuilder().withCompression(compressAlgo) 162 .withIncludesMvcc(false).withIncludesTags(useTags) 163 .withDataBlockEncoding(DataBlockEncoding.NONE).withHBaseCheckSum(true).build(); 164 ReaderContext readerContext = 165 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fsdis)) 166 .withFilePath(hfilePath).withFileSystem(fs).withFileSize(fileSize).build(); 167 HFileBlock.FSReader blockReader = 168 new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP, conf); 169 // Comparator class name is stored in the trailer in version 3. 170 CellComparator comparator = trailer.createComparator(); 171 HFileBlockIndex.BlockIndexReader dataBlockIndexReader = 172 new HFileBlockIndex.CellBasedKeyBlockIndexReaderV2(comparator, 173 trailer.getNumDataIndexLevels()); 174 HFileBlockIndex.BlockIndexReader metaBlockIndexReader = 175 new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1); 176 177 HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(), 178 fileSize - trailer.getTrailerSize()); 179 // Data index. We also read statistics about the block index written after 180 // the root level. 181 dataBlockIndexReader.readMultiLevelIndexRoot( 182 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount()); 183 184 FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fs, hfilePath); 185 readerContext = new ReaderContextBuilder().withFilePath(hfilePath).withFileSize(fileSize) 186 .withFileSystem(wrapper.getHfs()).withInputStreamWrapper(wrapper).build(); 187 HFileInfo hfile = new HFileInfo(readerContext, conf); 188 HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf); 189 hfile.initMetaAndIndex(reader); 190 if (findMidKey) { 191 Cell midkey = dataBlockIndexReader.midkey(reader); 192 assertNotNull(midkey, "Midkey should not be null"); 193 } 194 195 // Meta index. 196 metaBlockIndexReader.readRootIndex( 197 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), 198 trailer.getMetaIndexCount()); 199 // File info 200 HFileInfo fileInfo = new HFileInfo(); 201 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); 202 byte[] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION); 203 boolean includeMemstoreTS = 204 keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0; 205 206 // Counters for the number of key/value pairs and the number of blocks 207 int entriesRead = 0; 208 int blocksRead = 0; 209 long memstoreTS = 0; 210 211 // Scan blocks the way the reader would scan them 212 fsdis.seek(0); 213 long curBlockPos = 0; 214 while (curBlockPos <= trailer.getLastDataBlockOffset()) { 215 HFileBlock block = 216 blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader); 217 assertEquals(BlockType.DATA, block.getBlockType()); 218 ByteBuff buf = block.getBufferWithoutHeader(); 219 int keyLen = -1; 220 while (buf.hasRemaining()) { 221 222 keyLen = buf.getInt(); 223 224 int valueLen = buf.getInt(); 225 226 byte[] key = new byte[keyLen]; 227 buf.get(key); 228 229 byte[] value = new byte[valueLen]; 230 buf.get(value); 231 byte[] tagValue = null; 232 if (useTags) { 233 int tagLen = ((buf.get() & 0xff) << 8) ^ (buf.get() & 0xff); 234 tagValue = new byte[tagLen]; 235 buf.get(tagValue); 236 } 237 238 if (includeMemstoreTS) { 239 ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(), 240 buf.arrayOffset() + buf.position(), buf.remaining()); 241 DataInputStream data_input = new DataInputStream(byte_input); 242 243 memstoreTS = WritableUtils.readVLong(data_input); 244 buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS)); 245 } 246 247 // A brute-force check to see that all keys and values are correct. 248 KeyValue kv = keyValues.get(entriesRead); 249 assertTrue(Bytes.compareTo(key, kv.getKey()) == 0); 250 assertTrue(Bytes.compareTo(value, 0, value.length, kv.getValueArray(), kv.getValueOffset(), 251 kv.getValueLength()) == 0); 252 if (useTags) { 253 assertNotNull(tagValue); 254 KeyValue tkv = kv; 255 assertEquals(tagValue.length, tkv.getTagsLength()); 256 assertTrue(Bytes.compareTo(tagValue, 0, tagValue.length, tkv.getTagsArray(), 257 tkv.getTagsOffset(), tkv.getTagsLength()) == 0); 258 } 259 ++entriesRead; 260 } 261 ++blocksRead; 262 curBlockPos += block.getOnDiskSizeWithHeader(); 263 } 264 LOG.info("Finished reading: entries=" + entriesRead + ", blocksRead=" + blocksRead); 265 assertEquals(entryCount, entriesRead); 266 267 // Meta blocks. We can scan until the load-on-open data offset (which is 268 // the root block index offset in version 2) because we are not testing 269 // intermediate-level index blocks here. 270 271 int metaCounter = 0; 272 while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { 273 LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " 274 + trailer.getLoadOnOpenDataOffset()); 275 HFileBlock block = 276 blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader); 277 assertEquals(BlockType.META, block.getBlockType()); 278 Text t = new Text(); 279 ByteBuff buf = block.getBufferWithoutHeader(); 280 if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) { 281 throw new IOException( 282 "Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName()); 283 } 284 Text expectedText = (metaCounter == 0 ? new Text("Paris") 285 : metaCounter == 1 ? new Text("Moscow") 286 : new Text("Washington, D.C.")); 287 assertEquals(expectedText, t); 288 LOG.info("Read meta block data: " + t); 289 ++metaCounter; 290 curBlockPos += block.getOnDiskSizeWithHeader(); 291 } 292 293 fsdis.close(); 294 reader.close(); 295 } 296}