001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023 024import java.io.ByteArrayInputStream; 025import java.io.DataInputStream; 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.List; 030import java.util.Random; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataInputStream; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.ArrayBackedTag; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellComparator; 038import org.apache.hadoop.hbase.CellComparatorImpl; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.Tag; 045import org.apache.hadoop.hbase.io.compress.Compression; 046import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 047import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; 048import org.apache.hadoop.hbase.nio.ByteBuff; 049import org.apache.hadoop.hbase.testclassification.IOTests; 050import org.apache.hadoop.hbase.testclassification.SmallTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.Writables; 053import org.apache.hadoop.io.Text; 054import org.apache.hadoop.io.WritableUtils; 055import org.junit.Before; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.runner.RunWith; 060import org.junit.runners.Parameterized; 061import org.junit.runners.Parameterized.Parameters; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * Testing writing a version 3 {@link HFile}. 067 */ 068@RunWith(Parameterized.class) 069@Category({IOTests.class, SmallTests.class}) 070public class TestHFileWriterV3 { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestHFileWriterV3.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestHFileWriterV3.class); 077 078 private static final HBaseTestingUtility TEST_UTIL = 079 new HBaseTestingUtility(); 080 081 private Configuration conf; 082 private FileSystem fs; 083 private boolean useTags; 084 public TestHFileWriterV3(boolean useTags) { 085 this.useTags = useTags; 086 } 087 @Parameters 088 public static Collection<Object[]> parameters() { 089 return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED; 090 } 091 092 @Before 093 public void setUp() throws IOException { 094 conf = TEST_UTIL.getConfiguration(); 095 fs = FileSystem.get(conf); 096 } 097 098 @Test 099 public void testHFileFormatV3() throws IOException { 100 testHFileFormatV3Internals(useTags); 101 } 102 103 private void testHFileFormatV3Internals(boolean useTags) throws IOException { 104 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3"); 105 final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ; 106 final int entryCount = 10000; 107 writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags); 108 } 109 110 @Test 111 public void testMidKeyInHFile() throws IOException{ 112 testMidKeyInHFileInternals(useTags); 113 } 114 115 private void testMidKeyInHFileInternals(boolean useTags) throws IOException { 116 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), 117 "testMidKeyInHFile"); 118 Compression.Algorithm compressAlgo = Compression.Algorithm.NONE; 119 int entryCount = 50000; 120 writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags); 121 } 122 123 private void writeDataAndReadFromHFile(Path hfilePath, 124 Algorithm compressAlgo, int entryCount, boolean findMidKey, boolean useTags) throws IOException { 125 HFileContext context = new HFileContextBuilder() 126 .withBlockSize(4096) 127 .withIncludesTags(useTags) 128 .withCompression(compressAlgo).build(); 129 HFile.Writer writer = new HFile.WriterFactory(conf, new CacheConfig(conf)) 130 .withPath(fs, hfilePath) 131 .withFileContext(context) 132 .withComparator(CellComparatorImpl.COMPARATOR) 133 .create(); 134 135 Random rand = new Random(9713312); // Just a fixed seed. 136 List<KeyValue> keyValues = new ArrayList<>(entryCount); 137 138 for (int i = 0; i < entryCount; ++i) { 139 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i); 140 141 // A random-length random value. 142 byte[] valueBytes = RandomKeyValueUtil.randomValue(rand); 143 KeyValue keyValue = null; 144 if (useTags) { 145 ArrayList<Tag> tags = new ArrayList<>(); 146 for (int j = 0; j < 1 + rand.nextInt(4); j++) { 147 byte[] tagBytes = new byte[16]; 148 rand.nextBytes(tagBytes); 149 tags.add(new ArrayBackedTag((byte) 1, tagBytes)); 150 } 151 keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, 152 valueBytes, tags); 153 } else { 154 keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, 155 valueBytes); 156 } 157 writer.append(keyValue); 158 keyValues.add(keyValue); 159 } 160 161 // Add in an arbitrary order. They will be sorted lexicographically by 162 // the key. 163 writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C.")); 164 writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow")); 165 writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris")); 166 167 writer.close(); 168 169 170 FSDataInputStream fsdis = fs.open(hfilePath); 171 172 long fileSize = fs.getFileStatus(hfilePath).getLen(); 173 FixedFileTrailer trailer = 174 FixedFileTrailer.readFromStream(fsdis, fileSize); 175 176 assertEquals(3, trailer.getMajorVersion()); 177 assertEquals(entryCount, trailer.getEntryCount()); 178 HFileContext meta = new HFileContextBuilder() 179 .withCompression(compressAlgo) 180 .withIncludesMvcc(false) 181 .withIncludesTags(useTags) 182 .withHBaseCheckSum(true).build(); 183 HFileBlock.FSReader blockReader = 184 new HFileBlock.FSReaderImpl(fsdis, fileSize, meta); 185 // Comparator class name is stored in the trailer in version 3. 186 CellComparator comparator = trailer.createComparator(); 187 HFileBlockIndex.BlockIndexReader dataBlockIndexReader = 188 new HFileBlockIndex.CellBasedKeyBlockIndexReader(comparator, 189 trailer.getNumDataIndexLevels()); 190 HFileBlockIndex.BlockIndexReader metaBlockIndexReader = 191 new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1); 192 193 HFileBlock.BlockIterator blockIter = blockReader.blockRange( 194 trailer.getLoadOnOpenDataOffset(), 195 fileSize - trailer.getTrailerSize()); 196 // Data index. We also read statistics about the block index written after 197 // the root level. 198 dataBlockIndexReader.readMultiLevelIndexRoot( 199 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount()); 200 201 if (findMidKey) { 202 Cell midkey = dataBlockIndexReader.midkey(); 203 assertNotNull("Midkey should not be null", midkey); 204 } 205 206 // Meta index. 207 metaBlockIndexReader.readRootIndex( 208 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX) 209 .getByteStream(), trailer.getMetaIndexCount()); 210 // File info 211 FileInfo fileInfo = new FileInfo(); 212 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); 213 byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION); 214 boolean includeMemstoreTS = keyValueFormatVersion != null && 215 Bytes.toInt(keyValueFormatVersion) > 0; 216 217 // Counters for the number of key/value pairs and the number of blocks 218 int entriesRead = 0; 219 int blocksRead = 0; 220 long memstoreTS = 0; 221 222 // Scan blocks the way the reader would scan them 223 fsdis.seek(0); 224 long curBlockPos = 0; 225 while (curBlockPos <= trailer.getLastDataBlockOffset()) { 226 HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false) 227 .unpack(context, blockReader); 228 assertEquals(BlockType.DATA, block.getBlockType()); 229 ByteBuff buf = block.getBufferWithoutHeader(); 230 int keyLen = -1; 231 while (buf.hasRemaining()) { 232 233 keyLen = buf.getInt(); 234 235 int valueLen = buf.getInt(); 236 237 byte[] key = new byte[keyLen]; 238 buf.get(key); 239 240 byte[] value = new byte[valueLen]; 241 buf.get(value); 242 byte[] tagValue = null; 243 if (useTags) { 244 int tagLen = ((buf.get() & 0xff) << 8) ^ (buf.get() & 0xff); 245 tagValue = new byte[tagLen]; 246 buf.get(tagValue); 247 } 248 249 if (includeMemstoreTS) { 250 ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(), buf.arrayOffset() 251 + buf.position(), buf.remaining()); 252 DataInputStream data_input = new DataInputStream(byte_input); 253 254 memstoreTS = WritableUtils.readVLong(data_input); 255 buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS)); 256 } 257 258 // A brute-force check to see that all keys and values are correct. 259 KeyValue kv = keyValues.get(entriesRead); 260 assertTrue(Bytes.compareTo(key, kv.getKey()) == 0); 261 assertTrue(Bytes.compareTo(value, 0, value.length, kv.getValueArray(), kv.getValueOffset(), 262 kv.getValueLength()) == 0); 263 if (useTags) { 264 assertNotNull(tagValue); 265 KeyValue tkv = kv; 266 assertEquals(tagValue.length, tkv.getTagsLength()); 267 assertTrue(Bytes.compareTo(tagValue, 0, tagValue.length, tkv.getTagsArray(), 268 tkv.getTagsOffset(), tkv.getTagsLength()) == 0); 269 } 270 ++entriesRead; 271 } 272 ++blocksRead; 273 curBlockPos += block.getOnDiskSizeWithHeader(); 274 } 275 LOG.info("Finished reading: entries=" + entriesRead + ", blocksRead=" 276 + blocksRead); 277 assertEquals(entryCount, entriesRead); 278 279 // Meta blocks. We can scan until the load-on-open data offset (which is 280 // the root block index offset in version 2) because we are not testing 281 // intermediate-level index blocks here. 282 283 int metaCounter = 0; 284 while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { 285 LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + 286 trailer.getLoadOnOpenDataOffset()); 287 HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false) 288 .unpack(context, blockReader); 289 assertEquals(BlockType.META, block.getBlockType()); 290 Text t = new Text(); 291 ByteBuff buf = block.getBufferWithoutHeader(); 292 if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) { 293 throw new IOException("Failed to deserialize block " + this + 294 " into a " + t.getClass().getSimpleName()); 295 } 296 Text expectedText = 297 (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text( 298 "Moscow") : new Text("Washington, D.C.")); 299 assertEquals(expectedText, t); 300 LOG.info("Read meta block data: " + t); 301 ++metaCounter; 302 curBlockPos += block.getOnDiskSizeWithHeader(); 303 } 304 305 fsdis.close(); 306 } 307} 308