001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Random; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FSDataInputStream; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.fs.HFileSystem; 038import org.apache.hadoop.hbase.io.ByteBuffAllocator; 039import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 040import org.apache.hadoop.hbase.io.compress.Compression; 041import org.apache.hadoop.hbase.nio.ByteBuff; 042import org.apache.hadoop.hbase.testclassification.IOTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.jupiter.api.BeforeEach; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048import org.junit.jupiter.api.TestInfo; 049 050@Tag(IOTests.TAG) 051@Tag(MediumTests.TAG) 052public class TestHFileBlockUnpack { 053 054 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 055 056 // repetition gives us some chance to get a good compression ratio 057 private static float CHANCE_TO_REPEAT = 0.6f; 058 059 private static final int MIN_ALLOCATION_SIZE = 10 * 1024; 060 061 ByteBuffAllocator allocator; 062 063 private FileSystem fs; 064 065 @BeforeEach 066 public void setUp() throws Exception { 067 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 068 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 069 conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE); 070 allocator = ByteBuffAllocator.create(conf, true); 071 } 072 073 /** 074 * It's important that if you read and unpack the same HFileBlock twice, it results in an 075 * identical buffer each time. Otherwise we end up with validation failures in block cache, since 076 * contents may not match if the same block is cached twice. See 077 * https://issues.apache.org/jira/browse/HBASE-27053 078 */ 079 @Test 080 public void itUnpacksIdenticallyEachTime(TestInfo testInfo) throws IOException { 081 Path path = new Path(TEST_UTIL.getDataTestDir(), testInfo.getTestMethod().get().getName()); 082 int totalSize = createTestBlock(path); 083 084 // Allocate a bunch of random buffers, so we can be sure that unpack will only have "dirty" 085 // buffers to choose from when allocating itself. 086 Random random = new Random(); 087 byte[] temp = new byte[HConstants.DEFAULT_BLOCKSIZE]; 088 List<ByteBuff> buffs = new ArrayList<>(); 089 for (int i = 0; i < 10; i++) { 090 ByteBuff buff = allocator.allocate(HConstants.DEFAULT_BLOCKSIZE); 091 random.nextBytes(temp); 092 buff.put(temp); 093 buffs.add(buff); 094 } 095 096 buffs.forEach(ByteBuff::release); 097 098 // read the same block twice. we should expect the underlying buffer below to 099 // be identical each time 100 HFileBlockWrapper blockOne = readBlock(path, totalSize); 101 HFileBlockWrapper blockTwo = readBlock(path, totalSize); 102 103 // first check size fields 104 assertEquals(blockOne.original.getOnDiskSizeWithHeader(), 105 blockTwo.original.getOnDiskSizeWithHeader()); 106 assertEquals(blockOne.original.getUncompressedSizeWithoutHeader(), 107 blockTwo.original.getUncompressedSizeWithoutHeader()); 108 109 // next check packed buffers 110 assertBuffersEqual(blockOne.original.getBufferWithoutHeader(), 111 blockTwo.original.getBufferWithoutHeader(), 112 blockOne.original.getOnDiskDataSizeWithHeader() - blockOne.original.headerSize()); 113 114 // now check unpacked buffers. prior to HBASE-27053, this would fail because 115 // the unpacked buffer would include extra space for checksums at the end that was not written. 116 // so the checksum space would be filled with random junk when re-using pooled buffers. 117 assertBuffersEqual(blockOne.unpacked.getBufferWithoutHeader(), 118 blockTwo.unpacked.getBufferWithoutHeader(), 119 blockOne.original.getUncompressedSizeWithoutHeader()); 120 } 121 122 private void assertBuffersEqual(ByteBuff bufferOne, ByteBuff bufferTwo, int expectedSize) { 123 assertEquals(expectedSize, bufferOne.limit()); 124 assertEquals(expectedSize, bufferTwo.limit()); 125 assertEquals(0, 126 ByteBuff.compareTo(bufferOne, 0, bufferOne.limit(), bufferTwo, 0, bufferTwo.limit())); 127 } 128 129 /** 130 * If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that 131 * block will be allocated to heap regardless of desire for off-heap. After de-compressing the 132 * block, the new size may now exceed the min allocation size. This test ensures that those 133 * de-compressed blocks, which will be allocated off-heap, are properly marked as 134 * {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170 135 */ 136 @Test 137 public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize(TestInfo testInfo) 138 throws IOException { 139 Path path = new Path(TEST_UTIL.getDataTestDir(), testInfo.getTestMethod().get().getName()); 140 int totalSize = createTestBlock(path); 141 HFileBlockWrapper blockFromHFile = readBlock(path, totalSize); 142 143 assertFalse(blockFromHFile.original.isUnpacked(), "expected hfile block to NOT be unpacked"); 144 assertFalse(blockFromHFile.original.isSharedMem(), 145 "expected hfile block to NOT use shared memory"); 146 147 assertTrue(blockFromHFile.original.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE, 148 "expected generated block size " + blockFromHFile.original.getOnDiskSizeWithHeader() 149 + " to be less than " + MIN_ALLOCATION_SIZE); 150 assertTrue(blockFromHFile.original.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE, 151 "expected generated block uncompressed size " 152 + blockFromHFile.original.getUncompressedSizeWithoutHeader() + " to be more than " 153 + MIN_ALLOCATION_SIZE); 154 155 assertTrue(blockFromHFile.unpacked.isUnpacked(), "expected unpacked block to be unpacked"); 156 assertTrue(blockFromHFile.unpacked.isSharedMem(), 157 "expected unpacked block to use shared memory"); 158 } 159 160 private final static class HFileBlockWrapper { 161 private final HFileBlock original; 162 private final HFileBlock unpacked; 163 164 private HFileBlockWrapper(HFileBlock original, HFileBlock unpacked) { 165 this.original = original; 166 this.unpacked = unpacked; 167 } 168 } 169 170 private HFileBlockWrapper readBlock(Path path, int totalSize) throws IOException { 171 try (FSDataInputStream is = fs.open(path)) { 172 HFileContext meta = 173 new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ) 174 .withIncludesMvcc(false).withIncludesTags(false).build(); 175 ReaderContext context = 176 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 177 .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); 178 HFileBlock.FSReaderImpl hbr = 179 new HFileBlock.FSReaderImpl(context, meta, allocator, TEST_UTIL.getConfiguration()); 180 hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, TEST_UTIL.getConfiguration()); 181 hbr.setIncludesMemStoreTS(false); 182 HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false); 183 blockFromHFile.sanityCheck(); 184 return new HFileBlockWrapper(blockFromHFile, blockFromHFile.unpack(meta, hbr)); 185 } 186 } 187 188 private int createTestBlock(Path path) throws IOException { 189 HFileContext meta = 190 new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false) 191 .withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 192 193 int totalSize; 194 try (FSDataOutputStream os = fs.create(path)) { 195 HFileBlock.Writer hbw = 196 new HFileBlock.Writer(TEST_UTIL.getConfiguration(), NoOpDataBlockEncoder.INSTANCE, meta); 197 hbw.startWriting(BlockType.DATA); 198 writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1); 199 hbw.writeHeaderAndData(os); 200 totalSize = hbw.getOnDiskSizeWithHeader(); 201 assertTrue(totalSize < MIN_ALLOCATION_SIZE, 202 "expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE); 203 } 204 return totalSize; 205 } 206 207 static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException { 208 Random random = new Random(42); 209 210 byte[] family = new byte[] { 1 }; 211 int rowKey = 0; 212 int qualifier = 0; 213 int value = 0; 214 long timestamp = 0; 215 216 int totalSize = 0; 217 218 // go until just up to the limit. compression should bring the total on-disk size under 219 while (totalSize < desiredSize) { 220 rowKey = maybeIncrement(random, rowKey); 221 qualifier = maybeIncrement(random, qualifier); 222 value = maybeIncrement(random, value); 223 timestamp = maybeIncrement(random, (int) timestamp); 224 225 KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier), 226 timestamp, Bytes.toBytes(value)); 227 hbw.write(keyValue); 228 totalSize += keyValue.getLength(); 229 } 230 231 return totalSize; 232 } 233 234 private static int maybeIncrement(Random random, int value) { 235 if (random.nextFloat() < CHANCE_TO_REPEAT) { 236 return value; 237 } 238 return value + 1; 239 } 240 241}