001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.ByteArrayInputStream;
026import java.io.DataInputStream;
027import java.io.DataOutputStream;
028import java.io.IOException;
029import java.nio.BufferUnderflowException;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Iterator;
033import java.util.List;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.fs.HFileSystem;
042import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
043import org.apache.hadoop.hbase.io.compress.Compression;
044import org.apache.hadoop.hbase.nio.ByteBuff;
045import org.apache.hadoop.hbase.testclassification.IOTests;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.apache.hadoop.hbase.util.ChecksumType;
048import org.junit.Before;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055@Category({IOTests.class, SmallTests.class})
056public class TestChecksum {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060      HBaseClassTestRule.forClass(TestChecksum.class);
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
063
064  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
065      NONE, GZ };
066
067  static final int[] BYTES_PER_CHECKSUM = {
068      50, 500, 688, 16*1024, (16*1024+980), 64 * 1024};
069
070  private static final HBaseTestingUtility TEST_UTIL =
071    new HBaseTestingUtility();
072  private FileSystem fs;
073  private HFileSystem hfs;
074
075  @Before
076  public void setUp() throws Exception {
077    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
078    hfs = (HFileSystem)fs;
079  }
080
081  @Test
082  public void testNewBlocksHaveDefaultChecksum() throws IOException {
083    Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
084    FSDataOutputStream os = fs.create(path);
085    HFileContext meta = new HFileContextBuilder().build();
086    HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
087    DataOutputStream dos = hbw.startWriting(BlockType.DATA);
088    for (int i = 0; i < 1000; ++i)
089      dos.writeInt(i);
090    hbw.writeHeaderAndData(os);
091    int totalSize = hbw.getOnDiskSizeWithHeader();
092    os.close();
093
094    // Use hbase checksums.
095    assertEquals(true, hfs.useHBaseChecksum());
096
097    FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
098    meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
099    HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
100        is, totalSize, (HFileSystem) fs, path, meta);
101    HFileBlock b = hbr.readBlockData(0, -1, false, false);
102    assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
103  }
104
105  /**
106   * Test all checksum types by writing and reading back blocks.
107   */
108  @Test
109  public void testAllChecksumTypes() throws IOException {
110    List<ChecksumType> cktypes = new ArrayList<>(Arrays.asList(ChecksumType.values()));
111    for (Iterator<ChecksumType> itr = cktypes.iterator(); itr.hasNext(); ) {
112      ChecksumType cktype = itr.next();
113      Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName());
114      FSDataOutputStream os = fs.create(path);
115      HFileContext meta = new HFileContextBuilder()
116          .withChecksumType(cktype)
117          .build();
118      HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
119      DataOutputStream dos = hbw.startWriting(BlockType.DATA);
120      for (int i = 0; i < 1000; ++i) {
121        dos.writeInt(i);
122      }
123      hbw.writeHeaderAndData(os);
124      int totalSize = hbw.getOnDiskSizeWithHeader();
125      os.close();
126
127      // Use hbase checksums.
128      assertEquals(true, hfs.useHBaseChecksum());
129
130      FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
131      meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
132      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
133          is, totalSize, (HFileSystem) fs, path, meta);
134      HFileBlock b = hbr.readBlockData(0, -1, false, false);
135      ByteBuff data = b.getBufferWithoutHeader();
136      for (int i = 0; i < 1000; i++) {
137        assertEquals(i, data.getInt());
138      }
139      boolean exception_thrown = false;
140      try {
141        data.getInt();
142      } catch (BufferUnderflowException e) {
143        exception_thrown = true;
144      }
145      assertTrue(exception_thrown);
146      assertEquals(0, HFile.getAndResetChecksumFailuresCount());
147    }
148  }
149
150  /**
151   * Introduce checksum failures and check that we can still read
152   * the data
153   */
154  @Test
155  public void testChecksumCorruption() throws IOException {
156    testChecksumCorruptionInternals(false);
157    testChecksumCorruptionInternals(true);
158  }
159
160  protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
161    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
162      for (boolean pread : new boolean[] { false, true }) {
163        LOG.info("testChecksumCorruption: Compression algorithm: " + algo +
164                   ", pread=" + pread);
165        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
166            + algo);
167        FSDataOutputStream os = fs.create(path);
168        HFileContext meta = new HFileContextBuilder()
169                            .withCompression(algo)
170                            .withIncludesMvcc(true)
171                            .withIncludesTags(useTags)
172                            .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
173                            .build();
174        HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
175        long totalSize = 0;
176        for (int blockId = 0; blockId < 2; ++blockId) {
177          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
178          for (int i = 0; i < 1234; ++i)
179            dos.writeInt(i);
180          hbw.writeHeaderAndData(os);
181          totalSize += hbw.getOnDiskSizeWithHeader();
182        }
183        os.close();
184
185        // Use hbase checksums.
186        assertEquals(true, hfs.useHBaseChecksum());
187
188        // Do a read that purposely introduces checksum verification failures.
189        FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
190        meta = new HFileContextBuilder()
191              .withCompression(algo)
192              .withIncludesMvcc(true)
193              .withIncludesTags(useTags)
194              .withHBaseCheckSum(true)
195              .build();
196        HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, path, meta);
197        HFileBlock b = hbr.readBlockData(0, -1, pread, false);
198        b.sanityCheck();
199        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
200        assertEquals(algo == GZ ? 2173 : 4936,
201                     b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
202        // read data back from the hfile, exclude header and checksum
203        ByteBuff bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data
204        DataInputStream in = new DataInputStream(
205                               new ByteArrayInputStream(
206                                 bb.array(), bb.arrayOffset(), bb.limit()));
207
208        // assert that we encountered hbase checksum verification failures
209        // but still used hdfs checksums and read data successfully.
210        assertEquals(1, HFile.getAndResetChecksumFailuresCount());
211        validateData(in);
212
213        // A single instance of hbase checksum failure causes the reader to
214        // switch off hbase checksum verification for the next 100 read
215        // requests. Verify that this is correct.
216        for (int i = 0; i <
217             HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
218          b = hbr.readBlockData(0, -1, pread, false);
219          assertEquals(0, HFile.getAndResetChecksumFailuresCount());
220        }
221        // The next read should have hbase checksum verification reanabled,
222        // we verify this by assertng that there was a hbase-checksum failure.
223        b = hbr.readBlockData(0, -1, pread, false);
224        assertEquals(1, HFile.getAndResetChecksumFailuresCount());
225
226        // Since the above encountered a checksum failure, we switch
227        // back to not checking hbase checksums.
228        b = hbr.readBlockData(0, -1, pread, false);
229        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
230        is.close();
231
232        // Now, use a completely new reader. Switch off hbase checksums in
233        // the configuration. In this case, we should not detect
234        // any retries within hbase.
235        HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
236        assertEquals(false, newfs.useHBaseChecksum());
237        is = new FSDataInputStreamWrapper(newfs, path);
238        hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta);
239        b = hbr.readBlockData(0, -1, pread, false);
240        is.close();
241        b.sanityCheck();
242        b = b.unpack(meta, hbr);
243        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
244        assertEquals(algo == GZ ? 2173 : 4936,
245                     b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
246        // read data back from the hfile, exclude header and checksum
247        bb = b.getBufferWithoutHeader(); // read back data
248        in = new DataInputStream(new ByteArrayInputStream(
249                                 bb.array(), bb.arrayOffset(), bb.limit()));
250
251        // assert that we did not encounter hbase checksum verification failures
252        // but still used hdfs checksums and read data successfully.
253        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
254        validateData(in);
255      }
256    }
257  }
258
259  /**
260   * Test different values of bytesPerChecksum
261   */
262  @Test
263  public void testChecksumChunks() throws IOException {
264    testChecksumInternals(false);
265    testChecksumInternals(true);
266  }
267
268  protected void testChecksumInternals(boolean useTags) throws IOException {
269    Compression.Algorithm algo = NONE;
270    for (boolean pread : new boolean[] { false, true }) {
271      for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
272        Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" +
273                             algo + bytesPerChecksum);
274        FSDataOutputStream os = fs.create(path);
275        HFileContext meta = new HFileContextBuilder()
276                            .withCompression(algo)
277                            .withIncludesMvcc(true)
278                            .withIncludesTags(useTags)
279                            .withHBaseCheckSum(true)
280                            .withBytesPerCheckSum(bytesPerChecksum)
281                            .build();
282        HFileBlock.Writer hbw = new HFileBlock.Writer(null,
283           meta);
284
285        // write one block. The block has data
286        // that is at least 6 times more than the checksum chunk size
287        long dataSize = 0;
288        DataOutputStream dos = hbw.startWriting(BlockType.DATA);
289        for (; dataSize < 6 * bytesPerChecksum;) {
290          for (int i = 0; i < 1234; ++i) {
291            dos.writeInt(i);
292            dataSize += 4;
293          }
294        }
295        hbw.writeHeaderAndData(os);
296        long totalSize = hbw.getOnDiskSizeWithHeader();
297        os.close();
298
299        long expectedChunks = ChecksumUtil.numChunks(
300                               dataSize + HConstants.HFILEBLOCK_HEADER_SIZE,
301                               bytesPerChecksum);
302        LOG.info("testChecksumChunks: pread={}, bytesPerChecksum={}, fileSize={}, "
303                + "dataSize={}, expectedChunks={}, compression={}", pread, bytesPerChecksum,
304            totalSize, dataSize, expectedChunks, algo.toString());
305
306        // Verify hbase checksums.
307        assertEquals(true, hfs.useHBaseChecksum());
308
309        // Read data back from file.
310        FSDataInputStream is = fs.open(path);
311        FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
312        meta = new HFileContextBuilder()
313               .withCompression(algo)
314               .withIncludesMvcc(true)
315               .withIncludesTags(useTags)
316               .withHBaseCheckSum(true)
317               .withBytesPerCheckSum(bytesPerChecksum)
318               .build();
319        HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
320            is, nochecksum), totalSize, hfs, path, meta);
321        HFileBlock b = hbr.readBlockData(0, -1, pread, false);
322        is.close();
323        b.sanityCheck();
324        assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
325
326        // verify that we have the expected number of checksum chunks
327        assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize +
328                     expectedChunks * HFileBlock.CHECKSUM_SIZE);
329
330        // assert that we did not encounter hbase checksum verification failures
331        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
332      }
333    }
334  }
335
336  private void validateData(DataInputStream in) throws IOException {
337    // validate data
338    for (int i = 0; i < 1234; i++) {
339      int val = in.readInt();
340      assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val);
341    }
342  }
343
344  /**
345   * This class is to test checksum behavior when data is corrupted. It mimics the following
346   * behavior:
347   *  - When fs checksum is disabled, hbase may get corrupted data from hdfs. If verifyChecksum
348   *  is true, it means hbase checksum is on and fs checksum is off, so we corrupt the data.
349   *  - When fs checksum is enabled, hdfs will get a different copy from another node, and will
350   *    always return correct data. So we don't corrupt the data when verifyChecksum for hbase is
351   *    off.
352   */
353  static private class CorruptedFSReaderImpl extends HFileBlock.FSReaderImpl {
354    /**
355     * If set to true, corrupt reads using readAtOffset(...).
356     */
357    boolean corruptDataStream = false;
358
359    public CorruptedFSReaderImpl(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
360        Path path, HFileContext meta) throws IOException {
361      super(istream, fileSize, (HFileSystem) fs, path, meta);
362    }
363
364    @Override
365    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
366        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
367        throws IOException {
368      if (verifyChecksum) {
369        corruptDataStream = true;
370      }
371      HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
372          verifyChecksum, updateMetrics);
373      corruptDataStream = false;
374      return b;
375    }
376
377    @Override
378    protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
379        boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
380      int returnValue = super.readAtOffset(istream, dest, destOffset, size, peekIntoNextBlock,
381          fileOffset, pread);
382      if (!corruptDataStream) {
383        return returnValue;
384      }
385      // Corrupt 3rd character of block magic of next block's header.
386      if (peekIntoNextBlock) {
387        dest[destOffset + size + 3] = 0b00000000;
388      }
389      // We might be reading this block's header too, corrupt it.
390      dest[destOffset + 1] = 0b00000000;
391      // Corrupt non header data
392      if (size > hdrSize) {
393        dest[destOffset + hdrSize + 1] = 0b00000000;
394      }
395      return returnValue;
396    }
397  }
398}