001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; 021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.ByteArrayInputStream; 026import java.io.DataInputStream; 027import java.io.DataOutputStream; 028import java.io.IOException; 029import java.nio.BufferUnderflowException; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Iterator; 033import java.util.List; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.fs.HFileSystem; 042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 043import org.apache.hadoop.hbase.io.compress.Compression; 044import org.apache.hadoop.hbase.nio.ByteBuff; 045import org.apache.hadoop.hbase.testclassification.IOTests; 046import org.apache.hadoop.hbase.testclassification.SmallTests; 047import org.apache.hadoop.hbase.util.ChecksumType; 048import org.junit.Before; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055@Category({IOTests.class, SmallTests.class}) 056public class TestChecksum { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestChecksum.class); 061 062 private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); 063 064 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { 065 NONE, GZ }; 066 067 static final int[] BYTES_PER_CHECKSUM = { 068 50, 500, 688, 16*1024, (16*1024+980), 64 * 1024}; 069 070 private static final HBaseTestingUtility TEST_UTIL = 071 new HBaseTestingUtility(); 072 private FileSystem fs; 073 private HFileSystem hfs; 074 075 @Before 076 public void setUp() throws Exception { 077 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 078 hfs = (HFileSystem)fs; 079 } 080 081 @Test 082 public void testNewBlocksHaveDefaultChecksum() throws IOException { 083 Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum"); 084 FSDataOutputStream os = fs.create(path); 085 HFileContext meta = new HFileContextBuilder().build(); 086 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 087 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 088 for (int i = 0; i < 1000; ++i) 089 dos.writeInt(i); 090 hbw.writeHeaderAndData(os); 091 int totalSize = hbw.getOnDiskSizeWithHeader(); 092 os.close(); 093 094 // Use hbase checksums. 095 assertEquals(true, hfs.useHBaseChecksum()); 096 097 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 098 meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); 099 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl( 100 is, totalSize, (HFileSystem) fs, path, meta); 101 HFileBlock b = hbr.readBlockData(0, -1, false, false); 102 assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode()); 103 } 104 105 /** 106 * Test all checksum types by writing and reading back blocks. 107 */ 108 @Test 109 public void testAllChecksumTypes() throws IOException { 110 List<ChecksumType> cktypes = new ArrayList<>(Arrays.asList(ChecksumType.values())); 111 for (Iterator<ChecksumType> itr = cktypes.iterator(); itr.hasNext(); ) { 112 ChecksumType cktype = itr.next(); 113 Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName()); 114 FSDataOutputStream os = fs.create(path); 115 HFileContext meta = new HFileContextBuilder() 116 .withChecksumType(cktype) 117 .build(); 118 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 119 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 120 for (int i = 0; i < 1000; ++i) { 121 dos.writeInt(i); 122 } 123 hbw.writeHeaderAndData(os); 124 int totalSize = hbw.getOnDiskSizeWithHeader(); 125 os.close(); 126 127 // Use hbase checksums. 128 assertEquals(true, hfs.useHBaseChecksum()); 129 130 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 131 meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); 132 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl( 133 is, totalSize, (HFileSystem) fs, path, meta); 134 HFileBlock b = hbr.readBlockData(0, -1, false, false); 135 ByteBuff data = b.getBufferWithoutHeader(); 136 for (int i = 0; i < 1000; i++) { 137 assertEquals(i, data.getInt()); 138 } 139 boolean exception_thrown = false; 140 try { 141 data.getInt(); 142 } catch (BufferUnderflowException e) { 143 exception_thrown = true; 144 } 145 assertTrue(exception_thrown); 146 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 147 } 148 } 149 150 /** 151 * Introduce checksum failures and check that we can still read 152 * the data 153 */ 154 @Test 155 public void testChecksumCorruption() throws IOException { 156 testChecksumCorruptionInternals(false); 157 testChecksumCorruptionInternals(true); 158 } 159 160 protected void testChecksumCorruptionInternals(boolean useTags) throws IOException { 161 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 162 for (boolean pread : new boolean[] { false, true }) { 163 LOG.info("testChecksumCorruption: Compression algorithm: " + algo + 164 ", pread=" + pread); 165 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" 166 + algo); 167 FSDataOutputStream os = fs.create(path); 168 HFileContext meta = new HFileContextBuilder() 169 .withCompression(algo) 170 .withIncludesMvcc(true) 171 .withIncludesTags(useTags) 172 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 173 .build(); 174 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 175 long totalSize = 0; 176 for (int blockId = 0; blockId < 2; ++blockId) { 177 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 178 for (int i = 0; i < 1234; ++i) 179 dos.writeInt(i); 180 hbw.writeHeaderAndData(os); 181 totalSize += hbw.getOnDiskSizeWithHeader(); 182 } 183 os.close(); 184 185 // Use hbase checksums. 186 assertEquals(true, hfs.useHBaseChecksum()); 187 188 // Do a read that purposely introduces checksum verification failures. 189 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path); 190 meta = new HFileContextBuilder() 191 .withCompression(algo) 192 .withIncludesMvcc(true) 193 .withIncludesTags(useTags) 194 .withHBaseCheckSum(true) 195 .build(); 196 HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, path, meta); 197 HFileBlock b = hbr.readBlockData(0, -1, pread, false); 198 b.sanityCheck(); 199 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 200 assertEquals(algo == GZ ? 2173 : 4936, 201 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 202 // read data back from the hfile, exclude header and checksum 203 ByteBuff bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data 204 DataInputStream in = new DataInputStream( 205 new ByteArrayInputStream( 206 bb.array(), bb.arrayOffset(), bb.limit())); 207 208 // assert that we encountered hbase checksum verification failures 209 // but still used hdfs checksums and read data successfully. 210 assertEquals(1, HFile.getAndResetChecksumFailuresCount()); 211 validateData(in); 212 213 // A single instance of hbase checksum failure causes the reader to 214 // switch off hbase checksum verification for the next 100 read 215 // requests. Verify that this is correct. 216 for (int i = 0; i < 217 HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) { 218 b = hbr.readBlockData(0, -1, pread, false); 219 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 220 } 221 // The next read should have hbase checksum verification reanabled, 222 // we verify this by assertng that there was a hbase-checksum failure. 223 b = hbr.readBlockData(0, -1, pread, false); 224 assertEquals(1, HFile.getAndResetChecksumFailuresCount()); 225 226 // Since the above encountered a checksum failure, we switch 227 // back to not checking hbase checksums. 228 b = hbr.readBlockData(0, -1, pread, false); 229 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 230 is.close(); 231 232 // Now, use a completely new reader. Switch off hbase checksums in 233 // the configuration. In this case, we should not detect 234 // any retries within hbase. 235 HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false); 236 assertEquals(false, newfs.useHBaseChecksum()); 237 is = new FSDataInputStreamWrapper(newfs, path); 238 hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta); 239 b = hbr.readBlockData(0, -1, pread, false); 240 is.close(); 241 b.sanityCheck(); 242 b = b.unpack(meta, hbr); 243 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 244 assertEquals(algo == GZ ? 2173 : 4936, 245 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 246 // read data back from the hfile, exclude header and checksum 247 bb = b.getBufferWithoutHeader(); // read back data 248 in = new DataInputStream(new ByteArrayInputStream( 249 bb.array(), bb.arrayOffset(), bb.limit())); 250 251 // assert that we did not encounter hbase checksum verification failures 252 // but still used hdfs checksums and read data successfully. 253 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 254 validateData(in); 255 } 256 } 257 } 258 259 /** 260 * Test different values of bytesPerChecksum 261 */ 262 @Test 263 public void testChecksumChunks() throws IOException { 264 testChecksumInternals(false); 265 testChecksumInternals(true); 266 } 267 268 protected void testChecksumInternals(boolean useTags) throws IOException { 269 Compression.Algorithm algo = NONE; 270 for (boolean pread : new boolean[] { false, true }) { 271 for (int bytesPerChecksum : BYTES_PER_CHECKSUM) { 272 Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" + 273 algo + bytesPerChecksum); 274 FSDataOutputStream os = fs.create(path); 275 HFileContext meta = new HFileContextBuilder() 276 .withCompression(algo) 277 .withIncludesMvcc(true) 278 .withIncludesTags(useTags) 279 .withHBaseCheckSum(true) 280 .withBytesPerCheckSum(bytesPerChecksum) 281 .build(); 282 HFileBlock.Writer hbw = new HFileBlock.Writer(null, 283 meta); 284 285 // write one block. The block has data 286 // that is at least 6 times more than the checksum chunk size 287 long dataSize = 0; 288 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 289 for (; dataSize < 6 * bytesPerChecksum;) { 290 for (int i = 0; i < 1234; ++i) { 291 dos.writeInt(i); 292 dataSize += 4; 293 } 294 } 295 hbw.writeHeaderAndData(os); 296 long totalSize = hbw.getOnDiskSizeWithHeader(); 297 os.close(); 298 299 long expectedChunks = ChecksumUtil.numChunks( 300 dataSize + HConstants.HFILEBLOCK_HEADER_SIZE, 301 bytesPerChecksum); 302 LOG.info("testChecksumChunks: pread={}, bytesPerChecksum={}, fileSize={}, " 303 + "dataSize={}, expectedChunks={}, compression={}", pread, bytesPerChecksum, 304 totalSize, dataSize, expectedChunks, algo.toString()); 305 306 // Verify hbase checksums. 307 assertEquals(true, hfs.useHBaseChecksum()); 308 309 // Read data back from file. 310 FSDataInputStream is = fs.open(path); 311 FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path); 312 meta = new HFileContextBuilder() 313 .withCompression(algo) 314 .withIncludesMvcc(true) 315 .withIncludesTags(useTags) 316 .withHBaseCheckSum(true) 317 .withBytesPerCheckSum(bytesPerChecksum) 318 .build(); 319 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper( 320 is, nochecksum), totalSize, hfs, path, meta); 321 HFileBlock b = hbr.readBlockData(0, -1, pread, false); 322 is.close(); 323 b.sanityCheck(); 324 assertEquals(dataSize, b.getUncompressedSizeWithoutHeader()); 325 326 // verify that we have the expected number of checksum chunks 327 assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize + 328 expectedChunks * HFileBlock.CHECKSUM_SIZE); 329 330 // assert that we did not encounter hbase checksum verification failures 331 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 332 } 333 } 334 } 335 336 private void validateData(DataInputStream in) throws IOException { 337 // validate data 338 for (int i = 0; i < 1234; i++) { 339 int val = in.readInt(); 340 assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val); 341 } 342 } 343 344 /** 345 * This class is to test checksum behavior when data is corrupted. It mimics the following 346 * behavior: 347 * - When fs checksum is disabled, hbase may get corrupted data from hdfs. If verifyChecksum 348 * is true, it means hbase checksum is on and fs checksum is off, so we corrupt the data. 349 * - When fs checksum is enabled, hdfs will get a different copy from another node, and will 350 * always return correct data. So we don't corrupt the data when verifyChecksum for hbase is 351 * off. 352 */ 353 static private class CorruptedFSReaderImpl extends HFileBlock.FSReaderImpl { 354 /** 355 * If set to true, corrupt reads using readAtOffset(...). 356 */ 357 boolean corruptDataStream = false; 358 359 public CorruptedFSReaderImpl(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs, 360 Path path, HFileContext meta) throws IOException { 361 super(istream, fileSize, (HFileSystem) fs, path, meta); 362 } 363 364 @Override 365 protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, 366 long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics) 367 throws IOException { 368 if (verifyChecksum) { 369 corruptDataStream = true; 370 } 371 HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, 372 verifyChecksum, updateMetrics); 373 corruptDataStream = false; 374 return b; 375 } 376 377 @Override 378 protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size, 379 boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { 380 int returnValue = super.readAtOffset(istream, dest, destOffset, size, peekIntoNextBlock, 381 fileOffset, pread); 382 if (!corruptDataStream) { 383 return returnValue; 384 } 385 // Corrupt 3rd character of block magic of next block's header. 386 if (peekIntoNextBlock) { 387 dest[destOffset + size + 3] = 0b00000000; 388 } 389 // We might be reading this block's header too, corrupt it. 390 dest[destOffset + 1] = 0b00000000; 391 // Corrupt non header data 392 if (size > hdrSize) { 393 dest[destOffset + hdrSize + 1] = 0b00000000; 394 } 395 return returnValue; 396 } 397 } 398}