001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Random;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FSDataInputStream;
030import org.apache.hadoop.fs.FSDataOutputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.fs.HFileSystem;
039import org.apache.hadoop.hbase.io.ByteBuffAllocator;
040import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.nio.ByteBuff;
043import org.apache.hadoop.hbase.testclassification.IOTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.junit.Before;
047import org.junit.ClassRule;
048import org.junit.Rule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.rules.TestName;
052
053@Category({ IOTests.class, MediumTests.class })
054public class TestHFileBlockUnpack {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestHFileBlockUnpack.class);
059
060  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
061
062  // repetition gives us some chance to get a good compression ratio
063  private static float CHANCE_TO_REPEAT = 0.6f;
064
065  private static final int MIN_ALLOCATION_SIZE = 10 * 1024;
066
067  ByteBuffAllocator allocator;
068
069  @Rule
070  public TestName name = new TestName();
071  private FileSystem fs;
072
073  @Before
074  public void setUp() throws Exception {
075    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
076    Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
077    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE);
078    allocator = ByteBuffAllocator.create(conf, true);
079  }
080
081  /**
082   * It's important that if you read and unpack the same HFileBlock twice, it results in an
083   * identical buffer each time. Otherwise we end up with validation failures in block cache, since
084   * contents may not match if the same block is cached twice. See
085   * https://issues.apache.org/jira/browse/HBASE-27053
086   */
087  @Test
088  public void itUnpacksIdenticallyEachTime() throws IOException {
089    Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
090    int totalSize = createTestBlock(path);
091
092    // Allocate a bunch of random buffers, so we can be sure that unpack will only have "dirty"
093    // buffers to choose from when allocating itself.
094    Random random = new Random();
095    byte[] temp = new byte[HConstants.DEFAULT_BLOCKSIZE];
096    List<ByteBuff> buffs = new ArrayList<>();
097    for (int i = 0; i < 10; i++) {
098      ByteBuff buff = allocator.allocate(HConstants.DEFAULT_BLOCKSIZE);
099      random.nextBytes(temp);
100      buff.put(temp);
101      buffs.add(buff);
102    }
103
104    buffs.forEach(ByteBuff::release);
105
106    // read the same block twice. we should expect the underlying buffer below to
107    // be identical each time
108    HFileBlockWrapper blockOne = readBlock(path, totalSize);
109    HFileBlockWrapper blockTwo = readBlock(path, totalSize);
110
111    // first check size fields
112    assertEquals(blockOne.original.getOnDiskSizeWithHeader(),
113      blockTwo.original.getOnDiskSizeWithHeader());
114    assertEquals(blockOne.original.getUncompressedSizeWithoutHeader(),
115      blockTwo.original.getUncompressedSizeWithoutHeader());
116
117    // next check packed buffers
118    assertBuffersEqual(blockOne.original.getBufferWithoutHeader(),
119      blockTwo.original.getBufferWithoutHeader(),
120      blockOne.original.getOnDiskDataSizeWithHeader() - blockOne.original.headerSize());
121
122    // now check unpacked buffers. prior to HBASE-27053, this would fail because
123    // the unpacked buffer would include extra space for checksums at the end that was not written.
124    // so the checksum space would be filled with random junk when re-using pooled buffers.
125    assertBuffersEqual(blockOne.unpacked.getBufferWithoutHeader(),
126      blockTwo.unpacked.getBufferWithoutHeader(),
127      blockOne.original.getUncompressedSizeWithoutHeader());
128  }
129
130  private void assertBuffersEqual(ByteBuff bufferOne, ByteBuff bufferTwo, int expectedSize) {
131    assertEquals(expectedSize, bufferOne.limit());
132    assertEquals(expectedSize, bufferTwo.limit());
133    assertEquals(0,
134      ByteBuff.compareTo(bufferOne, 0, bufferOne.limit(), bufferTwo, 0, bufferTwo.limit()));
135  }
136
137  /**
138   * If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that
139   * block will be allocated to heap regardless of desire for off-heap. After de-compressing the
140   * block, the new size may now exceed the min allocation size. This test ensures that those
141   * de-compressed blocks, which will be allocated off-heap, are properly marked as
142   * {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170
143   */
144  @Test
145  public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException {
146    Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName());
147    int totalSize = createTestBlock(path);
148    HFileBlockWrapper blockFromHFile = readBlock(path, totalSize);
149
150    assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.original.isUnpacked());
151    assertFalse("expected hfile block to NOT use shared memory",
152      blockFromHFile.original.isSharedMem());
153
154    assertTrue(
155      "expected generated block size " + blockFromHFile.original.getOnDiskSizeWithHeader()
156        + " to be less than " + MIN_ALLOCATION_SIZE,
157      blockFromHFile.original.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE);
158    assertTrue(
159      "expected generated block uncompressed size "
160        + blockFromHFile.original.getUncompressedSizeWithoutHeader() + " to be more than "
161        + MIN_ALLOCATION_SIZE,
162      blockFromHFile.original.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE);
163
164    assertTrue("expected unpacked block to be unpacked", blockFromHFile.unpacked.isUnpacked());
165    assertTrue("expected unpacked block to use shared memory",
166      blockFromHFile.unpacked.isSharedMem());
167  }
168
169  private final static class HFileBlockWrapper {
170    private final HFileBlock original;
171    private final HFileBlock unpacked;
172
173    private HFileBlockWrapper(HFileBlock original, HFileBlock unpacked) {
174      this.original = original;
175      this.unpacked = unpacked;
176    }
177  }
178
179  private HFileBlockWrapper readBlock(Path path, int totalSize) throws IOException {
180    try (FSDataInputStream is = fs.open(path)) {
181      HFileContext meta =
182        new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ)
183          .withIncludesMvcc(false).withIncludesTags(false).build();
184      ReaderContext context =
185        new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
186          .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
187      HFileBlock.FSReaderImpl hbr =
188        new HFileBlock.FSReaderImpl(context, meta, allocator, TEST_UTIL.getConfiguration());
189      hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, TEST_UTIL.getConfiguration());
190      hbr.setIncludesMemStoreTS(false);
191      HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false);
192      blockFromHFile.sanityCheck();
193      return new HFileBlockWrapper(blockFromHFile, blockFromHFile.unpack(meta, hbr));
194    }
195  }
196
197  private int createTestBlock(Path path) throws IOException {
198    HFileContext meta =
199      new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false)
200        .withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
201
202    int totalSize;
203    try (FSDataOutputStream os = fs.create(path)) {
204      HFileBlock.Writer hbw =
205        new HFileBlock.Writer(TEST_UTIL.getConfiguration(), NoOpDataBlockEncoder.INSTANCE, meta);
206      hbw.startWriting(BlockType.DATA);
207      writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1);
208      hbw.writeHeaderAndData(os);
209      totalSize = hbw.getOnDiskSizeWithHeader();
210      assertTrue(
211        "expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE,
212        totalSize < MIN_ALLOCATION_SIZE);
213    }
214    return totalSize;
215  }
216
217  static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException {
218    Random random = new Random(42);
219
220    byte[] family = new byte[] { 1 };
221    int rowKey = 0;
222    int qualifier = 0;
223    int value = 0;
224    long timestamp = 0;
225
226    int totalSize = 0;
227
228    // go until just up to the limit. compression should bring the total on-disk size under
229    while (totalSize < desiredSize) {
230      rowKey = maybeIncrement(random, rowKey);
231      qualifier = maybeIncrement(random, qualifier);
232      value = maybeIncrement(random, value);
233      timestamp = maybeIncrement(random, (int) timestamp);
234
235      KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier),
236        timestamp, Bytes.toBytes(value));
237      hbw.write(keyValue);
238      totalSize += keyValue.getLength();
239    }
240
241    return totalSize;
242  }
243
244  private static int maybeIncrement(Random random, int value) {
245    if (random.nextFloat() < CHANCE_TO_REPEAT) {
246      return value;
247    }
248    return value + 1;
249  }
250
251}