001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.ByteArrayInputStream;
027import java.io.DataInputStream;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.nio.BufferUnderflowException;
031import java.nio.ByteBuffer;
032
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.fs.HFileSystem;
041import org.apache.hadoop.hbase.io.ByteBuffAllocator;
042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
043import org.apache.hadoop.hbase.io.compress.Compression;
044import org.apache.hadoop.hbase.nio.ByteBuff;
045import org.apache.hadoop.hbase.nio.MultiByteBuff;
046import org.apache.hadoop.hbase.nio.SingleByteBuff;
047import org.apache.hadoop.hbase.testclassification.IOTests;
048import org.apache.hadoop.hbase.testclassification.SmallTests;
049import org.apache.hadoop.hbase.util.ChecksumType;
050import org.junit.Before;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057@Category({IOTests.class, SmallTests.class})
058public class TestChecksum {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestChecksum.class);
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
065
066  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
067      NONE, GZ };
068
069  static final int[] BYTES_PER_CHECKSUM = {
070      50, 500, 688, 16*1024, (16*1024+980), 64 * 1024};
071
072  private static final HBaseTestingUtility TEST_UTIL =
073    new HBaseTestingUtility();
074  private FileSystem fs;
075  private HFileSystem hfs;
076
077  @Before
078  public void setUp() throws Exception {
079    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
080    hfs = (HFileSystem)fs;
081  }
082
083  @Test
084  public void testNewBlocksHaveDefaultChecksum() throws IOException {
085    Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
086    FSDataOutputStream os = fs.create(path);
087    HFileContext meta = new HFileContextBuilder().build();
088    HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
089    DataOutputStream dos = hbw.startWriting(BlockType.DATA);
090    for (int i = 0; i < 1000; ++i)
091      dos.writeInt(i);
092    hbw.writeHeaderAndData(os);
093    int totalSize = hbw.getOnDiskSizeWithHeader();
094    os.close();
095
096    // Use hbase checksums.
097    assertEquals(true, hfs.useHBaseChecksum());
098
099    FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
100    meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
101    ReaderContext context = new ReaderContextBuilder()
102        .withInputStreamWrapper(is)
103        .withFileSize(totalSize)
104        .withFileSystem((HFileSystem) fs)
105        .withFilePath(path)
106        .build();
107    HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
108        meta, ByteBuffAllocator.HEAP);
109    HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
110    assertTrue(!b.isSharedMem());
111    assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
112  }
113
114  private void verifyMBBCheckSum(ByteBuff buf) throws IOException {
115    int size = buf.remaining() / 2 + 1;
116    ByteBuff mbb = new MultiByteBuff(ByteBuffer.allocate(size), ByteBuffer.allocate(size))
117          .position(0).limit(buf.remaining());
118    for (int i = buf.position(); i < buf.limit(); i++) {
119      mbb.put(buf.get(i));
120    }
121    mbb.position(0).limit(buf.remaining());
122    assertEquals(mbb.remaining(), buf.remaining());
123    assertTrue(mbb.remaining() > size);
124    ChecksumUtil.validateChecksum(mbb, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM);
125  }
126
127  private void verifySBBCheckSum(ByteBuff buf) throws IOException {
128    ChecksumUtil.validateChecksum(buf, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM);
129  }
130
131  @Test
132  public void testVerifyCheckSum() throws IOException {
133    int intCount = 10000;
134    for (ChecksumType ckt : ChecksumType.values()) {
135      Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName());
136      FSDataOutputStream os = fs.create(path);
137      HFileContext meta = new HFileContextBuilder()
138            .withChecksumType(ckt)
139            .build();
140      HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
141      DataOutputStream dos = hbw.startWriting(BlockType.DATA);
142      for (int i = 0; i < intCount; ++i) {
143        dos.writeInt(i);
144      }
145      hbw.writeHeaderAndData(os);
146      int totalSize = hbw.getOnDiskSizeWithHeader();
147      os.close();
148
149      // Use hbase checksums.
150      assertEquals(true, hfs.useHBaseChecksum());
151
152      FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
153      meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
154      ReaderContext context = new ReaderContextBuilder()
155          .withInputStreamWrapper(is)
156          .withFileSize(totalSize)
157          .withFileSystem((HFileSystem) fs)
158          .withFilePath(path)
159          .build();
160      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
161          meta, ByteBuffAllocator.HEAP);
162      HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
163      assertTrue(!b.isSharedMem());
164
165      // verify SingleByteBuff checksum.
166      verifySBBCheckSum(b.getBufferReadOnly());
167
168      // verify MultiByteBuff checksum.
169      verifyMBBCheckSum(b.getBufferReadOnly());
170
171      ByteBuff data = b.getBufferWithoutHeader();
172      for (int i = 0; i < intCount; i++) {
173        assertEquals(i, data.getInt());
174      }
175      try {
176        data.getInt();
177        fail();
178      } catch (BufferUnderflowException e) {
179        // expected failure
180      }
181      assertEquals(0, HFile.getAndResetChecksumFailuresCount());
182    }
183  }
184
185  /**
186   * Introduce checksum failures and check that we can still read
187   * the data
188   */
189  @Test
190  public void testChecksumCorruption() throws IOException {
191    testChecksumCorruptionInternals(false);
192    testChecksumCorruptionInternals(true);
193  }
194
195  protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
196    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
197      for (boolean pread : new boolean[] { false, true }) {
198        LOG.info("testChecksumCorruption: Compression algorithm: " + algo +
199                   ", pread=" + pread);
200        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
201            + algo);
202        FSDataOutputStream os = fs.create(path);
203        HFileContext meta = new HFileContextBuilder()
204                            .withCompression(algo)
205                            .withIncludesMvcc(true)
206                            .withIncludesTags(useTags)
207                            .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
208                            .build();
209        HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
210        long totalSize = 0;
211        for (int blockId = 0; blockId < 2; ++blockId) {
212          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
213          for (int i = 0; i < 1234; ++i)
214            dos.writeInt(i);
215          hbw.writeHeaderAndData(os);
216          totalSize += hbw.getOnDiskSizeWithHeader();
217        }
218        os.close();
219
220        // Use hbase checksums.
221        assertEquals(true, hfs.useHBaseChecksum());
222
223        // Do a read that purposely introduces checksum verification failures.
224        FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
225        meta = new HFileContextBuilder()
226              .withCompression(algo)
227              .withIncludesMvcc(true)
228              .withIncludesTags(useTags)
229              .withHBaseCheckSum(true)
230              .build();
231        ReaderContext context = new ReaderContextBuilder()
232           .withInputStreamWrapper(is)
233           .withFileSize(totalSize)
234           .withFileSystem(fs)
235           .withFilePath(path)
236           .build();
237        HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(context, meta);
238        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
239        b.sanityCheck();
240        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
241        assertEquals(algo == GZ ? 2173 : 4936,
242                     b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
243        // read data back from the hfile, exclude header and checksum
244        ByteBuff bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data
245        DataInputStream in = new DataInputStream(
246                               new ByteArrayInputStream(
247                                 bb.array(), bb.arrayOffset(), bb.limit()));
248
249        // assert that we encountered hbase checksum verification failures
250        // but still used hdfs checksums and read data successfully.
251        assertEquals(1, HFile.getAndResetChecksumFailuresCount());
252        validateData(in);
253
254        // A single instance of hbase checksum failure causes the reader to
255        // switch off hbase checksum verification for the next 100 read
256        // requests. Verify that this is correct.
257        for (int i = 0; i <
258             HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
259          b = hbr.readBlockData(0, -1, pread, false, true);
260          assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
261          assertEquals(0, HFile.getAndResetChecksumFailuresCount());
262        }
263        // The next read should have hbase checksum verification reanabled,
264        // we verify this by assertng that there was a hbase-checksum failure.
265        b = hbr.readBlockData(0, -1, pread, false, true);
266        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
267        assertEquals(1, HFile.getAndResetChecksumFailuresCount());
268
269        // Since the above encountered a checksum failure, we switch
270        // back to not checking hbase checksums.
271        b = hbr.readBlockData(0, -1, pread, false, true);
272        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
273        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
274        is.close();
275
276        // Now, use a completely new reader. Switch off hbase checksums in
277        // the configuration. In this case, we should not detect
278        // any retries within hbase.
279        HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
280        assertEquals(false, newfs.useHBaseChecksum());
281        is = new FSDataInputStreamWrapper(newfs, path);
282        context = new ReaderContextBuilder()
283            .withInputStreamWrapper(is)
284            .withFileSize(totalSize)
285            .withFileSystem(newfs)
286            .withFilePath(path)
287            .build();
288        hbr = new CorruptedFSReaderImpl(context, meta);
289        b = hbr.readBlockData(0, -1, pread, false, true);
290        is.close();
291        b.sanityCheck();
292        b = b.unpack(meta, hbr);
293        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
294        assertEquals(algo == GZ ? 2173 : 4936,
295                     b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
296        // read data back from the hfile, exclude header and checksum
297        bb = b.getBufferWithoutHeader(); // read back data
298        in = new DataInputStream(new ByteArrayInputStream(
299                                 bb.array(), bb.arrayOffset(), bb.limit()));
300
301        // assert that we did not encounter hbase checksum verification failures
302        // but still used hdfs checksums and read data successfully.
303        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
304        validateData(in);
305      }
306    }
307  }
308
309  /**
310   * Test different values of bytesPerChecksum
311   */
312  @Test
313  public void testChecksumChunks() throws IOException {
314    testChecksumInternals(false);
315    testChecksumInternals(true);
316  }
317
318  protected void testChecksumInternals(boolean useTags) throws IOException {
319    Compression.Algorithm algo = NONE;
320    for (boolean pread : new boolean[] { false, true }) {
321      for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
322        Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" +
323                             algo + bytesPerChecksum);
324        FSDataOutputStream os = fs.create(path);
325        HFileContext meta = new HFileContextBuilder()
326                            .withCompression(algo)
327                            .withIncludesMvcc(true)
328                            .withIncludesTags(useTags)
329                            .withHBaseCheckSum(true)
330                            .withBytesPerCheckSum(bytesPerChecksum)
331                            .build();
332        HFileBlock.Writer hbw = new HFileBlock.Writer(null,
333           meta);
334
335        // write one block. The block has data
336        // that is at least 6 times more than the checksum chunk size
337        long dataSize = 0;
338        DataOutputStream dos = hbw.startWriting(BlockType.DATA);
339        for (; dataSize < 6 * bytesPerChecksum;) {
340          for (int i = 0; i < 1234; ++i) {
341            dos.writeInt(i);
342            dataSize += 4;
343          }
344        }
345        hbw.writeHeaderAndData(os);
346        long totalSize = hbw.getOnDiskSizeWithHeader();
347        os.close();
348
349        long expectedChunks = ChecksumUtil.numChunks(
350                               dataSize + HConstants.HFILEBLOCK_HEADER_SIZE,
351                               bytesPerChecksum);
352        LOG.info("testChecksumChunks: pread={}, bytesPerChecksum={}, fileSize={}, "
353                + "dataSize={}, expectedChunks={}, compression={}", pread, bytesPerChecksum,
354            totalSize, dataSize, expectedChunks, algo.toString());
355
356        // Verify hbase checksums.
357        assertEquals(true, hfs.useHBaseChecksum());
358
359        // Read data back from file.
360        FSDataInputStream is = fs.open(path);
361        FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
362        meta = new HFileContextBuilder()
363               .withCompression(algo)
364               .withIncludesMvcc(true)
365               .withIncludesTags(useTags)
366               .withHBaseCheckSum(true)
367               .withBytesPerCheckSum(bytesPerChecksum)
368               .build();
369        ReaderContext context = new ReaderContextBuilder()
370            .withInputStreamWrapper(new FSDataInputStreamWrapper(is, nochecksum))
371            .withFileSize(totalSize)
372            .withFileSystem(hfs)
373            .withFilePath(path)
374            .build();
375        HFileBlock.FSReader hbr =
376            new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP);
377        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
378        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
379        is.close();
380        b.sanityCheck();
381        assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
382
383        // verify that we have the expected number of checksum chunks
384        assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize +
385                     expectedChunks * HFileBlock.CHECKSUM_SIZE);
386
387        // assert that we did not encounter hbase checksum verification failures
388        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
389      }
390    }
391  }
392
393  private void validateData(DataInputStream in) throws IOException {
394    // validate data
395    for (int i = 0; i < 1234; i++) {
396      int val = in.readInt();
397      assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val);
398    }
399  }
400
401  /**
402   * This class is to test checksum behavior when data is corrupted. It mimics the following
403   * behavior:
404   *  - When fs checksum is disabled, hbase may get corrupted data from hdfs. If verifyChecksum
405   *  is true, it means hbase checksum is on and fs checksum is off, so we corrupt the data.
406   *  - When fs checksum is enabled, hdfs will get a different copy from another node, and will
407   *    always return correct data. So we don't corrupt the data when verifyChecksum for hbase is
408   *    off.
409   */
410  static private class CorruptedFSReaderImpl extends HFileBlock.FSReaderImpl {
411    /**
412     * If set to true, corrupt reads using readAtOffset(...).
413     */
414    boolean corruptDataStream = false;
415
416    public CorruptedFSReaderImpl(ReaderContext context, HFileContext meta) throws IOException {
417      super(context, meta, ByteBuffAllocator.HEAP);
418    }
419
420    @Override
421    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
422        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
423        boolean useHeap) throws IOException {
424      if (verifyChecksum) {
425        corruptDataStream = true;
426      }
427      HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
428        verifyChecksum, updateMetrics, useHeap);
429      corruptDataStream = false;
430      return b;
431    }
432
433
434    @Override
435    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
436        boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
437      int destOffset = dest.position();
438      boolean returnValue =
439          super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread);
440      if (!corruptDataStream) {
441        return returnValue;
442      }
443      // Corrupt 3rd character of block magic of next block's header.
444      if (peekIntoNextBlock) {
445        dest.put(destOffset + size + 3, (byte) 0b00000000);
446      }
447      // We might be reading this block's header too, corrupt it.
448      dest.put(destOffset + 1, (byte) 0b00000000);
449      // Corrupt non header data
450      if (size > hdrSize) {
451        dest.put(destOffset + hdrSize + 1, (byte) 0b00000000);
452      }
453      return returnValue;
454    }
455  }
456}