001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.List;
030import java.util.Random;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.ArrayBackedTag;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellComparator;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.Tag;
045import org.apache.hadoop.hbase.io.ByteBuffAllocator;
046import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
047import org.apache.hadoop.hbase.io.compress.Compression;
048import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
049import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
050import org.apache.hadoop.hbase.nio.ByteBuff;
051import org.apache.hadoop.hbase.testclassification.IOTests;
052import org.apache.hadoop.hbase.testclassification.SmallTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.Writables;
055import org.apache.hadoop.io.Text;
056import org.apache.hadoop.io.WritableUtils;
057import org.junit.Before;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.runner.RunWith;
062import org.junit.runners.Parameterized;
063import org.junit.runners.Parameterized.Parameters;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * Testing writing a version 3 {@link HFile}.
069 */
070@RunWith(Parameterized.class)
071@Category({IOTests.class, SmallTests.class})
072public class TestHFileWriterV3 {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestHFileWriterV3.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestHFileWriterV3.class);
079
080  private static final HBaseTestingUtility TEST_UTIL =
081      new HBaseTestingUtility();
082
083  private Configuration conf;
084  private FileSystem fs;
085  private boolean useTags;
086  public TestHFileWriterV3(boolean useTags) {
087    this.useTags = useTags;
088  }
089  @Parameters
090  public static Collection<Object[]> parameters() {
091    return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED;
092  }
093
094  @Before
095  public void setUp() throws IOException {
096    conf = TEST_UTIL.getConfiguration();
097    fs = FileSystem.get(conf);
098  }
099
100  @Test
101  public void testHFileFormatV3() throws IOException {
102    testHFileFormatV3Internals(useTags);
103  }
104
105  private void testHFileFormatV3Internals(boolean useTags) throws IOException {
106    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3");
107    final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ;
108    final int entryCount = 10000;
109    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags);
110  }
111
112  @Test
113  public void testMidKeyInHFile() throws IOException{
114    testMidKeyInHFileInternals(useTags);
115  }
116
117  private void testMidKeyInHFileInternals(boolean useTags) throws IOException {
118    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(),
119    "testMidKeyInHFile");
120    Compression.Algorithm compressAlgo = Compression.Algorithm.NONE;
121    int entryCount = 50000;
122    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags);
123  }
124
125  private void writeDataAndReadFromHFile(Path hfilePath,
126      Algorithm compressAlgo, int entryCount, boolean findMidKey, boolean useTags) throws IOException {
127    HFileContext context = new HFileContextBuilder()
128      .withBlockSize(4096)
129      .withIncludesTags(useTags)
130      .withDataBlockEncoding(DataBlockEncoding.NONE)
131      .withCompression(compressAlgo).build();
132    CacheConfig cacheConfig = new CacheConfig(conf);
133    HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig)
134      .withPath(fs, hfilePath)
135      .withFileContext(context)
136      .create();
137
138    Random rand = new Random(9713312); // Just a fixed seed.
139    List<KeyValue> keyValues = new ArrayList<>(entryCount);
140
141    for (int i = 0; i < entryCount; ++i) {
142      byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i);
143
144      // A random-length random value.
145      byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
146      KeyValue keyValue = null;
147      if (useTags) {
148        ArrayList<Tag> tags = new ArrayList<>();
149        for (int j = 0; j < 1 + rand.nextInt(4); j++) {
150          byte[] tagBytes = new byte[16];
151          rand.nextBytes(tagBytes);
152          tags.add(new ArrayBackedTag((byte) 1, tagBytes));
153        }
154        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP,
155            valueBytes, tags);
156      } else {
157        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP,
158            valueBytes);
159      }
160      writer.append(keyValue);
161      keyValues.add(keyValue);
162    }
163
164    // Add in an arbitrary order. They will be sorted lexicographically by
165    // the key.
166    writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C."));
167    writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow"));
168    writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris"));
169
170    writer.close();
171
172
173    FSDataInputStream fsdis = fs.open(hfilePath);
174
175    long fileSize = fs.getFileStatus(hfilePath).getLen();
176    FixedFileTrailer trailer =
177        FixedFileTrailer.readFromStream(fsdis, fileSize);
178
179    assertEquals(3, trailer.getMajorVersion());
180    assertEquals(entryCount, trailer.getEntryCount());
181    HFileContext meta = new HFileContextBuilder()
182      .withCompression(compressAlgo)
183      .withIncludesMvcc(false)
184      .withIncludesTags(useTags)
185      .withDataBlockEncoding(DataBlockEncoding.NONE)
186      .withHBaseCheckSum(true).build();
187    ReaderContext readerContext = new ReaderContextBuilder()
188      .withInputStreamWrapper(new FSDataInputStreamWrapper(fsdis))
189      .withFilePath(hfilePath)
190      .withFileSystem(fs)
191      .withFileSize(fileSize).build();
192    HFileBlock.FSReader blockReader =
193        new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP);
194    // Comparator class name is stored in the trailer in version 3.
195    CellComparator comparator = trailer.createComparator();
196    HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
197        new HFileBlockIndex.CellBasedKeyBlockIndexReader(comparator,
198            trailer.getNumDataIndexLevels());
199    HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
200        new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
201
202    HFileBlock.BlockIterator blockIter = blockReader.blockRange(
203        trailer.getLoadOnOpenDataOffset(),
204        fileSize - trailer.getTrailerSize());
205    // Data index. We also read statistics about the block index written after
206    // the root level.
207    dataBlockIndexReader.readMultiLevelIndexRoot(
208        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount());
209
210    FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fs, hfilePath);
211    readerContext = new ReaderContextBuilder()
212        .withFilePath(hfilePath)
213        .withFileSize(fileSize)
214        .withFileSystem(wrapper.getHfs())
215        .withInputStreamWrapper(wrapper)
216        .build();
217    HFileInfo hfile = new HFileInfo(readerContext, conf);
218    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
219    hfile.initMetaAndIndex(reader);
220    if (findMidKey) {
221      Cell midkey = dataBlockIndexReader.midkey(reader);
222      assertNotNull("Midkey should not be null", midkey);
223    }
224
225    // Meta index.
226    metaBlockIndexReader.readRootIndex(
227        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX)
228          .getByteStream(), trailer.getMetaIndexCount());
229    // File info
230    HFileInfo fileInfo = new HFileInfo();
231    fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
232    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
233    boolean includeMemstoreTS = keyValueFormatVersion != null &&
234        Bytes.toInt(keyValueFormatVersion) > 0;
235
236    // Counters for the number of key/value pairs and the number of blocks
237    int entriesRead = 0;
238    int blocksRead = 0;
239    long memstoreTS = 0;
240
241    // Scan blocks the way the reader would scan them
242    fsdis.seek(0);
243    long curBlockPos = 0;
244    while (curBlockPos <= trailer.getLastDataBlockOffset()) {
245      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true)
246          .unpack(context, blockReader);
247      assertEquals(BlockType.DATA, block.getBlockType());
248      ByteBuff buf = block.getBufferWithoutHeader();
249      int keyLen = -1;
250      while (buf.hasRemaining()) {
251
252        keyLen = buf.getInt();
253
254        int valueLen = buf.getInt();
255
256        byte[] key = new byte[keyLen];
257        buf.get(key);
258
259        byte[] value = new byte[valueLen];
260        buf.get(value);
261        byte[] tagValue = null;
262        if (useTags) {
263          int tagLen = ((buf.get() & 0xff) << 8) ^ (buf.get() & 0xff);
264          tagValue = new byte[tagLen];
265          buf.get(tagValue);
266        }
267
268        if (includeMemstoreTS) {
269          ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(), buf.arrayOffset()
270              + buf.position(), buf.remaining());
271          DataInputStream data_input = new DataInputStream(byte_input);
272
273          memstoreTS = WritableUtils.readVLong(data_input);
274          buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS));
275        }
276
277        // A brute-force check to see that all keys and values are correct.
278        KeyValue kv = keyValues.get(entriesRead);
279        assertTrue(Bytes.compareTo(key, kv.getKey()) == 0);
280        assertTrue(Bytes.compareTo(value, 0, value.length, kv.getValueArray(), kv.getValueOffset(),
281          kv.getValueLength()) == 0);
282        if (useTags) {
283          assertNotNull(tagValue);
284          KeyValue tkv =  kv;
285          assertEquals(tagValue.length, tkv.getTagsLength());
286          assertTrue(Bytes.compareTo(tagValue, 0, tagValue.length, tkv.getTagsArray(),
287              tkv.getTagsOffset(), tkv.getTagsLength()) == 0);
288        }
289        ++entriesRead;
290      }
291      ++blocksRead;
292      curBlockPos += block.getOnDiskSizeWithHeader();
293    }
294    LOG.info("Finished reading: entries=" + entriesRead + ", blocksRead="
295        + blocksRead);
296    assertEquals(entryCount, entriesRead);
297
298    // Meta blocks. We can scan until the load-on-open data offset (which is
299    // the root block index offset in version 2) because we are not testing
300    // intermediate-level index blocks here.
301
302    int metaCounter = 0;
303    while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
304      LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
305          trailer.getLoadOnOpenDataOffset());
306      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true)
307          .unpack(context, blockReader);
308      assertEquals(BlockType.META, block.getBlockType());
309      Text t = new Text();
310      ByteBuff buf = block.getBufferWithoutHeader();
311      if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
312        throw new IOException("Failed to deserialize block " + this +
313            " into a " + t.getClass().getSimpleName());
314      }
315      Text expectedText =
316          (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text(
317              "Moscow") : new Text("Washington, D.C."));
318      assertEquals(expectedText, t);
319      LOG.info("Read meta block data: " + t);
320      ++metaCounter;
321      curBlockPos += block.getOnDiskSizeWithHeader();
322    }
323
324    fsdis.close();
325    reader.close();
326  }
327}
328