001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; 021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.ByteArrayInputStream; 027import java.io.DataInputStream; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.nio.BufferUnderflowException; 031import java.nio.ByteBuffer; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.fs.HFileSystem; 040import org.apache.hadoop.hbase.io.ByteBuffAllocator; 041import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 042import org.apache.hadoop.hbase.io.compress.Compression; 043import org.apache.hadoop.hbase.nio.ByteBuff; 044import org.apache.hadoop.hbase.nio.MultiByteBuff; 045import org.apache.hadoop.hbase.nio.SingleByteBuff; 046import org.apache.hadoop.hbase.testclassification.IOTests; 047import org.apache.hadoop.hbase.testclassification.SmallTests; 048import org.apache.hadoop.hbase.util.ChecksumType; 049import org.junit.jupiter.api.BeforeEach; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055@Tag(IOTests.TAG) 056@Tag(SmallTests.TAG) 057public class TestChecksum { 058 059 private static final Logger LOG = LoggerFactory.getLogger(TestChecksum.class); 060 061 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; 062 063 static final int[] BYTES_PER_CHECKSUM = { 50, 500, 688, 16 * 1024, (16 * 1024 + 980), 64 * 1024 }; 064 065 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 066 private FileSystem fs; 067 private HFileSystem hfs; 068 069 @BeforeEach 070 public void setUp() throws Exception { 071 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 072 hfs = (HFileSystem) fs; 073 } 074 075 @Test 076 public void testNewBlocksHaveDefaultChecksum() throws IOException { 077 Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum"); 078 FSDataOutputStream os = fs.create(path); 079 HFileContext meta = new HFileContextBuilder().build(); 080 HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta); 081 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 082 for (int i = 0; i < 1000; ++i) 083 dos.writeInt(i); 084 hbw.writeHeaderAndData(os); 085 int totalSize = hbw.getOnDiskSizeWithHeader(); 086 os.close(); 087 088 // Use hbase checksums. 089 assertEquals(true, hfs.useHBaseChecksum()); 090 091 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 092 meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); 093 ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is) 094 .withFileSize(totalSize).withFileSystem((HFileSystem) fs).withFilePath(path).build(); 095 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP, 096 TEST_UTIL.getConfiguration()); 097 HFileBlock b = hbr.readBlockData(0, -1, false, false, true); 098 assertTrue(!b.isSharedMem()); 099 assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode()); 100 } 101 102 private void verifyMBBCheckSum(ByteBuff buf) throws IOException { 103 int size = buf.remaining() / 2 + 1; 104 ByteBuff mbb = new MultiByteBuff(ByteBuffer.allocate(size), ByteBuffer.allocate(size)) 105 .position(0).limit(buf.remaining()); 106 for (int i = buf.position(); i < buf.limit(); i++) { 107 mbb.put(buf.get(i)); 108 } 109 mbb.position(0).limit(buf.remaining()); 110 assertEquals(mbb.remaining(), buf.remaining()); 111 assertTrue(mbb.remaining() > size); 112 ChecksumUtil.validateChecksum(mbb, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM); 113 } 114 115 private void verifySBBCheckSum(ByteBuff buf) throws IOException { 116 ChecksumUtil.validateChecksum(buf, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM); 117 } 118 119 @Test 120 public void testVerifyCheckSum() throws IOException { 121 int intCount = 10000; 122 for (ChecksumType ckt : ChecksumType.values()) { 123 Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName()); 124 FSDataOutputStream os = fs.create(path); 125 HFileContext meta = new HFileContextBuilder().withChecksumType(ckt).build(); 126 HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta); 127 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 128 for (int i = 0; i < intCount; ++i) { 129 dos.writeInt(i); 130 } 131 hbw.writeHeaderAndData(os); 132 int totalSize = hbw.getOnDiskSizeWithHeader(); 133 os.close(); 134 135 // Use hbase checksums. 136 assertEquals(true, hfs.useHBaseChecksum()); 137 138 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 139 meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); 140 ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is) 141 .withFileSize(totalSize).withFileSystem((HFileSystem) fs).withFilePath(path).build(); 142 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP, 143 TEST_UTIL.getConfiguration()); 144 HFileBlock b = hbr.readBlockData(0, -1, false, false, true); 145 assertTrue(!b.isSharedMem()); 146 147 ByteBuff bufferWithChecksum = getBufferWithChecksum(b); 148 149 // verify SingleByteBuff checksum. 150 verifySBBCheckSum(bufferWithChecksum); 151 152 // verify MultiByteBuff checksum. 153 verifyMBBCheckSum(bufferWithChecksum); 154 155 ByteBuff data = b.getBufferWithoutHeader(); 156 for (int i = 0; i < intCount; i++) { 157 assertEquals(i, data.getInt()); 158 } 159 try { 160 data.getInt(); 161 fail(); 162 } catch (BufferUnderflowException e) { 163 // expected failure 164 } 165 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 166 } 167 } 168 169 /** 170 * HFileBlock buffer does not include checksum because it is discarded after verifying upon 171 * reading from disk. We artificially add a checksum onto the buffer for use in testing that 172 * ChecksumUtil.validateChecksum works for SingleByteBuff and MultiByteBuff in 173 * {@link #verifySBBCheckSum(ByteBuff)} and {@link #verifyMBBCheckSum(ByteBuff)} 174 */ 175 private ByteBuff getBufferWithChecksum(HFileBlock block) throws IOException { 176 ByteBuff buf = block.getBufferReadOnly(); 177 178 int numBytes = 179 (int) ChecksumUtil.numBytes(buf.remaining(), block.getHFileContext().getBytesPerChecksum()); 180 byte[] checksum = new byte[numBytes]; 181 ChecksumUtil.generateChecksums(buf.array(), 0, buf.limit(), checksum, 0, 182 block.getHFileContext().getChecksumType(), block.getBytesPerChecksum()); 183 184 ByteBuff bufWithChecksum = ByteBuffAllocator.HEAP.allocate(buf.limit() + numBytes); 185 bufWithChecksum.put(buf.array(), 0, buf.limit()); 186 bufWithChecksum.put(checksum); 187 188 return bufWithChecksum.rewind(); 189 } 190 191 /** 192 * Introduce checksum failures and check that we can still read the data 193 */ 194 @Test 195 public void testChecksumCorruption() throws IOException { 196 testChecksumCorruptionInternals(false); 197 testChecksumCorruptionInternals(true); 198 } 199 200 protected void testChecksumCorruptionInternals(boolean useTags) throws IOException { 201 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 202 for (boolean pread : new boolean[] { false, true }) { 203 LOG.info("testChecksumCorruption: Compression algorithm: " + algo + ", pread=" + pread); 204 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo); 205 FSDataOutputStream os = fs.create(path); 206 HFileContext meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true) 207 .withIncludesTags(useTags).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 208 HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta); 209 long totalSize = 0; 210 for (int blockId = 0; blockId < 2; ++blockId) { 211 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 212 for (int i = 0; i < 1234; ++i) 213 dos.writeInt(i); 214 hbw.writeHeaderAndData(os); 215 totalSize += hbw.getOnDiskSizeWithHeader(); 216 } 217 os.close(); 218 219 // Use hbase checksums. 220 assertEquals(true, hfs.useHBaseChecksum()); 221 222 // Do a read that purposely introduces checksum verification failures. 223 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 224 meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true) 225 .withIncludesTags(useTags).withHBaseCheckSum(true).build(); 226 ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is) 227 .withFileSize(totalSize).withFileSystem(fs).withFilePath(path).build(); 228 HFileBlock.FSReader hbr = 229 new CorruptedFSReaderImpl(context, meta, TEST_UTIL.getConfiguration()); 230 HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); 231 b.sanityCheck(); 232 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 233 assertEquals(algo == GZ ? 2173 : 4936, 234 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 235 // read data back from the hfile, exclude header and checksum 236 ByteBuff bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data 237 DataInputStream in = 238 new DataInputStream(new ByteArrayInputStream(bb.array(), bb.arrayOffset(), bb.limit())); 239 240 // assert that we encountered hbase checksum verification failures 241 // but still used hdfs checksums and read data successfully. 242 assertEquals(1, HFile.getAndResetChecksumFailuresCount()); 243 validateData(in); 244 245 // A single instance of hbase checksum failure causes the reader to 246 // switch off hbase checksum verification for the next 100 read 247 // requests. Verify that this is correct. 248 for (int i = 0; i < HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) { 249 b = hbr.readBlockData(0, -1, pread, false, true); 250 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 251 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 252 } 253 // The next read should have hbase checksum verification reanabled, 254 // we verify this by assertng that there was a hbase-checksum failure. 255 b = hbr.readBlockData(0, -1, pread, false, true); 256 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 257 assertEquals(1, HFile.getAndResetChecksumFailuresCount()); 258 259 // Since the above encountered a checksum failure, we switch 260 // back to not checking hbase checksums. 261 b = hbr.readBlockData(0, -1, pread, false, true); 262 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 263 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 264 is.close(); 265 266 // Now, use a completely new reader. Switch off hbase checksums in 267 // the configuration. In this case, we should not detect 268 // any retries within hbase. 269 Configuration conf = TEST_UTIL.getConfiguration(); 270 HFileSystem newfs = new HFileSystem(conf, false); 271 assertEquals(false, newfs.useHBaseChecksum()); 272 is = new FSDataInputStreamWrapper(newfs, path); 273 context = new ReaderContextBuilder().withInputStreamWrapper(is).withFileSize(totalSize) 274 .withFileSystem(newfs).withFilePath(path).build(); 275 hbr = new CorruptedFSReaderImpl(context, meta, conf); 276 b = hbr.readBlockData(0, -1, pread, false, true); 277 is.close(); 278 b.sanityCheck(); 279 b = b.unpack(meta, hbr); 280 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 281 assertEquals(algo == GZ ? 2173 : 4936, 282 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 283 // read data back from the hfile, exclude header and checksum 284 bb = b.getBufferWithoutHeader(); // read back data 285 in = 286 new DataInputStream(new ByteArrayInputStream(bb.array(), bb.arrayOffset(), bb.limit())); 287 288 // assert that we did not encounter hbase checksum verification failures 289 // but still used hdfs checksums and read data successfully. 290 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 291 validateData(in); 292 } 293 } 294 } 295 296 /** 297 * Test different values of bytesPerChecksum 298 */ 299 @Test 300 public void testChecksumChunks() throws IOException { 301 testChecksumInternals(false); 302 testChecksumInternals(true); 303 } 304 305 protected void testChecksumInternals(boolean useTags) throws IOException { 306 Compression.Algorithm algo = NONE; 307 for (boolean pread : new boolean[] { false, true }) { 308 for (int bytesPerChecksum : BYTES_PER_CHECKSUM) { 309 Path path = 310 new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" + algo + bytesPerChecksum); 311 FSDataOutputStream os = fs.create(path); 312 HFileContext meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true) 313 .withIncludesTags(useTags).withHBaseCheckSum(true).withBytesPerCheckSum(bytesPerChecksum) 314 .build(); 315 HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta); 316 317 // write one block. The block has data 318 // that is at least 6 times more than the checksum chunk size 319 long dataSize = 0; 320 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 321 for (; dataSize < 6 * bytesPerChecksum;) { 322 for (int i = 0; i < 1234; ++i) { 323 dos.writeInt(i); 324 dataSize += 4; 325 } 326 } 327 hbw.writeHeaderAndData(os); 328 long totalSize = hbw.getOnDiskSizeWithHeader(); 329 os.close(); 330 331 long expectedChunks = 332 ChecksumUtil.numChunks(dataSize + HConstants.HFILEBLOCK_HEADER_SIZE, bytesPerChecksum); 333 LOG.info( 334 "testChecksumChunks: pread={}, bytesPerChecksum={}, fileSize={}, " 335 + "dataSize={}, expectedChunks={}, compression={}", 336 pread, bytesPerChecksum, totalSize, dataSize, expectedChunks, algo.toString()); 337 338 // Verify hbase checksums. 339 assertEquals(true, hfs.useHBaseChecksum()); 340 341 // Read data back from file. 342 FSDataInputStream is = fs.open(path); 343 FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path); 344 meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true) 345 .withIncludesTags(useTags).withHBaseCheckSum(true).withBytesPerCheckSum(bytesPerChecksum) 346 .build(); 347 ReaderContext context = new ReaderContextBuilder() 348 .withInputStreamWrapper(new FSDataInputStreamWrapper(is, nochecksum)) 349 .withFileSize(totalSize).withFileSystem(hfs).withFilePath(path).build(); 350 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP, 351 TEST_UTIL.getConfiguration()); 352 HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); 353 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 354 is.close(); 355 b.sanityCheck(); 356 assertEquals(dataSize, b.getUncompressedSizeWithoutHeader()); 357 358 // verify that we have the expected number of checksum chunks 359 assertEquals(totalSize, 360 HConstants.HFILEBLOCK_HEADER_SIZE + dataSize + expectedChunks * HFileBlock.CHECKSUM_SIZE); 361 362 // assert that we did not encounter hbase checksum verification failures 363 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 364 } 365 } 366 } 367 368 private void validateData(DataInputStream in) throws IOException { 369 // validate data 370 for (int i = 0; i < 1234; i++) { 371 int val = in.readInt(); 372 assertEquals(i, val, "testChecksumCorruption: data mismatch at index " + i); 373 } 374 } 375 376 /** 377 * This class is to test checksum behavior when data is corrupted. It mimics the following 378 * behavior: - When fs checksum is disabled, hbase may get corrupted data from hdfs. If 379 * verifyChecksum is true, it means hbase checksum is on and fs checksum is off, so we corrupt the 380 * data. - When fs checksum is enabled, hdfs will get a different copy from another node, and will 381 * always return correct data. So we don't corrupt the data when verifyChecksum for hbase is off. 382 */ 383 static private class CorruptedFSReaderImpl extends HFileBlock.FSReaderImpl { 384 /** 385 * If set to true, corrupt reads using readAtOffset(...). 386 */ 387 boolean corruptDataStream = false; 388 389 public CorruptedFSReaderImpl(ReaderContext context, HFileContext meta, Configuration conf) 390 throws IOException { 391 super(context, meta, ByteBuffAllocator.HEAP, conf); 392 } 393 394 @Override 395 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 396 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, 397 boolean useHeap) throws IOException { 398 if (verifyChecksum) { 399 corruptDataStream = true; 400 } 401 HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 402 verifyChecksum, updateMetrics, useHeap); 403 corruptDataStream = false; 404 return b; 405 } 406 407 @Override 408 protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, 409 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 410 int destOffset = dest.position(); 411 boolean returnValue = 412 super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread); 413 if (!corruptDataStream) { 414 return returnValue; 415 } 416 // Corrupt 3rd character of block magic of next block's header. 417 if (peekIntoNextBlock) { 418 dest.put(destOffset + size + 3, (byte) 0b00000000); 419 } 420 // We might be reading this block's header too, corrupt it. 421 dest.put(destOffset + 1, (byte) 0b00000000); 422 // Corrupt non header data 423 if (size > hdrSize) { 424 dest.put(destOffset + hdrSize + 1, (byte) 0b00000000); 425 } 426 return returnValue; 427 } 428 } 429}