001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.allOf; 022import static org.hamcrest.Matchers.hasProperty; 023import static org.hamcrest.Matchers.instanceOf; 024import static org.hamcrest.Matchers.startsWith; 025import static org.junit.jupiter.api.Assertions.assertEquals; 026import static org.junit.jupiter.api.Assertions.assertNotEquals; 027import static org.junit.jupiter.api.Assertions.assertTrue; 028import static org.junit.jupiter.api.Assertions.fail; 029 030import java.io.ByteArrayOutputStream; 031import java.io.Closeable; 032import java.io.IOException; 033import java.io.PrintStream; 034import java.nio.ByteBuffer; 035import java.nio.channels.SeekableByteChannel; 036import java.nio.charset.StandardCharsets; 037import java.nio.file.FileSystems; 038import java.nio.file.Files; 039import java.nio.file.StandardOpenOption; 040import java.time.Instant; 041import java.util.LinkedList; 042import java.util.List; 043import java.util.NoSuchElementException; 044import java.util.Random; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CellBuilderType; 048import org.apache.hadoop.hbase.ExtendedCell; 049import org.apache.hadoop.hbase.ExtendedCellBuilder; 050import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 051import org.apache.hadoop.hbase.HBaseTestingUtil; 052import org.apache.hadoop.hbase.fs.HFileSystem; 053import org.apache.hadoop.hbase.nio.ByteBuff; 054import org.apache.hadoop.hbase.testclassification.IOTests; 055import org.apache.hadoop.hbase.testclassification.SmallTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.ChecksumType; 058import org.hamcrest.Description; 059import org.hamcrest.Matcher; 060import org.hamcrest.TypeSafeMatcher; 061import org.junit.jupiter.api.BeforeAll; 062import org.junit.jupiter.api.BeforeEach; 063import org.junit.jupiter.api.Tag; 064import org.junit.jupiter.api.Test; 065import org.junit.jupiter.api.TestInfo; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069/** 070 * This test provides coverage for HFileHeader block fields that are read and interpreted before 071 * HBase checksum validation can be applied. As of now, this is just 072 * {@code onDiskSizeWithoutHeader}. 073 */ 074@Tag(IOTests.TAG) 075@Tag(SmallTests.TAG) 076public class TestHFileBlockHeaderCorruption { 077 078 private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class); 079 080 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 081 private static HFileSystem HFS; 082 private HFileContext hfileCtx; 083 private Path path; 084 085 @BeforeAll 086 public static void setUpBeforeAll() throws IOException { 087 HFS = (HFileSystem) HFileSystem.get(UTIL.getConfiguration()); 088 } 089 090 @BeforeEach 091 public void setUp(TestInfo testInfo) throws IOException { 092 path = new Path(UTIL.getDataTestDirOnTestFS(), testInfo.getTestMethod().get().getName()); 093 hfileCtx = new HFileContextBuilder().withBlockSize(4 * 1024).withHBaseCheckSum(true).build(); 094 HFile.WriterFactory factory = 095 HFile.getWriterFactory(UTIL.getConfiguration(), CacheConfig.DISABLED).withPath(HFS, path) 096 .withFileContext(hfileCtx); 097 098 ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 099 Random rand = new Random(Instant.now().toEpochMilli()); 100 byte[] family = Bytes.toBytes("f"); 101 try (HFile.Writer writer = factory.create()) { 102 for (int i = 0; i < 40; i++) { 103 byte[] row = RandomKeyValueUtil.randomOrderedFixedLengthKey(rand, i, 100); 104 byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand); 105 byte[] value = RandomKeyValueUtil.randomValue(rand); 106 ExtendedCell cell = cellBuilder.setType(Cell.Type.Put).setRow(row).setFamily(family) 107 .setQualifier(qualifier).setValue(value).build(); 108 writer.append(cell); 109 cellBuilder.clear(); 110 } 111 } 112 } 113 114 @Test 115 public void testChecksumTypeCorruptionFirstBlock() throws Exception { 116 HFileBlockChannelPosition firstBlock = null; 117 try { 118 try ( 119 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 120 assertTrue(it.hasNext()); 121 firstBlock = it.next(); 122 } 123 124 Corrupter c = new Corrupter(firstBlock); 125 126 logHeader(firstBlock); 127 128 // test corrupted HFileBlock with unknown checksumType code -1 129 c.write(HFileBlock.Header.CHECKSUM_TYPE_INDEX, ByteBuffer.wrap(new byte[] { -1 })); 130 logHeader(firstBlock); 131 try ( 132 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 133 CountingConsumer consumer = new CountingConsumer(it); 134 try { 135 consumer.readFully(); 136 fail(); 137 } catch (Exception e) { 138 assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) 139 .withMessage(startsWith("Unknown checksum type code"))); 140 } 141 assertEquals(0, consumer.getItemsRead()); 142 } 143 144 // valid checksumType code test 145 for (ChecksumType t : ChecksumType.values()) { 146 testValidChecksumTypeReadBlock(t.getCode(), c, firstBlock); 147 } 148 149 c.restore(); 150 // test corrupted HFileBlock with unknown checksumType code 3 151 c.write(HFileBlock.Header.CHECKSUM_TYPE_INDEX, ByteBuffer.wrap(new byte[] { 3 })); 152 logHeader(firstBlock); 153 try ( 154 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 155 CountingConsumer consumer = new CountingConsumer(it); 156 try { 157 consumer.readFully(); 158 fail(); 159 } catch (Exception e) { 160 assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) 161 .withMessage(startsWith("Unknown checksum type code"))); 162 } 163 assertEquals(0, consumer.getItemsRead()); 164 } 165 } finally { 166 if (firstBlock != null) { 167 firstBlock.close(); 168 } 169 } 170 } 171 172 @Test 173 public void testChecksumTypeCorruptionSecondBlock() throws Exception { 174 HFileBlockChannelPosition secondBlock = null; 175 try { 176 try ( 177 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 178 assertTrue(it.hasNext()); 179 it.next(); 180 assertTrue(it.hasNext()); 181 secondBlock = it.next(); 182 } 183 184 Corrupter c = new Corrupter(secondBlock); 185 186 logHeader(secondBlock); 187 // test corrupted HFileBlock with unknown checksumType code -1 188 c.write(HFileBlock.Header.CHECKSUM_TYPE_INDEX, ByteBuffer.wrap(new byte[] { -1 })); 189 logHeader(secondBlock); 190 try ( 191 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 192 CountingConsumer consumer = new CountingConsumer(it); 193 try { 194 consumer.readFully(); 195 fail(); 196 } catch (Exception e) { 197 assertThat(e, new IsThrowableMatching().withInstanceOf(RuntimeException.class) 198 .withMessage(startsWith("Unknown checksum type code"))); 199 } 200 assertEquals(1, consumer.getItemsRead()); 201 } 202 203 // valid checksumType code test 204 for (ChecksumType t : ChecksumType.values()) { 205 testValidChecksumTypeReadBlock(t.getCode(), c, secondBlock); 206 } 207 208 c.restore(); 209 // test corrupted HFileBlock with unknown checksumType code 3 210 c.write(HFileBlock.Header.CHECKSUM_TYPE_INDEX, ByteBuffer.wrap(new byte[] { 3 })); 211 logHeader(secondBlock); 212 try ( 213 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 214 CountingConsumer consumer = new CountingConsumer(it); 215 try { 216 consumer.readFully(); 217 fail(); 218 } catch (Exception e) { 219 assertThat(e, new IsThrowableMatching().withInstanceOf(RuntimeException.class) 220 .withMessage(startsWith("Unknown checksum type code"))); 221 } 222 assertEquals(1, consumer.getItemsRead()); 223 } 224 } finally { 225 if (secondBlock != null) { 226 secondBlock.close(); 227 } 228 } 229 } 230 231 public void testValidChecksumTypeReadBlock(byte checksumTypeCode, Corrupter c, 232 HFileBlockChannelPosition testBlock) throws IOException { 233 c.restore(); 234 c.write(HFileBlock.Header.CHECKSUM_TYPE_INDEX, 235 ByteBuffer.wrap(new byte[] { checksumTypeCode })); 236 logHeader(testBlock); 237 try (HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 238 CountingConsumer consumer = new CountingConsumer(it); 239 try { 240 consumer.readFully(); 241 } catch (Exception e) { 242 fail("test fail: valid checksumType are not executing properly"); 243 } 244 assertNotEquals(0, consumer.getItemsRead()); 245 } 246 } 247 248 @Test 249 public void testOnDiskSizeWithoutHeaderCorruptionFirstBlock() throws Exception { 250 HFileBlockChannelPosition firstBlock = null; 251 try { 252 try ( 253 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 254 assertTrue(it.hasNext()); 255 firstBlock = it.next(); 256 } 257 258 Corrupter c = new Corrupter(firstBlock); 259 260 logHeader(firstBlock); 261 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 262 ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE))); 263 logHeader(firstBlock); 264 try ( 265 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 266 CountingConsumer consumer = new CountingConsumer(it); 267 try { 268 consumer.readFully(); 269 fail(); 270 } catch (Exception e) { 271 assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) 272 .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); 273 } 274 assertEquals(0, consumer.getItemsRead()); 275 } 276 277 c.restore(); 278 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 279 ByteBuffer.wrap(Bytes.toBytes(0))); 280 logHeader(firstBlock); 281 try ( 282 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 283 CountingConsumer consumer = new CountingConsumer(it); 284 try { 285 consumer.readFully(); 286 fail(); 287 } catch (Exception e) { 288 assertThat(e, new IsThrowableMatching().withInstanceOf(IllegalArgumentException.class)); 289 } 290 assertEquals(0, consumer.getItemsRead()); 291 } 292 293 c.restore(); 294 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 295 ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE))); 296 logHeader(firstBlock); 297 try ( 298 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 299 CountingConsumer consumer = new CountingConsumer(it); 300 try { 301 consumer.readFully(); 302 fail(); 303 } catch (Exception e) { 304 assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) 305 .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); 306 } 307 assertEquals(0, consumer.getItemsRead()); 308 } 309 } finally { 310 if (firstBlock != null) { 311 firstBlock.close(); 312 } 313 } 314 } 315 316 @Test 317 public void testOnDiskSizeWithoutHeaderCorruptionSecondBlock() throws Exception { 318 HFileBlockChannelPosition secondBlock = null; 319 try { 320 try ( 321 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 322 assertTrue(it.hasNext()); 323 it.next(); 324 assertTrue(it.hasNext()); 325 secondBlock = it.next(); 326 } 327 328 Corrupter c = new Corrupter(secondBlock); 329 330 logHeader(secondBlock); 331 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 332 ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE))); 333 logHeader(secondBlock); 334 try ( 335 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 336 CountingConsumer consumer = new CountingConsumer(it); 337 try { 338 consumer.readFully(); 339 fail(); 340 } catch (Exception e) { 341 assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) 342 .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); 343 } 344 assertEquals(1, consumer.getItemsRead()); 345 } 346 347 c.restore(); 348 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 349 ByteBuffer.wrap(Bytes.toBytes(0))); 350 logHeader(secondBlock); 351 try ( 352 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 353 CountingConsumer consumer = new CountingConsumer(it); 354 try { 355 consumer.readFully(); 356 fail(); 357 } catch (Exception e) { 358 assertThat(e, new IsThrowableMatching().withInstanceOf(IllegalArgumentException.class)); 359 } 360 assertEquals(1, consumer.getItemsRead()); 361 } 362 363 c.restore(); 364 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 365 ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE))); 366 logHeader(secondBlock); 367 try ( 368 HFileBlockChannelPositionIterator it = new HFileBlockChannelPositionIterator(HFS, path)) { 369 CountingConsumer consumer = new CountingConsumer(it); 370 try { 371 consumer.readFully(); 372 fail(); 373 } catch (Exception e) { 374 assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) 375 .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); 376 } 377 assertEquals(1, consumer.getItemsRead()); 378 } 379 } finally { 380 if (secondBlock != null) { 381 secondBlock.close(); 382 } 383 } 384 } 385 386 private static void logHeader(HFileBlockChannelPosition hbcp) throws IOException { 387 ByteBuff buf = ByteBuff.wrap(ByteBuffer.allocate(HFileBlock.headerSize(true))); 388 hbcp.rewind(); 389 assertEquals(buf.capacity(), buf.read(hbcp.getChannel())); 390 buf.rewind(); 391 hbcp.rewind(); 392 logHeader(buf); 393 } 394 395 private static void logHeader(ByteBuff buf) { 396 byte[] blockMagic = new byte[8]; 397 buf.get(blockMagic); 398 int onDiskSizeWithoutHeader = buf.getInt(); 399 int uncompressedSizeWithoutHeader = buf.getInt(); 400 long prevBlockOffset = buf.getLong(); 401 byte checksumType = buf.get(); 402 int bytesPerChecksum = buf.getInt(); 403 int onDiskDataSizeWithHeader = buf.getInt(); 404 LOG.debug( 405 "blockMagic={}, onDiskSizeWithoutHeader={}, uncompressedSizeWithoutHeader={}, " 406 + "prevBlockOffset={}, checksumType={}, bytesPerChecksum={}, onDiskDataSizeWithHeader={}", 407 Bytes.toStringBinary(blockMagic), onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 408 prevBlockOffset, checksumType, bytesPerChecksum, onDiskDataSizeWithHeader); 409 } 410 411 /** 412 * Data class to enabled messing with the bytes behind an {@link HFileBlock}. 413 */ 414 public static class HFileBlockChannelPosition implements Closeable { 415 private final SeekableByteChannel channel; 416 private final long position; 417 418 public HFileBlockChannelPosition(SeekableByteChannel channel, long position) { 419 this.channel = channel; 420 this.position = position; 421 } 422 423 public SeekableByteChannel getChannel() { 424 return channel; 425 } 426 427 public long getPosition() { 428 return position; 429 } 430 431 public void rewind() throws IOException { 432 channel.position(position); 433 } 434 435 @Override 436 public void close() throws IOException { 437 channel.close(); 438 } 439 } 440 441 /** 442 * Reads blocks off of an {@link HFileBlockChannelPositionIterator}, counting them as it does. 443 */ 444 public static class CountingConsumer { 445 private final HFileBlockChannelPositionIterator iterator; 446 private int itemsRead = 0; 447 448 public CountingConsumer(HFileBlockChannelPositionIterator iterator) { 449 this.iterator = iterator; 450 } 451 452 public int getItemsRead() { 453 return itemsRead; 454 } 455 456 public Object readFully() throws IOException { 457 Object val = null; 458 for (itemsRead = 0; iterator.hasNext(); itemsRead++) { 459 val = iterator.next(); 460 } 461 return val; 462 } 463 } 464 465 /** 466 * A simplified wrapper over an {@link HFileBlock.BlockIterator} that looks a lot like an 467 * {@link java.util.Iterator}. 468 */ 469 public static class HFileBlockChannelPositionIterator implements Closeable { 470 471 private final Path hfsPath; 472 private final HFile.Reader reader; 473 private final HFileBlock.BlockIterator iter; 474 private HFileBlockChannelPosition current = null; 475 476 public HFileBlockChannelPositionIterator(HFileSystem hfs, Path hfsPath) throws IOException { 477 this.hfsPath = hfsPath; 478 this.reader = HFile.createReader(hfs, hfsPath, CacheConfig.DISABLED, true, hfs.getConf()); 479 HFileBlock.FSReader fsreader = reader.getUncachedBlockReader(); 480 // The read block offset cannot out of the range:0,loadOnOpenDataOffset 481 this.iter = fsreader.blockRange(0, reader.getTrailer().getLoadOnOpenDataOffset()); 482 } 483 484 public boolean hasNext() throws IOException { 485 HFileBlock next = iter.nextBlock(); 486 if (next != null) { 487 java.nio.file.Path p = FileSystems.getDefault().getPath(hfsPath.toString()); 488 SeekableByteChannel channel = Files.newByteChannel(p, StandardOpenOption.READ, 489 StandardOpenOption.WRITE, StandardOpenOption.DSYNC); 490 current = new HFileBlockChannelPosition(channel, next.getOffset()); 491 } 492 return next != null; 493 } 494 495 public HFileBlockChannelPosition next() { 496 if (current == null) { 497 throw new NoSuchElementException(); 498 } 499 HFileBlockChannelPosition ret = current; 500 current = null; 501 return ret; 502 } 503 504 @Override 505 public void close() throws IOException { 506 if (current != null) { 507 closeQuietly(current::close); 508 } 509 closeQuietly(reader::close); 510 } 511 512 @FunctionalInterface 513 private interface CloseMethod { 514 void run() throws IOException; 515 } 516 517 private static void closeQuietly(CloseMethod closeMethod) { 518 try { 519 closeMethod.run(); 520 } catch (Throwable e) { 521 LOG.debug("Ignoring thrown exception.", e); 522 } 523 } 524 } 525 526 /** 527 * Enables writing and rewriting portions of the file backing an {@link HFileBlock}. 528 */ 529 public static class Corrupter { 530 531 private final HFileBlockChannelPosition channelAndPosition; 532 private final ByteBuffer originalHeader; 533 534 public Corrupter(HFileBlockChannelPosition channelAndPosition) throws IOException { 535 this.channelAndPosition = channelAndPosition; 536 this.originalHeader = readHeaderData(channelAndPosition); 537 } 538 539 private static ByteBuffer readHeaderData(HFileBlockChannelPosition channelAndPosition) 540 throws IOException { 541 SeekableByteChannel channel = channelAndPosition.getChannel(); 542 ByteBuffer originalHeader = ByteBuffer.allocate(HFileBlock.headerSize(true)); 543 channelAndPosition.rewind(); 544 channel.read(originalHeader); 545 return originalHeader; 546 } 547 548 public void write(int offset, ByteBuffer src) throws IOException { 549 SeekableByteChannel channel = channelAndPosition.getChannel(); 550 long position = channelAndPosition.getPosition(); 551 channel.position(position + offset); 552 channel.write(src); 553 } 554 555 public void restore() throws IOException { 556 SeekableByteChannel channel = channelAndPosition.getChannel(); 557 originalHeader.rewind(); 558 channelAndPosition.rewind(); 559 assertEquals(originalHeader.capacity(), channel.write(originalHeader)); 560 } 561 } 562 563 /** 564 * A Matcher implementation that can make basic assertions over a provided {@link Throwable}. 565 * Assertion failures include the full stacktrace in their description. 566 */ 567 private static final class IsThrowableMatching extends TypeSafeMatcher<Throwable> { 568 569 private final List<Matcher<? super Throwable>> requirements = new LinkedList<>(); 570 571 public IsThrowableMatching withInstanceOf(Class<?> type) { 572 requirements.add(instanceOf(type)); 573 return this; 574 } 575 576 public IsThrowableMatching withMessage(Matcher<String> matcher) { 577 requirements.add(hasProperty("message", matcher)); 578 return this; 579 } 580 581 @Override 582 protected boolean matchesSafely(Throwable throwable) { 583 return allOf(requirements).matches(throwable); 584 } 585 586 @Override 587 protected void describeMismatchSafely(Throwable item, Description mismatchDescription) { 588 allOf(requirements).describeMismatch(item, mismatchDescription); 589 // would be nice if `item` could be provided as the cause of the AssertionError instead. 590 mismatchDescription.appendText(String.format("%nProvided: ")); 591 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { 592 try (PrintStream ps = new PrintStream(baos, false, StandardCharsets.UTF_8.name())) { 593 item.printStackTrace(ps); 594 ps.flush(); 595 } 596 mismatchDescription.appendText(baos.toString(StandardCharsets.UTF_8.name())); 597 } catch (Exception e) { 598 throw new RuntimeException(e); 599 } 600 } 601 602 @Override 603 public void describeTo(Description description) { 604 description.appendDescriptionOf(allOf(requirements)); 605 } 606 } 607}