001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.List;
030import java.util.Random;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataInputStream;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.ArrayBackedTag;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellComparator;
038import org.apache.hadoop.hbase.CellComparatorImpl;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.Tag;
045import org.apache.hadoop.hbase.io.compress.Compression;
046import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
048import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
049import org.apache.hadoop.hbase.nio.ByteBuff;
050import org.apache.hadoop.hbase.testclassification.IOTests;
051import org.apache.hadoop.hbase.testclassification.SmallTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.Writables;
054import org.apache.hadoop.io.Text;
055import org.apache.hadoop.io.WritableUtils;
056import org.junit.Before;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.runner.RunWith;
061import org.junit.runners.Parameterized;
062import org.junit.runners.Parameterized.Parameters;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * Testing writing a version 3 {@link HFile}.
068 */
069@RunWith(Parameterized.class)
070@Category({IOTests.class, SmallTests.class})
071public class TestHFileWriterV3 {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075      HBaseClassTestRule.forClass(TestHFileWriterV3.class);
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestHFileWriterV3.class);
078
079  private static final HBaseTestingUtility TEST_UTIL =
080      new HBaseTestingUtility();
081
082  private Configuration conf;
083  private FileSystem fs;
084  private boolean useTags;
085  public TestHFileWriterV3(boolean useTags) {
086    this.useTags = useTags;
087  }
088  @Parameters
089  public static Collection<Object[]> parameters() {
090    return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED;
091  }
092
093  @Before
094  public void setUp() throws IOException {
095    conf = TEST_UTIL.getConfiguration();
096    fs = FileSystem.get(conf);
097  }
098
099  @Test
100  public void testHFileFormatV3() throws IOException {
101    testHFileFormatV3Internals(useTags);
102  }
103
104  private void testHFileFormatV3Internals(boolean useTags) throws IOException {
105    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3");
106    final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ;
107    final int entryCount = 10000;
108    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags);
109  }
110
111  @Test
112  public void testMidKeyInHFile() throws IOException{
113    testMidKeyInHFileInternals(useTags);
114  }
115
116  private void testMidKeyInHFileInternals(boolean useTags) throws IOException {
117    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(),
118    "testMidKeyInHFile");
119    Compression.Algorithm compressAlgo = Compression.Algorithm.NONE;
120    int entryCount = 50000;
121    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags);
122  }
123
124  private void writeDataAndReadFromHFile(Path hfilePath,
125      Algorithm compressAlgo, int entryCount, boolean findMidKey, boolean useTags) throws IOException {
126    HFileContext context = new HFileContextBuilder()
127      .withBlockSize(4096)
128      .withIncludesTags(useTags)
129      .withDataBlockEncoding(DataBlockEncoding.NONE)
130      .withCompression(compressAlgo).build();
131    HFile.Writer writer = new HFile.WriterFactory(conf, new CacheConfig(conf))
132      .withPath(fs, hfilePath)
133      .withFileContext(context)
134      .withComparator(CellComparatorImpl.COMPARATOR)
135      .create();
136
137    Random rand = new Random(9713312); // Just a fixed seed.
138    List<KeyValue> keyValues = new ArrayList<>(entryCount);
139
140    for (int i = 0; i < entryCount; ++i) {
141      byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i);
142
143      // A random-length random value.
144      byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
145      KeyValue keyValue = null;
146      if (useTags) {
147        ArrayList<Tag> tags = new ArrayList<>();
148        for (int j = 0; j < 1 + rand.nextInt(4); j++) {
149          byte[] tagBytes = new byte[16];
150          rand.nextBytes(tagBytes);
151          tags.add(new ArrayBackedTag((byte) 1, tagBytes));
152        }
153        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP,
154            valueBytes, tags);
155      } else {
156        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP,
157            valueBytes);
158      }
159      writer.append(keyValue);
160      keyValues.add(keyValue);
161    }
162
163    // Add in an arbitrary order. They will be sorted lexicographically by
164    // the key.
165    writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C."));
166    writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow"));
167    writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris"));
168
169    writer.close();
170
171
172    FSDataInputStream fsdis = fs.open(hfilePath);
173
174    long fileSize = fs.getFileStatus(hfilePath).getLen();
175    FixedFileTrailer trailer =
176        FixedFileTrailer.readFromStream(fsdis, fileSize);
177
178    assertEquals(3, trailer.getMajorVersion());
179    assertEquals(entryCount, trailer.getEntryCount());
180    HFileContext meta = new HFileContextBuilder()
181      .withCompression(compressAlgo)
182      .withIncludesMvcc(false)
183      .withIncludesTags(useTags)
184      .withDataBlockEncoding(DataBlockEncoding.NONE)
185      .withHBaseCheckSum(true).build();
186    HFileBlock.FSReader blockReader =
187        new HFileBlock.FSReaderImpl(fsdis, fileSize, meta);
188    // Comparator class name is stored in the trailer in version 3.
189    CellComparator comparator = trailer.createComparator();
190    HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
191        new HFileBlockIndex.CellBasedKeyBlockIndexReader(comparator,
192            trailer.getNumDataIndexLevels());
193    HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
194        new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
195
196    HFileBlock.BlockIterator blockIter = blockReader.blockRange(
197        trailer.getLoadOnOpenDataOffset(),
198        fileSize - trailer.getTrailerSize());
199    // Data index. We also read statistics about the block index written after
200    // the root level.
201    dataBlockIndexReader.readMultiLevelIndexRoot(
202        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount());
203
204    if (findMidKey) {
205      Cell midkey = dataBlockIndexReader.midkey();
206      assertNotNull("Midkey should not be null", midkey);
207    }
208
209    // Meta index.
210    metaBlockIndexReader.readRootIndex(
211        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX)
212          .getByteStream(), trailer.getMetaIndexCount());
213    // File info
214    FileInfo fileInfo = new FileInfo();
215    fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
216    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
217    boolean includeMemstoreTS = keyValueFormatVersion != null &&
218        Bytes.toInt(keyValueFormatVersion) > 0;
219
220    // Counters for the number of key/value pairs and the number of blocks
221    int entriesRead = 0;
222    int blocksRead = 0;
223    long memstoreTS = 0;
224
225    // Scan blocks the way the reader would scan them
226    fsdis.seek(0);
227    long curBlockPos = 0;
228    while (curBlockPos <= trailer.getLastDataBlockOffset()) {
229      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
230        .unpack(context, blockReader);
231      assertEquals(BlockType.DATA, block.getBlockType());
232      ByteBuff buf = block.getBufferWithoutHeader();
233      int keyLen = -1;
234      while (buf.hasRemaining()) {
235
236        keyLen = buf.getInt();
237
238        int valueLen = buf.getInt();
239
240        byte[] key = new byte[keyLen];
241        buf.get(key);
242
243        byte[] value = new byte[valueLen];
244        buf.get(value);
245        byte[] tagValue = null;
246        if (useTags) {
247          int tagLen = ((buf.get() & 0xff) << 8) ^ (buf.get() & 0xff);
248          tagValue = new byte[tagLen];
249          buf.get(tagValue);
250        }
251
252        if (includeMemstoreTS) {
253          ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(), buf.arrayOffset()
254              + buf.position(), buf.remaining());
255          DataInputStream data_input = new DataInputStream(byte_input);
256
257          memstoreTS = WritableUtils.readVLong(data_input);
258          buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS));
259        }
260
261        // A brute-force check to see that all keys and values are correct.
262        KeyValue kv = keyValues.get(entriesRead);
263        assertTrue(Bytes.compareTo(key, kv.getKey()) == 0);
264        assertTrue(Bytes.compareTo(value, 0, value.length, kv.getValueArray(), kv.getValueOffset(),
265          kv.getValueLength()) == 0);
266        if (useTags) {
267          assertNotNull(tagValue);
268          KeyValue tkv =  kv;
269          assertEquals(tagValue.length, tkv.getTagsLength());
270          assertTrue(Bytes.compareTo(tagValue, 0, tagValue.length, tkv.getTagsArray(),
271              tkv.getTagsOffset(), tkv.getTagsLength()) == 0);
272        }
273        ++entriesRead;
274      }
275      ++blocksRead;
276      curBlockPos += block.getOnDiskSizeWithHeader();
277    }
278    LOG.info("Finished reading: entries=" + entriesRead + ", blocksRead="
279        + blocksRead);
280    assertEquals(entryCount, entriesRead);
281
282    // Meta blocks. We can scan until the load-on-open data offset (which is
283    // the root block index offset in version 2) because we are not testing
284    // intermediate-level index blocks here.
285
286    int metaCounter = 0;
287    while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
288      LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
289          trailer.getLoadOnOpenDataOffset());
290      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
291        .unpack(context, blockReader);
292      assertEquals(BlockType.META, block.getBlockType());
293      Text t = new Text();
294      ByteBuff buf = block.getBufferWithoutHeader();
295      if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
296        throw new IOException("Failed to deserialize block " + this +
297            " into a " + t.getClass().getSimpleName());
298      }
299      Text expectedText =
300          (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text(
301              "Moscow") : new Text("Washington, D.C."));
302      assertEquals(expectedText, t);
303      LOG.info("Read meta block data: " + t);
304      ++metaCounter;
305      curBlockPos += block.getOnDiskSizeWithHeader();
306    }
307
308    fsdis.close();
309  }
310}
311