001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.allOf;
022import static org.hamcrest.Matchers.hasProperty;
023import static org.hamcrest.Matchers.instanceOf;
024import static org.hamcrest.Matchers.startsWith;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.ByteArrayOutputStream;
030import java.io.Closeable;
031import java.io.IOException;
032import java.io.PrintStream;
033import java.nio.ByteBuffer;
034import java.nio.channels.SeekableByteChannel;
035import java.nio.charset.StandardCharsets;
036import java.nio.file.FileSystems;
037import java.nio.file.Files;
038import java.nio.file.StandardOpenOption;
039import java.time.Instant;
040import java.util.LinkedList;
041import java.util.List;
042import java.util.NoSuchElementException;
043import java.util.Random;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellBuilder;
048import org.apache.hadoop.hbase.CellBuilderFactory;
049import org.apache.hadoop.hbase.CellBuilderType;
050import org.apache.hadoop.hbase.HBaseClassTestRule;
051import org.apache.hadoop.hbase.HBaseTestingUtil;
052import org.apache.hadoop.hbase.fs.HFileSystem;
053import org.apache.hadoop.hbase.nio.ByteBuff;
054import org.apache.hadoop.hbase.testclassification.IOTests;
055import org.apache.hadoop.hbase.testclassification.SmallTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.hamcrest.Description;
058import org.hamcrest.Matcher;
059import org.hamcrest.TypeSafeMatcher;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.ExternalResource;
065import org.junit.rules.RuleChain;
066import org.junit.rules.TestName;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070/**
071 * This test provides coverage for HFileHeader block fields that are read and interpreted before
072 * HBase checksum validation can be applied. As of now, this is just
073 * {@code onDiskSizeWithoutHeader}.
074 */
075@Category({ IOTests.class, SmallTests.class })
076public class TestHFileBlockHeaderCorruption {
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class);
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082    HBaseClassTestRule.forClass(TestHFileBlockHeaderCorruption.class);
083
084  private final HFileTestRule hFileTestRule;
085
086  @Rule
087  public final RuleChain ruleChain;
088
089  public TestHFileBlockHeaderCorruption() throws IOException {
090    TestName testName = new TestName();
091    hFileTestRule = new HFileTestRule(new HBaseTestingUtil(), testName);
092    ruleChain = RuleChain.outerRule(testName).around(hFileTestRule);
093  }
094
095  @Test
096  public void testOnDiskSizeWithoutHeaderCorruptionFirstBlock() throws Exception {
097    HFileBlockChannelPosition firstBlock = null;
098    try {
099      try (HFileBlockChannelPositionIterator it =
100        new HFileBlockChannelPositionIterator(hFileTestRule)) {
101        assertTrue(it.hasNext());
102        firstBlock = it.next();
103      }
104
105      Corrupter c = new Corrupter(firstBlock);
106
107      logHeader(firstBlock);
108      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
109        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
110      logHeader(firstBlock);
111      try (HFileBlockChannelPositionIterator it =
112        new HFileBlockChannelPositionIterator(hFileTestRule)) {
113        CountingConsumer consumer = new CountingConsumer(it);
114        try {
115          consumer.readFully();
116          fail();
117        } catch (Exception e) {
118          assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class)
119            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
120        }
121        assertEquals(0, consumer.getItemsRead());
122      }
123
124      c.restore();
125      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
126        ByteBuffer.wrap(Bytes.toBytes(0)));
127      logHeader(firstBlock);
128      try (HFileBlockChannelPositionIterator it =
129        new HFileBlockChannelPositionIterator(hFileTestRule)) {
130        CountingConsumer consumer = new CountingConsumer(it);
131        try {
132          consumer.readFully();
133          fail();
134        } catch (Exception e) {
135          assertThat(e, new IsThrowableMatching().withInstanceOf(IllegalArgumentException.class));
136        }
137        assertEquals(0, consumer.getItemsRead());
138      }
139
140      c.restore();
141      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
142        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
143      logHeader(firstBlock);
144      try (HFileBlockChannelPositionIterator it =
145        new HFileBlockChannelPositionIterator(hFileTestRule)) {
146        CountingConsumer consumer = new CountingConsumer(it);
147        try {
148          consumer.readFully();
149          fail();
150        } catch (Exception e) {
151          assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class)
152            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
153        }
154        assertEquals(0, consumer.getItemsRead());
155      }
156    } finally {
157      if (firstBlock != null) {
158        firstBlock.close();
159      }
160    }
161  }
162
163  @Test
164  public void testOnDiskSizeWithoutHeaderCorruptionSecondBlock() throws Exception {
165    HFileBlockChannelPosition secondBlock = null;
166    try {
167      try (HFileBlockChannelPositionIterator it =
168        new HFileBlockChannelPositionIterator(hFileTestRule)) {
169        assertTrue(it.hasNext());
170        it.next();
171        assertTrue(it.hasNext());
172        secondBlock = it.next();
173      }
174
175      Corrupter c = new Corrupter(secondBlock);
176
177      logHeader(secondBlock);
178      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
179        ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE)));
180      logHeader(secondBlock);
181      try (HFileBlockChannelPositionIterator it =
182        new HFileBlockChannelPositionIterator(hFileTestRule)) {
183        CountingConsumer consumer = new CountingConsumer(it);
184        try {
185          consumer.readFully();
186          fail();
187        } catch (Exception e) {
188          assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class)
189            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
190        }
191        assertEquals(1, consumer.getItemsRead());
192      }
193
194      c.restore();
195      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
196        ByteBuffer.wrap(Bytes.toBytes(0)));
197      logHeader(secondBlock);
198      try (HFileBlockChannelPositionIterator it =
199        new HFileBlockChannelPositionIterator(hFileTestRule)) {
200        CountingConsumer consumer = new CountingConsumer(it);
201        try {
202          consumer.readFully();
203          fail();
204        } catch (Exception e) {
205          assertThat(e, new IsThrowableMatching().withInstanceOf(IllegalArgumentException.class));
206        }
207        assertEquals(1, consumer.getItemsRead());
208      }
209
210      c.restore();
211      c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX,
212        ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE)));
213      logHeader(secondBlock);
214      try (HFileBlockChannelPositionIterator it =
215        new HFileBlockChannelPositionIterator(hFileTestRule)) {
216        CountingConsumer consumer = new CountingConsumer(it);
217        try {
218          consumer.readFully();
219          fail();
220        } catch (Exception e) {
221          assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class)
222            .withMessage(startsWith("Invalid onDiskSizeWithHeader=")));
223        }
224        assertEquals(1, consumer.getItemsRead());
225      }
226    } finally {
227      if (secondBlock != null) {
228        secondBlock.close();
229      }
230    }
231  }
232
233  private static void logHeader(HFileBlockChannelPosition hbcp) throws IOException {
234    ByteBuff buf = ByteBuff.wrap(ByteBuffer.allocate(HFileBlock.headerSize(true)));
235    hbcp.rewind();
236    assertEquals(buf.capacity(), buf.read(hbcp.getChannel()));
237    buf.rewind();
238    hbcp.rewind();
239    logHeader(buf);
240  }
241
242  private static void logHeader(ByteBuff buf) {
243    byte[] blockMagic = new byte[8];
244    buf.get(blockMagic);
245    int onDiskSizeWithoutHeader = buf.getInt();
246    int uncompressedSizeWithoutHeader = buf.getInt();
247    long prevBlockOffset = buf.getLong();
248    byte checksumType = buf.get();
249    int bytesPerChecksum = buf.getInt();
250    int onDiskDataSizeWithHeader = buf.getInt();
251    LOG.debug(
252      "blockMagic={}, onDiskSizeWithoutHeader={}, uncompressedSizeWithoutHeader={}, "
253        + "prevBlockOffset={}, checksumType={}, bytesPerChecksum={}, onDiskDataSizeWithHeader={}",
254      Bytes.toStringBinary(blockMagic), onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
255      prevBlockOffset, checksumType, bytesPerChecksum, onDiskDataSizeWithHeader);
256  }
257
258  /**
259   * Data class to enabled messing with the bytes behind an {@link HFileBlock}.
260   */
261  public static class HFileBlockChannelPosition implements Closeable {
262    private final SeekableByteChannel channel;
263    private final long position;
264
265    public HFileBlockChannelPosition(SeekableByteChannel channel, long position) {
266      this.channel = channel;
267      this.position = position;
268    }
269
270    public SeekableByteChannel getChannel() {
271      return channel;
272    }
273
274    public long getPosition() {
275      return position;
276    }
277
278    public void rewind() throws IOException {
279      channel.position(position);
280    }
281
282    @Override
283    public void close() throws IOException {
284      channel.close();
285    }
286  }
287
288  /**
289   * Reads blocks off of an {@link HFileBlockChannelPositionIterator}, counting them as it does.
290   */
291  public static class CountingConsumer {
292    private final HFileBlockChannelPositionIterator iterator;
293    private int itemsRead = 0;
294
295    public CountingConsumer(HFileBlockChannelPositionIterator iterator) {
296      this.iterator = iterator;
297    }
298
299    public int getItemsRead() {
300      return itemsRead;
301    }
302
303    public Object readFully() throws IOException {
304      Object val = null;
305      for (itemsRead = 0; iterator.hasNext(); itemsRead++) {
306        val = iterator.next();
307      }
308      return val;
309    }
310  }
311
312  /**
313   * A simplified wrapper over an {@link HFileBlock.BlockIterator} that looks a lot like an
314   * {@link java.util.Iterator}.
315   */
316  public static class HFileBlockChannelPositionIterator implements Closeable {
317
318    private final HFileTestRule hFileTestRule;
319    private final HFile.Reader reader;
320    private final HFileBlock.BlockIterator iter;
321    private HFileBlockChannelPosition current = null;
322
323    public HFileBlockChannelPositionIterator(HFileTestRule hFileTestRule) throws IOException {
324      Configuration conf = hFileTestRule.getConfiguration();
325      HFileSystem hfs = hFileTestRule.getHFileSystem();
326      Path hfsPath = hFileTestRule.getPath();
327
328      HFile.Reader reader = null;
329      HFileBlock.BlockIterator iter = null;
330      try {
331        reader = HFile.createReader(hfs, hfsPath, CacheConfig.DISABLED, true, conf);
332        HFileBlock.FSReader fsreader = reader.getUncachedBlockReader();
333        iter = fsreader.blockRange(0, hfs.getFileStatus(hfsPath).getLen());
334      } catch (IOException e) {
335        if (reader != null) {
336          closeQuietly(reader::close);
337        }
338        throw e;
339      }
340
341      this.hFileTestRule = hFileTestRule;
342      this.reader = reader;
343      this.iter = iter;
344    }
345
346    public boolean hasNext() throws IOException {
347      HFileBlock next = iter.nextBlock();
348      SeekableByteChannel channel = hFileTestRule.getRWChannel();
349      if (next != null) {
350        current = new HFileBlockChannelPosition(channel, next.getOffset());
351      }
352      return next != null;
353    }
354
355    public HFileBlockChannelPosition next() {
356      if (current == null) {
357        throw new NoSuchElementException();
358      }
359      HFileBlockChannelPosition ret = current;
360      current = null;
361      return ret;
362    }
363
364    @Override
365    public void close() throws IOException {
366      if (current != null) {
367        closeQuietly(current::close);
368      }
369      closeQuietly(reader::close);
370    }
371
372    @FunctionalInterface
373    private interface CloseMethod {
374      void run() throws IOException;
375    }
376
377    private static void closeQuietly(CloseMethod closeMethod) {
378      try {
379        closeMethod.run();
380      } catch (Throwable e) {
381        LOG.debug("Ignoring thrown exception.", e);
382      }
383    }
384  }
385
386  /**
387   * Enables writing and rewriting portions of the file backing an {@link HFileBlock}.
388   */
389  public static class Corrupter {
390
391    private final HFileBlockChannelPosition channelAndPosition;
392    private final ByteBuffer originalHeader;
393
394    public Corrupter(HFileBlockChannelPosition channelAndPosition) throws IOException {
395      this.channelAndPosition = channelAndPosition;
396      this.originalHeader = readHeaderData(channelAndPosition);
397    }
398
399    private static ByteBuffer readHeaderData(HFileBlockChannelPosition channelAndPosition)
400      throws IOException {
401      SeekableByteChannel channel = channelAndPosition.getChannel();
402      ByteBuffer originalHeader = ByteBuffer.allocate(HFileBlock.headerSize(true));
403      channelAndPosition.rewind();
404      channel.read(originalHeader);
405      return originalHeader;
406    }
407
408    public void write(int offset, ByteBuffer src) throws IOException {
409      SeekableByteChannel channel = channelAndPosition.getChannel();
410      long position = channelAndPosition.getPosition();
411      channel.position(position + offset);
412      channel.write(src);
413    }
414
415    public void restore() throws IOException {
416      SeekableByteChannel channel = channelAndPosition.getChannel();
417      originalHeader.rewind();
418      channelAndPosition.rewind();
419      assertEquals(originalHeader.capacity(), channel.write(originalHeader));
420    }
421  }
422
423  public static class HFileTestRule extends ExternalResource {
424
425    private final HBaseTestingUtil testingUtility;
426    private final HFileSystem hfs;
427    private final HFileContext context;
428    private final TestName testName;
429    private Path path;
430
431    public HFileTestRule(HBaseTestingUtil testingUtility, TestName testName) throws IOException {
432      this.testingUtility = testingUtility;
433      this.testName = testName;
434      this.hfs = (HFileSystem) HFileSystem.get(testingUtility.getConfiguration());
435      this.context =
436        new HFileContextBuilder().withBlockSize(4 * 1024).withHBaseCheckSum(true).build();
437    }
438
439    public Configuration getConfiguration() {
440      return testingUtility.getConfiguration();
441    }
442
443    public HFileSystem getHFileSystem() {
444      return hfs;
445    }
446
447    public HFileContext getHFileContext() {
448      return context;
449    }
450
451    public Path getPath() {
452      return path;
453    }
454
455    public SeekableByteChannel getRWChannel() throws IOException {
456      java.nio.file.Path p = FileSystems.getDefault().getPath(path.toString());
457      return Files.newByteChannel(p, StandardOpenOption.READ, StandardOpenOption.WRITE,
458        StandardOpenOption.DSYNC);
459    }
460
461    @Override
462    protected void before() throws Throwable {
463      this.path = new Path(testingUtility.getDataTestDirOnTestFS(), testName.getMethodName());
464      HFile.WriterFactory factory =
465        HFile.getWriterFactory(testingUtility.getConfiguration(), CacheConfig.DISABLED)
466          .withPath(hfs, path).withFileContext(context);
467
468      CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
469      Random rand = new Random(Instant.now().toEpochMilli());
470      byte[] family = Bytes.toBytes("f");
471      try (HFile.Writer writer = factory.create()) {
472        for (int i = 0; i < 40; i++) {
473          byte[] row = RandomKeyValueUtil.randomOrderedFixedLengthKey(rand, i, 100);
474          byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand);
475          byte[] value = RandomKeyValueUtil.randomValue(rand);
476          Cell cell = cellBuilder.setType(Cell.Type.Put).setRow(row).setFamily(family)
477            .setQualifier(qualifier).setValue(value).build();
478          writer.append(cell);
479          cellBuilder.clear();
480        }
481      }
482    }
483  }
484
485  /**
486   * A Matcher implementation that can make basic assertions over a provided {@link Throwable}.
487   * Assertion failures include the full stacktrace in their description.
488   */
489  private static final class IsThrowableMatching extends TypeSafeMatcher<Throwable> {
490
491    private final List<Matcher<? super Throwable>> requirements = new LinkedList<>();
492
493    public IsThrowableMatching withInstanceOf(Class<?> type) {
494      requirements.add(instanceOf(type));
495      return this;
496    }
497
498    public IsThrowableMatching withMessage(Matcher<String> matcher) {
499      requirements.add(hasProperty("message", matcher));
500      return this;
501    }
502
503    @Override
504    protected boolean matchesSafely(Throwable throwable) {
505      return allOf(requirements).matches(throwable);
506    }
507
508    @Override
509    protected void describeMismatchSafely(Throwable item, Description mismatchDescription) {
510      allOf(requirements).describeMismatch(item, mismatchDescription);
511      // would be nice if `item` could be provided as the cause of the AssertionError instead.
512      mismatchDescription.appendText(String.format("%nProvided: "));
513      try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
514        try (PrintStream ps = new PrintStream(baos, false, StandardCharsets.UTF_8.name())) {
515          item.printStackTrace(ps);
516          ps.flush();
517        }
518        mismatchDescription.appendText(baos.toString(StandardCharsets.UTF_8.name()));
519      } catch (Exception e) {
520        throw new RuntimeException(e);
521      }
522    }
523
524    @Override
525    public void describeTo(Description description) {
526      description.appendDescriptionOf(allOf(requirements));
527    }
528  }
529}