001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.ByteArrayInputStream;
027import java.io.DataInputStream;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.nio.BufferUnderflowException;
031import java.nio.ByteBuffer;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.fs.HFileSystem;
041import org.apache.hadoop.hbase.io.ByteBuffAllocator;
042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
043import org.apache.hadoop.hbase.io.compress.Compression;
044import org.apache.hadoop.hbase.nio.ByteBuff;
045import org.apache.hadoop.hbase.nio.MultiByteBuff;
046import org.apache.hadoop.hbase.nio.SingleByteBuff;
047import org.apache.hadoop.hbase.testclassification.IOTests;
048import org.apache.hadoop.hbase.testclassification.SmallTests;
049import org.apache.hadoop.hbase.util.ChecksumType;
050import org.junit.Before;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057@Category({ IOTests.class, SmallTests.class })
058public class TestChecksum {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestChecksum.class);
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestChecksum.class);
065
066  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
067
068  static final int[] BYTES_PER_CHECKSUM = { 50, 500, 688, 16 * 1024, (16 * 1024 + 980), 64 * 1024 };
069
070  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
071  private FileSystem fs;
072  private HFileSystem hfs;
073
074  @Before
075  public void setUp() throws Exception {
076    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
077    hfs = (HFileSystem) fs;
078  }
079
080  @Test
081  public void testNewBlocksHaveDefaultChecksum() throws IOException {
082    Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
083    FSDataOutputStream os = fs.create(path);
084    HFileContext meta = new HFileContextBuilder().build();
085    HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
086    DataOutputStream dos = hbw.startWriting(BlockType.DATA);
087    for (int i = 0; i < 1000; ++i)
088      dos.writeInt(i);
089    hbw.writeHeaderAndData(os);
090    int totalSize = hbw.getOnDiskSizeWithHeader();
091    os.close();
092
093    // Use hbase checksums.
094    assertEquals(true, hfs.useHBaseChecksum());
095
096    FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
097    meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
098    ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is)
099      .withFileSize(totalSize).withFileSystem((HFileSystem) fs).withFilePath(path).build();
100    HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP,
101      TEST_UTIL.getConfiguration());
102    HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
103    assertTrue(!b.isSharedMem());
104    assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
105  }
106
107  private void verifyMBBCheckSum(ByteBuff buf) throws IOException {
108    int size = buf.remaining() / 2 + 1;
109    ByteBuff mbb = new MultiByteBuff(ByteBuffer.allocate(size), ByteBuffer.allocate(size))
110      .position(0).limit(buf.remaining());
111    for (int i = buf.position(); i < buf.limit(); i++) {
112      mbb.put(buf.get(i));
113    }
114    mbb.position(0).limit(buf.remaining());
115    assertEquals(mbb.remaining(), buf.remaining());
116    assertTrue(mbb.remaining() > size);
117    ChecksumUtil.validateChecksum(mbb, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM);
118  }
119
120  private void verifySBBCheckSum(ByteBuff buf) throws IOException {
121    ChecksumUtil.validateChecksum(buf, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM);
122  }
123
124  @Test
125  public void testVerifyCheckSum() throws IOException {
126    int intCount = 10000;
127    for (ChecksumType ckt : ChecksumType.values()) {
128      Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName());
129      FSDataOutputStream os = fs.create(path);
130      HFileContext meta = new HFileContextBuilder().withChecksumType(ckt).build();
131      HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
132      DataOutputStream dos = hbw.startWriting(BlockType.DATA);
133      for (int i = 0; i < intCount; ++i) {
134        dos.writeInt(i);
135      }
136      hbw.writeHeaderAndData(os);
137      int totalSize = hbw.getOnDiskSizeWithHeader();
138      os.close();
139
140      // Use hbase checksums.
141      assertEquals(true, hfs.useHBaseChecksum());
142
143      FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
144      meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
145      ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is)
146        .withFileSize(totalSize).withFileSystem((HFileSystem) fs).withFilePath(path).build();
147      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP,
148        TEST_UTIL.getConfiguration());
149      HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
150      assertTrue(!b.isSharedMem());
151
152      ByteBuff bufferWithChecksum = getBufferWithChecksum(b);
153
154      // verify SingleByteBuff checksum.
155      verifySBBCheckSum(bufferWithChecksum);
156
157      // verify MultiByteBuff checksum.
158      verifyMBBCheckSum(bufferWithChecksum);
159
160      ByteBuff data = b.getBufferWithoutHeader();
161      for (int i = 0; i < intCount; i++) {
162        assertEquals(i, data.getInt());
163      }
164      try {
165        data.getInt();
166        fail();
167      } catch (BufferUnderflowException e) {
168        // expected failure
169      }
170      assertEquals(0, HFile.getAndResetChecksumFailuresCount());
171    }
172  }
173
174  /**
175   * HFileBlock buffer does not include checksum because it is discarded after verifying upon
176   * reading from disk. We artificially add a checksum onto the buffer for use in testing that
177   * ChecksumUtil.validateChecksum works for SingleByteBuff and MultiByteBuff in
178   * {@link #verifySBBCheckSum(ByteBuff)} and {@link #verifyMBBCheckSum(ByteBuff)}
179   */
180  private ByteBuff getBufferWithChecksum(HFileBlock block) throws IOException {
181    ByteBuff buf = block.getBufferReadOnly();
182
183    int numBytes =
184      (int) ChecksumUtil.numBytes(buf.remaining(), block.getHFileContext().getBytesPerChecksum());
185    byte[] checksum = new byte[numBytes];
186    ChecksumUtil.generateChecksums(buf.array(), 0, buf.limit(), checksum, 0,
187      block.getHFileContext().getChecksumType(), block.getBytesPerChecksum());
188
189    ByteBuff bufWithChecksum = ByteBuffAllocator.HEAP.allocate(buf.limit() + numBytes);
190    bufWithChecksum.put(buf.array(), 0, buf.limit());
191    bufWithChecksum.put(checksum);
192
193    return bufWithChecksum.rewind();
194  }
195
196  /**
197   * Introduce checksum failures and check that we can still read the data
198   */
199  @Test
200  public void testChecksumCorruption() throws IOException {
201    testChecksumCorruptionInternals(false);
202    testChecksumCorruptionInternals(true);
203  }
204
205  protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
206    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
207      for (boolean pread : new boolean[] { false, true }) {
208        LOG.info("testChecksumCorruption: Compression algorithm: " + algo + ", pread=" + pread);
209        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo);
210        FSDataOutputStream os = fs.create(path);
211        HFileContext meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true)
212          .withIncludesTags(useTags).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
213        HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
214        long totalSize = 0;
215        for (int blockId = 0; blockId < 2; ++blockId) {
216          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
217          for (int i = 0; i < 1234; ++i)
218            dos.writeInt(i);
219          hbw.writeHeaderAndData(os);
220          totalSize += hbw.getOnDiskSizeWithHeader();
221        }
222        os.close();
223
224        // Use hbase checksums.
225        assertEquals(true, hfs.useHBaseChecksum());
226
227        // Do a read that purposely introduces checksum verification failures.
228        FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
229        meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true)
230          .withIncludesTags(useTags).withHBaseCheckSum(true).build();
231        ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is)
232          .withFileSize(totalSize).withFileSystem(fs).withFilePath(path).build();
233        HFileBlock.FSReader hbr =
234          new CorruptedFSReaderImpl(context, meta, TEST_UTIL.getConfiguration());
235        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
236        b.sanityCheck();
237        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
238        assertEquals(algo == GZ ? 2173 : 4936,
239          b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
240        // read data back from the hfile, exclude header and checksum
241        ByteBuff bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data
242        DataInputStream in =
243          new DataInputStream(new ByteArrayInputStream(bb.array(), bb.arrayOffset(), bb.limit()));
244
245        // assert that we encountered hbase checksum verification failures
246        // but still used hdfs checksums and read data successfully.
247        assertEquals(1, HFile.getAndResetChecksumFailuresCount());
248        validateData(in);
249
250        // A single instance of hbase checksum failure causes the reader to
251        // switch off hbase checksum verification for the next 100 read
252        // requests. Verify that this is correct.
253        for (int i = 0; i < HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
254          b = hbr.readBlockData(0, -1, pread, false, true);
255          assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
256          assertEquals(0, HFile.getAndResetChecksumFailuresCount());
257        }
258        // The next read should have hbase checksum verification reanabled,
259        // we verify this by assertng that there was a hbase-checksum failure.
260        b = hbr.readBlockData(0, -1, pread, false, true);
261        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
262        assertEquals(1, HFile.getAndResetChecksumFailuresCount());
263
264        // Since the above encountered a checksum failure, we switch
265        // back to not checking hbase checksums.
266        b = hbr.readBlockData(0, -1, pread, false, true);
267        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
268        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
269        is.close();
270
271        // Now, use a completely new reader. Switch off hbase checksums in
272        // the configuration. In this case, we should not detect
273        // any retries within hbase.
274        Configuration conf = TEST_UTIL.getConfiguration();
275        HFileSystem newfs = new HFileSystem(conf, false);
276        assertEquals(false, newfs.useHBaseChecksum());
277        is = new FSDataInputStreamWrapper(newfs, path);
278        context = new ReaderContextBuilder().withInputStreamWrapper(is).withFileSize(totalSize)
279          .withFileSystem(newfs).withFilePath(path).build();
280        hbr = new CorruptedFSReaderImpl(context, meta, conf);
281        b = hbr.readBlockData(0, -1, pread, false, true);
282        is.close();
283        b.sanityCheck();
284        b = b.unpack(meta, hbr);
285        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
286        assertEquals(algo == GZ ? 2173 : 4936,
287          b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
288        // read data back from the hfile, exclude header and checksum
289        bb = b.getBufferWithoutHeader(); // read back data
290        in =
291          new DataInputStream(new ByteArrayInputStream(bb.array(), bb.arrayOffset(), bb.limit()));
292
293        // assert that we did not encounter hbase checksum verification failures
294        // but still used hdfs checksums and read data successfully.
295        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
296        validateData(in);
297      }
298    }
299  }
300
301  /**
302   * Test different values of bytesPerChecksum
303   */
304  @Test
305  public void testChecksumChunks() throws IOException {
306    testChecksumInternals(false);
307    testChecksumInternals(true);
308  }
309
310  protected void testChecksumInternals(boolean useTags) throws IOException {
311    Compression.Algorithm algo = NONE;
312    for (boolean pread : new boolean[] { false, true }) {
313      for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
314        Path path =
315          new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" + algo + bytesPerChecksum);
316        FSDataOutputStream os = fs.create(path);
317        HFileContext meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true)
318          .withIncludesTags(useTags).withHBaseCheckSum(true).withBytesPerCheckSum(bytesPerChecksum)
319          .build();
320        HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
321
322        // write one block. The block has data
323        // that is at least 6 times more than the checksum chunk size
324        long dataSize = 0;
325        DataOutputStream dos = hbw.startWriting(BlockType.DATA);
326        for (; dataSize < 6 * bytesPerChecksum;) {
327          for (int i = 0; i < 1234; ++i) {
328            dos.writeInt(i);
329            dataSize += 4;
330          }
331        }
332        hbw.writeHeaderAndData(os);
333        long totalSize = hbw.getOnDiskSizeWithHeader();
334        os.close();
335
336        long expectedChunks =
337          ChecksumUtil.numChunks(dataSize + HConstants.HFILEBLOCK_HEADER_SIZE, bytesPerChecksum);
338        LOG.info(
339          "testChecksumChunks: pread={}, bytesPerChecksum={}, fileSize={}, "
340            + "dataSize={}, expectedChunks={}, compression={}",
341          pread, bytesPerChecksum, totalSize, dataSize, expectedChunks, algo.toString());
342
343        // Verify hbase checksums.
344        assertEquals(true, hfs.useHBaseChecksum());
345
346        // Read data back from file.
347        FSDataInputStream is = fs.open(path);
348        FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
349        meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true)
350          .withIncludesTags(useTags).withHBaseCheckSum(true).withBytesPerCheckSum(bytesPerChecksum)
351          .build();
352        ReaderContext context = new ReaderContextBuilder()
353          .withInputStreamWrapper(new FSDataInputStreamWrapper(is, nochecksum))
354          .withFileSize(totalSize).withFileSystem(hfs).withFilePath(path).build();
355        HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP,
356          TEST_UTIL.getConfiguration());
357        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
358        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
359        is.close();
360        b.sanityCheck();
361        assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
362
363        // verify that we have the expected number of checksum chunks
364        assertEquals(totalSize,
365          HConstants.HFILEBLOCK_HEADER_SIZE + dataSize + expectedChunks * HFileBlock.CHECKSUM_SIZE);
366
367        // assert that we did not encounter hbase checksum verification failures
368        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
369      }
370    }
371  }
372
373  private void validateData(DataInputStream in) throws IOException {
374    // validate data
375    for (int i = 0; i < 1234; i++) {
376      int val = in.readInt();
377      assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val);
378    }
379  }
380
381  /**
382   * This class is to test checksum behavior when data is corrupted. It mimics the following
383   * behavior: - When fs checksum is disabled, hbase may get corrupted data from hdfs. If
384   * verifyChecksum is true, it means hbase checksum is on and fs checksum is off, so we corrupt the
385   * data. - When fs checksum is enabled, hdfs will get a different copy from another node, and will
386   * always return correct data. So we don't corrupt the data when verifyChecksum for hbase is off.
387   */
388  static private class CorruptedFSReaderImpl extends HFileBlock.FSReaderImpl {
389    /**
390     * If set to true, corrupt reads using readAtOffset(...).
391     */
392    boolean corruptDataStream = false;
393
394    public CorruptedFSReaderImpl(ReaderContext context, HFileContext meta, Configuration conf)
395      throws IOException {
396      super(context, meta, ByteBuffAllocator.HEAP, conf);
397    }
398
399    @Override
400    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
401      long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
402      boolean useHeap) throws IOException {
403      if (verifyChecksum) {
404        corruptDataStream = true;
405      }
406      HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
407        verifyChecksum, updateMetrics, useHeap);
408      corruptDataStream = false;
409      return b;
410    }
411
412    @Override
413    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
414      boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
415      int destOffset = dest.position();
416      boolean returnValue =
417        super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread);
418      if (!corruptDataStream) {
419        return returnValue;
420      }
421      // Corrupt 3rd character of block magic of next block's header.
422      if (peekIntoNextBlock) {
423        dest.put(destOffset + size + 3, (byte) 0b00000000);
424      }
425      // We might be reading this block's header too, corrupt it.
426      dest.put(destOffset + 1, (byte) 0b00000000);
427      // Corrupt non header data
428      if (size > hdrSize) {
429        dest.put(destOffset + hdrSize + 1, (byte) 0b00000000);
430      }
431      return returnValue;
432    }
433  }
434}