001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; 021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.ByteArrayInputStream; 027import java.io.DataInputStream; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.nio.BufferUnderflowException; 031import java.nio.ByteBuffer; 032 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.fs.HFileSystem; 041import org.apache.hadoop.hbase.io.ByteBuffAllocator; 042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 043import org.apache.hadoop.hbase.io.compress.Compression; 044import org.apache.hadoop.hbase.nio.ByteBuff; 045import org.apache.hadoop.hbase.nio.MultiByteBuff; 046import org.apache.hadoop.hbase.nio.SingleByteBuff; 047import org.apache.hadoop.hbase.testclassification.IOTests; 048import org.apache.hadoop.hbase.testclassification.SmallTests; 049import org.apache.hadoop.hbase.util.ChecksumType; 050import org.junit.Before; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057@Category({IOTests.class, SmallTests.class}) 058public class TestChecksum { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestChecksum.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); 065 066 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { 067 NONE, GZ }; 068 069 static final int[] BYTES_PER_CHECKSUM = { 070 50, 500, 688, 16*1024, (16*1024+980), 64 * 1024}; 071 072 private static final HBaseTestingUtility TEST_UTIL = 073 new HBaseTestingUtility(); 074 private FileSystem fs; 075 private HFileSystem hfs; 076 077 @Before 078 public void setUp() throws Exception { 079 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 080 hfs = (HFileSystem)fs; 081 } 082 083 @Test 084 public void testNewBlocksHaveDefaultChecksum() throws IOException { 085 Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum"); 086 FSDataOutputStream os = fs.create(path); 087 HFileContext meta = new HFileContextBuilder().build(); 088 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 089 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 090 for (int i = 0; i < 1000; ++i) 091 dos.writeInt(i); 092 hbw.writeHeaderAndData(os); 093 int totalSize = hbw.getOnDiskSizeWithHeader(); 094 os.close(); 095 096 // Use hbase checksums. 097 assertEquals(true, hfs.useHBaseChecksum()); 098 099 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 100 meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); 101 ReaderContext context = new ReaderContextBuilder() 102 .withInputStreamWrapper(is) 103 .withFileSize(totalSize) 104 .withFileSystem((HFileSystem) fs) 105 .withFilePath(path) 106 .build(); 107 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, 108 meta, ByteBuffAllocator.HEAP); 109 HFileBlock b = hbr.readBlockData(0, -1, false, false, true); 110 assertTrue(!b.isSharedMem()); 111 assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode()); 112 } 113 114 private void verifyMBBCheckSum(ByteBuff buf) throws IOException { 115 int size = buf.remaining() / 2 + 1; 116 ByteBuff mbb = new MultiByteBuff(ByteBuffer.allocate(size), ByteBuffer.allocate(size)) 117 .position(0).limit(buf.remaining()); 118 for (int i = buf.position(); i < buf.limit(); i++) { 119 mbb.put(buf.get(i)); 120 } 121 mbb.position(0).limit(buf.remaining()); 122 assertEquals(mbb.remaining(), buf.remaining()); 123 assertTrue(mbb.remaining() > size); 124 ChecksumUtil.validateChecksum(mbb, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM); 125 } 126 127 private void verifySBBCheckSum(ByteBuff buf) throws IOException { 128 ChecksumUtil.validateChecksum(buf, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM); 129 } 130 131 @Test 132 public void testVerifyCheckSum() throws IOException { 133 int intCount = 10000; 134 for (ChecksumType ckt : ChecksumType.values()) { 135 Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName()); 136 FSDataOutputStream os = fs.create(path); 137 HFileContext meta = new HFileContextBuilder() 138 .withChecksumType(ckt) 139 .build(); 140 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 141 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 142 for (int i = 0; i < intCount; ++i) { 143 dos.writeInt(i); 144 } 145 hbw.writeHeaderAndData(os); 146 int totalSize = hbw.getOnDiskSizeWithHeader(); 147 os.close(); 148 149 // Use hbase checksums. 150 assertEquals(true, hfs.useHBaseChecksum()); 151 152 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 153 meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); 154 ReaderContext context = new ReaderContextBuilder() 155 .withInputStreamWrapper(is) 156 .withFileSize(totalSize) 157 .withFileSystem((HFileSystem) fs) 158 .withFilePath(path) 159 .build(); 160 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, 161 meta, ByteBuffAllocator.HEAP); 162 HFileBlock b = hbr.readBlockData(0, -1, false, false, true); 163 assertTrue(!b.isSharedMem()); 164 165 // verify SingleByteBuff checksum. 166 verifySBBCheckSum(b.getBufferReadOnly()); 167 168 // verify MultiByteBuff checksum. 169 verifyMBBCheckSum(b.getBufferReadOnly()); 170 171 ByteBuff data = b.getBufferWithoutHeader(); 172 for (int i = 0; i < intCount; i++) { 173 assertEquals(i, data.getInt()); 174 } 175 try { 176 data.getInt(); 177 fail(); 178 } catch (BufferUnderflowException e) { 179 // expected failure 180 } 181 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 182 } 183 } 184 185 /** 186 * Introduce checksum failures and check that we can still read 187 * the data 188 */ 189 @Test 190 public void testChecksumCorruption() throws IOException { 191 testChecksumCorruptionInternals(false); 192 testChecksumCorruptionInternals(true); 193 } 194 195 protected void testChecksumCorruptionInternals(boolean useTags) throws IOException { 196 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 197 for (boolean pread : new boolean[] { false, true }) { 198 LOG.info("testChecksumCorruption: Compression algorithm: " + algo + 199 ", pread=" + pread); 200 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" 201 + algo); 202 FSDataOutputStream os = fs.create(path); 203 HFileContext meta = new HFileContextBuilder() 204 .withCompression(algo) 205 .withIncludesMvcc(true) 206 .withIncludesTags(useTags) 207 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 208 .build(); 209 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 210 long totalSize = 0; 211 for (int blockId = 0; blockId < 2; ++blockId) { 212 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 213 for (int i = 0; i < 1234; ++i) 214 dos.writeInt(i); 215 hbw.writeHeaderAndData(os); 216 totalSize += hbw.getOnDiskSizeWithHeader(); 217 } 218 os.close(); 219 220 // Use hbase checksums. 221 assertEquals(true, hfs.useHBaseChecksum()); 222 223 // Do a read that purposely introduces checksum verification failures. 224 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 225 meta = new HFileContextBuilder() 226 .withCompression(algo) 227 .withIncludesMvcc(true) 228 .withIncludesTags(useTags) 229 .withHBaseCheckSum(true) 230 .build(); 231 ReaderContext context = new ReaderContextBuilder() 232 .withInputStreamWrapper(is) 233 .withFileSize(totalSize) 234 .withFileSystem(fs) 235 .withFilePath(path) 236 .build(); 237 HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(context, meta); 238 HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); 239 b.sanityCheck(); 240 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 241 assertEquals(algo == GZ ? 2173 : 4936, 242 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 243 // read data back from the hfile, exclude header and checksum 244 ByteBuff bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data 245 DataInputStream in = new DataInputStream( 246 new ByteArrayInputStream( 247 bb.array(), bb.arrayOffset(), bb.limit())); 248 249 // assert that we encountered hbase checksum verification failures 250 // but still used hdfs checksums and read data successfully. 251 assertEquals(1, HFile.getAndResetChecksumFailuresCount()); 252 validateData(in); 253 254 // A single instance of hbase checksum failure causes the reader to 255 // switch off hbase checksum verification for the next 100 read 256 // requests. Verify that this is correct. 257 for (int i = 0; i < 258 HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) { 259 b = hbr.readBlockData(0, -1, pread, false, true); 260 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 261 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 262 } 263 // The next read should have hbase checksum verification reanabled, 264 // we verify this by assertng that there was a hbase-checksum failure. 265 b = hbr.readBlockData(0, -1, pread, false, true); 266 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 267 assertEquals(1, HFile.getAndResetChecksumFailuresCount()); 268 269 // Since the above encountered a checksum failure, we switch 270 // back to not checking hbase checksums. 271 b = hbr.readBlockData(0, -1, pread, false, true); 272 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 273 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 274 is.close(); 275 276 // Now, use a completely new reader. Switch off hbase checksums in 277 // the configuration. In this case, we should not detect 278 // any retries within hbase. 279 HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false); 280 assertEquals(false, newfs.useHBaseChecksum()); 281 is = new FSDataInputStreamWrapper(newfs, path); 282 context = new ReaderContextBuilder() 283 .withInputStreamWrapper(is) 284 .withFileSize(totalSize) 285 .withFileSystem(newfs) 286 .withFilePath(path) 287 .build(); 288 hbr = new CorruptedFSReaderImpl(context, meta); 289 b = hbr.readBlockData(0, -1, pread, false, true); 290 is.close(); 291 b.sanityCheck(); 292 b = b.unpack(meta, hbr); 293 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 294 assertEquals(algo == GZ ? 2173 : 4936, 295 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 296 // read data back from the hfile, exclude header and checksum 297 bb = b.getBufferWithoutHeader(); // read back data 298 in = new DataInputStream(new ByteArrayInputStream( 299 bb.array(), bb.arrayOffset(), bb.limit())); 300 301 // assert that we did not encounter hbase checksum verification failures 302 // but still used hdfs checksums and read data successfully. 303 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 304 validateData(in); 305 } 306 } 307 } 308 309 /** 310 * Test different values of bytesPerChecksum 311 */ 312 @Test 313 public void testChecksumChunks() throws IOException { 314 testChecksumInternals(false); 315 testChecksumInternals(true); 316 } 317 318 protected void testChecksumInternals(boolean useTags) throws IOException { 319 Compression.Algorithm algo = NONE; 320 for (boolean pread : new boolean[] { false, true }) { 321 for (int bytesPerChecksum : BYTES_PER_CHECKSUM) { 322 Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" + 323 algo + bytesPerChecksum); 324 FSDataOutputStream os = fs.create(path); 325 HFileContext meta = new HFileContextBuilder() 326 .withCompression(algo) 327 .withIncludesMvcc(true) 328 .withIncludesTags(useTags) 329 .withHBaseCheckSum(true) 330 .withBytesPerCheckSum(bytesPerChecksum) 331 .build(); 332 HFileBlock.Writer hbw = new HFileBlock.Writer(null, 333 meta); 334 335 // write one block. The block has data 336 // that is at least 6 times more than the checksum chunk size 337 long dataSize = 0; 338 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 339 for (; dataSize < 6 * bytesPerChecksum;) { 340 for (int i = 0; i < 1234; ++i) { 341 dos.writeInt(i); 342 dataSize += 4; 343 } 344 } 345 hbw.writeHeaderAndData(os); 346 long totalSize = hbw.getOnDiskSizeWithHeader(); 347 os.close(); 348 349 long expectedChunks = ChecksumUtil.numChunks( 350 dataSize + HConstants.HFILEBLOCK_HEADER_SIZE, 351 bytesPerChecksum); 352 LOG.info("testChecksumChunks: pread={}, bytesPerChecksum={}, fileSize={}, " 353 + "dataSize={}, expectedChunks={}, compression={}", pread, bytesPerChecksum, 354 totalSize, dataSize, expectedChunks, algo.toString()); 355 356 // Verify hbase checksums. 357 assertEquals(true, hfs.useHBaseChecksum()); 358 359 // Read data back from file. 360 FSDataInputStream is = fs.open(path); 361 FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path); 362 meta = new HFileContextBuilder() 363 .withCompression(algo) 364 .withIncludesMvcc(true) 365 .withIncludesTags(useTags) 366 .withHBaseCheckSum(true) 367 .withBytesPerCheckSum(bytesPerChecksum) 368 .build(); 369 ReaderContext context = new ReaderContextBuilder() 370 .withInputStreamWrapper(new FSDataInputStreamWrapper(is, nochecksum)) 371 .withFileSize(totalSize) 372 .withFileSystem(hfs) 373 .withFilePath(path) 374 .build(); 375 HFileBlock.FSReader hbr = 376 new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP); 377 HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); 378 assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); 379 is.close(); 380 b.sanityCheck(); 381 assertEquals(dataSize, b.getUncompressedSizeWithoutHeader()); 382 383 // verify that we have the expected number of checksum chunks 384 assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize + 385 expectedChunks * HFileBlock.CHECKSUM_SIZE); 386 387 // assert that we did not encounter hbase checksum verification failures 388 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 389 } 390 } 391 } 392 393 private void validateData(DataInputStream in) throws IOException { 394 // validate data 395 for (int i = 0; i < 1234; i++) { 396 int val = in.readInt(); 397 assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val); 398 } 399 } 400 401 /** 402 * This class is to test checksum behavior when data is corrupted. It mimics the following 403 * behavior: 404 * - When fs checksum is disabled, hbase may get corrupted data from hdfs. If verifyChecksum 405 * is true, it means hbase checksum is on and fs checksum is off, so we corrupt the data. 406 * - When fs checksum is enabled, hdfs will get a different copy from another node, and will 407 * always return correct data. So we don't corrupt the data when verifyChecksum for hbase is 408 * off. 409 */ 410 static private class CorruptedFSReaderImpl extends HFileBlock.FSReaderImpl { 411 /** 412 * If set to true, corrupt reads using readAtOffset(...). 413 */ 414 boolean corruptDataStream = false; 415 416 public CorruptedFSReaderImpl(ReaderContext context, HFileContext meta) throws IOException { 417 super(context, meta, ByteBuffAllocator.HEAP); 418 } 419 420 @Override 421 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 422 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, 423 boolean useHeap) throws IOException { 424 if (verifyChecksum) { 425 corruptDataStream = true; 426 } 427 HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 428 verifyChecksum, updateMetrics, useHeap); 429 corruptDataStream = false; 430 return b; 431 } 432 433 434 @Override 435 protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, 436 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 437 int destOffset = dest.position(); 438 boolean returnValue = 439 super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread); 440 if (!corruptDataStream) { 441 return returnValue; 442 } 443 // Corrupt 3rd character of block magic of next block's header. 444 if (peekIntoNextBlock) { 445 dest.put(destOffset + size + 3, (byte) 0b00000000); 446 } 447 // We might be reading this block's header too, corrupt it. 448 dest.put(destOffset + 1, (byte) 0b00000000); 449 // Corrupt non header data 450 if (size > hdrSize) { 451 dest.put(destOffset + hdrSize + 1, (byte) 0b00000000); 452 } 453 return returnValue; 454 } 455 } 456}