001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.ByteArrayInputStream;
027import java.io.DataInputStream;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.nio.BufferUnderflowException;
031import java.nio.ByteBuffer;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.fs.HFileSystem;
040import org.apache.hadoop.hbase.io.ByteBuffAllocator;
041import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
042import org.apache.hadoop.hbase.io.compress.Compression;
043import org.apache.hadoop.hbase.nio.ByteBuff;
044import org.apache.hadoop.hbase.nio.MultiByteBuff;
045import org.apache.hadoop.hbase.nio.SingleByteBuff;
046import org.apache.hadoop.hbase.testclassification.IOTests;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.apache.hadoop.hbase.util.ChecksumType;
049import org.junit.jupiter.api.BeforeEach;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055@Tag(IOTests.TAG)
056@Tag(SmallTests.TAG)
057public class TestChecksum {
058
059  private static final Logger LOG = LoggerFactory.getLogger(TestChecksum.class);
060
061  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
062
063  static final int[] BYTES_PER_CHECKSUM = { 50, 500, 688, 16 * 1024, (16 * 1024 + 980), 64 * 1024 };
064
065  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
066  private FileSystem fs;
067  private HFileSystem hfs;
068
069  @BeforeEach
070  public void setUp() throws Exception {
071    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
072    hfs = (HFileSystem) fs;
073  }
074
075  @Test
076  public void testNewBlocksHaveDefaultChecksum() throws IOException {
077    Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
078    FSDataOutputStream os = fs.create(path);
079    HFileContext meta = new HFileContextBuilder().build();
080    HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
081    DataOutputStream dos = hbw.startWriting(BlockType.DATA);
082    for (int i = 0; i < 1000; ++i)
083      dos.writeInt(i);
084    hbw.writeHeaderAndData(os);
085    int totalSize = hbw.getOnDiskSizeWithHeader();
086    os.close();
087
088    // Use hbase checksums.
089    assertEquals(true, hfs.useHBaseChecksum());
090
091    FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
092    meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
093    ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is)
094      .withFileSize(totalSize).withFileSystem((HFileSystem) fs).withFilePath(path).build();
095    HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP,
096      TEST_UTIL.getConfiguration());
097    HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
098    assertTrue(!b.isSharedMem());
099    assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
100  }
101
102  private void verifyMBBCheckSum(ByteBuff buf) throws IOException {
103    int size = buf.remaining() / 2 + 1;
104    ByteBuff mbb = new MultiByteBuff(ByteBuffer.allocate(size), ByteBuffer.allocate(size))
105      .position(0).limit(buf.remaining());
106    for (int i = buf.position(); i < buf.limit(); i++) {
107      mbb.put(buf.get(i));
108    }
109    mbb.position(0).limit(buf.remaining());
110    assertEquals(mbb.remaining(), buf.remaining());
111    assertTrue(mbb.remaining() > size);
112    ChecksumUtil.validateChecksum(mbb, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM);
113  }
114
115  private void verifySBBCheckSum(ByteBuff buf) throws IOException {
116    ChecksumUtil.validateChecksum(buf, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM);
117  }
118
119  @Test
120  public void testVerifyCheckSum() throws IOException {
121    int intCount = 10000;
122    for (ChecksumType ckt : ChecksumType.values()) {
123      Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName());
124      FSDataOutputStream os = fs.create(path);
125      HFileContext meta = new HFileContextBuilder().withChecksumType(ckt).build();
126      HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
127      DataOutputStream dos = hbw.startWriting(BlockType.DATA);
128      for (int i = 0; i < intCount; ++i) {
129        dos.writeInt(i);
130      }
131      hbw.writeHeaderAndData(os);
132      int totalSize = hbw.getOnDiskSizeWithHeader();
133      os.close();
134
135      // Use hbase checksums.
136      assertEquals(true, hfs.useHBaseChecksum());
137
138      FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
139      meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
140      ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is)
141        .withFileSize(totalSize).withFileSystem((HFileSystem) fs).withFilePath(path).build();
142      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP,
143        TEST_UTIL.getConfiguration());
144      HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
145      assertTrue(!b.isSharedMem());
146
147      ByteBuff bufferWithChecksum = getBufferWithChecksum(b);
148
149      // verify SingleByteBuff checksum.
150      verifySBBCheckSum(bufferWithChecksum);
151
152      // verify MultiByteBuff checksum.
153      verifyMBBCheckSum(bufferWithChecksum);
154
155      ByteBuff data = b.getBufferWithoutHeader();
156      for (int i = 0; i < intCount; i++) {
157        assertEquals(i, data.getInt());
158      }
159      try {
160        data.getInt();
161        fail();
162      } catch (BufferUnderflowException e) {
163        // expected failure
164      }
165      assertEquals(0, HFile.getAndResetChecksumFailuresCount());
166    }
167  }
168
169  /**
170   * HFileBlock buffer does not include checksum because it is discarded after verifying upon
171   * reading from disk. We artificially add a checksum onto the buffer for use in testing that
172   * ChecksumUtil.validateChecksum works for SingleByteBuff and MultiByteBuff in
173   * {@link #verifySBBCheckSum(ByteBuff)} and {@link #verifyMBBCheckSum(ByteBuff)}
174   */
175  private ByteBuff getBufferWithChecksum(HFileBlock block) throws IOException {
176    ByteBuff buf = block.getBufferReadOnly();
177
178    int numBytes =
179      (int) ChecksumUtil.numBytes(buf.remaining(), block.getHFileContext().getBytesPerChecksum());
180    byte[] checksum = new byte[numBytes];
181    ChecksumUtil.generateChecksums(buf.array(), 0, buf.limit(), checksum, 0,
182      block.getHFileContext().getChecksumType(), block.getBytesPerChecksum());
183
184    ByteBuff bufWithChecksum = ByteBuffAllocator.HEAP.allocate(buf.limit() + numBytes);
185    bufWithChecksum.put(buf.array(), 0, buf.limit());
186    bufWithChecksum.put(checksum);
187
188    return bufWithChecksum.rewind();
189  }
190
191  /**
192   * Introduce checksum failures and check that we can still read the data
193   */
194  @Test
195  public void testChecksumCorruption() throws IOException {
196    testChecksumCorruptionInternals(false);
197    testChecksumCorruptionInternals(true);
198  }
199
200  protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
201    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
202      for (boolean pread : new boolean[] { false, true }) {
203        LOG.info("testChecksumCorruption: Compression algorithm: " + algo + ", pread=" + pread);
204        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo);
205        FSDataOutputStream os = fs.create(path);
206        HFileContext meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true)
207          .withIncludesTags(useTags).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
208        HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
209        long totalSize = 0;
210        for (int blockId = 0; blockId < 2; ++blockId) {
211          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
212          for (int i = 0; i < 1234; ++i)
213            dos.writeInt(i);
214          hbw.writeHeaderAndData(os);
215          totalSize += hbw.getOnDiskSizeWithHeader();
216        }
217        os.close();
218
219        // Use hbase checksums.
220        assertEquals(true, hfs.useHBaseChecksum());
221
222        // Do a read that purposely introduces checksum verification failures.
223        FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
224        meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true)
225          .withIncludesTags(useTags).withHBaseCheckSum(true).build();
226        ReaderContext context = new ReaderContextBuilder().withInputStreamWrapper(is)
227          .withFileSize(totalSize).withFileSystem(fs).withFilePath(path).build();
228        HFileBlock.FSReader hbr =
229          new CorruptedFSReaderImpl(context, meta, TEST_UTIL.getConfiguration());
230        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
231        b.sanityCheck();
232        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
233        assertEquals(algo == GZ ? 2173 : 4936,
234          b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
235        // read data back from the hfile, exclude header and checksum
236        ByteBuff bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data
237        DataInputStream in =
238          new DataInputStream(new ByteArrayInputStream(bb.array(), bb.arrayOffset(), bb.limit()));
239
240        // assert that we encountered hbase checksum verification failures
241        // but still used hdfs checksums and read data successfully.
242        assertEquals(1, HFile.getAndResetChecksumFailuresCount());
243        validateData(in);
244
245        // A single instance of hbase checksum failure causes the reader to
246        // switch off hbase checksum verification for the next 100 read
247        // requests. Verify that this is correct.
248        for (int i = 0; i < HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
249          b = hbr.readBlockData(0, -1, pread, false, true);
250          assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
251          assertEquals(0, HFile.getAndResetChecksumFailuresCount());
252        }
253        // The next read should have hbase checksum verification reanabled,
254        // we verify this by assertng that there was a hbase-checksum failure.
255        b = hbr.readBlockData(0, -1, pread, false, true);
256        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
257        assertEquals(1, HFile.getAndResetChecksumFailuresCount());
258
259        // Since the above encountered a checksum failure, we switch
260        // back to not checking hbase checksums.
261        b = hbr.readBlockData(0, -1, pread, false, true);
262        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
263        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
264        is.close();
265
266        // Now, use a completely new reader. Switch off hbase checksums in
267        // the configuration. In this case, we should not detect
268        // any retries within hbase.
269        Configuration conf = TEST_UTIL.getConfiguration();
270        HFileSystem newfs = new HFileSystem(conf, false);
271        assertEquals(false, newfs.useHBaseChecksum());
272        is = new FSDataInputStreamWrapper(newfs, path);
273        context = new ReaderContextBuilder().withInputStreamWrapper(is).withFileSize(totalSize)
274          .withFileSystem(newfs).withFilePath(path).build();
275        hbr = new CorruptedFSReaderImpl(context, meta, conf);
276        b = hbr.readBlockData(0, -1, pread, false, true);
277        is.close();
278        b.sanityCheck();
279        b = b.unpack(meta, hbr);
280        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
281        assertEquals(algo == GZ ? 2173 : 4936,
282          b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
283        // read data back from the hfile, exclude header and checksum
284        bb = b.getBufferWithoutHeader(); // read back data
285        in =
286          new DataInputStream(new ByteArrayInputStream(bb.array(), bb.arrayOffset(), bb.limit()));
287
288        // assert that we did not encounter hbase checksum verification failures
289        // but still used hdfs checksums and read data successfully.
290        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
291        validateData(in);
292      }
293    }
294  }
295
296  /**
297   * Test different values of bytesPerChecksum
298   */
299  @Test
300  public void testChecksumChunks() throws IOException {
301    testChecksumInternals(false);
302    testChecksumInternals(true);
303  }
304
305  protected void testChecksumInternals(boolean useTags) throws IOException {
306    Compression.Algorithm algo = NONE;
307    for (boolean pread : new boolean[] { false, true }) {
308      for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
309        Path path =
310          new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" + algo + bytesPerChecksum);
311        FSDataOutputStream os = fs.create(path);
312        HFileContext meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true)
313          .withIncludesTags(useTags).withHBaseCheckSum(true).withBytesPerCheckSum(bytesPerChecksum)
314          .build();
315        HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
316
317        // write one block. The block has data
318        // that is at least 6 times more than the checksum chunk size
319        long dataSize = 0;
320        DataOutputStream dos = hbw.startWriting(BlockType.DATA);
321        for (; dataSize < 6 * bytesPerChecksum;) {
322          for (int i = 0; i < 1234; ++i) {
323            dos.writeInt(i);
324            dataSize += 4;
325          }
326        }
327        hbw.writeHeaderAndData(os);
328        long totalSize = hbw.getOnDiskSizeWithHeader();
329        os.close();
330
331        long expectedChunks =
332          ChecksumUtil.numChunks(dataSize + HConstants.HFILEBLOCK_HEADER_SIZE, bytesPerChecksum);
333        LOG.info(
334          "testChecksumChunks: pread={}, bytesPerChecksum={}, fileSize={}, "
335            + "dataSize={}, expectedChunks={}, compression={}",
336          pread, bytesPerChecksum, totalSize, dataSize, expectedChunks, algo.toString());
337
338        // Verify hbase checksums.
339        assertEquals(true, hfs.useHBaseChecksum());
340
341        // Read data back from file.
342        FSDataInputStream is = fs.open(path);
343        FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
344        meta = new HFileContextBuilder().withCompression(algo).withIncludesMvcc(true)
345          .withIncludesTags(useTags).withHBaseCheckSum(true).withBytesPerCheckSum(bytesPerChecksum)
346          .build();
347        ReaderContext context = new ReaderContextBuilder()
348          .withInputStreamWrapper(new FSDataInputStreamWrapper(is, nochecksum))
349          .withFileSize(totalSize).withFileSystem(hfs).withFilePath(path).build();
350        HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP,
351          TEST_UTIL.getConfiguration());
352        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
353        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
354        is.close();
355        b.sanityCheck();
356        assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
357
358        // verify that we have the expected number of checksum chunks
359        assertEquals(totalSize,
360          HConstants.HFILEBLOCK_HEADER_SIZE + dataSize + expectedChunks * HFileBlock.CHECKSUM_SIZE);
361
362        // assert that we did not encounter hbase checksum verification failures
363        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
364      }
365    }
366  }
367
368  private void validateData(DataInputStream in) throws IOException {
369    // validate data
370    for (int i = 0; i < 1234; i++) {
371      int val = in.readInt();
372      assertEquals(i, val, "testChecksumCorruption: data mismatch at index " + i);
373    }
374  }
375
376  /**
377   * This class is to test checksum behavior when data is corrupted. It mimics the following
378   * behavior: - When fs checksum is disabled, hbase may get corrupted data from hdfs. If
379   * verifyChecksum is true, it means hbase checksum is on and fs checksum is off, so we corrupt the
380   * data. - When fs checksum is enabled, hdfs will get a different copy from another node, and will
381   * always return correct data. So we don't corrupt the data when verifyChecksum for hbase is off.
382   */
383  static private class CorruptedFSReaderImpl extends HFileBlock.FSReaderImpl {
384    /**
385     * If set to true, corrupt reads using readAtOffset(...).
386     */
387    boolean corruptDataStream = false;
388
389    public CorruptedFSReaderImpl(ReaderContext context, HFileContext meta, Configuration conf)
390      throws IOException {
391      super(context, meta, ByteBuffAllocator.HEAP, conf);
392    }
393
394    @Override
395    protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
396      long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
397      boolean useHeap) throws IOException {
398      if (verifyChecksum) {
399        corruptDataStream = true;
400      }
401      HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
402        verifyChecksum, updateMetrics, useHeap);
403      corruptDataStream = false;
404      return b;
405    }
406
407    @Override
408    protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size,
409      boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
410      int destOffset = dest.position();
411      boolean returnValue =
412        super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread);
413      if (!corruptDataStream) {
414        return returnValue;
415      }
416      // Corrupt 3rd character of block magic of next block's header.
417      if (peekIntoNextBlock) {
418        dest.put(destOffset + size + 3, (byte) 0b00000000);
419      }
420      // We might be reading this block's header too, corrupt it.
421      dest.put(destOffset + 1, (byte) 0b00000000);
422      // Corrupt non header data
423      if (size > hdrSize) {
424        dest.put(destOffset + hdrSize + 1, (byte) 0b00000000);
425      }
426      return returnValue;
427    }
428  }
429}