001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.List;
025import java.util.Random;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FSDataInputStream;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.ArrayBackedTag;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellComparator;
033import org.apache.hadoop.hbase.CellComparatorImpl;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.Tag;
039import org.apache.hadoop.hbase.io.ByteBuffAllocator;
040import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
044import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
045import org.apache.hadoop.hbase.nio.ByteBuff;
046import org.apache.hadoop.hbase.testclassification.IOTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.Writables;
050import org.apache.hadoop.io.Text;
051import org.junit.Assert;
052import org.junit.Before;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.runner.RunWith;
057import org.junit.runners.Parameterized;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Testing writing a version 3 {@link HFile} for all encoded blocks
063 */
064@RunWith(Parameterized.class)
065@Category({ IOTests.class, MediumTests.class })
066public class TestHFileWriterV3WithDataEncoders {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestHFileWriterV3WithDataEncoders.class);
071
072  private static final Logger LOG =
073    LoggerFactory.getLogger(TestHFileWriterV3WithDataEncoders.class);
074  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
075  private static final Random RNG = new Random(9713312); // Just a fixed seed.
076
077  private Configuration conf;
078  private FileSystem fs;
079  private boolean useTags;
080  private DataBlockEncoding dataBlockEncoding;
081
082  public TestHFileWriterV3WithDataEncoders(boolean useTags, DataBlockEncoding dataBlockEncoding) {
083    this.useTags = useTags;
084    this.dataBlockEncoding = dataBlockEncoding;
085  }
086
087  @Parameterized.Parameters
088  public static Collection<Object[]> parameters() {
089    DataBlockEncoding[] dataBlockEncodings = DataBlockEncoding.values();
090    Object[][] params = new Object[dataBlockEncodings.length * 2 - 2][];
091    int i = 0;
092    for (DataBlockEncoding dataBlockEncoding : dataBlockEncodings) {
093      if (dataBlockEncoding == DataBlockEncoding.NONE) {
094        continue;
095      }
096      params[i++] = new Object[] { false, dataBlockEncoding };
097      params[i++] = new Object[] { true, dataBlockEncoding };
098    }
099    return Arrays.asList(params);
100  }
101
102  @Before
103  public void setUp() throws IOException {
104    conf = TEST_UTIL.getConfiguration();
105    fs = FileSystem.get(conf);
106  }
107
108  @Test
109  public void testHFileFormatV3() throws IOException {
110    testHFileFormatV3Internals(useTags);
111  }
112
113  private void testHFileFormatV3Internals(boolean useTags) throws IOException {
114    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3");
115    final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ;
116    final int entryCount = 10000;
117    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags);
118  }
119
120  @Test
121  public void testMidKeyInHFile() throws IOException {
122    testMidKeyInHFileInternals(useTags);
123  }
124
125  private void testMidKeyInHFileInternals(boolean useTags) throws IOException {
126    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testMidKeyInHFile");
127    Compression.Algorithm compressAlgo = Compression.Algorithm.NONE;
128    int entryCount = 50000;
129    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags);
130  }
131
132  private void writeDataAndReadFromHFile(Path hfilePath, Compression.Algorithm compressAlgo,
133    int entryCount, boolean findMidKey, boolean useTags) throws IOException {
134
135    HFileContext context = new HFileContextBuilder().withBlockSize(4096).withIncludesTags(useTags)
136      .withDataBlockEncoding(dataBlockEncoding).withCellComparator(CellComparatorImpl.COMPARATOR)
137      .withCompression(compressAlgo).build();
138    CacheConfig cacheConfig = new CacheConfig(conf);
139    HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, hfilePath)
140      .withFileContext(context).create();
141
142    List<KeyValue> keyValues = new ArrayList<>(entryCount);
143    writeKeyValues(entryCount, useTags, writer, RNG, keyValues);
144
145    FSDataInputStream fsdis = fs.open(hfilePath);
146
147    long fileSize = fs.getFileStatus(hfilePath).getLen();
148    FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, fileSize);
149
150    Assert.assertEquals(3, trailer.getMajorVersion());
151    Assert.assertEquals(entryCount, trailer.getEntryCount());
152    HFileContext meta = new HFileContextBuilder().withCompression(compressAlgo)
153      .withIncludesMvcc(true).withIncludesTags(useTags).withDataBlockEncoding(dataBlockEncoding)
154      .withHBaseCheckSum(true).build();
155    ReaderContext readerContext =
156      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fsdis))
157        .withFilePath(hfilePath).withFileSystem(fs).withFileSize(fileSize).build();
158    HFileBlock.FSReader blockReader =
159      new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP, conf);
160    // Comparator class name is stored in the trailer in version 3.
161    CellComparator comparator = trailer.createComparator();
162    HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
163      new HFileBlockIndex.CellBasedKeyBlockIndexReaderV2(comparator,
164        trailer.getNumDataIndexLevels());
165    HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
166      new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
167
168    HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
169      fileSize - trailer.getTrailerSize());
170    // Data index. We also read statistics about the block index written after
171    // the root level.
172    dataBlockIndexReader.readMultiLevelIndexRoot(
173      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount());
174
175    FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fs, hfilePath);
176    readerContext = new ReaderContextBuilder().withFilePath(hfilePath).withFileSize(fileSize)
177      .withFileSystem(wrapper.getHfs()).withInputStreamWrapper(wrapper).build();
178    HFileInfo hfile = new HFileInfo(readerContext, conf);
179    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
180    hfile.initMetaAndIndex(reader);
181    if (findMidKey) {
182      Cell midkey = dataBlockIndexReader.midkey(reader);
183      Assert.assertNotNull("Midkey should not be null", midkey);
184    }
185
186    // Meta index.
187    metaBlockIndexReader.readRootIndex(
188      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(),
189      trailer.getMetaIndexCount());
190    // File info
191    HFileInfo fileInfo = new HFileInfo();
192    fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
193    byte[] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
194    boolean includeMemstoreTS =
195      keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0;
196
197    // Counters for the number of key/value pairs and the number of blocks
198    int entriesRead = 0;
199    int blocksRead = 0;
200    long memstoreTS = 0;
201
202    DataBlockEncoder encoder = dataBlockEncoding.getEncoder();
203    long curBlockPos = scanBlocks(entryCount, context, keyValues, fsdis, trailer, meta, blockReader,
204      entriesRead, blocksRead, encoder);
205
206    // Meta blocks. We can scan until the load-on-open data offset (which is
207    // the root block index offset in version 2) because we are not testing
208    // intermediate-level index blocks here.
209
210    int metaCounter = 0;
211    while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
212      LOG.info("Current offset: {}, scanning until {}", fsdis.getPos(),
213        trailer.getLoadOnOpenDataOffset());
214      HFileBlock block =
215        blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader);
216      Assert.assertEquals(BlockType.META, block.getBlockType());
217      Text t = new Text();
218      ByteBuff buf = block.getBufferWithoutHeader();
219      if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
220        throw new IOException(
221          "Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName());
222      }
223      Text expectedText = (metaCounter == 0 ? new Text("Paris")
224        : metaCounter == 1 ? new Text("Moscow")
225        : new Text("Washington, D.C."));
226      Assert.assertEquals(expectedText, t);
227      LOG.info("Read meta block data: " + t);
228      ++metaCounter;
229      curBlockPos += block.getOnDiskSizeWithHeader();
230    }
231
232    fsdis.close();
233    reader.close();
234  }
235
236  private long scanBlocks(int entryCount, HFileContext context, List<KeyValue> keyValues,
237    FSDataInputStream fsdis, FixedFileTrailer trailer, HFileContext meta,
238    HFileBlock.FSReader blockReader, int entriesRead, int blocksRead, DataBlockEncoder encoder)
239    throws IOException {
240    // Scan blocks the way the reader would scan them
241    fsdis.seek(0);
242    long curBlockPos = 0;
243    while (curBlockPos <= trailer.getLastDataBlockOffset()) {
244      HFileBlockDecodingContext ctx = blockReader.getBlockDecodingContext();
245      HFileBlock block =
246        blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader);
247      Assert.assertEquals(BlockType.ENCODED_DATA, block.getBlockType());
248      ByteBuff origBlock = block.getBufferReadOnly();
249      int pos = block.headerSize() + DataBlockEncoding.ID_SIZE;
250      origBlock.position(pos);
251      origBlock.limit(pos + block.getUncompressedSizeWithoutHeader() - DataBlockEncoding.ID_SIZE);
252      ByteBuff buf = origBlock.slice();
253      DataBlockEncoder.EncodedSeeker seeker =
254        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
255      seeker.setCurrentBuffer(buf);
256      Cell res = seeker.getCell();
257      KeyValue kv = keyValues.get(entriesRead);
258      Assert.assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv));
259      ++entriesRead;
260      while (seeker.next()) {
261        res = seeker.getCell();
262        kv = keyValues.get(entriesRead);
263        Assert.assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv));
264        ++entriesRead;
265      }
266      ++blocksRead;
267      curBlockPos += block.getOnDiskSizeWithHeader();
268    }
269    LOG.info("Finished reading: entries={}, blocksRead = {}", entriesRead, blocksRead);
270    Assert.assertEquals(entryCount, entriesRead);
271    return curBlockPos;
272  }
273
274  private void writeKeyValues(int entryCount, boolean useTags, HFile.Writer writer, Random rand,
275    List<KeyValue> keyValues) throws IOException {
276
277    for (int i = 0; i < entryCount; ++i) {
278      byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i);
279
280      // A random-length random value.
281      byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
282      KeyValue keyValue = null;
283      if (useTags) {
284        ArrayList<Tag> tags = new ArrayList<>();
285        for (int j = 0; j < 1 + rand.nextInt(4); j++) {
286          byte[] tagBytes = new byte[16];
287          rand.nextBytes(tagBytes);
288          tags.add(new ArrayBackedTag((byte) 1, tagBytes));
289        }
290        keyValue =
291          new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes, tags);
292      } else {
293        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes);
294      }
295      writer.append(keyValue);
296      keyValues.add(keyValue);
297    }
298
299    // Add in an arbitrary order. They will be sorted lexicographically by
300    // the key.
301    writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C."));
302    writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow"));
303    writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris"));
304
305    writer.close();
306  }
307
308}