001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; 021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 022import static org.junit.Assert.*; 023 024import java.io.ByteArrayOutputStream; 025import java.io.DataOutputStream; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.nio.ByteBuffer; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Random; 036import java.util.concurrent.Callable; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.Executor; 039import java.util.concurrent.ExecutorCompletionService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.Future; 042import org.apache.hadoop.fs.FSDataInputStream; 043import org.apache.hadoop.fs.FSDataOutputStream; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.ArrayBackedTag; 047import org.apache.hadoop.hbase.CellComparatorImpl; 048import org.apache.hadoop.hbase.CellUtil; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseTestingUtility; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.Tag; 054import org.apache.hadoop.hbase.fs.HFileSystem; 055import org.apache.hadoop.hbase.io.compress.Compression; 056import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 057import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 058import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; 059import org.apache.hadoop.hbase.nio.ByteBuff; 060import org.apache.hadoop.hbase.nio.MultiByteBuff; 061import org.apache.hadoop.hbase.nio.SingleByteBuff; 062import org.apache.hadoop.hbase.testclassification.IOTests; 063import org.apache.hadoop.hbase.testclassification.MediumTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.ChecksumType; 066import org.apache.hadoop.hbase.util.ClassSize; 067import org.apache.hadoop.io.WritableUtils; 068import org.apache.hadoop.io.compress.Compressor; 069import org.junit.Before; 070import org.junit.ClassRule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.junit.runner.RunWith; 074import org.junit.runners.Parameterized; 075import org.junit.runners.Parameterized.Parameters; 076import org.mockito.Mockito; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080@Category({IOTests.class, MediumTests.class}) 081@RunWith(Parameterized.class) 082public class TestHFileBlock { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestHFileBlock.class); 087 088 // change this value to activate more logs 089 private static final boolean detailedLogging = false; 090 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true }; 091 092 private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); 093 094 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; 095 096 private static final int NUM_TEST_BLOCKS = 1000; 097 private static final int NUM_READER_THREADS = 26; 098 099 // Used to generate KeyValues 100 private static int NUM_KEYVALUES = 50; 101 private static int FIELD_LENGTH = 10; 102 private static float CHANCE_TO_REPEAT = 0.6f; 103 104 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 105 private FileSystem fs; 106 107 private final boolean includesMemstoreTS; 108 private final boolean includesTag; 109 public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) { 110 this.includesMemstoreTS = includesMemstoreTS; 111 this.includesTag = includesTag; 112 } 113 114 @Parameters 115 public static Collection<Object[]> parameters() { 116 return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED; 117 } 118 119 @Before 120 public void setUp() throws IOException { 121 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 122 } 123 124 static void writeTestBlockContents(DataOutputStream dos) throws IOException { 125 // This compresses really well. 126 for (int i = 0; i < 1000; ++i) 127 dos.writeInt(i / 100); 128 } 129 130 static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS, 131 boolean useTag) throws IOException { 132 List<KeyValue> keyValues = new ArrayList<>(); 133 Random randomizer = new Random(42L + seed); // just any fixed number 134 135 // generate keyValues 136 for (int i = 0; i < NUM_KEYVALUES; ++i) { 137 byte[] row; 138 long timestamp; 139 byte[] family; 140 byte[] qualifier; 141 byte[] value; 142 143 // generate it or repeat, it should compress well 144 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 145 row = CellUtil.cloneRow(keyValues.get(randomizer.nextInt(keyValues.size()))); 146 } else { 147 row = new byte[FIELD_LENGTH]; 148 randomizer.nextBytes(row); 149 } 150 if (0 == i) { 151 family = new byte[FIELD_LENGTH]; 152 randomizer.nextBytes(family); 153 } else { 154 family = CellUtil.cloneFamily(keyValues.get(0)); 155 } 156 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 157 qualifier = CellUtil.cloneQualifier(keyValues.get(randomizer.nextInt(keyValues.size()))); 158 } else { 159 qualifier = new byte[FIELD_LENGTH]; 160 randomizer.nextBytes(qualifier); 161 } 162 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 163 value = CellUtil.cloneValue(keyValues.get(randomizer.nextInt(keyValues.size()))); 164 } else { 165 value = new byte[FIELD_LENGTH]; 166 randomizer.nextBytes(value); 167 } 168 if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) { 169 timestamp = keyValues.get( 170 randomizer.nextInt(keyValues.size())).getTimestamp(); 171 } else { 172 timestamp = randomizer.nextLong(); 173 } 174 if (!useTag) { 175 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value)); 176 } else { 177 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, 178 new Tag[] { new ArrayBackedTag((byte) 1, Bytes.toBytes("myTagVal")) })); 179 } 180 } 181 182 // sort it and write to stream 183 int totalSize = 0; 184 Collections.sort(keyValues, CellComparatorImpl.COMPARATOR); 185 186 for (KeyValue kv : keyValues) { 187 totalSize += kv.getLength(); 188 if (includesMemstoreTS) { 189 long memstoreTS = randomizer.nextLong(); 190 kv.setSequenceId(memstoreTS); 191 totalSize += WritableUtils.getVIntSize(memstoreTS); 192 } 193 hbw.write(kv); 194 } 195 return totalSize; 196 } 197 198 public byte[] createTestV1Block(Compression.Algorithm algo) 199 throws IOException { 200 Compressor compressor = algo.getCompressor(); 201 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 202 OutputStream os = algo.createCompressionStream(baos, compressor, 0); 203 DataOutputStream dos = new DataOutputStream(os); 204 BlockType.META.write(dos); // Let's make this a meta block. 205 writeTestBlockContents(dos); 206 dos.flush(); 207 algo.returnCompressor(compressor); 208 return baos.toByteArray(); 209 } 210 211 static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo, 212 boolean includesMemstoreTS, boolean includesTag) throws IOException { 213 final BlockType blockType = BlockType.DATA; 214 HFileContext meta = new HFileContextBuilder() 215 .withCompression(algo) 216 .withIncludesMvcc(includesMemstoreTS) 217 .withIncludesTags(includesTag) 218 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 219 .build(); 220 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 221 DataOutputStream dos = hbw.startWriting(blockType); 222 writeTestBlockContents(dos); 223 dos.flush(); 224 hbw.ensureBlockReady(); 225 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); 226 hbw.release(); 227 return hbw; 228 } 229 230 public String createTestBlockStr(Compression.Algorithm algo, 231 int correctLength, boolean useTag) throws IOException { 232 HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag); 233 byte[] testV2Block = hbw.getHeaderAndDataForTest(); 234 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9; 235 if (testV2Block.length == correctLength) { 236 // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid 237 // variations across operating systems. 238 // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. 239 // We only make this change when the compressed block length matches. 240 // Otherwise, there are obviously other inconsistencies. 241 testV2Block[osOffset] = 3; 242 } 243 return Bytes.toStringBinary(testV2Block); 244 } 245 246 @Test 247 public void testNoCompression() throws IOException { 248 CacheConfig cacheConf = Mockito.mock(CacheConfig.class); 249 Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false); 250 251 HFileBlock block = 252 createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf); 253 assertEquals(4000, block.getUncompressedSizeWithoutHeader()); 254 assertEquals(4004, block.getOnDiskSizeWithoutHeader()); 255 assertTrue(block.isUnpacked()); 256 } 257 258 @Test 259 public void testGzipCompression() throws IOException { 260 final String correctTestBlockStr = 261 "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" 262 + "\\xFF\\xFF\\xFF\\xFF" 263 + "\\x0" + ChecksumType.getDefaultChecksumType().getCode() 264 + "\\x00\\x00@\\x00\\x00\\x00\\x00[" 265 // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html 266 + "\\x1F\\x8B" // gzip magic signature 267 + "\\x08" // Compression method: 8 = "deflate" 268 + "\\x00" // Flags 269 + "\\x00\\x00\\x00\\x00" // mtime 270 + "\\x00" // XFL (extra flags) 271 // OS (0 = FAT filesystems, 3 = Unix). However, this field 272 // sometimes gets set to 0 on Linux and Mac, so we reset it to 3. 273 // This appears to be a difference caused by the availability 274 // (and use) of the native GZ codec. 275 + "\\x03" 276 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa" 277 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" 278 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00" 279 + "\\x00\\x00\\x00\\x00"; // 4 byte checksum (ignored) 280 final int correctGzipBlockLength = 95; 281 final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false); 282 // We ignore the block checksum because createTestBlockStr can change the 283 // gzip header after the block is produced 284 assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4), 285 testBlockStr.substring(0, correctGzipBlockLength - 4)); 286 } 287 288 @Test 289 public void testReaderV2() throws IOException { 290 testReaderV2Internals(); 291 } 292 293 protected void testReaderV2Internals() throws IOException { 294 if(includesTag) { 295 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); 296 } 297 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 298 for (boolean pread : new boolean[] { false, true }) { 299 LOG.info("testReaderV2: Compression algorithm: " + algo + 300 ", pread=" + pread); 301 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" 302 + algo); 303 FSDataOutputStream os = fs.create(path); 304 HFileContext meta = new HFileContextBuilder() 305 .withCompression(algo) 306 .withIncludesMvcc(includesMemstoreTS) 307 .withIncludesTags(includesTag) 308 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 309 .build(); 310 HFileBlock.Writer hbw = new HFileBlock.Writer(null, 311 meta); 312 long totalSize = 0; 313 for (int blockId = 0; blockId < 2; ++blockId) { 314 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 315 for (int i = 0; i < 1234; ++i) 316 dos.writeInt(i); 317 hbw.writeHeaderAndData(os); 318 totalSize += hbw.getOnDiskSizeWithHeader(); 319 } 320 os.close(); 321 322 FSDataInputStream is = fs.open(path); 323 meta = new HFileContextBuilder() 324 .withHBaseCheckSum(true) 325 .withIncludesMvcc(includesMemstoreTS) 326 .withIncludesTags(includesTag) 327 .withCompression(algo).build(); 328 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); 329 HFileBlock b = hbr.readBlockData(0, -1, pread, false); 330 is.close(); 331 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 332 333 b.sanityCheck(); 334 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 335 assertEquals(algo == GZ ? 2173 : 4936, 336 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 337 HFileBlock expected = b; 338 339 if (algo == GZ) { 340 is = fs.open(path); 341 hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); 342 b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + 343 b.totalChecksumBytes(), pread, false); 344 assertEquals(expected, b); 345 int wrongCompressedSize = 2172; 346 try { 347 b = hbr.readBlockData(0, wrongCompressedSize 348 + HConstants.HFILEBLOCK_HEADER_SIZE, pread, false); 349 fail("Exception expected"); 350 } catch (IOException ex) { 351 String expectedPrefix = "Passed in onDiskSizeWithHeader="; 352 assertTrue("Invalid exception message: '" + ex.getMessage() 353 + "'.\nMessage is expected to start with: '" + expectedPrefix 354 + "'", ex.getMessage().startsWith(expectedPrefix)); 355 } 356 is.close(); 357 } 358 } 359 } 360 } 361 362 /** 363 * Test encoding/decoding data blocks. 364 * @throws IOException a bug or a problem with temporary files. 365 */ 366 @Test 367 public void testDataBlockEncoding() throws IOException { 368 testInternals(); 369 } 370 371 private void testInternals() throws IOException { 372 final int numBlocks = 5; 373 if(includesTag) { 374 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); 375 } 376 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 377 for (boolean pread : new boolean[] { false, true }) { 378 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 379 LOG.info("testDataBlockEncoding: Compression algorithm={}, pread={}, dataBlockEncoder={}", 380 algo.toString(), pread, encoding); 381 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" 382 + algo + "_" + encoding.toString()); 383 FSDataOutputStream os = fs.create(path); 384 HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ? 385 new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE; 386 HFileContext meta = new HFileContextBuilder() 387 .withCompression(algo) 388 .withIncludesMvcc(includesMemstoreTS) 389 .withIncludesTags(includesTag) 390 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 391 .build(); 392 HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta); 393 long totalSize = 0; 394 final List<Integer> encodedSizes = new ArrayList<>(); 395 final List<ByteBuffer> encodedBlocks = new ArrayList<>(); 396 for (int blockId = 0; blockId < numBlocks; ++blockId) { 397 hbw.startWriting(BlockType.DATA); 398 writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag); 399 hbw.writeHeaderAndData(os); 400 int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE; 401 byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array(); 402 final int encodedSize = encodedResultWithHeader.length - headerLen; 403 if (encoding != DataBlockEncoding.NONE) { 404 // We need to account for the two-byte encoding algorithm ID that 405 // comes after the 24-byte block header but before encoded KVs. 406 headerLen += DataBlockEncoding.ID_SIZE; 407 } 408 byte[] encodedDataSection = 409 new byte[encodedResultWithHeader.length - headerLen]; 410 System.arraycopy(encodedResultWithHeader, headerLen, 411 encodedDataSection, 0, encodedDataSection.length); 412 final ByteBuffer encodedBuf = 413 ByteBuffer.wrap(encodedDataSection); 414 encodedSizes.add(encodedSize); 415 encodedBlocks.add(encodedBuf); 416 totalSize += hbw.getOnDiskSizeWithHeader(); 417 } 418 os.close(); 419 420 FSDataInputStream is = fs.open(path); 421 meta = new HFileContextBuilder() 422 .withHBaseCheckSum(true) 423 .withCompression(algo) 424 .withIncludesMvcc(includesMemstoreTS) 425 .withIncludesTags(includesTag) 426 .build(); 427 HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); 428 hbr.setDataBlockEncoder(dataBlockEncoder); 429 hbr.setIncludesMemStoreTS(includesMemstoreTS); 430 HFileBlock blockFromHFile, blockUnpacked; 431 int pos = 0; 432 for (int blockId = 0; blockId < numBlocks; ++blockId) { 433 blockFromHFile = hbr.readBlockData(pos, -1, pread, false); 434 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 435 blockFromHFile.sanityCheck(); 436 pos += blockFromHFile.getOnDiskSizeWithHeader(); 437 assertEquals((int) encodedSizes.get(blockId), 438 blockFromHFile.getUncompressedSizeWithoutHeader()); 439 assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked()); 440 long packedHeapsize = blockFromHFile.heapSize(); 441 blockUnpacked = blockFromHFile.unpack(meta, hbr); 442 assertTrue(blockUnpacked.isUnpacked()); 443 if (meta.isCompressedOrEncrypted()) { 444 LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked 445 .heapSize()); 446 assertFalse(packedHeapsize == blockUnpacked.heapSize()); 447 assertTrue("Packed heapSize should be < unpacked heapSize", 448 packedHeapsize < blockUnpacked.heapSize()); 449 } 450 ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader(); 451 if (encoding != DataBlockEncoding.NONE) { 452 // We expect a two-byte big-endian encoding id. 453 assertEquals( 454 "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread), 455 Long.toHexString(0), Long.toHexString(actualBuffer.get(0))); 456 assertEquals( 457 "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread), 458 Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1))); 459 actualBuffer.position(2); 460 actualBuffer = actualBuffer.slice(); 461 } 462 463 ByteBuffer expectedBuffer = encodedBlocks.get(blockId); 464 expectedBuffer.rewind(); 465 466 // test if content matches, produce nice message 467 assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, algo, encoding, 468 pread); 469 470 // test serialized blocks 471 for (boolean reuseBuffer : new boolean[] { false, true }) { 472 ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength()); 473 blockFromHFile.serialize(serialized, true); 474 HFileBlock deserialized = 475 (HFileBlock) blockFromHFile.getDeserializer().deserialize( 476 new SingleByteBuff(serialized), reuseBuffer, MemoryType.EXCLUSIVE); 477 assertEquals( 478 "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer, 479 blockFromHFile, deserialized); 480 // intentional reference comparison 481 if (blockFromHFile != blockUnpacked) { 482 assertEquals("Deserializaed block cannot be unpacked correctly.", 483 blockUnpacked, deserialized.unpack(meta, hbr)); 484 } 485 } 486 } 487 is.close(); 488 } 489 } 490 } 491 } 492 493 static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding, 494 boolean pread) { 495 return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread); 496 } 497 498 static void assertBuffersEqual(ByteBuff expectedBuffer, 499 ByteBuff actualBuffer, Compression.Algorithm compression, 500 DataBlockEncoding encoding, boolean pread) { 501 if (!actualBuffer.equals(expectedBuffer)) { 502 int prefix = 0; 503 int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit()); 504 while (prefix < minLimit && 505 expectedBuffer.get(prefix) == actualBuffer.get(prefix)) { 506 prefix++; 507 } 508 509 fail(String.format( 510 "Content mismatch for %s, commonPrefix %d, expected %s, got %s", 511 buildMessageDetails(compression, encoding, pread), prefix, 512 nextBytesToStr(expectedBuffer, prefix), 513 nextBytesToStr(actualBuffer, prefix))); 514 } 515 } 516 517 /** 518 * Convert a few next bytes in the given buffer at the given position to 519 * string. Used for error messages. 520 */ 521 private static String nextBytesToStr(ByteBuff buf, int pos) { 522 int maxBytes = buf.limit() - pos; 523 int numBytes = Math.min(16, maxBytes); 524 return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos, 525 numBytes) + (numBytes < maxBytes ? "..." : ""); 526 } 527 528 @Test 529 public void testPreviousOffset() throws IOException { 530 testPreviousOffsetInternals(); 531 } 532 533 protected void testPreviousOffsetInternals() throws IOException { 534 // TODO: parameterize these nested loops. 535 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 536 for (boolean pread : BOOLEAN_VALUES) { 537 for (boolean cacheOnWrite : BOOLEAN_VALUES) { 538 Random rand = defaultRandom(); 539 LOG.info("testPreviousOffset: Compression algorithm={}, pread={}, cacheOnWrite={}", 540 algo.toString(), pread, cacheOnWrite); 541 Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset"); 542 List<Long> expectedOffsets = new ArrayList<>(); 543 List<Long> expectedPrevOffsets = new ArrayList<>(); 544 List<BlockType> expectedTypes = new ArrayList<>(); 545 List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null; 546 long totalSize = writeBlocks(rand, algo, path, expectedOffsets, 547 expectedPrevOffsets, expectedTypes, expectedContents); 548 549 FSDataInputStream is = fs.open(path); 550 HFileContext meta = new HFileContextBuilder() 551 .withHBaseCheckSum(true) 552 .withIncludesMvcc(includesMemstoreTS) 553 .withIncludesTags(includesTag) 554 .withCompression(algo).build(); 555 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); 556 long curOffset = 0; 557 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 558 if (!pread) { 559 assertEquals(is.getPos(), curOffset + (i == 0 ? 0 : 560 HConstants.HFILEBLOCK_HEADER_SIZE)); 561 } 562 563 assertEquals(expectedOffsets.get(i).longValue(), curOffset); 564 if (detailedLogging) { 565 LOG.info("Reading block #" + i + " at offset " + curOffset); 566 } 567 HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false); 568 if (detailedLogging) { 569 LOG.info("Block #" + i + ": " + b); 570 } 571 assertEquals("Invalid block #" + i + "'s type:", 572 expectedTypes.get(i), b.getBlockType()); 573 assertEquals("Invalid previous block offset for block " + i 574 + " of " + "type " + b.getBlockType() + ":", 575 (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset()); 576 b.sanityCheck(); 577 assertEquals(curOffset, b.getOffset()); 578 579 // Now re-load this block knowing the on-disk size. This tests a 580 // different branch in the loader. 581 HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false); 582 b2.sanityCheck(); 583 584 assertEquals(b.getBlockType(), b2.getBlockType()); 585 assertEquals(b.getOnDiskSizeWithoutHeader(), 586 b2.getOnDiskSizeWithoutHeader()); 587 assertEquals(b.getOnDiskSizeWithHeader(), 588 b2.getOnDiskSizeWithHeader()); 589 assertEquals(b.getUncompressedSizeWithoutHeader(), 590 b2.getUncompressedSizeWithoutHeader()); 591 assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset()); 592 assertEquals(curOffset, b2.getOffset()); 593 assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum()); 594 assertEquals(b.getOnDiskDataSizeWithHeader(), 595 b2.getOnDiskDataSizeWithHeader()); 596 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 597 598 curOffset += b.getOnDiskSizeWithHeader(); 599 600 if (cacheOnWrite) { 601 // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply 602 // verifies that the unpacked value read back off disk matches the unpacked value 603 // generated before writing to disk. 604 b = b.unpack(meta, hbr); 605 // b's buffer has header + data + checksum while 606 // expectedContents have header + data only 607 ByteBuff bufRead = b.getBufferReadOnly(); 608 ByteBuffer bufExpected = expectedContents.get(i); 609 boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(), 610 bufRead.arrayOffset(), 611 bufRead.limit() - b.totalChecksumBytes(), 612 bufExpected.array(), bufExpected.arrayOffset(), 613 bufExpected.limit()) == 0; 614 String wrongBytesMsg = ""; 615 616 if (!bytesAreCorrect) { 617 // Optimization: only construct an error message in case we 618 // will need it. 619 wrongBytesMsg = "Expected bytes in block #" + i + " (algo=" 620 + algo + ", pread=" + pread 621 + ", cacheOnWrite=" + cacheOnWrite + "):\n"; 622 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), 623 bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) 624 + ", actual:\n" 625 + Bytes.toStringBinary(bufRead.array(), 626 bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit())); 627 if (detailedLogging) { 628 LOG.warn("expected header" + 629 HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) + 630 "\nfound header" + 631 HFileBlock.toStringHeader(bufRead)); 632 LOG.warn("bufread offset " + bufRead.arrayOffset() + 633 " limit " + bufRead.limit() + 634 " expected offset " + bufExpected.arrayOffset() + 635 " limit " + bufExpected.limit()); 636 LOG.warn(wrongBytesMsg); 637 } 638 } 639 assertTrue(wrongBytesMsg, bytesAreCorrect); 640 } 641 } 642 643 assertEquals(curOffset, fs.getFileStatus(path).getLen()); 644 is.close(); 645 } 646 } 647 } 648 } 649 650 private Random defaultRandom() { 651 return new Random(189237); 652 } 653 654 private class BlockReaderThread implements Callable<Boolean> { 655 private final String clientId; 656 private final HFileBlock.FSReader hbr; 657 private final List<Long> offsets; 658 private final List<BlockType> types; 659 private final long fileSize; 660 661 public BlockReaderThread(String clientId, 662 HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types, 663 long fileSize) { 664 this.clientId = clientId; 665 this.offsets = offsets; 666 this.hbr = hbr; 667 this.types = types; 668 this.fileSize = fileSize; 669 } 670 671 @Override 672 public Boolean call() throws Exception { 673 Random rand = new Random(clientId.hashCode()); 674 long endTime = System.currentTimeMillis() + 10000; 675 int numBlocksRead = 0; 676 int numPositionalRead = 0; 677 int numWithOnDiskSize = 0; 678 while (System.currentTimeMillis() < endTime) { 679 int blockId = rand.nextInt(NUM_TEST_BLOCKS); 680 long offset = offsets.get(blockId); 681 // now we only support concurrent read with pread = true 682 boolean pread = true; 683 boolean withOnDiskSize = rand.nextBoolean(); 684 long expectedSize = 685 (blockId == NUM_TEST_BLOCKS - 1 ? fileSize 686 : offsets.get(blockId + 1)) - offset; 687 688 HFileBlock b; 689 try { 690 long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; 691 b = hbr.readBlockData(offset, onDiskSizeArg, pread, false); 692 } catch (IOException ex) { 693 LOG.error("Error in client " + clientId + " trying to read block at " 694 + offset + ", pread=" + pread + ", withOnDiskSize=" + 695 withOnDiskSize, ex); 696 return false; 697 } 698 699 assertEquals(types.get(blockId), b.getBlockType()); 700 assertEquals(expectedSize, b.getOnDiskSizeWithHeader()); 701 assertEquals(offset, b.getOffset()); 702 703 ++numBlocksRead; 704 if (pread) 705 ++numPositionalRead; 706 if (withOnDiskSize) 707 ++numWithOnDiskSize; 708 } 709 LOG.info("Client " + clientId + " successfully read " + numBlocksRead + 710 " blocks (with pread: " + numPositionalRead + ", with onDiskSize " + 711 "specified: " + numWithOnDiskSize + ")"); 712 713 return true; 714 } 715 716 } 717 718 @Test 719 public void testConcurrentReading() throws Exception { 720 testConcurrentReadingInternals(); 721 } 722 723 protected void testConcurrentReadingInternals() throws IOException, 724 InterruptedException, ExecutionException { 725 for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) { 726 Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading"); 727 Random rand = defaultRandom(); 728 List<Long> offsets = new ArrayList<>(); 729 List<BlockType> types = new ArrayList<>(); 730 writeBlocks(rand, compressAlgo, path, offsets, null, types, null); 731 FSDataInputStream is = fs.open(path); 732 long fileSize = fs.getFileStatus(path).getLen(); 733 HFileContext meta = new HFileContextBuilder() 734 .withHBaseCheckSum(true) 735 .withIncludesMvcc(includesMemstoreTS) 736 .withIncludesTags(includesTag) 737 .withCompression(compressAlgo) 738 .build(); 739 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, fileSize, meta); 740 741 Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS); 742 ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec); 743 744 for (int i = 0; i < NUM_READER_THREADS; ++i) { 745 ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr, 746 offsets, types, fileSize)); 747 } 748 749 for (int i = 0; i < NUM_READER_THREADS; ++i) { 750 Future<Boolean> result = ecs.take(); 751 assertTrue(result.get()); 752 if (detailedLogging) { 753 LOG.info(String.valueOf(i + 1) 754 + " reader threads finished successfully (algo=" + compressAlgo 755 + ")"); 756 } 757 } 758 759 is.close(); 760 } 761 } 762 763 private long writeBlocks(Random rand, Compression.Algorithm compressAlgo, 764 Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets, 765 List<BlockType> expectedTypes, List<ByteBuffer> expectedContents 766 ) throws IOException { 767 boolean cacheOnWrite = expectedContents != null; 768 FSDataOutputStream os = fs.create(path); 769 HFileContext meta = new HFileContextBuilder() 770 .withHBaseCheckSum(true) 771 .withIncludesMvcc(includesMemstoreTS) 772 .withIncludesTags(includesTag) 773 .withCompression(compressAlgo) 774 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 775 .build(); 776 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); 777 Map<BlockType, Long> prevOffsetByType = new HashMap<>(); 778 long totalSize = 0; 779 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 780 long pos = os.getPos(); 781 int blockTypeOrdinal = rand.nextInt(BlockType.values().length); 782 if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) { 783 blockTypeOrdinal = BlockType.DATA.ordinal(); 784 } 785 BlockType bt = BlockType.values()[blockTypeOrdinal]; 786 DataOutputStream dos = hbw.startWriting(bt); 787 int size = rand.nextInt(500); 788 for (int j = 0; j < size; ++j) { 789 // This might compress well. 790 dos.writeShort(i + 1); 791 dos.writeInt(j + 1); 792 } 793 794 if (expectedOffsets != null) 795 expectedOffsets.add(os.getPos()); 796 797 if (expectedPrevOffsets != null) { 798 Long prevOffset = prevOffsetByType.get(bt); 799 expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1); 800 prevOffsetByType.put(bt, os.getPos()); 801 } 802 803 expectedTypes.add(bt); 804 805 hbw.writeHeaderAndData(os); 806 totalSize += hbw.getOnDiskSizeWithHeader(); 807 808 if (cacheOnWrite) 809 expectedContents.add(hbw.cloneUncompressedBufferWithHeader()); 810 811 if (detailedLogging) { 812 LOG.info("Written block #" + i + " of type " + bt 813 + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader() 814 + ", packed size " + hbw.getOnDiskSizeWithoutHeader() 815 + " at offset " + pos); 816 } 817 } 818 os.close(); 819 LOG.info("Created a temporary file at " + path + ", " 820 + fs.getFileStatus(path).getLen() + " byte, compression=" + 821 compressAlgo); 822 return totalSize; 823 } 824 825 @Test 826 public void testBlockHeapSize() { 827 testBlockHeapSizeInternals(); 828 } 829 830 protected void testBlockHeapSizeInternals() { 831 if (ClassSize.is32BitJVM()) { 832 assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); 833 } else { 834 assertEquals(72, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); 835 } 836 837 for (int size : new int[] { 100, 256, 12345 }) { 838 byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size]; 839 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 840 HFileContext meta = new HFileContextBuilder() 841 .withIncludesMvcc(includesMemstoreTS) 842 .withIncludesTags(includesTag) 843 .withHBaseCheckSum(false) 844 .withCompression(Algorithm.NONE) 845 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) 846 .withChecksumType(ChecksumType.NULL).build(); 847 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, 848 HFileBlock.FILL_HEADER, -1, 0, -1, meta); 849 long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase( 850 new MultiByteBuff(buf).getClass(), true) 851 + HConstants.HFILEBLOCK_HEADER_SIZE + size); 852 long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true)); 853 long hfileBlockExpectedSize = 854 ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true)); 855 long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize; 856 assertEquals("Block data size: " + size + ", byte buffer expected " + 857 "size: " + byteBufferExpectedSize + ", HFileBlock class expected " + 858 "size: " + hfileBlockExpectedSize + ";", expected, 859 block.heapSize()); 860 } 861 } 862 863 @Test 864 public void testSerializeWithoutNextBlockMetadata() { 865 int size = 100; 866 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 867 byte[] byteArr = new byte[length]; 868 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 869 HFileContext meta = new HFileContextBuilder().build(); 870 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf, 871 HFileBlock.FILL_HEADER, -1, 52, -1, meta); 872 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf, 873 HFileBlock.FILL_HEADER, -1, -1, -1, meta); 874 ByteBuffer buff1 = ByteBuffer.allocate(length); 875 ByteBuffer buff2 = ByteBuffer.allocate(length); 876 blockWithNextBlockMetadata.serialize(buff1, true); 877 blockWithoutNextBlockMetadata.serialize(buff2, true); 878 assertNotEquals(buff1, buff2); 879 buff1.clear(); 880 buff2.clear(); 881 blockWithNextBlockMetadata.serialize(buff1, false); 882 blockWithoutNextBlockMetadata.serialize(buff2, false); 883 assertEquals(buff1, buff2); 884 } 885}