001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.allOf;
022import static org.hamcrest.Matchers.hasProperty;
023import static org.hamcrest.Matchers.instanceOf;
024import static org.hamcrest.Matchers.startsWith;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertNotEquals;
027import static org.junit.jupiter.api.Assertions.assertTrue;
028import static org.junit.jupiter.api.Assertions.fail;
029
030import java.io.ByteArrayOutputStream;
031import java.io.Closeable;
032import java.io.IOException;
033import java.io.PrintStream;
034import java.nio.ByteBuffer;
035import java.nio.channels.SeekableByteChannel;
036import java.nio.charset.StandardCharsets;
037import java.nio.file.FileSystems;
038import java.nio.file.Files;
039import java.nio.file.StandardOpenOption;
040import java.time.Instant;
041import java.util.LinkedList;
042import java.util.List;
043import java.util.NoSuchElementException;
044import java.util.Random;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellBuilderType;
048import org.apache.hadoop.hbase.ExtendedCell;
049import org.apache.hadoop.hbase.ExtendedCellBuilder;
050import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
051import org.apache.hadoop.hbase.HBaseTestingUtil;
052import org.apache.hadoop.hbase.fs.HFileSystem;
053import org.apache.hadoop.hbase.nio.ByteBuff;
054import org.apache.hadoop.hbase.testclassification.IOTests;
055import org.apache.hadoop.hbase.testclassification.SmallTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.ChecksumType;
058import org.hamcrest.Description;
059import org.hamcrest.Matcher;
060import org.hamcrest.TypeSafeMatcher;
061import org.junit.jupiter.api.BeforeAll;
062import org.junit.jupiter.api.BeforeEach;
063import org.junit.jupiter.api.Tag;
064import org.junit.jupiter.api.Test;
065import org.junit.jupiter.api.TestInfo;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * This test provides coverage for HFileHeader block fields that are read and interpreted before
071 * HBase checksum validation can be applied. As of now, this is just
072 * {@code onDiskSizeWithoutHeader}.
073 */
074@Tag(IOTests.TAG)
075@Tag(SmallTests.TAG)
076public class TestHFileBlockHeaderCorruption {
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class);
079
080  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
081  private static HFileSystem HFS;
082  private HFileContext hfileCtx;
083  private Path path;
084
085  @BeforeAll
086  public static void setUpBeforeAll() throws IOException {
087    HFS = (HFileSystem) HFileSystem.get(UTIL.getConfiguration());
088  }
089
090  @BeforeEach
091  public void setUp(TestInfo testInfo) throws IOException {
092    path = new Path(UTIL.getDataTestDirOnTestFS(), testInfo.getTestMethod().get().getName());
093    hfileCtx = new HFileContextBuilder().withBlockSize(4 * 1024).withHBaseCheckSum(true).build();
094    HFile.WriterFactory factory =
095      HFile.getWriterFactory(UTIL.getConfiguration(), CacheConfig.DISABLED).withPath(HFS, path)
096        .withFileContext(hfileCtx);
097
098    ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
099    Random rand = new Random(Instant.now().toEpochMilli());
100    byte[] family = Bytes.toBytes("f");
101    try (HFile.Writer writer = factory.create()) {
102      for (int i = 0; i < 40; i++) {
103        byte[] row = RandomKeyValueUtil.randomOrderedFixedLengthKey(rand, i, 100);
104        byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand);
105        byte[] value = RandomKeyValueUtil.randomValue(rand);
106        ExtendedCell cell = cellBuilder.setType(Cell.Type.Put).setRow(row).setFamily(family)
107          .setQualifier(qualifier).setValue(value).build();
108        writer.append(cell);
109        cellBuilder.clear();
110      }
111    }
112  }
113
114  @Test
115  public void testChecksumTypeCorruptionFirstBlock() throws Exception {
116    HFileBlockChannelPosition firstBlock = null;
117    try {
118      try (
119        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
120        assertTrue(it.hasNext());
121        firstBlock = it.next();
122      }
123
124      Corrupter c = new Corrupter(firstBlock);
125
126      logHeader(firstBlock);
127
128      // test corrupted HFileBlock with unknown checksumType code -1
129      c.write(HFileBlock.Header.CHECKSUM_TYPE_INDEX, ByteBuffer.wrap(new byte[] { -1 }));
130      logHeader(firstBlock);
131      try (
132        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
133        CountingConsumer consumer = new CountingConsumer(it);
134        try {
135          consumer.readFully();
136          fail();
137        } catch (Exception e) {
138          assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class)
139            .withMessage(startsWith("Unknown checksum type code")));
140        }
141        assertEquals(0, consumer.getItemsRead());
142      }
143
144      // valid checksumType code test
145      for (ChecksumType t : ChecksumType.values()) {
146        testValidChecksumTypeReadBlock(t.getCode(), c, firstBlock);
147      }
148
149      c.restore();
150      // test corrupted HFileBlock with unknown checksumType code 3
151      c.write(HFileBlock.Header.CHECKSUM_TYPE_INDEX, ByteBuffer.wrap(new byte[] { 3 }));
152      logHeader(firstBlock);
153      try (
154        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
155        CountingConsumer consumer = new CountingConsumer(it);
156        try {
157          consumer.readFully();
158          fail();
159        } catch (Exception e) {
160          assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class)
161            .withMessage(startsWith("Unknown checksum type code")));
162        }
163        assertEquals(0, consumer.getItemsRead());
164      }
165    } finally {
166      if (firstBlock != null) {
167        firstBlock.close();
168      }
169    }
170  }
171
172  @Test
173  public void testChecksumTypeCorruptionSecondBlock() throws Exception {
174    HFileBlockChannelPosition secondBlock = null;
175    try {
176      try (
177        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
178        assertTrue(it.hasNext());
179        it.next();
180        assertTrue(it.hasNext());
181        secondBlock = it.next();
182      }
183
184      Corrupter c = new Corrupter(secondBlock);
185
186      logHeader(secondBlock);
187      // test corrupted HFileBlock with unknown checksumType code -1
188      c.write(HFileBlock.Header.CHECKSUM_TYPE_INDEX, ByteBuffer.wrap(new byte[] { -1 }));
189      logHeader(secondBlock);
190      try (
191        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
192        CountingConsumer consumer = new CountingConsumer(it);
193        try {
194          consumer.readFully();
195          fail();
196        } catch (Exception e) {
197          assertThat(e, new IsThrowableMatching().withInstanceOf(RuntimeException.class)
198            .withMessage(startsWith("Unknown checksum type code")));
199        }
200        assertEquals(1, consumer.getItemsRead());
201      }
202
203      // valid checksumType code test
204      for (ChecksumType t : ChecksumType.values()) {
205        testValidChecksumTypeReadBlock(t.getCode(), c, secondBlock);
206      }
207
208      c.restore();
209      // test corrupted HFileBlock with unknown checksumType code 3
210      c.write(HFileBlock.Header.CHECKSUM_TYPE_INDEX, ByteBuffer.wrap(new byte[] { 3 }));
211      logHeader(secondBlock);
212      try (
213        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
214        CountingConsumer consumer = new CountingConsumer(it);
215        try {
216          consumer.readFully();
217          fail();
218        } catch (Exception e) {
219          assertThat(e, new IsThrowableMatching().withInstanceOf(RuntimeException.class)
220            .withMessage(startsWith("Unknown checksum type code")));
221        }
222        assertEquals(1, consumer.getItemsRead());
223      }
224    } finally {
225      if (secondBlock != null) {
226        secondBlock.close();
227      }
228    }
229  }
230
231  public void testValidChecksumTypeReadBlock(byte checksumTypeCode, Corrupter c,
232    HFileBlockChannelPosition testBlock) throws IOException {
233    c.restore();
234    c.write(HFileBlock.Header.CHECKSUM_TYPE_INDEX,
235      ByteBuffer.wrap(new byte[] { checksumTypeCode }));
236    logHeader(testBlock);
237    try (HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
238      CountingConsumer consumer = new CountingConsumer(it);
239      try {
240        consumer.readFully();
241      } catch (Exception e) {
242        fail("test fail: valid checksumType are not executing properly");
243      }
244      assertNotEquals(0, consumer.getItemsRead());
245    }
246  }
247
248  @Test
249  public void testOnDiskSizeWithoutHeaderCorruptionFirstBlock() throws Exception {
250    HFileBlockChannelPosition firstBlock = null;
251    try {
252      try (
253        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
254        assertTrue(it.hasNext());
255        firstBlock = it.next();
256      }
257
258      Corrupter c = new Corrupter(firstBlock);
259
260      logHeader(firstBlock);
261      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
262        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
263      logHeader(firstBlock);
264      try (
265        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
266        CountingConsumer consumer = new CountingConsumer(it);
267        try {
268          consumer.readFully();
269          fail();
270        } catch (Exception e) {
271          assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class)
272            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
273        }
274        assertEquals(0, consumer.getItemsRead());
275      }
276
277      c.restore();
278      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
279        ByteBuffer.wrap(Bytes.toBytes(0)));
280      logHeader(firstBlock);
281      try (
282        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
283        CountingConsumer consumer = new CountingConsumer(it);
284        try {
285          consumer.readFully();
286          fail();
287        } catch (Exception e) {
288          assertThat(e, new IsThrowableMatching().withInstanceOf(IllegalArgumentException.class));
289        }
290        assertEquals(0, consumer.getItemsRead());
291      }
292
293      c.restore();
294      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
295        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
296      logHeader(firstBlock);
297      try (
298        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
299        CountingConsumer consumer = new CountingConsumer(it);
300        try {
301          consumer.readFully();
302          fail();
303        } catch (Exception e) {
304          assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class)
305            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
306        }
307        assertEquals(0, consumer.getItemsRead());
308      }
309    } finally {
310      if (firstBlock != null) {
311        firstBlock.close();
312      }
313    }
314  }
315
316  @Test
317  public void testOnDiskSizeWithoutHeaderCorruptionSecondBlock() throws Exception {
318    HFileBlockChannelPosition secondBlock = null;
319    try {
320      try (
321        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
322        assertTrue(it.hasNext());
323        it.next();
324        assertTrue(it.hasNext());
325        secondBlock = it.next();
326      }
327
328      Corrupter c = new Corrupter(secondBlock);
329
330      logHeader(secondBlock);
331      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
332        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
333      logHeader(secondBlock);
334      try (
335        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
336        CountingConsumer consumer = new CountingConsumer(it);
337        try {
338          consumer.readFully();
339          fail();
340        } catch (Exception e) {
341          assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class)
342            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
343        }
344        assertEquals(1, consumer.getItemsRead());
345      }
346
347      c.restore();
348      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
349        ByteBuffer.wrap(Bytes.toBytes(0)));
350      logHeader(secondBlock);
351      try (
352        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
353        CountingConsumer consumer = new CountingConsumer(it);
354        try {
355          consumer.readFully();
356          fail();
357        } catch (Exception e) {
358          assertThat(e, new IsThrowableMatching().withInstanceOf(IllegalArgumentException.class));
359        }
360        assertEquals(1, consumer.getItemsRead());
361      }
362
363      c.restore();
364      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
365        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
366      logHeader(secondBlock);
367      try (
368        HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) {
369        CountingConsumer consumer = new CountingConsumer(it);
370        try {
371          consumer.readFully();
372          fail();
373        } catch (Exception e) {
374          assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class)
375            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
376        }
377        assertEquals(1, consumer.getItemsRead());
378      }
379    } finally {
380      if (secondBlock != null) {
381        secondBlock.close();
382      }
383    }
384  }
385
386  private static void logHeader(HFileBlockChannelPosition hbcp) throws IOException {
387    ByteBuff buf = ByteBuff.wrap(ByteBuffer.allocate(HFileBlock.headerSize(true)));
388    hbcp.rewind();
389    assertEquals(buf.capacity(), buf.read(hbcp.getChannel()));
390    buf.rewind();
391    hbcp.rewind();
392    logHeader(buf);
393  }
394
395  private static void logHeader(ByteBuff buf) {
396    byte[] blockMagic = new byte[8];
397    buf.get(blockMagic);
398    int onDiskSizeWithoutHeader = buf.getInt();
399    int uncompressedSizeWithoutHeader = buf.getInt();
400    long prevBlockOffset = buf.getLong();
401    byte checksumType = buf.get();
402    int bytesPerChecksum = buf.getInt();
403    int onDiskDataSizeWithHeader = buf.getInt();
404    LOG.debug(
405      "blockMagic={}, onDiskSizeWithoutHeader={}, uncompressedSizeWithoutHeader={}, "
406        + "prevBlockOffset={}, checksumType={}, bytesPerChecksum={}, onDiskDataSizeWithHeader={}",
407      Bytes.toStringBinary(blockMagic), onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
408      prevBlockOffset, checksumType, bytesPerChecksum, onDiskDataSizeWithHeader);
409  }
410
411  /**
412   * Data class to enabled messing with the bytes behind an {@link HFileBlock}.
413   */
414  public static class HFileBlockChannelPosition implements Closeable {
415    private final SeekableByteChannel channel;
416    private final long position;
417
418    public HFileBlockChannelPosition(SeekableByteChannel channel, long position) {
419      this.channel = channel;
420      this.position = position;
421    }
422
423    public SeekableByteChannel getChannel() {
424      return channel;
425    }
426
427    public long getPosition() {
428      return position;
429    }
430
431    public void rewind() throws IOException {
432      channel.position(position);
433    }
434
435    @Override
436    public void close() throws IOException {
437      channel.close();
438    }
439  }
440
441  /**
442   * Reads blocks off of an {@link HFileBlockChannelPositionIterator}, counting them as it does.
443   */
444  public static class CountingConsumer {
445    private final HFileBlockChannelPositionIterator iterator;
446    private int itemsRead = 0;
447
448    public CountingConsumer(HFileBlockChannelPositionIterator iterator) {
449      this.iterator = iterator;
450    }
451
452    public int getItemsRead() {
453      return itemsRead;
454    }
455
456    public Object readFully() throws IOException {
457      Object val = null;
458      for (itemsRead = 0; iterator.hasNext(); itemsRead++) {
459        val = iterator.next();
460      }
461      return val;
462    }
463  }
464
465  /**
466   * A simplified wrapper over an {@link HFileBlock.BlockIterator} that looks a lot like an
467   * {@link java.util.Iterator}.
468   */
469  public static class HFileBlockChannelPositionIterator implements Closeable {
470
471    private final Path hfsPath;
472    private final HFile.Reader reader;
473    private final HFileBlock.BlockIterator iter;
474    private HFileBlockChannelPosition current = null;
475
476    public HFileBlockChannelPositionIterator(HFileSystem hfs, Path hfsPath) throws IOException {
477      this.hfsPath = hfsPath;
478      this.reader = HFile.createReader(hfs, hfsPath, CacheConfig.DISABLED, true, hfs.getConf());
479      HFileBlock.FSReader fsreader = reader.getUncachedBlockReader();
480      // The read block offset cannot out of the range:0,loadOnOpenDataOffset
481      this.iter = fsreader.blockRange(0, reader.getTrailer().getLoadOnOpenDataOffset());
482    }
483
484    public boolean hasNext() throws IOException {
485      HFileBlock next = iter.nextBlock();
486      if (next != null) {
487        java.nio.file.Path p = FileSystems.getDefault().getPath(hfsPath.toString());
488        SeekableByteChannel channel = Files.newByteChannel(p, StandardOpenOption.READ,
489          StandardOpenOption.WRITE, StandardOpenOption.DSYNC);
490        current = new HFileBlockChannelPosition(channel, next.getOffset());
491      }
492      return next != null;
493    }
494
495    public HFileBlockChannelPosition next() {
496      if (current == null) {
497        throw new NoSuchElementException();
498      }
499      HFileBlockChannelPosition ret = current;
500      current = null;
501      return ret;
502    }
503
504    @Override
505    public void close() throws IOException {
506      if (current != null) {
507        closeQuietly(current::close);
508      }
509      closeQuietly(reader::close);
510    }
511
512    @FunctionalInterface
513    private interface CloseMethod {
514      void run() throws IOException;
515    }
516
517    private static void closeQuietly(CloseMethod closeMethod) {
518      try {
519        closeMethod.run();
520      } catch (Throwable e) {
521        LOG.debug("Ignoring thrown exception.", e);
522      }
523    }
524  }
525
526  /**
527   * Enables writing and rewriting portions of the file backing an {@link HFileBlock}.
528   */
529  public static class Corrupter {
530
531    private final HFileBlockChannelPosition channelAndPosition;
532    private final ByteBuffer originalHeader;
533
534    public Corrupter(HFileBlockChannelPosition channelAndPosition) throws IOException {
535      this.channelAndPosition = channelAndPosition;
536      this.originalHeader = readHeaderData(channelAndPosition);
537    }
538
539    private static ByteBuffer readHeaderData(HFileBlockChannelPosition channelAndPosition)
540      throws IOException {
541      SeekableByteChannel channel = channelAndPosition.getChannel();
542      ByteBuffer originalHeader = ByteBuffer.allocate(HFileBlock.headerSize(true));
543      channelAndPosition.rewind();
544      channel.read(originalHeader);
545      return originalHeader;
546    }
547
548    public void write(int offset, ByteBuffer src) throws IOException {
549      SeekableByteChannel channel = channelAndPosition.getChannel();
550      long position = channelAndPosition.getPosition();
551      channel.position(position + offset);
552      channel.write(src);
553    }
554
555    public void restore() throws IOException {
556      SeekableByteChannel channel = channelAndPosition.getChannel();
557      originalHeader.rewind();
558      channelAndPosition.rewind();
559      assertEquals(originalHeader.capacity(), channel.write(originalHeader));
560    }
561  }
562
563  /**
564   * A Matcher implementation that can make basic assertions over a provided {@link Throwable}.
565   * Assertion failures include the full stacktrace in their description.
566   */
567  private static final class IsThrowableMatching extends TypeSafeMatcher<Throwable> {
568
569    private final List<Matcher<? super Throwable>> requirements = new LinkedList<>();
570
571    public IsThrowableMatching withInstanceOf(Class<?> type) {
572      requirements.add(instanceOf(type));
573      return this;
574    }
575
576    public IsThrowableMatching withMessage(Matcher<String> matcher) {
577      requirements.add(hasProperty("message", matcher));
578      return this;
579    }
580
581    @Override
582    protected boolean matchesSafely(Throwable throwable) {
583      return allOf(requirements).matches(throwable);
584    }
585
586    @Override
587    protected void describeMismatchSafely(Throwable item, Description mismatchDescription) {
588      allOf(requirements).describeMismatch(item, mismatchDescription);
589      // would be nice if `item` could be provided as the cause of the AssertionError instead.
590      mismatchDescription.appendText(String.format("%nProvided: "));
591      try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
592        try (PrintStream ps = new PrintStream(baos, false, StandardCharsets.UTF_8.name())) {
593          item.printStackTrace(ps);
594          ps.flush();
595        }
596        mismatchDescription.appendText(baos.toString(StandardCharsets.UTF_8.name()));
597      } catch (Exception e) {
598        throw new RuntimeException(e);
599      }
600    }
601
602    @Override
603    public void describeTo(Description description) {
604      description.appendDescriptionOf(allOf(requirements));
605    }
606  }
607}