001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.ByteArrayInputStream;
027import java.io.DataInputStream;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.nio.BufferUnderflowException;
031import java.nio.ByteBuffer;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.fs.HFileSystem;
042import org.apache.hadoop.hbase.io.ByteBuffAllocator;
043import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
044import org.apache.hadoop.hbase.io.compress.Compression;
045import org.apache.hadoop.hbase.nio.ByteBuff;
046import org.apache.hadoop.hbase.nio.MultiByteBuff;
047import org.apache.hadoop.hbase.nio.SingleByteBuff;
048import org.apache.hadoop.hbase.testclassification.IOTests;
049import org.apache.hadoop.hbase.testclassification.SmallTests;
050import org.apache.hadoop.hbase.util.ChecksumType;
051import org.junit.Before;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058@Category({IOTests.class, SmallTests.class})
059public class TestChecksum {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestChecksum.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
066
067  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
068      NONE, GZ };
069
070  static final int[] BYTES_PER_CHECKSUM = {
071      50, 500, 688, 16*1024, (16*1024+980), 64 * 1024};
072
073  private static final HBaseTestingUtil TEST_UTIL =
074    new HBaseTestingUtil();
075  private FileSystem fs;
076  private HFileSystem hfs;
077
078  @Before
079  public void setUp() throws Exception {
080    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
081    hfs = (HFileSystem)fs;
082  }
083
084  @Test
085  public void testNewBlocksHaveDefaultChecksum() throws IOException {
086    Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
087    FSDataOutputStream os = fs.create(path);
088    HFileContext meta = new HFileContextBuilder().build();
089    HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
090    DataOutputStream dos = hbw.startWriting(BlockType.DATA);
091    for (int i = 0; i < 1000; ++i)
092      dos.writeInt(i);
093    hbw.writeHeaderAndData(os);
094    int totalSize = hbw.getOnDiskSizeWithHeader();
095    os.close();
096
097    // Use hbase checksums.
098    assertEquals(true, hfs.useHBaseChecksum());
099
100    FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
101    meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
102    ReaderContext context = new ReaderContextBuilder()
103        .withInputStreamWrapper(is)
104        .withFileSize(totalSize)
105        .withFileSystem((HFileSystem) fs)
106        .withFilePath(path)
107        .build();
108    HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
109        meta, ByteBuffAllocator.HEAP, TEST_UTIL.getConfiguration());
110    HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
111    assertTrue(!b.isSharedMem());
112    assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
113  }
114
115  private void verifyMBBCheckSum(ByteBuff buf) throws IOException {
116    int size = buf.remaining() / 2 + 1;
117    ByteBuff mbb = new MultiByteBuff(ByteBuffer.allocate(size), ByteBuffer.allocate(size))
118          .position(0).limit(buf.remaining());
119    for (int i = buf.position(); i < buf.limit(); i++) {
120      mbb.put(buf.get(i));
121    }
122    mbb.position(0).limit(buf.remaining());
123    assertEquals(mbb.remaining(), buf.remaining());
124    assertTrue(mbb.remaining() > size);
125    ChecksumUtil.validateChecksum(mbb, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM);
126  }
127
128  private void verifySBBCheckSum(ByteBuff buf) throws IOException {
129    ChecksumUtil.validateChecksum(buf, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM);
130  }
131
132  @Test
133  public void testVerifyCheckSum() throws IOException {
134    int intCount = 10000;
135    for (ChecksumType ckt : ChecksumType.values()) {
136      Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName());
137      FSDataOutputStream os = fs.create(path);
138      HFileContext meta = new HFileContextBuilder()
139            .withChecksumType(ckt)
140            .build();
141      HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
142      DataOutputStream dos = hbw.startWriting(BlockType.DATA);
143      for (int i = 0; i < intCount; ++i) {
144        dos.writeInt(i);
145      }
146      hbw.writeHeaderAndData(os);
147      int totalSize = hbw.getOnDiskSizeWithHeader();
148      os.close();
149
150      // Use hbase checksums.
151      assertEquals(true, hfs.useHBaseChecksum());
152
153      FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
154      meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
155      ReaderContext context = new ReaderContextBuilder()
156          .withInputStreamWrapper(is)
157          .withFileSize(totalSize)
158          .withFileSystem((HFileSystem) fs)
159          .withFilePath(path)
160          .build();
161      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
162          meta, ByteBuffAllocator.HEAP, TEST_UTIL.getConfiguration());
163      HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
164      assertTrue(!b.isSharedMem());
165
166      // verify SingleByteBuff checksum.
167      verifySBBCheckSum(b.getBufferReadOnly());
168
169      // verify MultiByteBuff checksum.
170      verifyMBBCheckSum(b.getBufferReadOnly());
171
172      ByteBuff data = b.getBufferWithoutHeader();
173      for (int i = 0; i < intCount; i++) {
174        assertEquals(i, data.getInt());
175      }
176      try {
177        data.getInt();
178        fail();
179      } catch (BufferUnderflowException e) {
180        // expected failure
181      }
182      assertEquals(0, HFile.getAndResetChecksumFailuresCount());
183    }
184  }
185
186  /**
187   * Introduce checksum failures and check that we can still read
188   * the data
189   */
190  @Test
191  public void testChecksumCorruption() throws IOException {
192    testChecksumCorruptionInternals(false);
193    testChecksumCorruptionInternals(true);
194  }
195
196  protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
197    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
198      for (boolean pread : new boolean[] { false, true }) {
199        LOG.info("testChecksumCorruption: Compression algorithm: " + algo +
200                   ", pread=" + pread);
201        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
202            + algo);
203        FSDataOutputStream os = fs.create(path);
204        HFileContext meta = new HFileContextBuilder()
205                            .withCompression(algo)
206                            .withIncludesMvcc(true)
207                            .withIncludesTags(useTags)
208                            .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
209                            .build();
210        HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
211        long totalSize = 0;
212        for (int blockId = 0; blockId < 2; ++blockId) {
213          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
214          for (int i = 0; i < 1234; ++i)
215            dos.writeInt(i);
216          hbw.writeHeaderAndData(os);
217          totalSize += hbw.getOnDiskSizeWithHeader();
218        }
219        os.close();
220
221        // Use hbase checksums.
222        assertEquals(true, hfs.useHBaseChecksum());
223
224        // Do a read that purposely introduces checksum verification failures.
225        FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
226        meta = new HFileContextBuilder()
227              .withCompression(algo)
228              .withIncludesMvcc(true)
229              .withIncludesTags(useTags)
230              .withHBaseCheckSum(true)
231              .build();
232        ReaderContext context = new ReaderContextBuilder()
233           .withInputStreamWrapper(is)
234           .withFileSize(totalSize)
235           .withFileSystem(fs)
236           .withFilePath(path)
237           .build();
238        HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(context, meta,
239          TEST_UTIL.getConfiguration());
240        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
241        b.sanityCheck();
242        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
243        assertEquals(algo == GZ ? 2173 : 4936,
244                     b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
245        // read data back from the hfile, exclude header and checksum
246        ByteBuff bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data
247        DataInputStream in = new DataInputStream(
248                               new ByteArrayInputStream(
249                                 bb.array(), bb.arrayOffset(), bb.limit()));
250
251        // assert that we encountered hbase checksum verification failures
252        // but still used hdfs checksums and read data successfully.
253        assertEquals(1, HFile.getAndResetChecksumFailuresCount());
254        validateData(in);
255
256        // A single instance of hbase checksum failure causes the reader to
257        // switch off hbase checksum verification for the next 100 read
258        // requests. Verify that this is correct.
259        for (int i = 0; i <
260             HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
261          b = hbr.readBlockData(0, -1, pread, false, true);
262          assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
263          assertEquals(0, HFile.getAndResetChecksumFailuresCount());
264        }
265        // The next read should have hbase checksum verification reanabled,
266        // we verify this by assertng that there was a hbase-checksum failure.
267        b = hbr.readBlockData(0, -1, pread, false, true);
268        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
269        assertEquals(1, HFile.getAndResetChecksumFailuresCount());
270
271        // Since the above encountered a checksum failure, we switch
272        // back to not checking hbase checksums.
273        b = hbr.readBlockData(0, -1, pread, false, true);
274        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
275        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
276        is.close();
277
278        // Now, use a completely new reader. Switch off hbase checksums in
279        // the configuration. In this case, we should not detect
280        // any retries within hbase.
281        Configuration conf = TEST_UTIL.getConfiguration();
282        HFileSystem newfs = new HFileSystem(conf, false);
283        assertEquals(false, newfs.useHBaseChecksum());
284        is = new FSDataInputStreamWrapper(newfs, path);
285        context = new ReaderContextBuilder()
286            .withInputStreamWrapper(is)
287            .withFileSize(totalSize)
288            .withFileSystem(newfs)
289            .withFilePath(path)
290            .build();
291        hbr = new CorruptedFSReaderImpl(context, meta, conf);
292        b = hbr.readBlockData(0, -1, pread, false, true);
293        is.close();
294        b.sanityCheck();
295        b = b.unpack(meta, hbr);
296        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
297        assertEquals(algo == GZ ? 2173 : 4936,
298                     b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
299        // read data back from the hfile, exclude header and checksum
300        bb = b.getBufferWithoutHeader(); // read back data
301        in = new DataInputStream(new ByteArrayInputStream(
302                                 bb.array(), bb.arrayOffset(), bb.limit()));
303
304        // assert that we did not encounter hbase checksum verification failures
305        // but still used hdfs checksums and read data successfully.
306        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
307        validateData(in);
308      }
309    }
310  }
311
312  /**
313   * Test different values of bytesPerChecksum
314   */
315  @Test
316  public void testChecksumChunks() throws IOException {
317    testChecksumInternals(false);
318    testChecksumInternals(true);
319  }
320
321  protected void testChecksumInternals(boolean useTags) throws IOException {
322    Compression.Algorithm algo = NONE;
323    for (boolean pread : new boolean[] { false, true }) {
324      for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
325        Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" +
326                             algo + bytesPerChecksum);
327        FSDataOutputStream os = fs.create(path);
328        HFileContext meta = new HFileContextBuilder()
329                            .withCompression(algo)
330                            .withIncludesMvcc(true)
331                            .withIncludesTags(useTags)
332                            .withHBaseCheckSum(true)
333                            .withBytesPerCheckSum(bytesPerChecksum)
334                            .build();
335        HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null,
336           meta);
337
338        // write one block. The block has data
339        // that is at least 6 times more than the checksum chunk size
340        long dataSize = 0;
341        DataOutputStream dos = hbw.startWriting(BlockType.DATA);
342        for (; dataSize < 6 * bytesPerChecksum;) {
343          for (int i = 0; i < 1234; ++i) {
344            dos.writeInt(i);
345            dataSize += 4;
346          }
347        }
348        hbw.writeHeaderAndData(os);
349        long totalSize = hbw.getOnDiskSizeWithHeader();
350        os.close();
351
352        long expectedChunks = ChecksumUtil.numChunks(
353                               dataSize + HConstants.HFILEBLOCK_HEADER_SIZE,
354                               bytesPerChecksum);
355        LOG.info("testChecksumChunks: pread={}, bytesPerChecksum={}, fileSize={}, "
356                + "dataSize={}, expectedChunks={}, compression={}", pread, bytesPerChecksum,
357            totalSize, dataSize, expectedChunks, algo.toString());
358
359        // Verify hbase checksums.
360        assertEquals(true, hfs.useHBaseChecksum());
361
362        // Read data back from file.
363        FSDataInputStream is = fs.open(path);
364        FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
365        meta = new HFileContextBuilder()
366               .withCompression(algo)
367               .withIncludesMvcc(true)
368               .withIncludesTags(useTags)
369               .withHBaseCheckSum(true)
370               .withBytesPerCheckSum(bytesPerChecksum)
371               .build();
372        ReaderContext context = new ReaderContextBuilder()
373            .withInputStreamWrapper(new FSDataInputStreamWrapper(is, nochecksum))
374            .withFileSize(totalSize)
375            .withFileSystem(hfs)
376            .withFilePath(path)
377            .build();
378        HFileBlock.FSReader hbr =
379            new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP,
380              TEST_UTIL.getConfiguration());
381        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
382        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
383        is.close();
384        b.sanityCheck();
385        assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
386
387        // verify that we have the expected number of checksum chunks
388        assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize +
389                     expectedChunks * HFileBlock.CHECKSUM_SIZE);
390
391        // assert that we did not encounter hbase checksum verification failures
392        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
393      }
394    }
395  }
396
397  private void validateData(DataInputStream in) throws IOException {
398    // validate data
399    for (int i = 0; i < 1234; i++) {
400      int val = in.readInt();
401      assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val);
402    }
403  }
404
405  /**
406   * This class is to test checksum behavior when data is corrupted. It mimics the following
407   * behavior:
408   *  - When fs checksum is disabled, hbase may get corrupted data from hdfs. If verifyChecksum
409   *  is true, it means hbase checksum is on and fs checksum is off, so we corrupt the data.
410   *  - When fs checksum is enabled, hdfs will get a different copy from another node, and will
411   *    always return correct data. So we don't corrupt the data when verifyChecksum for hbase is
412   *    off.
413   */
414  static private class CorruptedFSReaderImpl extends HFileBlock.FSReaderImpl {
415    /**
416     * If set to true, corrupt reads using readAtOffset(...).
417     */
418    boolean corruptDataStream = false;
419
420    public CorruptedFSReaderImpl(ReaderContext context, HFileContext meta, Configuration conf)
421        throws IOException {
422      super(context, meta, ByteBuffAllocator.HEAP, conf);
423    }
424
425    @Override
426    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
427        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
428        boolean useHeap) throws IOException {
429      if (verifyChecksum) {
430        corruptDataStream = true;
431      }
432      HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
433        verifyChecksum, updateMetrics, useHeap);
434      corruptDataStream = false;
435      return b;
436    }
437
438
439    @Override
440    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
441        boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
442      int destOffset = dest.position();
443      boolean returnValue =
444          super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread);
445      if (!corruptDataStream) {
446        return returnValue;
447      }
448      // Corrupt 3rd character of block magic of next block's header.
449      if (peekIntoNextBlock) {
450        dest.put(destOffset + size + 3, (byte) 0b00000000);
451      }
452      // We might be reading this block's header too, corrupt it.
453      dest.put(destOffset + 1, (byte) 0b00000000);
454      // Corrupt non header data
455      if (size > hdrSize) {
456        dest.put(destOffset + hdrSize + 1, (byte) 0b00000000);
457      }
458      return returnValue;
459    }
460  }
461}