001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.List;
026import java.util.Random;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FSDataInputStream;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.ArrayBackedTag;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellComparatorImpl;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.Tag;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
044import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
045import org.apache.hadoop.hbase.nio.ByteBuff;
046import org.apache.hadoop.hbase.testclassification.IOTests;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.Writables;
050import org.apache.hadoop.io.Text;
051import org.junit.Assert;
052import org.junit.Before;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.runner.RunWith;
057import org.junit.runners.Parameterized;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Testing writing a version 3 {@link HFile} for all encoded blocks
063 */
064@RunWith(Parameterized.class)
065@Category({IOTests.class, SmallTests.class})
066public class TestHFileWriterV3WithDataEncoders {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestHFileWriterV3WithDataEncoders.class);
071
072  private static final Logger LOG =
073    LoggerFactory.getLogger(TestHFileWriterV3WithDataEncoders.class);
074
075  private static final HBaseTestingUtility TEST_UTIL =
076    new HBaseTestingUtility();
077
078  private Configuration conf;
079  private FileSystem fs;
080  private boolean useTags;
081  private DataBlockEncoding dataBlockEncoding;
082
083  public TestHFileWriterV3WithDataEncoders(boolean useTags,
084      DataBlockEncoding dataBlockEncoding) {
085    this.useTags = useTags;
086    this.dataBlockEncoding = dataBlockEncoding;
087  }
088
089  @Parameterized.Parameters
090  public static Collection<Object[]> parameters() {
091    DataBlockEncoding[] dataBlockEncodings = DataBlockEncoding.values();
092    Object[][] params = new Object[dataBlockEncodings.length * 2 - 2][];
093    int i = 0;
094    for (DataBlockEncoding dataBlockEncoding : dataBlockEncodings) {
095      if (dataBlockEncoding == DataBlockEncoding.NONE) {
096        continue;
097      }
098      params[i++] = new Object[]{false, dataBlockEncoding};
099      params[i++] = new Object[]{true, dataBlockEncoding};
100    }
101    return Arrays.asList(params);
102  }
103
104  @Before
105  public void setUp() throws IOException {
106    conf = TEST_UTIL.getConfiguration();
107    fs = FileSystem.get(conf);
108  }
109
110  @Test
111  public void testHFileFormatV3() throws IOException {
112    testHFileFormatV3Internals(useTags);
113  }
114
115  private void testHFileFormatV3Internals(boolean useTags) throws IOException {
116    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3");
117    final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ;
118    final int entryCount = 10000;
119    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags);
120  }
121
122  @Test
123  public void testMidKeyInHFile() throws IOException{
124    testMidKeyInHFileInternals(useTags);
125  }
126
127  private void testMidKeyInHFileInternals(boolean useTags) throws IOException {
128    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(),
129      "testMidKeyInHFile");
130    Compression.Algorithm compressAlgo = Compression.Algorithm.NONE;
131    int entryCount = 50000;
132    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags);
133  }
134
135  private void writeDataAndReadFromHFile(Path hfilePath,
136      Compression.Algorithm compressAlgo, int entryCount, boolean findMidKey, boolean useTags)
137      throws IOException {
138
139    HFileContext context = new HFileContextBuilder()
140      .withBlockSize(4096)
141      .withIncludesTags(useTags)
142      .withDataBlockEncoding(dataBlockEncoding)
143      .withCompression(compressAlgo).build();
144    CacheConfig cacheConfig = new CacheConfig(conf);
145    HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig)
146      .withPath(fs, hfilePath)
147      .withFileContext(context)
148      .create();
149
150    Random rand = new Random(9713312); // Just a fixed seed.
151    List<KeyValue> keyValues = new ArrayList<>(entryCount);
152
153    writeKeyValues(entryCount, useTags, writer, rand, keyValues);
154
155
156    FSDataInputStream fsdis = fs.open(hfilePath);
157
158    long fileSize = fs.getFileStatus(hfilePath).getLen();
159    FixedFileTrailer trailer =
160      FixedFileTrailer.readFromStream(fsdis, fileSize);
161
162    Assert.assertEquals(3, trailer.getMajorVersion());
163    Assert.assertEquals(entryCount, trailer.getEntryCount());
164    HFileContext meta = new HFileContextBuilder()
165      .withCompression(compressAlgo)
166      .withIncludesMvcc(true)
167      .withIncludesTags(useTags)
168      .withDataBlockEncoding(dataBlockEncoding)
169      .withHBaseCheckSum(true).build();
170    HFileBlock.FSReader blockReader =
171      new HFileBlock.FSReaderImpl(fsdis, fileSize, meta);
172    // Comparator class name is stored in the trailer in version 3.
173    CellComparator comparator = trailer.createComparator();
174    HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
175      new HFileBlockIndex.CellBasedKeyBlockIndexReader(comparator,
176        trailer.getNumDataIndexLevels());
177    HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
178      new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
179
180    HFileBlock.BlockIterator blockIter = blockReader.blockRange(
181      trailer.getLoadOnOpenDataOffset(),
182      fileSize - trailer.getTrailerSize());
183    // Data index. We also read statistics about the block index written after
184    // the root level.
185    dataBlockIndexReader.readMultiLevelIndexRoot(
186      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount());
187
188    if (findMidKey) {
189      Cell midkey = dataBlockIndexReader.midkey();
190      Assert.assertNotNull("Midkey should not be null", midkey);
191    }
192
193    // Meta index.
194    metaBlockIndexReader.readRootIndex(
195      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX)
196        .getByteStream(), trailer.getMetaIndexCount());
197    // File info
198    HFile.FileInfo fileInfo = new HFile.FileInfo();
199    fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
200    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
201    boolean includeMemstoreTS = keyValueFormatVersion != null &&
202      Bytes.toInt(keyValueFormatVersion) > 0;
203
204    // Counters for the number of key/value pairs and the number of blocks
205    int entriesRead = 0;
206    int blocksRead = 0;
207    long memstoreTS = 0;
208
209    DataBlockEncoder encoder = dataBlockEncoding.getEncoder();
210    long curBlockPos = scanBlocks(entryCount, context, keyValues, fsdis, trailer,
211      meta, blockReader, entriesRead, blocksRead, encoder);
212
213
214    // Meta blocks. We can scan until the load-on-open data offset (which is
215    // the root block index offset in version 2) because we are not testing
216    // intermediate-level index blocks here.
217
218    int metaCounter = 0;
219    while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
220      LOG.info("Current offset: {}, scanning until {}", fsdis.getPos(),
221        trailer.getLoadOnOpenDataOffset());
222      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
223        .unpack(context, blockReader);
224      Assert.assertEquals(BlockType.META, block.getBlockType());
225      Text t = new Text();
226      ByteBuff buf = block.getBufferWithoutHeader();
227      if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
228        throw new IOException("Failed to deserialize block " + this +
229          " into a " + t.getClass().getSimpleName());
230      }
231      Text expectedText =
232        (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text(
233          "Moscow") : new Text("Washington, D.C."));
234      Assert.assertEquals(expectedText, t);
235      LOG.info("Read meta block data: " + t);
236      ++metaCounter;
237      curBlockPos += block.getOnDiskSizeWithHeader();
238    }
239
240    fsdis.close();
241  }
242
243  private long scanBlocks(int entryCount, HFileContext context, List<KeyValue> keyValues,
244      FSDataInputStream fsdis, FixedFileTrailer trailer, HFileContext meta,
245      HFileBlock.FSReader blockReader, int entriesRead, int blocksRead,
246      DataBlockEncoder encoder) throws IOException {
247    // Scan blocks the way the reader would scan them
248    fsdis.seek(0);
249    long curBlockPos = 0;
250    while (curBlockPos <= trailer.getLastDataBlockOffset()) {
251      HFileBlockDecodingContext ctx = blockReader.getBlockDecodingContext();
252      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
253        .unpack(context, blockReader);
254      Assert.assertEquals(BlockType.ENCODED_DATA, block.getBlockType());
255      ByteBuff origBlock = block.getBufferReadOnly();
256      int pos = block.headerSize() + DataBlockEncoding.ID_SIZE;
257      origBlock.position(pos);
258      origBlock.limit(pos + block.getUncompressedSizeWithoutHeader() - DataBlockEncoding.ID_SIZE);
259      ByteBuff buf =  origBlock.slice();
260      DataBlockEncoder.EncodedSeeker seeker =
261        encoder.createSeeker(CellComparatorImpl.COMPARATOR,
262          encoder.newDataBlockDecodingContext(meta));
263      seeker.setCurrentBuffer(buf);
264      Cell res = seeker.getCell();
265      KeyValue kv = keyValues.get(entriesRead);
266      Assert.assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv));
267      ++entriesRead;
268      while(seeker.next()) {
269        res = seeker.getCell();
270        kv = keyValues.get(entriesRead);
271        Assert.assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv));
272        ++entriesRead;
273      }
274      ++blocksRead;
275      curBlockPos += block.getOnDiskSizeWithHeader();
276    }
277    LOG.info("Finished reading: entries={}, blocksRead = {}", entriesRead, blocksRead);
278    Assert.assertEquals(entryCount, entriesRead);
279    return curBlockPos;
280  }
281
282  private void writeKeyValues(int entryCount, boolean useTags, HFile.Writer writer,
283      Random rand, List<KeyValue> keyValues) throws IOException {
284
285    for (int i = 0; i < entryCount; ++i) {
286      byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i);
287
288      // A random-length random value.
289      byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
290      KeyValue keyValue = null;
291      if (useTags) {
292        ArrayList<Tag> tags = new ArrayList<>();
293        for (int j = 0; j < 1 + rand.nextInt(4); j++) {
294          byte[] tagBytes = new byte[16];
295          rand.nextBytes(tagBytes);
296          tags.add(new ArrayBackedTag((byte) 1, tagBytes));
297        }
298        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP,
299          valueBytes, tags);
300      } else {
301        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP,
302          valueBytes);
303      }
304      writer.append(keyValue);
305      keyValues.add(keyValue);
306    }
307
308    // Add in an arbitrary order. They will be sorted lexicographically by
309    // the key.
310    writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C."));
311    writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow"));
312    writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris"));
313
314    writer.close();
315  }
316
317}