001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; 021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 022import static org.junit.Assert.*; 023 024import java.io.ByteArrayOutputStream; 025import java.io.DataOutputStream; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.nio.ByteBuffer; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Optional; 036import java.util.Random; 037import java.util.concurrent.Callable; 038import java.util.concurrent.ExecutionException; 039import java.util.concurrent.Executor; 040import java.util.concurrent.ExecutorCompletionService; 041import java.util.concurrent.Executors; 042import java.util.concurrent.Future; 043import org.apache.hadoop.fs.FSDataInputStream; 044import org.apache.hadoop.fs.FSDataOutputStream; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.ArrayBackedTag; 048import org.apache.hadoop.hbase.CellComparatorImpl; 049import org.apache.hadoop.hbase.CellUtil; 050import org.apache.hadoop.hbase.HBaseClassTestRule; 051import org.apache.hadoop.hbase.HBaseTestingUtility; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.KeyValue; 054import org.apache.hadoop.hbase.Tag; 055import org.apache.hadoop.hbase.fs.HFileSystem; 056import org.apache.hadoop.hbase.io.compress.Compression; 057import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 059import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; 060import org.apache.hadoop.hbase.nio.ByteBuff; 061import org.apache.hadoop.hbase.nio.MultiByteBuff; 062import org.apache.hadoop.hbase.nio.SingleByteBuff; 063import org.apache.hadoop.hbase.testclassification.IOTests; 064import org.apache.hadoop.hbase.testclassification.MediumTests; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.ChecksumType; 067import org.apache.hadoop.hbase.util.ClassSize; 068import org.apache.hadoop.io.WritableUtils; 069import org.apache.hadoop.io.compress.Compressor; 070import org.junit.Before; 071import org.junit.ClassRule; 072import org.junit.Test; 073import org.junit.experimental.categories.Category; 074import org.junit.runner.RunWith; 075import org.junit.runners.Parameterized; 076import org.junit.runners.Parameterized.Parameters; 077import org.mockito.Mockito; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081@Category({IOTests.class, MediumTests.class}) 082@RunWith(Parameterized.class) 083public class TestHFileBlock { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestHFileBlock.class); 088 089 // change this value to activate more logs 090 private static final boolean detailedLogging = false; 091 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true }; 092 093 private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); 094 095 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; 096 097 private static final int NUM_TEST_BLOCKS = 1000; 098 private static final int NUM_READER_THREADS = 26; 099 100 // Used to generate KeyValues 101 private static int NUM_KEYVALUES = 50; 102 private static int FIELD_LENGTH = 10; 103 private static float CHANCE_TO_REPEAT = 0.6f; 104 105 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 106 private FileSystem fs; 107 108 private final boolean includesMemstoreTS; 109 private final boolean includesTag; 110 public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) { 111 this.includesMemstoreTS = includesMemstoreTS; 112 this.includesTag = includesTag; 113 } 114 115 @Parameters 116 public static Collection<Object[]> parameters() { 117 return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED; 118 } 119 120 @Before 121 public void setUp() throws IOException { 122 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 123 } 124 125 static void writeTestBlockContents(DataOutputStream dos) throws IOException { 126 // This compresses really well. 127 for (int i = 0; i < 1000; ++i) 128 dos.writeInt(i / 100); 129 } 130 131 static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS, 132 boolean useTag) throws IOException { 133 List<KeyValue> keyValues = new ArrayList<>(); 134 Random randomizer = new Random(42L + seed); // just any fixed number 135 136 // generate keyValues 137 for (int i = 0; i < NUM_KEYVALUES; ++i) { 138 byte[] row; 139 long timestamp; 140 byte[] family; 141 byte[] qualifier; 142 byte[] value; 143 144 // generate it or repeat, it should compress well 145 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 146 row = CellUtil.cloneRow(keyValues.get(randomizer.nextInt(keyValues.size()))); 147 } else { 148 row = new byte[FIELD_LENGTH]; 149 randomizer.nextBytes(row); 150 } 151 if (0 == i) { 152 family = new byte[FIELD_LENGTH]; 153 randomizer.nextBytes(family); 154 } else { 155 family = CellUtil.cloneFamily(keyValues.get(0)); 156 } 157 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 158 qualifier = CellUtil.cloneQualifier(keyValues.get(randomizer.nextInt(keyValues.size()))); 159 } else { 160 qualifier = new byte[FIELD_LENGTH]; 161 randomizer.nextBytes(qualifier); 162 } 163 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 164 value = CellUtil.cloneValue(keyValues.get(randomizer.nextInt(keyValues.size()))); 165 } else { 166 value = new byte[FIELD_LENGTH]; 167 randomizer.nextBytes(value); 168 } 169 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 170 timestamp = keyValues.get( 171 randomizer.nextInt(keyValues.size())).getTimestamp(); 172 } else { 173 timestamp = randomizer.nextLong(); 174 } 175 if (!useTag) { 176 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value)); 177 } else { 178 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, 179 new Tag[] { new ArrayBackedTag((byte) 1, Bytes.toBytes("myTagVal")) })); 180 } 181 } 182 183 // sort it and write to stream 184 int totalSize = 0; 185 Collections.sort(keyValues, CellComparatorImpl.COMPARATOR); 186 187 for (KeyValue kv : keyValues) { 188 totalSize += kv.getLength(); 189 if (includesMemstoreTS) { 190 long memstoreTS = randomizer.nextLong(); 191 kv.setSequenceId(memstoreTS); 192 totalSize += WritableUtils.getVIntSize(memstoreTS); 193 } 194 hbw.write(kv); 195 } 196 return totalSize; 197 } 198 199 public byte[] createTestV1Block(Compression.Algorithm algo) 200 throws IOException { 201 Compressor compressor = algo.getCompressor(); 202 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 203 OutputStream os = algo.createCompressionStream(baos, compressor, 0); 204 DataOutputStream dos = new DataOutputStream(os); 205 BlockType.META.write(dos); // Let's make this a meta block. 206 writeTestBlockContents(dos); 207 dos.flush(); 208 algo.returnCompressor(compressor); 209 return baos.toByteArray(); 210 } 211 212 static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo, 213 boolean includesMemstoreTS, boolean includesTag) throws IOException { 214 final BlockType blockType = BlockType.DATA; 215 HFileContext meta = new HFileContextBuilder() 216 .withCompression(algo) 217 .withIncludesMvcc(includesMemstoreTS) 218 .withIncludesTags(includesTag) 219 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 220 .build(); 221 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 222 DataOutputStream dos = hbw.startWriting(blockType); 223 writeTestBlockContents(dos); 224 dos.flush(); 225 hbw.ensureBlockReady(); 226 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); 227 hbw.release(); 228 return hbw; 229 } 230 231 public String createTestBlockStr(Compression.Algorithm algo, 232 int correctLength, boolean useTag) throws IOException { 233 HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag); 234 byte[] testV2Block = hbw.getHeaderAndDataForTest(); 235 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9; 236 if (testV2Block.length == correctLength) { 237 // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid 238 // variations across operating systems. 239 // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. 240 // We only make this change when the compressed block length matches. 241 // Otherwise, there are obviously other inconsistencies. 242 testV2Block[osOffset] = 3; 243 } 244 return Bytes.toStringBinary(testV2Block); 245 } 246 247 @Test 248 public void testNoCompression() throws IOException { 249 CacheConfig cacheConf = Mockito.mock(CacheConfig.class); 250 Mockito.when(cacheConf.getBlockCache()).thenReturn(Optional.empty()); 251 252 HFileBlock block = 253 createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf); 254 assertEquals(4000, block.getUncompressedSizeWithoutHeader()); 255 assertEquals(4004, block.getOnDiskSizeWithoutHeader()); 256 assertTrue(block.isUnpacked()); 257 } 258 259 @Test 260 public void testGzipCompression() throws IOException { 261 final String correctTestBlockStr = 262 "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" 263 + "\\xFF\\xFF\\xFF\\xFF" 264 + "\\x0" + ChecksumType.getDefaultChecksumType().getCode() 265 + "\\x00\\x00@\\x00\\x00\\x00\\x00[" 266 // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html 267 + "\\x1F\\x8B" // gzip magic signature 268 + "\\x08" // Compression method: 8 = "deflate" 269 + "\\x00" // Flags 270 + "\\x00\\x00\\x00\\x00" // mtime 271 + "\\x00" // XFL (extra flags) 272 // OS (0 = FAT filesystems, 3 = Unix). However, this field 273 // sometimes gets set to 0 on Linux and Mac, so we reset it to 3. 274 // This appears to be a difference caused by the availability 275 // (and use) of the native GZ codec. 276 + "\\x03" 277 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa" 278 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" 279 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00" 280 + "\\x00\\x00\\x00\\x00"; // 4 byte checksum (ignored) 281 final int correctGzipBlockLength = 95; 282 final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false); 283 // We ignore the block checksum because createTestBlockStr can change the 284 // gzip header after the block is produced 285 assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4), 286 testBlockStr.substring(0, correctGzipBlockLength - 4)); 287 } 288 289 @Test 290 public void testReaderV2() throws IOException { 291 testReaderV2Internals(); 292 } 293 294 protected void testReaderV2Internals() throws IOException { 295 if(includesTag) { 296 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); 297 } 298 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 299 for (boolean pread : new boolean[] { false, true }) { 300 LOG.info("testReaderV2: Compression algorithm: " + algo + 301 ", pread=" + pread); 302 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" 303 + algo); 304 FSDataOutputStream os = fs.create(path); 305 HFileContext meta = new HFileContextBuilder() 306 .withCompression(algo) 307 .withIncludesMvcc(includesMemstoreTS) 308 .withIncludesTags(includesTag) 309 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 310 .build(); 311 HFileBlock.Writer hbw = new HFileBlock.Writer(null, 312 meta); 313 long totalSize = 0; 314 for (int blockId = 0; blockId < 2; ++blockId) { 315 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 316 for (int i = 0; i < 1234; ++i) 317 dos.writeInt(i); 318 hbw.writeHeaderAndData(os); 319 totalSize += hbw.getOnDiskSizeWithHeader(); 320 } 321 os.close(); 322 323 FSDataInputStream is = fs.open(path); 324 meta = new HFileContextBuilder() 325 .withHBaseCheckSum(true) 326 .withIncludesMvcc(includesMemstoreTS) 327 .withIncludesTags(includesTag) 328 .withCompression(algo).build(); 329 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); 330 HFileBlock b = hbr.readBlockData(0, -1, pread, false); 331 is.close(); 332 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 333 334 b.sanityCheck(); 335 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 336 assertEquals(algo == GZ ? 2173 : 4936, 337 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 338 HFileBlock expected = b; 339 340 if (algo == GZ) { 341 is = fs.open(path); 342 hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); 343 b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + 344 b.totalChecksumBytes(), pread, false); 345 assertEquals(expected, b); 346 int wrongCompressedSize = 2172; 347 try { 348 b = hbr.readBlockData(0, wrongCompressedSize 349 + HConstants.HFILEBLOCK_HEADER_SIZE, pread, false); 350 fail("Exception expected"); 351 } catch (IOException ex) { 352 String expectedPrefix = "Passed in onDiskSizeWithHeader="; 353 assertTrue("Invalid exception message: '" + ex.getMessage() 354 + "'.\nMessage is expected to start with: '" + expectedPrefix 355 + "'", ex.getMessage().startsWith(expectedPrefix)); 356 } 357 is.close(); 358 } 359 } 360 } 361 } 362 363 /** 364 * Test encoding/decoding data blocks. 365 * @throws IOException a bug or a problem with temporary files. 366 */ 367 @Test 368 public void testDataBlockEncoding() throws IOException { 369 testInternals(); 370 } 371 372 private void testInternals() throws IOException { 373 final int numBlocks = 5; 374 if(includesTag) { 375 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); 376 } 377 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 378 for (boolean pread : new boolean[] { false, true }) { 379 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 380 LOG.info("testDataBlockEncoding: Compression algorithm={}, pread={}, dataBlockEncoder={}", 381 algo.toString(), pread, encoding); 382 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" 383 + algo + "_" + encoding.toString()); 384 FSDataOutputStream os = fs.create(path); 385 HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ? 386 new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE; 387 HFileContext meta = new HFileContextBuilder() 388 .withCompression(algo) 389 .withIncludesMvcc(includesMemstoreTS) 390 .withIncludesTags(includesTag) 391 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 392 .build(); 393 HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta); 394 long totalSize = 0; 395 final List<Integer> encodedSizes = new ArrayList<>(); 396 final List<ByteBuffer> encodedBlocks = new ArrayList<>(); 397 for (int blockId = 0; blockId < numBlocks; ++blockId) { 398 hbw.startWriting(BlockType.DATA); 399 writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag); 400 hbw.writeHeaderAndData(os); 401 int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE; 402 byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array(); 403 final int encodedSize = encodedResultWithHeader.length - headerLen; 404 if (encoding != DataBlockEncoding.NONE) { 405 // We need to account for the two-byte encoding algorithm ID that 406 // comes after the 24-byte block header but before encoded KVs. 407 headerLen += DataBlockEncoding.ID_SIZE; 408 } 409 byte[] encodedDataSection = 410 new byte[encodedResultWithHeader.length - headerLen]; 411 System.arraycopy(encodedResultWithHeader, headerLen, 412 encodedDataSection, 0, encodedDataSection.length); 413 final ByteBuffer encodedBuf = 414 ByteBuffer.wrap(encodedDataSection); 415 encodedSizes.add(encodedSize); 416 encodedBlocks.add(encodedBuf); 417 totalSize += hbw.getOnDiskSizeWithHeader(); 418 } 419 os.close(); 420 421 FSDataInputStream is = fs.open(path); 422 meta = new HFileContextBuilder() 423 .withHBaseCheckSum(true) 424 .withCompression(algo) 425 .withIncludesMvcc(includesMemstoreTS) 426 .withIncludesTags(includesTag) 427 .build(); 428 HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); 429 hbr.setDataBlockEncoder(dataBlockEncoder); 430 hbr.setIncludesMemStoreTS(includesMemstoreTS); 431 HFileBlock blockFromHFile, blockUnpacked; 432 int pos = 0; 433 for (int blockId = 0; blockId < numBlocks; ++blockId) { 434 blockFromHFile = hbr.readBlockData(pos, -1, pread, false); 435 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 436 blockFromHFile.sanityCheck(); 437 pos += blockFromHFile.getOnDiskSizeWithHeader(); 438 assertEquals((int) encodedSizes.get(blockId), 439 blockFromHFile.getUncompressedSizeWithoutHeader()); 440 assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked()); 441 long packedHeapsize = blockFromHFile.heapSize(); 442 blockUnpacked = blockFromHFile.unpack(meta, hbr); 443 assertTrue(blockUnpacked.isUnpacked()); 444 if (meta.isCompressedOrEncrypted()) { 445 LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked 446 .heapSize()); 447 assertFalse(packedHeapsize == blockUnpacked.heapSize()); 448 assertTrue("Packed heapSize should be < unpacked heapSize", 449 packedHeapsize < blockUnpacked.heapSize()); 450 } 451 ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader(); 452 if (encoding != DataBlockEncoding.NONE) { 453 // We expect a two-byte big-endian encoding id. 454 assertEquals( 455 "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread), 456 Long.toHexString(0), Long.toHexString(actualBuffer.get(0))); 457 assertEquals( 458 "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread), 459 Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1))); 460 actualBuffer.position(2); 461 actualBuffer = actualBuffer.slice(); 462 } 463 464 ByteBuffer expectedBuffer = encodedBlocks.get(blockId); 465 expectedBuffer.rewind(); 466 467 // test if content matches, produce nice message 468 assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, algo, encoding, 469 pread); 470 471 // test serialized blocks 472 for (boolean reuseBuffer : new boolean[] { false, true }) { 473 ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength()); 474 blockFromHFile.serialize(serialized, true); 475 HFileBlock deserialized = 476 (HFileBlock) blockFromHFile.getDeserializer().deserialize( 477 new SingleByteBuff(serialized), reuseBuffer, MemoryType.EXCLUSIVE); 478 assertEquals( 479 "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer, 480 blockFromHFile, deserialized); 481 // intentional reference comparison 482 if (blockFromHFile != blockUnpacked) { 483 assertEquals("Deserializaed block cannot be unpacked correctly.", 484 blockUnpacked, deserialized.unpack(meta, hbr)); 485 } 486 } 487 } 488 is.close(); 489 } 490 } 491 } 492 } 493 494 static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding, 495 boolean pread) { 496 return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread); 497 } 498 499 static void assertBuffersEqual(ByteBuff expectedBuffer, 500 ByteBuff actualBuffer, Compression.Algorithm compression, 501 DataBlockEncoding encoding, boolean pread) { 502 if (!actualBuffer.equals(expectedBuffer)) { 503 int prefix = 0; 504 int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit()); 505 while (prefix < minLimit && 506 expectedBuffer.get(prefix) == actualBuffer.get(prefix)) { 507 prefix++; 508 } 509 510 fail(String.format( 511 "Content mismatch for %s, commonPrefix %d, expected %s, got %s", 512 buildMessageDetails(compression, encoding, pread), prefix, 513 nextBytesToStr(expectedBuffer, prefix), 514 nextBytesToStr(actualBuffer, prefix))); 515 } 516 } 517 518 /** 519 * Convert a few next bytes in the given buffer at the given position to 520 * string. Used for error messages. 521 */ 522 private static String nextBytesToStr(ByteBuff buf, int pos) { 523 int maxBytes = buf.limit() - pos; 524 int numBytes = Math.min(16, maxBytes); 525 return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos, 526 numBytes) + (numBytes < maxBytes ? "..." : ""); 527 } 528 529 @Test 530 public void testPreviousOffset() throws IOException { 531 testPreviousOffsetInternals(); 532 } 533 534 protected void testPreviousOffsetInternals() throws IOException { 535 // TODO: parameterize these nested loops. 536 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 537 for (boolean pread : BOOLEAN_VALUES) { 538 for (boolean cacheOnWrite : BOOLEAN_VALUES) { 539 Random rand = defaultRandom(); 540 LOG.info("testPreviousOffset: Compression algorithm={}, pread={}, cacheOnWrite={}", 541 algo.toString(), pread, cacheOnWrite); 542 Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset"); 543 List<Long> expectedOffsets = new ArrayList<>(); 544 List<Long> expectedPrevOffsets = new ArrayList<>(); 545 List<BlockType> expectedTypes = new ArrayList<>(); 546 List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null; 547 long totalSize = writeBlocks(rand, algo, path, expectedOffsets, 548 expectedPrevOffsets, expectedTypes, expectedContents); 549 550 FSDataInputStream is = fs.open(path); 551 HFileContext meta = new HFileContextBuilder() 552 .withHBaseCheckSum(true) 553 .withIncludesMvcc(includesMemstoreTS) 554 .withIncludesTags(includesTag) 555 .withCompression(algo).build(); 556 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); 557 long curOffset = 0; 558 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 559 if (!pread) { 560 assertEquals(is.getPos(), curOffset + (i == 0 ? 0 : 561 HConstants.HFILEBLOCK_HEADER_SIZE)); 562 } 563 564 assertEquals(expectedOffsets.get(i).longValue(), curOffset); 565 if (detailedLogging) { 566 LOG.info("Reading block #" + i + " at offset " + curOffset); 567 } 568 HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false); 569 if (detailedLogging) { 570 LOG.info("Block #" + i + ": " + b); 571 } 572 assertEquals("Invalid block #" + i + "'s type:", 573 expectedTypes.get(i), b.getBlockType()); 574 assertEquals("Invalid previous block offset for block " + i 575 + " of " + "type " + b.getBlockType() + ":", 576 (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset()); 577 b.sanityCheck(); 578 assertEquals(curOffset, b.getOffset()); 579 580 // Now re-load this block knowing the on-disk size. This tests a 581 // different branch in the loader. 582 HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false); 583 b2.sanityCheck(); 584 585 assertEquals(b.getBlockType(), b2.getBlockType()); 586 assertEquals(b.getOnDiskSizeWithoutHeader(), 587 b2.getOnDiskSizeWithoutHeader()); 588 assertEquals(b.getOnDiskSizeWithHeader(), 589 b2.getOnDiskSizeWithHeader()); 590 assertEquals(b.getUncompressedSizeWithoutHeader(), 591 b2.getUncompressedSizeWithoutHeader()); 592 assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset()); 593 assertEquals(curOffset, b2.getOffset()); 594 assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum()); 595 assertEquals(b.getOnDiskDataSizeWithHeader(), 596 b2.getOnDiskDataSizeWithHeader()); 597 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 598 599 curOffset += b.getOnDiskSizeWithHeader(); 600 601 if (cacheOnWrite) { 602 // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply 603 // verifies that the unpacked value read back off disk matches the unpacked value 604 // generated before writing to disk. 605 b = b.unpack(meta, hbr); 606 // b's buffer has header + data + checksum while 607 // expectedContents have header + data only 608 ByteBuff bufRead = b.getBufferReadOnly(); 609 ByteBuffer bufExpected = expectedContents.get(i); 610 boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(), 611 bufRead.arrayOffset(), 612 bufRead.limit() - b.totalChecksumBytes(), 613 bufExpected.array(), bufExpected.arrayOffset(), 614 bufExpected.limit()) == 0; 615 String wrongBytesMsg = ""; 616 617 if (!bytesAreCorrect) { 618 // Optimization: only construct an error message in case we 619 // will need it. 620 wrongBytesMsg = "Expected bytes in block #" + i + " (algo=" 621 + algo + ", pread=" + pread 622 + ", cacheOnWrite=" + cacheOnWrite + "):\n"; 623 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), 624 bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) 625 + ", actual:\n" 626 + Bytes.toStringBinary(bufRead.array(), 627 bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit())); 628 if (detailedLogging) { 629 LOG.warn("expected header" + 630 HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) + 631 "\nfound header" + 632 HFileBlock.toStringHeader(bufRead)); 633 LOG.warn("bufread offset " + bufRead.arrayOffset() + 634 " limit " + bufRead.limit() + 635 " expected offset " + bufExpected.arrayOffset() + 636 " limit " + bufExpected.limit()); 637 LOG.warn(wrongBytesMsg); 638 } 639 } 640 assertTrue(wrongBytesMsg, bytesAreCorrect); 641 } 642 } 643 644 assertEquals(curOffset, fs.getFileStatus(path).getLen()); 645 is.close(); 646 } 647 } 648 } 649 } 650 651 private Random defaultRandom() { 652 return new Random(189237); 653 } 654 655 private class BlockReaderThread implements Callable<Boolean> { 656 private final String clientId; 657 private final HFileBlock.FSReader hbr; 658 private final List<Long> offsets; 659 private final List<BlockType> types; 660 private final long fileSize; 661 662 public BlockReaderThread(String clientId, 663 HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types, 664 long fileSize) { 665 this.clientId = clientId; 666 this.offsets = offsets; 667 this.hbr = hbr; 668 this.types = types; 669 this.fileSize = fileSize; 670 } 671 672 @Override 673 public Boolean call() throws Exception { 674 Random rand = new Random(clientId.hashCode()); 675 long endTime = System.currentTimeMillis() + 10000; 676 int numBlocksRead = 0; 677 int numPositionalRead = 0; 678 int numWithOnDiskSize = 0; 679 while (System.currentTimeMillis() < endTime) { 680 int blockId = rand.nextInt(NUM_TEST_BLOCKS); 681 long offset = offsets.get(blockId); 682 // now we only support concurrent read with pread = true 683 boolean pread = true; 684 boolean withOnDiskSize = rand.nextBoolean(); 685 long expectedSize = 686 (blockId == NUM_TEST_BLOCKS - 1 ? fileSize 687 : offsets.get(blockId + 1)) - offset; 688 689 HFileBlock b; 690 try { 691 long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; 692 b = hbr.readBlockData(offset, onDiskSizeArg, pread, false); 693 } catch (IOException ex) { 694 LOG.error("Error in client " + clientId + " trying to read block at " 695 + offset + ", pread=" + pread + ", withOnDiskSize=" + 696 withOnDiskSize, ex); 697 return false; 698 } 699 700 assertEquals(types.get(blockId), b.getBlockType()); 701 assertEquals(expectedSize, b.getOnDiskSizeWithHeader()); 702 assertEquals(offset, b.getOffset()); 703 704 ++numBlocksRead; 705 if (pread) 706 ++numPositionalRead; 707 if (withOnDiskSize) 708 ++numWithOnDiskSize; 709 } 710 LOG.info("Client " + clientId + " successfully read " + numBlocksRead + 711 " blocks (with pread: " + numPositionalRead + ", with onDiskSize " + 712 "specified: " + numWithOnDiskSize + ")"); 713 714 return true; 715 } 716 717 } 718 719 @Test 720 public void testConcurrentReading() throws Exception { 721 testConcurrentReadingInternals(); 722 } 723 724 protected void testConcurrentReadingInternals() throws IOException, 725 InterruptedException, ExecutionException { 726 for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) { 727 Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading"); 728 Random rand = defaultRandom(); 729 List<Long> offsets = new ArrayList<>(); 730 List<BlockType> types = new ArrayList<>(); 731 writeBlocks(rand, compressAlgo, path, offsets, null, types, null); 732 FSDataInputStream is = fs.open(path); 733 long fileSize = fs.getFileStatus(path).getLen(); 734 HFileContext meta = new HFileContextBuilder() 735 .withHBaseCheckSum(true) 736 .withIncludesMvcc(includesMemstoreTS) 737 .withIncludesTags(includesTag) 738 .withCompression(compressAlgo) 739 .build(); 740 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, fileSize, meta); 741 742 Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS); 743 ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec); 744 745 for (int i = 0; i < NUM_READER_THREADS; ++i) { 746 ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr, 747 offsets, types, fileSize)); 748 } 749 750 for (int i = 0; i < NUM_READER_THREADS; ++i) { 751 Future<Boolean> result = ecs.take(); 752 assertTrue(result.get()); 753 if (detailedLogging) { 754 LOG.info(String.valueOf(i + 1) 755 + " reader threads finished successfully (algo=" + compressAlgo 756 + ")"); 757 } 758 } 759 760 is.close(); 761 } 762 } 763 764 private long writeBlocks(Random rand, Compression.Algorithm compressAlgo, 765 Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets, 766 List<BlockType> expectedTypes, List<ByteBuffer> expectedContents 767 ) throws IOException { 768 boolean cacheOnWrite = expectedContents != null; 769 FSDataOutputStream os = fs.create(path); 770 HFileContext meta = new HFileContextBuilder() 771 .withHBaseCheckSum(true) 772 .withIncludesMvcc(includesMemstoreTS) 773 .withIncludesTags(includesTag) 774 .withCompression(compressAlgo) 775 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 776 .build(); 777 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 778 Map<BlockType, Long> prevOffsetByType = new HashMap<>(); 779 long totalSize = 0; 780 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 781 long pos = os.getPos(); 782 int blockTypeOrdinal = rand.nextInt(BlockType.values().length); 783 if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) { 784 blockTypeOrdinal = BlockType.DATA.ordinal(); 785 } 786 BlockType bt = BlockType.values()[blockTypeOrdinal]; 787 DataOutputStream dos = hbw.startWriting(bt); 788 int size = rand.nextInt(500); 789 for (int j = 0; j < size; ++j) { 790 // This might compress well. 791 dos.writeShort(i + 1); 792 dos.writeInt(j + 1); 793 } 794 795 if (expectedOffsets != null) 796 expectedOffsets.add(os.getPos()); 797 798 if (expectedPrevOffsets != null) { 799 Long prevOffset = prevOffsetByType.get(bt); 800 expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1); 801 prevOffsetByType.put(bt, os.getPos()); 802 } 803 804 expectedTypes.add(bt); 805 806 hbw.writeHeaderAndData(os); 807 totalSize += hbw.getOnDiskSizeWithHeader(); 808 809 if (cacheOnWrite) 810 expectedContents.add(hbw.cloneUncompressedBufferWithHeader()); 811 812 if (detailedLogging) { 813 LOG.info("Written block #" + i + " of type " + bt 814 + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader() 815 + ", packed size " + hbw.getOnDiskSizeWithoutHeader() 816 + " at offset " + pos); 817 } 818 } 819 os.close(); 820 LOG.info("Created a temporary file at " + path + ", " 821 + fs.getFileStatus(path).getLen() + " byte, compression=" + 822 compressAlgo); 823 return totalSize; 824 } 825 826 @Test 827 public void testBlockHeapSize() { 828 testBlockHeapSizeInternals(); 829 } 830 831 protected void testBlockHeapSizeInternals() { 832 if (ClassSize.is32BitJVM()) { 833 assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); 834 } else { 835 assertEquals(72, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); 836 } 837 838 for (int size : new int[] { 100, 256, 12345 }) { 839 byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size]; 840 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 841 HFileContext meta = new HFileContextBuilder() 842 .withIncludesMvcc(includesMemstoreTS) 843 .withIncludesTags(includesTag) 844 .withHBaseCheckSum(false) 845 .withCompression(Algorithm.NONE) 846 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 847 .withChecksumType(ChecksumType.NULL).build(); 848 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, 849 HFileBlock.FILL_HEADER, -1, 0, -1, meta); 850 long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase( 851 new MultiByteBuff(buf).getClass(), true) 852 + HConstants.HFILEBLOCK_HEADER_SIZE + size); 853 long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true)); 854 long hfileBlockExpectedSize = 855 ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true)); 856 long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize; 857 assertEquals("Block data size: " + size + ", byte buffer expected " + 858 "size: " + byteBufferExpectedSize + ", HFileBlock class expected " + 859 "size: " + hfileBlockExpectedSize + ";", expected, 860 block.heapSize()); 861 } 862 } 863 864 @Test 865 public void testSerializeWithoutNextBlockMetadata() { 866 int size = 100; 867 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 868 byte[] byteArr = new byte[length]; 869 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 870 HFileContext meta = new HFileContextBuilder().build(); 871 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf, 872 HFileBlock.FILL_HEADER, -1, 52, -1, meta); 873 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf, 874 HFileBlock.FILL_HEADER, -1, -1, -1, meta); 875 ByteBuffer buff1 = ByteBuffer.allocate(length); 876 ByteBuffer buff2 = ByteBuffer.allocate(length); 877 blockWithNextBlockMetadata.serialize(buff1, true); 878 blockWithoutNextBlockMetadata.serialize(buff2, true); 879 assertNotEquals(buff1, buff2); 880 buff1.clear(); 881 buff2.clear(); 882 blockWithNextBlockMetadata.serialize(buff1, false); 883 blockWithoutNextBlockMetadata.serialize(buff2, false); 884 assertEquals(buff1, buff2); 885 } 886}