001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Random;
027import java.util.stream.Stream;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FSDataInputStream;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.ArrayBackedTag;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellComparatorImpl;
036import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.Tag;
041import org.apache.hadoop.hbase.io.ByteBuffAllocator;
042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
043import org.apache.hadoop.hbase.io.compress.Compression;
044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
046import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
047import org.apache.hadoop.hbase.nio.ByteBuff;
048import org.apache.hadoop.hbase.testclassification.IOTests;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.Writables;
052import org.apache.hadoop.io.Text;
053import org.junit.jupiter.api.BeforeEach;
054import org.junit.jupiter.api.TestTemplate;
055import org.junit.jupiter.params.provider.Arguments;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * Testing writing a version 3 {@link HFile} for all encoded blocks
061 */
062@org.junit.jupiter.api.Tag(IOTests.TAG)
063@org.junit.jupiter.api.Tag(MediumTests.TAG)
064@HBaseParameterizedTestTemplate(name = "{index}: useTags={0}, dataBlockEncoding={1}")
065public class TestHFileWriterV3WithDataEncoders {
066
067  private static final Logger LOG =
068    LoggerFactory.getLogger(TestHFileWriterV3WithDataEncoders.class);
069  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
070  private static final Random RNG = new Random(9713312); // Just a fixed seed.
071
072  private Configuration conf;
073  private FileSystem fs;
074  private boolean useTags;
075  private DataBlockEncoding dataBlockEncoding;
076
077  public TestHFileWriterV3WithDataEncoders(boolean useTags, DataBlockEncoding dataBlockEncoding) {
078    this.useTags = useTags;
079    this.dataBlockEncoding = dataBlockEncoding;
080  }
081
082  public static Stream<Arguments> parameters() {
083    DataBlockEncoding[] dataBlockEncodings = DataBlockEncoding.values();
084    Stream.Builder<Arguments> builder = Stream.builder();
085    for (DataBlockEncoding dataBlockEncoding : dataBlockEncodings) {
086      if (dataBlockEncoding == DataBlockEncoding.NONE) {
087        continue;
088      }
089      builder.add(Arguments.of(false, dataBlockEncoding));
090      builder.add(Arguments.of(true, dataBlockEncoding));
091    }
092    return builder.build();
093  }
094
095  @BeforeEach
096  public void setUp() throws IOException {
097    conf = TEST_UTIL.getConfiguration();
098    fs = FileSystem.get(conf);
099  }
100
101  @TestTemplate
102  public void testHFileFormatV3() throws IOException {
103    testHFileFormatV3Internals(useTags);
104  }
105
106  private void testHFileFormatV3Internals(boolean useTags) throws IOException {
107    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3");
108    final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ;
109    final int entryCount = 10000;
110    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags);
111  }
112
113  @TestTemplate
114  public void testMidKeyInHFile() throws IOException {
115    testMidKeyInHFileInternals(useTags);
116  }
117
118  private void testMidKeyInHFileInternals(boolean useTags) throws IOException {
119    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testMidKeyInHFile");
120    Compression.Algorithm compressAlgo = Compression.Algorithm.NONE;
121    int entryCount = 50000;
122    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags);
123  }
124
125  private void writeDataAndReadFromHFile(Path hfilePath, Compression.Algorithm compressAlgo,
126    int entryCount, boolean findMidKey, boolean useTags) throws IOException {
127
128    HFileContext context = new HFileContextBuilder().withBlockSize(4096).withIncludesTags(useTags)
129      .withDataBlockEncoding(dataBlockEncoding).withCellComparator(CellComparatorImpl.COMPARATOR)
130      .withCompression(compressAlgo).build();
131    CacheConfig cacheConfig = new CacheConfig(conf);
132    HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, hfilePath)
133      .withFileContext(context).create();
134
135    List<KeyValue> keyValues = new ArrayList<>(entryCount);
136    writeKeyValues(entryCount, useTags, writer, RNG, keyValues);
137
138    FSDataInputStream fsdis = fs.open(hfilePath);
139
140    long fileSize = fs.getFileStatus(hfilePath).getLen();
141    FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, fileSize);
142
143    assertEquals(3, trailer.getMajorVersion());
144    assertEquals(entryCount, trailer.getEntryCount());
145    HFileContext meta = new HFileContextBuilder().withCompression(compressAlgo)
146      .withIncludesMvcc(true).withIncludesTags(useTags).withDataBlockEncoding(dataBlockEncoding)
147      .withHBaseCheckSum(true).build();
148    ReaderContext readerContext =
149      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fsdis))
150        .withFilePath(hfilePath).withFileSystem(fs).withFileSize(fileSize).build();
151    HFileBlock.FSReader blockReader =
152      new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP, conf);
153    // Comparator class name is stored in the trailer in version 3.
154    CellComparator comparator = trailer.createComparator();
155    HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
156      new HFileBlockIndex.CellBasedKeyBlockIndexReaderV2(comparator,
157        trailer.getNumDataIndexLevels());
158    HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
159      new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
160
161    HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
162      fileSize - trailer.getTrailerSize());
163    // Data index. We also read statistics about the block index written after
164    // the root level.
165    dataBlockIndexReader.readMultiLevelIndexRoot(
166      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount());
167
168    FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fs, hfilePath);
169    readerContext = new ReaderContextBuilder().withFilePath(hfilePath).withFileSize(fileSize)
170      .withFileSystem(wrapper.getHfs()).withInputStreamWrapper(wrapper).build();
171    HFileInfo hfile = new HFileInfo(readerContext, conf);
172    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
173    hfile.initMetaAndIndex(reader);
174    if (findMidKey) {
175      Cell midkey = dataBlockIndexReader.midkey(reader);
176      assertNotNull(midkey, "Midkey should not be null");
177    }
178
179    // Meta index.
180    metaBlockIndexReader.readRootIndex(
181      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(),
182      trailer.getMetaIndexCount());
183    // File info
184    HFileInfo fileInfo = new HFileInfo();
185    fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
186    byte[] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
187    boolean includeMemstoreTS =
188      keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0;
189
190    // Counters for the number of key/value pairs and the number of blocks
191    int entriesRead = 0;
192    int blocksRead = 0;
193    long memstoreTS = 0;
194
195    DataBlockEncoder encoder = dataBlockEncoding.getEncoder();
196    long curBlockPos = scanBlocks(entryCount, context, keyValues, fsdis, trailer, meta, blockReader,
197      entriesRead, blocksRead, encoder);
198
199    // Meta blocks. We can scan until the load-on-open data offset (which is
200    // the root block index offset in version 2) because we are not testing
201    // intermediate-level index blocks here.
202
203    int metaCounter = 0;
204    while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
205      LOG.info("Current offset: {}, scanning until {}", fsdis.getPos(),
206        trailer.getLoadOnOpenDataOffset());
207      HFileBlock block =
208        blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader);
209      assertEquals(BlockType.META, block.getBlockType());
210      Text t = new Text();
211      ByteBuff buf = block.getBufferWithoutHeader();
212      if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
213        throw new IOException(
214          "Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName());
215      }
216      Text expectedText = (metaCounter == 0 ? new Text("Paris")
217        : metaCounter == 1 ? new Text("Moscow")
218        : new Text("Washington, D.C."));
219      assertEquals(expectedText, t);
220      LOG.info("Read meta block data: " + t);
221      ++metaCounter;
222      curBlockPos += block.getOnDiskSizeWithHeader();
223    }
224
225    fsdis.close();
226    reader.close();
227  }
228
229  private long scanBlocks(int entryCount, HFileContext context, List<KeyValue> keyValues,
230    FSDataInputStream fsdis, FixedFileTrailer trailer, HFileContext meta,
231    HFileBlock.FSReader blockReader, int entriesRead, int blocksRead, DataBlockEncoder encoder)
232    throws IOException {
233    // Scan blocks the way the reader would scan them
234    fsdis.seek(0);
235    long curBlockPos = 0;
236    while (curBlockPos <= trailer.getLastDataBlockOffset()) {
237      HFileBlockDecodingContext ctx = blockReader.getBlockDecodingContext();
238      HFileBlock block =
239        blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader);
240      assertEquals(BlockType.ENCODED_DATA, block.getBlockType());
241      ByteBuff origBlock = block.getBufferReadOnly();
242      int pos = block.headerSize() + DataBlockEncoding.ID_SIZE;
243      origBlock.position(pos);
244      origBlock.limit(pos + block.getUncompressedSizeWithoutHeader() - DataBlockEncoding.ID_SIZE);
245      ByteBuff buf = origBlock.slice();
246      DataBlockEncoder.EncodedSeeker seeker =
247        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
248      seeker.setCurrentBuffer(buf);
249      Cell res = seeker.getCell();
250      KeyValue kv = keyValues.get(entriesRead);
251      assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv));
252      ++entriesRead;
253      while (seeker.next()) {
254        res = seeker.getCell();
255        kv = keyValues.get(entriesRead);
256        assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv));
257        ++entriesRead;
258      }
259      ++blocksRead;
260      curBlockPos += block.getOnDiskSizeWithHeader();
261    }
262    LOG.info("Finished reading: entries={}, blocksRead = {}", entriesRead, blocksRead);
263    assertEquals(entryCount, entriesRead);
264    return curBlockPos;
265  }
266
267  private void writeKeyValues(int entryCount, boolean useTags, HFile.Writer writer, Random rand,
268    List<KeyValue> keyValues) throws IOException {
269
270    for (int i = 0; i < entryCount; ++i) {
271      byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i);
272
273      // A random-length random value.
274      byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
275      KeyValue keyValue = null;
276      if (useTags) {
277        ArrayList<Tag> tags = new ArrayList<>();
278        for (int j = 0; j < 1 + rand.nextInt(4); j++) {
279          byte[] tagBytes = new byte[16];
280          rand.nextBytes(tagBytes);
281          tags.add(new ArrayBackedTag((byte) 1, tagBytes));
282        }
283        keyValue =
284          new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes, tags);
285      } else {
286        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes);
287      }
288      writer.append(keyValue);
289      keyValues.add(keyValue);
290    }
291
292    // Add in an arbitrary order. They will be sorted lexicographically by
293    // the key.
294    writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C."));
295    writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow"));
296    writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris"));
297
298    writer.close();
299  }
300
301}