001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Random;
030import java.util.stream.Stream;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataInputStream;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.ArrayBackedTag;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellComparator;
038import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
039import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.Tag;
044import org.apache.hadoop.hbase.io.ByteBuffAllocator;
045import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
046import org.apache.hadoop.hbase.io.compress.Compression;
047import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
048import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
049import org.apache.hadoop.hbase.nio.ByteBuff;
050import org.apache.hadoop.hbase.testclassification.IOTests;
051import org.apache.hadoop.hbase.testclassification.SmallTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.Writables;
054import org.apache.hadoop.io.Text;
055import org.apache.hadoop.io.WritableUtils;
056import org.junit.jupiter.api.BeforeEach;
057import org.junit.jupiter.api.TestTemplate;
058import org.junit.jupiter.params.provider.Arguments;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * Testing writing a version 3 {@link HFile}.
064 */
065@org.junit.jupiter.api.Tag(IOTests.TAG)
066@org.junit.jupiter.api.Tag(SmallTests.TAG)
067@HBaseParameterizedTestTemplate(name = "{index}: useTags={0}")
068public class TestHFileWriterV3 {
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestHFileWriterV3.class);
071  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
072  private static final Random RNG = new Random(9713312); // Just a fixed seed.
073
074  private Configuration conf;
075  private FileSystem fs;
076  private boolean useTags;
077
078  public TestHFileWriterV3(boolean useTags) {
079    this.useTags = useTags;
080  }
081
082  public static Stream<Arguments> parameters() {
083    return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED.stream().map(arr -> Arguments.of(arr));
084  }
085
086  @BeforeEach
087  public void setUp() throws IOException {
088    conf = TEST_UTIL.getConfiguration();
089    fs = FileSystem.get(conf);
090  }
091
092  @TestTemplate
093  public void testHFileFormatV3() throws IOException {
094    testHFileFormatV3Internals(useTags);
095  }
096
097  private void testHFileFormatV3Internals(boolean useTags) throws IOException {
098    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3");
099    final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ;
100    final int entryCount = 10000;
101    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags);
102  }
103
104  @TestTemplate
105  public void testMidKeyInHFile() throws IOException {
106    testMidKeyInHFileInternals(useTags);
107  }
108
109  private void testMidKeyInHFileInternals(boolean useTags) throws IOException {
110    Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testMidKeyInHFile");
111    Compression.Algorithm compressAlgo = Compression.Algorithm.NONE;
112    int entryCount = 50000;
113    writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags);
114  }
115
116  private void writeDataAndReadFromHFile(Path hfilePath, Algorithm compressAlgo, int entryCount,
117    boolean findMidKey, boolean useTags) throws IOException {
118    HFileContext context = new HFileContextBuilder().withBlockSize(4096).withIncludesTags(useTags)
119      .withDataBlockEncoding(DataBlockEncoding.NONE).withCompression(compressAlgo).build();
120    CacheConfig cacheConfig = new CacheConfig(conf);
121    HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, hfilePath)
122      .withFileContext(context).create();
123
124    List<KeyValue> keyValues = new ArrayList<>(entryCount);
125    for (int i = 0; i < entryCount; ++i) {
126      byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(RNG, i);
127      // A random-length random value.
128      byte[] valueBytes = RandomKeyValueUtil.randomValue(RNG);
129      KeyValue keyValue = null;
130      if (useTags) {
131        ArrayList<Tag> tags = new ArrayList<>();
132        for (int j = 0; j < 1 + RNG.nextInt(4); j++) {
133          byte[] tagBytes = new byte[16];
134          RNG.nextBytes(tagBytes);
135          tags.add(new ArrayBackedTag((byte) 1, tagBytes));
136        }
137        keyValue =
138          new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes, tags);
139      } else {
140        keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes);
141      }
142      writer.append(keyValue);
143      keyValues.add(keyValue);
144    }
145
146    // Add in an arbitrary order. They will be sorted lexicographically by
147    // the key.
148    writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C."));
149    writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow"));
150    writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris"));
151
152    writer.close();
153
154    FSDataInputStream fsdis = fs.open(hfilePath);
155
156    long fileSize = fs.getFileStatus(hfilePath).getLen();
157    FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, fileSize);
158
159    assertEquals(3, trailer.getMajorVersion());
160    assertEquals(entryCount, trailer.getEntryCount());
161    HFileContext meta = new HFileContextBuilder().withCompression(compressAlgo)
162      .withIncludesMvcc(false).withIncludesTags(useTags)
163      .withDataBlockEncoding(DataBlockEncoding.NONE).withHBaseCheckSum(true).build();
164    ReaderContext readerContext =
165      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fsdis))
166        .withFilePath(hfilePath).withFileSystem(fs).withFileSize(fileSize).build();
167    HFileBlock.FSReader blockReader =
168      new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP, conf);
169    // Comparator class name is stored in the trailer in version 3.
170    CellComparator comparator = trailer.createComparator();
171    HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
172      new HFileBlockIndex.CellBasedKeyBlockIndexReaderV2(comparator,
173        trailer.getNumDataIndexLevels());
174    HFileBlockIndex.BlockIndexReader metaBlockIndexReader =
175      new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
176
177    HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
178      fileSize - trailer.getTrailerSize());
179    // Data index. We also read statistics about the block index written after
180    // the root level.
181    dataBlockIndexReader.readMultiLevelIndexRoot(
182      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount());
183
184    FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fs, hfilePath);
185    readerContext = new ReaderContextBuilder().withFilePath(hfilePath).withFileSize(fileSize)
186      .withFileSystem(wrapper.getHfs()).withInputStreamWrapper(wrapper).build();
187    HFileInfo hfile = new HFileInfo(readerContext, conf);
188    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf);
189    hfile.initMetaAndIndex(reader);
190    if (findMidKey) {
191      Cell midkey = dataBlockIndexReader.midkey(reader);
192      assertNotNull(midkey, "Midkey should not be null");
193    }
194
195    // Meta index.
196    metaBlockIndexReader.readRootIndex(
197      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(),
198      trailer.getMetaIndexCount());
199    // File info
200    HFileInfo fileInfo = new HFileInfo();
201    fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
202    byte[] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
203    boolean includeMemstoreTS =
204      keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0;
205
206    // Counters for the number of key/value pairs and the number of blocks
207    int entriesRead = 0;
208    int blocksRead = 0;
209    long memstoreTS = 0;
210
211    // Scan blocks the way the reader would scan them
212    fsdis.seek(0);
213    long curBlockPos = 0;
214    while (curBlockPos <= trailer.getLastDataBlockOffset()) {
215      HFileBlock block =
216        blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader);
217      assertEquals(BlockType.DATA, block.getBlockType());
218      ByteBuff buf = block.getBufferWithoutHeader();
219      int keyLen = -1;
220      while (buf.hasRemaining()) {
221
222        keyLen = buf.getInt();
223
224        int valueLen = buf.getInt();
225
226        byte[] key = new byte[keyLen];
227        buf.get(key);
228
229        byte[] value = new byte[valueLen];
230        buf.get(value);
231        byte[] tagValue = null;
232        if (useTags) {
233          int tagLen = ((buf.get() & 0xff) << 8) ^ (buf.get() & 0xff);
234          tagValue = new byte[tagLen];
235          buf.get(tagValue);
236        }
237
238        if (includeMemstoreTS) {
239          ByteArrayInputStream byte_input = new ByteArrayInputStream(buf.array(),
240            buf.arrayOffset() + buf.position(), buf.remaining());
241          DataInputStream data_input = new DataInputStream(byte_input);
242
243          memstoreTS = WritableUtils.readVLong(data_input);
244          buf.position(buf.position() + WritableUtils.getVIntSize(memstoreTS));
245        }
246
247        // A brute-force check to see that all keys and values are correct.
248        KeyValue kv = keyValues.get(entriesRead);
249        assertTrue(Bytes.compareTo(key, kv.getKey()) == 0);
250        assertTrue(Bytes.compareTo(value, 0, value.length, kv.getValueArray(), kv.getValueOffset(),
251          kv.getValueLength()) == 0);
252        if (useTags) {
253          assertNotNull(tagValue);
254          KeyValue tkv = kv;
255          assertEquals(tagValue.length, tkv.getTagsLength());
256          assertTrue(Bytes.compareTo(tagValue, 0, tagValue.length, tkv.getTagsArray(),
257            tkv.getTagsOffset(), tkv.getTagsLength()) == 0);
258        }
259        ++entriesRead;
260      }
261      ++blocksRead;
262      curBlockPos += block.getOnDiskSizeWithHeader();
263    }
264    LOG.info("Finished reading: entries=" + entriesRead + ", blocksRead=" + blocksRead);
265    assertEquals(entryCount, entriesRead);
266
267    // Meta blocks. We can scan until the load-on-open data offset (which is
268    // the root block index offset in version 2) because we are not testing
269    // intermediate-level index blocks here.
270
271    int metaCounter = 0;
272    while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
273      LOG.info("Current offset: " + fsdis.getPos() + ", scanning until "
274        + trailer.getLoadOnOpenDataOffset());
275      HFileBlock block =
276        blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader);
277      assertEquals(BlockType.META, block.getBlockType());
278      Text t = new Text();
279      ByteBuff buf = block.getBufferWithoutHeader();
280      if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
281        throw new IOException(
282          "Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName());
283      }
284      Text expectedText = (metaCounter == 0 ? new Text("Paris")
285        : metaCounter == 1 ? new Text("Moscow")
286        : new Text("Washington, D.C."));
287      assertEquals(expectedText, t);
288      LOG.info("Read meta block data: " + t);
289      ++metaCounter;
290      curBlockPos += block.getOnDiskSizeWithHeader();
291    }
292
293    fsdis.close();
294    reader.close();
295  }
296}