001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.io.hfile; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.List; 026import java.util.Random; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FSDataInputStream; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.ArrayBackedTag; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparator; 035import org.apache.hadoop.hbase.CellComparatorImpl; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.Tag; 041import org.apache.hadoop.hbase.io.ByteBuffAllocator; 042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 043import org.apache.hadoop.hbase.io.compress.Compression; 044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; 045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 046import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 047import org.apache.hadoop.hbase.nio.ByteBuff; 048import org.apache.hadoop.hbase.testclassification.IOTests; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.Writables; 052import org.apache.hadoop.io.Text; 053import org.junit.Assert; 054import org.junit.Before; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.runner.RunWith; 059import org.junit.runners.Parameterized; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * Testing writing a version 3 {@link HFile} for all encoded blocks 065 */ 066@RunWith(Parameterized.class) 067@Category({IOTests.class, MediumTests.class}) 068public class TestHFileWriterV3WithDataEncoders { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestHFileWriterV3WithDataEncoders.class); 073 074 private static final Logger LOG = 075 LoggerFactory.getLogger(TestHFileWriterV3WithDataEncoders.class); 076 077 private static final HBaseTestingUtility TEST_UTIL = 078 new HBaseTestingUtility(); 079 080 private Configuration conf; 081 private FileSystem fs; 082 private boolean useTags; 083 private DataBlockEncoding dataBlockEncoding; 084 085 public TestHFileWriterV3WithDataEncoders(boolean useTags, 086 DataBlockEncoding dataBlockEncoding) { 087 this.useTags = useTags; 088 this.dataBlockEncoding = dataBlockEncoding; 089 } 090 091 @Parameterized.Parameters 092 public static Collection<Object[]> parameters() { 093 DataBlockEncoding[] dataBlockEncodings = DataBlockEncoding.values(); 094 Object[][] params = new Object[dataBlockEncodings.length * 2 - 2][]; 095 int i = 0; 096 for (DataBlockEncoding dataBlockEncoding : dataBlockEncodings) { 097 if (dataBlockEncoding == DataBlockEncoding.NONE) { 098 continue; 099 } 100 params[i++] = new Object[]{false, dataBlockEncoding}; 101 params[i++] = new Object[]{true, dataBlockEncoding}; 102 } 103 return Arrays.asList(params); 104 } 105 106 @Before 107 public void setUp() throws IOException { 108 conf = TEST_UTIL.getConfiguration(); 109 fs = FileSystem.get(conf); 110 } 111 112 @Test 113 public void testHFileFormatV3() throws IOException { 114 testHFileFormatV3Internals(useTags); 115 } 116 117 private void testHFileFormatV3Internals(boolean useTags) throws IOException { 118 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3"); 119 final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ; 120 final int entryCount = 10000; 121 writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags); 122 } 123 124 @Test 125 public void testMidKeyInHFile() throws IOException{ 126 testMidKeyInHFileInternals(useTags); 127 } 128 129 private void testMidKeyInHFileInternals(boolean useTags) throws IOException { 130 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), 131 "testMidKeyInHFile"); 132 Compression.Algorithm compressAlgo = Compression.Algorithm.NONE; 133 int entryCount = 50000; 134 writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags); 135 } 136 137 private void writeDataAndReadFromHFile(Path hfilePath, 138 Compression.Algorithm compressAlgo, int entryCount, boolean findMidKey, boolean useTags) 139 throws IOException { 140 141 HFileContext context = new HFileContextBuilder() 142 .withBlockSize(4096) 143 .withIncludesTags(useTags) 144 .withDataBlockEncoding(dataBlockEncoding) 145 .withCellComparator(CellComparatorImpl.COMPARATOR) 146 .withCompression(compressAlgo).build(); 147 CacheConfig cacheConfig = new CacheConfig(conf); 148 HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig) 149 .withPath(fs, hfilePath) 150 .withFileContext(context) 151 .create(); 152 153 Random rand = new Random(9713312); // Just a fixed seed. 154 List<KeyValue> keyValues = new ArrayList<>(entryCount); 155 156 writeKeyValues(entryCount, useTags, writer, rand, keyValues); 157 158 159 FSDataInputStream fsdis = fs.open(hfilePath); 160 161 long fileSize = fs.getFileStatus(hfilePath).getLen(); 162 FixedFileTrailer trailer = 163 FixedFileTrailer.readFromStream(fsdis, fileSize); 164 165 Assert.assertEquals(3, trailer.getMajorVersion()); 166 Assert.assertEquals(entryCount, trailer.getEntryCount()); 167 HFileContext meta = new HFileContextBuilder() 168 .withCompression(compressAlgo) 169 .withIncludesMvcc(true) 170 .withIncludesTags(useTags) 171 .withDataBlockEncoding(dataBlockEncoding) 172 .withHBaseCheckSum(true).build(); 173 ReaderContext readerContext = new ReaderContextBuilder() 174 .withInputStreamWrapper(new FSDataInputStreamWrapper(fsdis)) 175 .withFilePath(hfilePath) 176 .withFileSystem(fs) 177 .withFileSize(fileSize).build(); 178 HFileBlock.FSReader blockReader = 179 new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP); 180 // Comparator class name is stored in the trailer in version 3. 181 CellComparator comparator = trailer.createComparator(); 182 HFileBlockIndex.BlockIndexReader dataBlockIndexReader = 183 new HFileBlockIndex.CellBasedKeyBlockIndexReader(comparator, 184 trailer.getNumDataIndexLevels()); 185 HFileBlockIndex.BlockIndexReader metaBlockIndexReader = 186 new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1); 187 188 HFileBlock.BlockIterator blockIter = blockReader.blockRange( 189 trailer.getLoadOnOpenDataOffset(), 190 fileSize - trailer.getTrailerSize()); 191 // Data index. We also read statistics about the block index written after 192 // the root level. 193 dataBlockIndexReader.readMultiLevelIndexRoot( 194 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount()); 195 196 FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fs, hfilePath); 197 readerContext = new ReaderContextBuilder() 198 .withFilePath(hfilePath) 199 .withFileSize(fileSize) 200 .withFileSystem(wrapper.getHfs()) 201 .withInputStreamWrapper(wrapper) 202 .build(); 203 HFileInfo hfile = new HFileInfo(readerContext, conf); 204 HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf); 205 hfile.initMetaAndIndex(reader); 206 if (findMidKey) { 207 Cell midkey = dataBlockIndexReader.midkey(reader); 208 Assert.assertNotNull("Midkey should not be null", midkey); 209 } 210 211 // Meta index. 212 metaBlockIndexReader.readRootIndex( 213 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX) 214 .getByteStream(), trailer.getMetaIndexCount()); 215 // File info 216 HFileInfo fileInfo = new HFileInfo(); 217 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); 218 byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION); 219 boolean includeMemstoreTS = keyValueFormatVersion != null && 220 Bytes.toInt(keyValueFormatVersion) > 0; 221 222 // Counters for the number of key/value pairs and the number of blocks 223 int entriesRead = 0; 224 int blocksRead = 0; 225 long memstoreTS = 0; 226 227 DataBlockEncoder encoder = dataBlockEncoding.getEncoder(); 228 long curBlockPos = scanBlocks(entryCount, context, keyValues, fsdis, trailer, 229 meta, blockReader, entriesRead, blocksRead, encoder); 230 231 232 // Meta blocks. We can scan until the load-on-open data offset (which is 233 // the root block index offset in version 2) because we are not testing 234 // intermediate-level index blocks here. 235 236 int metaCounter = 0; 237 while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { 238 LOG.info("Current offset: {}, scanning until {}", fsdis.getPos(), 239 trailer.getLoadOnOpenDataOffset()); 240 HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true) 241 .unpack(context, blockReader); 242 Assert.assertEquals(BlockType.META, block.getBlockType()); 243 Text t = new Text(); 244 ByteBuff buf = block.getBufferWithoutHeader(); 245 if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) { 246 throw new IOException("Failed to deserialize block " + this + 247 " into a " + t.getClass().getSimpleName()); 248 } 249 Text expectedText = 250 (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text( 251 "Moscow") : new Text("Washington, D.C.")); 252 Assert.assertEquals(expectedText, t); 253 LOG.info("Read meta block data: " + t); 254 ++metaCounter; 255 curBlockPos += block.getOnDiskSizeWithHeader(); 256 } 257 258 fsdis.close(); 259 reader.close(); 260 } 261 262 private long scanBlocks(int entryCount, HFileContext context, List<KeyValue> keyValues, 263 FSDataInputStream fsdis, FixedFileTrailer trailer, HFileContext meta, 264 HFileBlock.FSReader blockReader, int entriesRead, int blocksRead, 265 DataBlockEncoder encoder) throws IOException { 266 // Scan blocks the way the reader would scan them 267 fsdis.seek(0); 268 long curBlockPos = 0; 269 while (curBlockPos <= trailer.getLastDataBlockOffset()) { 270 HFileBlockDecodingContext ctx = blockReader.getBlockDecodingContext(); 271 HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true) 272 .unpack(context, blockReader); 273 Assert.assertEquals(BlockType.ENCODED_DATA, block.getBlockType()); 274 ByteBuff origBlock = block.getBufferReadOnly(); 275 int pos = block.headerSize() + DataBlockEncoding.ID_SIZE; 276 origBlock.position(pos); 277 origBlock.limit(pos + block.getUncompressedSizeWithoutHeader() - DataBlockEncoding.ID_SIZE); 278 ByteBuff buf = origBlock.slice(); 279 DataBlockEncoder.EncodedSeeker seeker = 280 encoder.createSeeker(encoder.newDataBlockDecodingContext(meta)); 281 seeker.setCurrentBuffer(buf); 282 Cell res = seeker.getCell(); 283 KeyValue kv = keyValues.get(entriesRead); 284 Assert.assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv)); 285 ++entriesRead; 286 while(seeker.next()) { 287 res = seeker.getCell(); 288 kv = keyValues.get(entriesRead); 289 Assert.assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv)); 290 ++entriesRead; 291 } 292 ++blocksRead; 293 curBlockPos += block.getOnDiskSizeWithHeader(); 294 } 295 LOG.info("Finished reading: entries={}, blocksRead = {}", entriesRead, blocksRead); 296 Assert.assertEquals(entryCount, entriesRead); 297 return curBlockPos; 298 } 299 300 private void writeKeyValues(int entryCount, boolean useTags, HFile.Writer writer, 301 Random rand, List<KeyValue> keyValues) throws IOException { 302 303 for (int i = 0; i < entryCount; ++i) { 304 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i); 305 306 // A random-length random value. 307 byte[] valueBytes = RandomKeyValueUtil.randomValue(rand); 308 KeyValue keyValue = null; 309 if (useTags) { 310 ArrayList<Tag> tags = new ArrayList<>(); 311 for (int j = 0; j < 1 + rand.nextInt(4); j++) { 312 byte[] tagBytes = new byte[16]; 313 rand.nextBytes(tagBytes); 314 tags.add(new ArrayBackedTag((byte) 1, tagBytes)); 315 } 316 keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, 317 valueBytes, tags); 318 } else { 319 keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, 320 valueBytes); 321 } 322 writer.append(keyValue); 323 keyValues.add(keyValue); 324 } 325 326 // Add in an arbitrary order. They will be sorted lexicographically by 327 // the key. 328 writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C.")); 329 writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow")); 330 writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris")); 331 332 writer.close(); 333 } 334 335}