001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Random;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FSDataInputStream;
030import org.apache.hadoop.fs.FSDataOutputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.fs.HFileSystem;
038import org.apache.hadoop.hbase.io.ByteBuffAllocator;
039import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
040import org.apache.hadoop.hbase.io.compress.Compression;
041import org.apache.hadoop.hbase.nio.ByteBuff;
042import org.apache.hadoop.hbase.testclassification.IOTests;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.jupiter.api.BeforeEach;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048import org.junit.jupiter.api.TestInfo;
049
050@Tag(IOTests.TAG)
051@Tag(MediumTests.TAG)
052public class TestHFileBlockUnpack {
053
054  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
055
056  // repetition gives us some chance to get a good compression ratio
057  private static float CHANCE_TO_REPEAT = 0.6f;
058
059  private static final int MIN_ALLOCATION_SIZE = 10 * 1024;
060
061  ByteBuffAllocator allocator;
062
063  private FileSystem fs;
064
065  @BeforeEach
066  public void setUp() throws Exception {
067    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
068    Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
069    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE);
070    allocator = ByteBuffAllocator.create(conf, true);
071  }
072
073  /**
074   * It's important that if you read and unpack the same HFileBlock twice, it results in an
075   * identical buffer each time. Otherwise we end up with validation failures in block cache, since
076   * contents may not match if the same block is cached twice. See
077   * https://issues.apache.org/jira/browse/HBASE-27053
078   */
079  @Test
080  public void itUnpacksIdenticallyEachTime(TestInfo testInfo) throws IOException {
081    Path path = new Path(TEST_UTIL.getDataTestDir(), testInfo.getTestMethod().get().getName());
082    int totalSize = createTestBlock(path);
083
084    // Allocate a bunch of random buffers, so we can be sure that unpack will only have "dirty"
085    // buffers to choose from when allocating itself.
086    Random random = new Random();
087    byte[] temp = new byte[HConstants.DEFAULT_BLOCKSIZE];
088    List<ByteBuff> buffs = new ArrayList<>();
089    for (int i = 0; i < 10; i++) {
090      ByteBuff buff = allocator.allocate(HConstants.DEFAULT_BLOCKSIZE);
091      random.nextBytes(temp);
092      buff.put(temp);
093      buffs.add(buff);
094    }
095
096    buffs.forEach(ByteBuff::release);
097
098    // read the same block twice. we should expect the underlying buffer below to
099    // be identical each time
100    HFileBlockWrapper blockOne = readBlock(path, totalSize);
101    HFileBlockWrapper blockTwo = readBlock(path, totalSize);
102
103    // first check size fields
104    assertEquals(blockOne.original.getOnDiskSizeWithHeader(),
105      blockTwo.original.getOnDiskSizeWithHeader());
106    assertEquals(blockOne.original.getUncompressedSizeWithoutHeader(),
107      blockTwo.original.getUncompressedSizeWithoutHeader());
108
109    // next check packed buffers
110    assertBuffersEqual(blockOne.original.getBufferWithoutHeader(),
111      blockTwo.original.getBufferWithoutHeader(),
112      blockOne.original.getOnDiskDataSizeWithHeader() - blockOne.original.headerSize());
113
114    // now check unpacked buffers. prior to HBASE-27053, this would fail because
115    // the unpacked buffer would include extra space for checksums at the end that was not written.
116    // so the checksum space would be filled with random junk when re-using pooled buffers.
117    assertBuffersEqual(blockOne.unpacked.getBufferWithoutHeader(),
118      blockTwo.unpacked.getBufferWithoutHeader(),
119      blockOne.original.getUncompressedSizeWithoutHeader());
120  }
121
122  private void assertBuffersEqual(ByteBuff bufferOne, ByteBuff bufferTwo, int expectedSize) {
123    assertEquals(expectedSize, bufferOne.limit());
124    assertEquals(expectedSize, bufferTwo.limit());
125    assertEquals(0,
126      ByteBuff.compareTo(bufferOne, 0, bufferOne.limit(), bufferTwo, 0, bufferTwo.limit()));
127  }
128
129  /**
130   * If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that
131   * block will be allocated to heap regardless of desire for off-heap. After de-compressing the
132   * block, the new size may now exceed the min allocation size. This test ensures that those
133   * de-compressed blocks, which will be allocated off-heap, are properly marked as
134   * {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170
135   */
136  @Test
137  public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize(TestInfo testInfo)
138    throws IOException {
139    Path path = new Path(TEST_UTIL.getDataTestDir(), testInfo.getTestMethod().get().getName());
140    int totalSize = createTestBlock(path);
141    HFileBlockWrapper blockFromHFile = readBlock(path, totalSize);
142
143    assertFalse(blockFromHFile.original.isUnpacked(), "expected hfile block to NOT be unpacked");
144    assertFalse(blockFromHFile.original.isSharedMem(),
145      "expected hfile block to NOT use shared memory");
146
147    assertTrue(blockFromHFile.original.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE,
148      "expected generated block size " + blockFromHFile.original.getOnDiskSizeWithHeader()
149        + " to be less than " + MIN_ALLOCATION_SIZE);
150    assertTrue(blockFromHFile.original.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE,
151      "expected generated block uncompressed size "
152        + blockFromHFile.original.getUncompressedSizeWithoutHeader() + " to be more than "
153        + MIN_ALLOCATION_SIZE);
154
155    assertTrue(blockFromHFile.unpacked.isUnpacked(), "expected unpacked block to be unpacked");
156    assertTrue(blockFromHFile.unpacked.isSharedMem(),
157      "expected unpacked block to use shared memory");
158  }
159
160  private final static class HFileBlockWrapper {
161    private final HFileBlock original;
162    private final HFileBlock unpacked;
163
164    private HFileBlockWrapper(HFileBlock original, HFileBlock unpacked) {
165      this.original = original;
166      this.unpacked = unpacked;
167    }
168  }
169
170  private HFileBlockWrapper readBlock(Path path, int totalSize) throws IOException {
171    try (FSDataInputStream is = fs.open(path)) {
172      HFileContext meta =
173        new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ)
174          .withIncludesMvcc(false).withIncludesTags(false).build();
175      ReaderContext context =
176        new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
177          .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
178      HFileBlock.FSReaderImpl hbr =
179        new HFileBlock.FSReaderImpl(context, meta, allocator, TEST_UTIL.getConfiguration());
180      hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, TEST_UTIL.getConfiguration());
181      hbr.setIncludesMemStoreTS(false);
182      HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false);
183      blockFromHFile.sanityCheck();
184      return new HFileBlockWrapper(blockFromHFile, blockFromHFile.unpack(meta, hbr));
185    }
186  }
187
188  private int createTestBlock(Path path) throws IOException {
189    HFileContext meta =
190      new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false)
191        .withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
192
193    int totalSize;
194    try (FSDataOutputStream os = fs.create(path)) {
195      HFileBlock.Writer hbw =
196        new HFileBlock.Writer(TEST_UTIL.getConfiguration(), NoOpDataBlockEncoder.INSTANCE, meta);
197      hbw.startWriting(BlockType.DATA);
198      writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1);
199      hbw.writeHeaderAndData(os);
200      totalSize = hbw.getOnDiskSizeWithHeader();
201      assertTrue(totalSize < MIN_ALLOCATION_SIZE,
202        "expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE);
203    }
204    return totalSize;
205  }
206
207  static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException {
208    Random random = new Random(42);
209
210    byte[] family = new byte[] { 1 };
211    int rowKey = 0;
212    int qualifier = 0;
213    int value = 0;
214    long timestamp = 0;
215
216    int totalSize = 0;
217
218    // go until just up to the limit. compression should bring the total on-disk size under
219    while (totalSize < desiredSize) {
220      rowKey = maybeIncrement(random, rowKey);
221      qualifier = maybeIncrement(random, qualifier);
222      value = maybeIncrement(random, value);
223      timestamp = maybeIncrement(random, (int) timestamp);
224
225      KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier),
226        timestamp, Bytes.toBytes(value));
227      hbw.write(keyValue);
228      totalSize += keyValue.getLength();
229    }
230
231    return totalSize;
232  }
233
234  private static int maybeIncrement(Random random, int value) {
235    if (random.nextFloat() < CHANCE_TO_REPEAT) {
236      return value;
237    }
238    return value + 1;
239  }
240
241}