001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.List;
026import java.util.Random;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FSDataInputStream;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.ArrayBackedTag;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellComparatorImpl;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.Tag;
041import org.apache.hadoop.hbase.io.ByteBuffAllocator;
042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
043import org.apache.hadoop.hbase.io.compress.Compression;
044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
046import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
047import org.apache.hadoop.hbase.nio.ByteBuff;
048import org.apache.hadoop.hbase.testclassification.IOTests;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.Writables;
052import org.apache.hadoop.io.Text;
053import org.junit.Assert;
054import org.junit.Before;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.runner.RunWith;
059import org.junit.runners.Parameterized;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * Testing writing a version 3 {@link HFile} for all encoded blocks
065 */
066@RunWith(Parameterized.class)
067@Category({IOTests.class, MediumTests.class})
068public class TestHFileWriterV3WithDataEncoders {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestHFileWriterV3WithDataEncoders.class);
073
074  private static final Logger LOG =
075    LoggerFactory.getLogger(TestHFileWriterV3WithDataEncoders.class);
076
077  private static final HBaseTestingUtility TEST_UTIL =
078    new HBaseTestingUtility();
079
080  private Configuration conf;
081  private FileSystem fs;
082  private boolean useTags;
083  private DataBlockEncoding dataBlockEncoding;
084
085  public TestHFileWriterV3WithDataEncoders(boolean useTags,
086      DataBlockEncoding dataBlockEncoding) {
087    this.useTags = useTags;
088    this.dataBlockEncoding = dataBlockEncoding;
089  }
090
091  @Parameterized.Parameters
092  public static Collection<Object[]> parameters() {
093    DataBlockEncoding[] dataBlockEncodings = DataBlockEncoding.values();
094    Object[][] params = new Object[dataBlockEncodings.length * 2 - 2][];
095    int i = 0;
096    for (DataBlockEncoding dataBlockEncoding : dataBlockEncodings) {
097      if (dataBlockEncoding == DataBlockEncoding.NONE) {
098        continue;
099      }
100      params[i++] = new Object[]{false, dataBlockEncoding};
101      params[i++] = new Object[]{true, dataBlockEncoding};
102    }
103    return Arrays.asList(params);
104  }
105
106  @Before
107  public void setUp() throws IOException {
108    conf = TEST_UTIL.getConfiguration();
109    fs = FileSystem.get(conf);
110  }
111
112  @Test
113  public void testHFileFormatV3() throws IOException {
114    testHFileFormatV3Internals(useTags);
115  }
116
117  private void testHFileFormatV3Internals(boolean useTags) throws IOException {
118    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3");
119    final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ;
120    final int entryCount = 10000;
121    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags);
122  }
123
124  @Test
125  public void testMidKeyInHFile() throws IOException{
126    testMidKeyInHFileInternals(useTags);
127  }
128
129  private void testMidKeyInHFileInternals(boolean useTags) throws IOException {
130    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(),
131      "testMidKeyInHFile");
132    Compression.Algorithm compressAlgo = Compression.Algorithm.NONE;
133    int entryCount = 50000;
134    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags);
135  }
136
137  private void writeDataAndReadFromHFile(Path hfilePath,
138      Compression.Algorithm compressAlgo, int entryCount, boolean findMidKey, boolean useTags)
139      throws IOException {
140
141    HFileContext context = new HFileContextBuilder()
142      .withBlockSize(4096)
143      .withIncludesTags(useTags)
144      .withDataBlockEncoding(dataBlockEncoding)
145      .withCellComparator(CellComparatorImpl.COMPARATOR)
146      .withCompression(compressAlgo).build();
147    CacheConfig cacheConfig = new CacheConfig(conf);
148    HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig)
149      .withPath(fs, hfilePath)
150      .withFileContext(context)
151      .create();
152
153    Random rand = new Random(9713312); // Just a fixed seed.
154    List<KeyValue> keyValues = new ArrayList<>(entryCount);
155
156    writeKeyValues(entryCount, useTags, writer, rand, keyValues);
157
158
159    FSDataInputStream fsdis = fs.open(hfilePath);
160
161    long fileSize = fs.getFileStatus(hfilePath).getLen();
162    FixedFileTrailer trailer =
163      FixedFileTrailer.readFromStream(fsdis, fileSize);
164
165    Assert.assertEquals(3, trailer.getMajorVersion());
166    Assert.assertEquals(entryCount, trailer.getEntryCount());
167    HFileContext meta = new HFileContextBuilder()
168      .withCompression(compressAlgo)
169      .withIncludesMvcc(true)
170      .withIncludesTags(useTags)
171      .withDataBlockEncoding(dataBlockEncoding)
172      .withHBaseCheckSum(true).build();
173    ReaderContext readerContext = new ReaderContextBuilder()
174      .withInputStreamWrapper(new FSDataInputStreamWrapper(fsdis))
175      .withFilePath(hfilePath)
176      .withFileSystem(fs)
177      .withFileSize(fileSize).build();
178    HFileBlock.FSReader blockReader =
179      new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP);
180    // Comparator class name is stored in the trailer in version 3.
181    CellComparator comparator = trailer.createComparator();
182    HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
183      new HFileBlockIndex.CellBasedKeyBlockIndexReader(comparator,
184        trailer.getNumDataIndexLevels());
185    HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
186      new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
187
188    HFileBlock.BlockIterator blockIter = blockReader.blockRange(
189      trailer.getLoadOnOpenDataOffset(),
190      fileSize - trailer.getTrailerSize());
191    // Data index. We also read statistics about the block index written after
192    // the root level.
193    dataBlockIndexReader.readMultiLevelIndexRoot(
194      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount());
195
196    FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fs, hfilePath);
197    readerContext = new ReaderContextBuilder()
198      .withFilePath(hfilePath)
199      .withFileSize(fileSize)
200      .withFileSystem(wrapper.getHfs())
201      .withInputStreamWrapper(wrapper)
202      .build();
203    HFileInfo hfile = new HFileInfo(readerContext, conf);
204    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
205    hfile.initMetaAndIndex(reader);
206    if (findMidKey) {
207      Cell midkey = dataBlockIndexReader.midkey(reader);
208      Assert.assertNotNull("Midkey should not be null", midkey);
209    }
210
211    // Meta index.
212    metaBlockIndexReader.readRootIndex(
213      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX)
214        .getByteStream(), trailer.getMetaIndexCount());
215    // File info
216    HFileInfo fileInfo = new HFileInfo();
217    fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
218    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
219    boolean includeMemstoreTS = keyValueFormatVersion != null &&
220      Bytes.toInt(keyValueFormatVersion) > 0;
221
222    // Counters for the number of key/value pairs and the number of blocks
223    int entriesRead = 0;
224    int blocksRead = 0;
225    long memstoreTS = 0;
226
227    DataBlockEncoder encoder = dataBlockEncoding.getEncoder();
228    long curBlockPos = scanBlocks(entryCount, context, keyValues, fsdis, trailer,
229      meta, blockReader, entriesRead, blocksRead, encoder);
230
231
232    // Meta blocks. We can scan until the load-on-open data offset (which is
233    // the root block index offset in version 2) because we are not testing
234    // intermediate-level index blocks here.
235
236    int metaCounter = 0;
237    while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
238      LOG.info("Current offset: {}, scanning until {}", fsdis.getPos(),
239        trailer.getLoadOnOpenDataOffset());
240      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true)
241        .unpack(context, blockReader);
242      Assert.assertEquals(BlockType.META, block.getBlockType());
243      Text t = new Text();
244      ByteBuff buf = block.getBufferWithoutHeader();
245      if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
246        throw new IOException("Failed to deserialize block " + this +
247          " into a " + t.getClass().getSimpleName());
248      }
249      Text expectedText =
250        (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text(
251          "Moscow") : new Text("Washington, D.C."));
252      Assert.assertEquals(expectedText, t);
253      LOG.info("Read meta block data: " + t);
254      ++metaCounter;
255      curBlockPos += block.getOnDiskSizeWithHeader();
256    }
257
258    fsdis.close();
259    reader.close();
260  }
261
262  private long scanBlocks(int entryCount, HFileContext context, List<KeyValue> keyValues,
263      FSDataInputStream fsdis, FixedFileTrailer trailer, HFileContext meta,
264      HFileBlock.FSReader blockReader, int entriesRead, int blocksRead,
265      DataBlockEncoder encoder) throws IOException {
266    // Scan blocks the way the reader would scan them
267    fsdis.seek(0);
268    long curBlockPos = 0;
269    while (curBlockPos <= trailer.getLastDataBlockOffset()) {
270      HFileBlockDecodingContext ctx = blockReader.getBlockDecodingContext();
271      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true)
272        .unpack(context, blockReader);
273      Assert.assertEquals(BlockType.ENCODED_DATA, block.getBlockType());
274      ByteBuff origBlock = block.getBufferReadOnly();
275      int pos = block.headerSize() + DataBlockEncoding.ID_SIZE;
276      origBlock.position(pos);
277      origBlock.limit(pos + block.getUncompressedSizeWithoutHeader() - DataBlockEncoding.ID_SIZE);
278      ByteBuff buf =  origBlock.slice();
279      DataBlockEncoder.EncodedSeeker seeker =
280        encoder.createSeeker(encoder.newDataBlockDecodingContext(meta));
281      seeker.setCurrentBuffer(buf);
282      Cell res = seeker.getCell();
283      KeyValue kv = keyValues.get(entriesRead);
284      Assert.assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv));
285      ++entriesRead;
286      while(seeker.next()) {
287        res = seeker.getCell();
288        kv = keyValues.get(entriesRead);
289        Assert.assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv));
290        ++entriesRead;
291      }
292      ++blocksRead;
293      curBlockPos += block.getOnDiskSizeWithHeader();
294    }
295    LOG.info("Finished reading: entries={}, blocksRead = {}", entriesRead, blocksRead);
296    Assert.assertEquals(entryCount, entriesRead);
297    return curBlockPos;
298  }
299
300  private void writeKeyValues(int entryCount, boolean useTags, HFile.Writer writer,
301      Random rand, List<KeyValue> keyValues) throws IOException {
302
303    for (int i = 0; i < entryCount; ++i) {
304      byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i);
305
306      // A random-length random value.
307      byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
308      KeyValue keyValue = null;
309      if (useTags) {
310        ArrayList<Tag> tags = new ArrayList<>();
311        for (int j = 0; j < 1 + rand.nextInt(4); j++) {
312          byte[] tagBytes = new byte[16];
313          rand.nextBytes(tagBytes);
314          tags.add(new ArrayBackedTag((byte) 1, tagBytes));
315        }
316        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP,
317          valueBytes, tags);
318      } else {
319        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP,
320          valueBytes);
321      }
322      writer.append(keyValue);
323      keyValues.add(keyValue);
324    }
325
326    // Add in an arbitrary order. They will be sorted lexicographically by
327    // the key.
328    writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C."));
329    writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow"));
330    writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris"));
331
332    writer.close();
333  }
334
335}