001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; 021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.ByteArrayInputStream; 027import java.io.DataInputStream; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.nio.BufferUnderflowException; 031import java.nio.ByteBuffer; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.fs.HFileSystem; 041import org.apache.hadoop.hbase.io.ByteBuffAllocator; 042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 043import org.apache.hadoop.hbase.io.compress.Compression; 044import org.apache.hadoop.hbase.nio.ByteBuff; 045import org.apache.hadoop.hbase.nio.MultiByteBuff; 046import org.apache.hadoop.hbase.nio.SingleByteBuff; 047import org.apache.hadoop.hbase.testclassification.IOTests; 048import org.apache.hadoop.hbase.testclassification.SmallTests; 049import org.apache.hadoop.hbase.util.ChecksumType; 050import org.junit.Before; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057@Category({ IOTests.class, SmallTests.class }) 058public class TestChecksum { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestChecksum.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); 065 066 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; 067 068 static final int[] BYTES_PER_CHECKSUM = { 50, 500, 688, 16 * 1024, (16 * 1024 + 980), 64 * 1024 }; 069 070 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 071 private FileSystem fs; 072 private HFileSystem hfs; 073 074 @Before 075 public void setUp() throws Exception { 076 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 077 hfs = (HFileSystem) fs; 078 } 079 080 @Test 081 public void testNewBlocksHaveDefaultChecksum() throws IOException { 082 Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum"); 083 FSDataOutputStream os = fs.create(path); 084 HFileContext meta = new HFileContextBuilder().build(); 085 HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta); 086 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 087 for (int i = 0; i < 1000; ++i) 088 dos.writeInt(i); 089 hbw.writeHeaderAndData(os); 090 int totalSize = hbw.getOnDiskSizeWithHeader(); 091 os.close(); 092 093 // Use hbase checksums. 094 assertEquals(true, hfs.useHBaseChecksum()); 095 096 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 097 meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); 098 ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is) 099 .withFileSize(totalSize).withFileSystem((HFileSystem) fs).withFilePath(path).build(); 100 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP, 101 TEST_UTIL.getConfiguration()); 102 HFileBlock b = hbr.readBlockData(0, -1, false, false, true); 103 assertTrue(!b.isSharedMem()); 104 assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode()); 105 } 106 107 private void verifyMBBCheckSum(ByteBuff buf) throws IOException { 108 int size = buf.remaining() / 2 + 1; 109 ByteBuff mbb = new MultiByteBuff(ByteBuffer.allocate(size), ByteBuffer.allocate(size)) 110 .position(0).limit(buf.remaining()); 111 for (int i = buf.position(); i < buf.limit(); i++) { 112 mbb.put(buf.get(i)); 113 } 114 mbb.position(0).limit(buf.remaining()); 115 assertEquals(mbb.remaining(), buf.remaining()); 116 assertTrue(mbb.remaining() > size); 117 ChecksumUtil.validateChecksum(mbb, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM); 118 } 119 120 private void verifySBBCheckSum(ByteBuff buf) throws IOException { 121 ChecksumUtil.validateChecksum(buf, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM); 122 } 123 124 @Test 125 public void testVerifyCheckSum() throws IOException { 126 int intCount = 10000; 127 for (ChecksumType ckt : ChecksumType.values()) { 128 Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName()); 129 FSDataOutputStream os = fs.create(path); 130 HFileContext meta = new HFileContextBuilder().withChecksumType(ckt).build(); 131 HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta); 132 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 133 for (int i = 0; i < intCount; ++i) { 134 dos.writeInt(i); 135 } 136 hbw.writeHeaderAndData(os); 137 int totalSize = hbw.getOnDiskSizeWithHeader(); 138 os.close(); 139 140 // Use hbase checksums. 141 assertEquals(true, hfs.useHBaseChecksum()); 142 143 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 144 meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); 145 ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is) 146 .withFileSize(totalSize).withFileSystem((HFileSystem) fs).withFilePath(path).build(); 147 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP, 148 TEST_UTIL.getConfiguration()); 149 HFileBlock b = hbr.readBlockData(0, -1, false, false, true); 150 assertTrue(!b.isSharedMem()); 151 152 ByteBuff bufferWithChecksum = getBufferWithChecksum(b); 153 154 // verify SingleByteBuff checksum. 155 verifySBBCheckSum(bufferWithChecksum); 156 157 // verify MultiByteBuff checksum. 158 verifyMBBCheckSum(bufferWithChecksum); 159 160 ByteBuff data = b.getBufferWithoutHeader(); 161 for (int i = 0; i < intCount; i++) { 162 assertEquals(i, data.getInt()); 163 } 164 try { 165 data.getInt(); 166 fail(); 167 } catch (BufferUnderflowException e) { 168 // expected failure 169 } 170 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 171 } 172 } 173 174 /** 175 * HFileBlock buffer does not include checksum because it is discarded after verifying upon 176 * reading from disk. We artificially add a checksum onto the buffer for use in testing that 177 * ChecksumUtil.validateChecksum works for SingleByteBuff and MultiByteBuff in 178 * {@link #verifySBBCheckSum(ByteBuff)} and {@link #verifyMBBCheckSum(ByteBuff)} 179 */ 180 private ByteBuff getBufferWithChecksum(HFileBlock block) throws IOException { 181 ByteBuff buf = block.getBufferReadOnly(); 182 183 int numBytes = 184 (int) ChecksumUtil.numBytes(buf.remaining(), block.getHFileContext().getBytesPerChecksum()); 185 byte[] checksum = new byte[numBytes]; 186 ChecksumUtil.generateChecksums(buf.array(), 0, buf.limit(), checksum, 0, 187 block.getHFileContext().getChecksumType(), block.getBytesPerChecksum()); 188 189 ByteBuff bufWithChecksum = ByteBuffAllocator.HEAP.allocate(buf.limit() + numBytes); 190 bufWithChecksum.put(buf.array(), 0, buf.limit()); 191 bufWithChecksum.put(checksum); 192 193 return bufWithChecksum.rewind(); 194 } 195 196 /** 197 * Introduce checksum failures and check that we can still read the data 198 */ 199 @Test 200 public void testChecksumCorruption() throws IOException { 201 testChecksumCorruptionInternals(false); 202 testChecksumCorruptionInternals(true); 203 } 204 205 protected void testChecksumCorruptionInternals(boolean useTags) throws IOException { 206 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 207 for (boolean pread : new boolean[] { false, true }) { 208 LOG.info("testChecksumCorruption: Compression algorithm: " + algo + ", pread=" + pread); 209 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo); 210 FSDataOutputStream os = fs.create(path); 211 HFileContext meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true) 212 .withIncludesTags(useTags).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 213 HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta); 214 long totalSize = 0; 215 for (int blockId = 0; blockId < 2; ++blockId) { 216 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 217 for (int i = 0; i < 1234; ++i) 218 dos.writeInt(i); 219 hbw.writeHeaderAndData(os); 220 totalSize += hbw.getOnDiskSizeWithHeader(); 221 } 222 os.close(); 223 224 // Use hbase checksums. 225 assertEquals(true, hfs.useHBaseChecksum()); 226 227 // Do a read that purposely introduces checksum verification failures. 228 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 229 meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true) 230 .withIncludesTags(useTags).withHBaseCheckSum(true).build(); 231 ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is) 232 .withFileSize(totalSize).withFileSystem(fs).withFilePath(path).build(); 233 HFileBlock.FSReader hbr = 234 new CorruptedFSReaderImpl(context, meta, TEST_UTIL.getConfiguration()); 235 HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); 236 b.sanityCheck(); 237 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 238 assertEquals(algo == GZ ? 2173 : 4936, 239 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 240 // read data back from the hfile, exclude header and checksum 241 ByteBuff bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data 242 DataInputStream in = 243 new DataInputStream(new ByteArrayInputStream(bb.array(), bb.arrayOffset(), bb.limit())); 244 245 // assert that we encountered hbase checksum verification failures 246 // but still used hdfs checksums and read data successfully. 247 assertEquals(1, HFile.getAndResetChecksumFailuresCount()); 248 validateData(in); 249 250 // A single instance of hbase checksum failure causes the reader to 251 // switch off hbase checksum verification for the next 100 read 252 // requests. Verify that this is correct. 253 for (int i = 0; i < HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) { 254 b = hbr.readBlockData(0, -1, pread, false, true); 255 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 256 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 257 } 258 // The next read should have hbase checksum verification reanabled, 259 // we verify this by assertng that there was a hbase-checksum failure. 260 b = hbr.readBlockData(0, -1, pread, false, true); 261 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 262 assertEquals(1, HFile.getAndResetChecksumFailuresCount()); 263 264 // Since the above encountered a checksum failure, we switch 265 // back to not checking hbase checksums. 266 b = hbr.readBlockData(0, -1, pread, false, true); 267 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 268 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 269 is.close(); 270 271 // Now, use a completely new reader. Switch off hbase checksums in 272 // the configuration. In this case, we should not detect 273 // any retries within hbase. 274 Configuration conf = TEST_UTIL.getConfiguration(); 275 HFileSystem newfs = new HFileSystem(conf, false); 276 assertEquals(false, newfs.useHBaseChecksum()); 277 is = new FSDataInputStreamWrapper(newfs, path); 278 context = new ReaderContextBuilder().withInputStreamWrapper(is).withFileSize(totalSize) 279 .withFileSystem(newfs).withFilePath(path).build(); 280 hbr = new CorruptedFSReaderImpl(context, meta, conf); 281 b = hbr.readBlockData(0, -1, pread, false, true); 282 is.close(); 283 b.sanityCheck(); 284 b = b.unpack(meta, hbr); 285 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 286 assertEquals(algo == GZ ? 2173 : 4936, 287 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 288 // read data back from the hfile, exclude header and checksum 289 bb = b.getBufferWithoutHeader(); // read back data 290 in = 291 new DataInputStream(new ByteArrayInputStream(bb.array(), bb.arrayOffset(), bb.limit())); 292 293 // assert that we did not encounter hbase checksum verification failures 294 // but still used hdfs checksums and read data successfully. 295 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 296 validateData(in); 297 } 298 } 299 } 300 301 /** 302 * Test different values of bytesPerChecksum 303 */ 304 @Test 305 public void testChecksumChunks() throws IOException { 306 testChecksumInternals(false); 307 testChecksumInternals(true); 308 } 309 310 protected void testChecksumInternals(boolean useTags) throws IOException { 311 Compression.Algorithm algo = NONE; 312 for (boolean pread : new boolean[] { false, true }) { 313 for (int bytesPerChecksum : BYTES_PER_CHECKSUM) { 314 Path path = 315 new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" + algo + bytesPerChecksum); 316 FSDataOutputStream os = fs.create(path); 317 HFileContext meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true) 318 .withIncludesTags(useTags).withHBaseCheckSum(true).withBytesPerCheckSum(bytesPerChecksum) 319 .build(); 320 HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta); 321 322 // write one block. The block has data 323 // that is at least 6 times more than the checksum chunk size 324 long dataSize = 0; 325 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 326 for (; dataSize < 6 * bytesPerChecksum;) { 327 for (int i = 0; i < 1234; ++i) { 328 dos.writeInt(i); 329 dataSize += 4; 330 } 331 } 332 hbw.writeHeaderAndData(os); 333 long totalSize = hbw.getOnDiskSizeWithHeader(); 334 os.close(); 335 336 long expectedChunks = 337 ChecksumUtil.numChunks(dataSize + HConstants.HFILEBLOCK_HEADER_SIZE, bytesPerChecksum); 338 LOG.info( 339 "testChecksumChunks: pread={}, bytesPerChecksum={}, fileSize={}, " 340 + "dataSize={}, expectedChunks={}, compression={}", 341 pread, bytesPerChecksum, totalSize, dataSize, expectedChunks, algo.toString()); 342 343 // Verify hbase checksums. 344 assertEquals(true, hfs.useHBaseChecksum()); 345 346 // Read data back from file. 347 FSDataInputStream is = fs.open(path); 348 FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path); 349 meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true) 350 .withIncludesTags(useTags).withHBaseCheckSum(true).withBytesPerCheckSum(bytesPerChecksum) 351 .build(); 352 ReaderContext context = new ReaderContextBuilder() 353 .withInputStreamWrapper(new FSDataInputStreamWrapper(is, nochecksum)) 354 .withFileSize(totalSize).withFileSystem(hfs).withFilePath(path).build(); 355 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP, 356 TEST_UTIL.getConfiguration()); 357 HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); 358 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 359 is.close(); 360 b.sanityCheck(); 361 assertEquals(dataSize, b.getUncompressedSizeWithoutHeader()); 362 363 // verify that we have the expected number of checksum chunks 364 assertEquals(totalSize, 365 HConstants.HFILEBLOCK_HEADER_SIZE + dataSize + expectedChunks * HFileBlock.CHECKSUM_SIZE); 366 367 // assert that we did not encounter hbase checksum verification failures 368 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 369 } 370 } 371 } 372 373 private void validateData(DataInputStream in) throws IOException { 374 // validate data 375 for (int i = 0; i < 1234; i++) { 376 int val = in.readInt(); 377 assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val); 378 } 379 } 380 381 /** 382 * This class is to test checksum behavior when data is corrupted. It mimics the following 383 * behavior: - When fs checksum is disabled, hbase may get corrupted data from hdfs. If 384 * verifyChecksum is true, it means hbase checksum is on and fs checksum is off, so we corrupt the 385 * data. - When fs checksum is enabled, hdfs will get a different copy from another node, and will 386 * always return correct data. So we don't corrupt the data when verifyChecksum for hbase is off. 387 */ 388 static private class CorruptedFSReaderImpl extends HFileBlock.FSReaderImpl { 389 /** 390 * If set to true, corrupt reads using readAtOffset(...). 391 */ 392 boolean corruptDataStream = false; 393 394 public CorruptedFSReaderImpl(ReaderContext context, HFileContext meta, Configuration conf) 395 throws IOException { 396 super(context, meta, ByteBuffAllocator.HEAP, conf); 397 } 398 399 @Override 400 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 401 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, 402 boolean useHeap) throws IOException { 403 if (verifyChecksum) { 404 corruptDataStream = true; 405 } 406 HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 407 verifyChecksum, updateMetrics, useHeap); 408 corruptDataStream = false; 409 return b; 410 } 411 412 @Override 413 protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, 414 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 415 int destOffset = dest.position(); 416 boolean returnValue = 417 super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread); 418 if (!corruptDataStream) { 419 return returnValue; 420 } 421 // Corrupt 3rd character of block magic of next block's header. 422 if (peekIntoNextBlock) { 423 dest.put(destOffset + size + 3, (byte) 0b00000000); 424 } 425 // We might be reading this block's header too, corrupt it. 426 dest.put(destOffset + 1, (byte) 0b00000000); 427 // Corrupt non header data 428 if (size > hdrSize) { 429 dest.put(destOffset + hdrSize + 1, (byte) 0b00000000); 430 } 431 return returnValue; 432 } 433 } 434}